Mensaje & Comunicación

Un agente en Saptiva-Agents puede reaccionar, enviar y publicar mensajes, y los mensajes son el único medio a través del cual los agentes pueden comunicarse entre sí.


Mensajes

Los mensajes son objetos serializables y pueden definirse utilizando:

  • Una subclase de pydantic.BaseModel, o

  • Un dataclass

from dataclasses import dataclass

@dataclass
class TextMessage:
    content: str
    source: str

@dataclass
class ImageMessage:
    url: str
    source: str

Nota

Los mensajes son puramente datos y no deben contener lógica.


Manejadores de Mensajes

Cuando un agente recibe un mensaje, el runtime invoca el manejador de mensajes del agente (on_message()), el cual debe implementar la lógica para manejarlo. Si el mensaje no puede ser manejado, el agente debe lanzar CantHandleException.

La clase base BaseAgent no implementa manejo de mensajes, por lo tanto, no se recomienda implementar directamente on_message() salvo para casos avanzados.

Se recomienda usar la clase RoutedAgent, que proporciona capacidad incorporada de enrutamiento de mensajes.


Enrutamiento de Mensajes por Tipo

La clase base RoutedAgent proporciona un mecanismo para asociar tipos de mensajes con funciones manejadoras (handlers) usando el decorador @message_handler(), lo que evita que los desarrolladores tengan que implementar directamente el método on_message().

Por ejemplo, el siguiente agente basado en enrutamiento por tipo responde de forma distinta a TextMessage e ImageMessage utilizando diferentes funciones manejadoras:

from saptiva_agents.core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler

class MyAgent(RoutedAgent):
    @message_handler
    async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hola, {message.source}, dijiste {message.content}!")

    @message_handler
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hola, {message.source}, me enviaste {message.url}!")
# Crear runtime y registrar el tipo de agente
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')

Prueba el agente con TextMessage y ImageMessage.

# Probar con ambos mensajes
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="¡Hola Mundo!", source="Usuario"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="Usuario"), agent_id)
await runtime.stop_when_idle()
Hola, Usuario, dijiste ¡Hola Mundo!!
Hola, Usuario, me enviaste https://example.com/image.jpg!

El runtime crea automáticamente una instancia de MyAgent con el ID de agente AgentId("my_agent", "default") al entregar el primer mensaje.


Enrutamiento de Mensajes del Mismo Tipo

En algunos escenarios, es útil enrutar mensajes del mismo tipo a diferentes funciones manejadoras. Por ejemplo, puede que desees manejar de forma distinta los mensajes que provienen de distintos agentes emisores. Para ello, puedes usar el parámetro match del decorador @message_handler().

El parámetro match permite asociar múltiples handlers al mismo tipo de mensaje, según condiciones adicionales. Este parámetro acepta una función que recibe el mensaje y el MessageContext, y devuelve un valor booleano indicando si ese handler debe procesar el mensaje. Los handlers con match se evalúan en orden alfabético de nombre de función.

A continuación se muestra un ejemplo de un agente que enruta los mensajes en función del agente emisor utilizando match:

class RoutedBySenderAgent(RoutedAgent):
    @message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignore
    async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hola desde user1, {message.source}, dijiste {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hola desde user2, {message.source}, dijiste {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hola {message.source}, me enviaste {message.url}!")

El agente anterior utiliza el campo source del mensaje para identificar al agente emisor. También puedes usar el campo sender del MessageContext para determinar el agente emisor mediante su ID, si está disponible.

Probemos este agente con mensajes que contienen distintos valores en el campo source:

# Registrar y enviar mensajes con distintos remitentes
runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hola desde user1, user1-test, dijiste Hello, World!!
Hola desde user2, user2-test, dijiste Hello, World!!
Hola user2-test, me enviaste https://example.com/image.jpg!

En el ejemplo anterior, el primer ImageMessage no es manejado porque el campo source del mensaje no coincide con la condición de coincidencia (match) del manejador.


Mensajería Directa

Hay dos formas de comunicación en Saptiva-Agents:

  • Mensajería Directa: el mensaje va a un agente específico.

  • Broadcast: el mensaje se publica en un topic y lo reciben múltiples agentes.

Veamos primero la mensajería directa. Para enviar un mensaje directo a otro agente, dentro de un manejador de mensajes se utiliza el método saptiva_agents.core.BaseAgent.send_message(), y desde el entorno de ejecución se utiliza el método saptiva_agents.core.AgentRuntime.send_message(). Al esperar (await) estas llamadas, se devolverá el valor de retorno del manejador de mensajes del agente receptor. Si el manejador del agente receptor devuelve None, también se devolverá None.

Nota

Si el agente invocado lanza una excepción mientras el emisor está esperando, la excepción será propagada de vuelta al emisor.

Solicitud/Respuesta

La mensajería directa puede utilizarse para escenarios de solicitud/respuesta, donde el emisor espera una respuesta del receptor. El receptor puede responder al mensaje devolviendo un valor desde su manejador de mensajes. Puedes pensar en esto como una llamada a función entre agentes.

Por ejemplo, considera los siguientes agentes:

from dataclasses import dataclass

from saptiva_agents.core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler

@dataclass
class Message:
    content: str

class InnerAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"Hola desde inner, {message.content}")

class OuterAgent(RoutedAgent):
    def __init__(self, description: str, inner_agent_type: str):
        super().__init__(description)
        self.inner_agent_id = AgentId(inner_agent_type, self.id.key)

    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Mensaje recibido: {message.content}")
        # Envía un mensaje directo al agente interno y recibe una respuesta.
        response = await self.send_message(Message(f"Hola desde outer, {message.content}"), self.inner_agent_id)
        print(f"Respuesta desde inner: {response.content}")

Al recibir un mensaje, el OuterAgent envía un mensaje directo al InnerAgent y recibe una respuesta de vuelta.

Podemos probar estos agentes enviando un Message al OuterAgent.

# Ejecutar
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Mensaje recibido: Hello, World!
Respuesta desde inner: Hola desde inner, Hola desde outer, Hello, World!

Ambas salidas son producidas por el manejador de mensajes del OuterAgent, sin embargo, la segunda salida se basa en la respuesta del InnerAgent.

En términos generales, la mensajería directa es adecuada para escenarios en los que el emisor y el receptor están estrechamente acoplados: se crean juntos y el emisor está vinculado a una instancia específica del receptor. Por ejemplo, un agente ejecuta llamadas a herramientas enviando mensajes directos a una instancia de ToolAgent, y utiliza las respuestas para formar un bucle de acción-observación.


Broadcast

La difusión (broadcast) es, en esencia, el modelo de publicación/suscripción con temas (topics) y suscripciones. Consulta Topic and Subscription para conocer los conceptos clave.

La diferencia principal entre la mensajería directa y la difusión es que broadcast no puede usarse en escenarios de solicitud/respuesta. Cuando un agente publica un mensaje, es una acción unidireccional: no puede recibir una respuesta de ningún otro agente, incluso si el manejador del agente receptor retorna un valor.

Nota

Si se proporciona una respuesta a un mensaje publicado, esta será descartada.

Nota

Si un agente publica un tipo de mensaje al cual está suscrito, no recibirá el mensaje que publicó. Esto es para prevenir bucles infinitos.


Suscribirse y Publicar en Temas (Topics)

La suscripción basada en tipo (type-based subscription) mapea los mensajes publicados en temas de un tipo específico de tema (topic type) hacia agentes de un tipo específico de agente (agent type). Para que un agente que hereda de RoutedAgent se suscriba a un tema de cierto tipo, puedes usar el decorador de clase type_subscription().

El siguiente ejemplo muestra una clase ReceiverAgent que se suscribe a temas del tipo "default" usando el decorador type_subscription(), y que imprime los mensajes recibidos.

from saptiva_agents.core import RoutedAgent, message_handler, type_subscription, MessageContext

@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Recibido: {message.content}")

Para publicar un mensaje desde el handler de un agente, usa el método publish_message() y especifica un TopicId. Esta llamada debe ser awaited para permitir que el runtime programe la entrega del mensaje a todos los suscriptores, pero siempre devolverá None. Si un agente lanza una excepción mientras maneja un mensaje publicado, esta será registrada en los logs, pero no se propagará de vuelta al agente que publicó el mensaje.

El siguiente ejemplo muestra un BroadcastingAgent que publica un mensaje en un tópico al recibir uno.

from saptiva_agents.core import TopicId

class BroadcastingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        await self.publish_message(
            Message("¡Mensaje desde broadcasting agent!"),
            topic_id=TopicId(type="default", source=self.id.key),
        )

BroadcastingAgent publica mensajes en un tópico con tipo "default" y con el campo source asignado a la clave del agente (agent key) de la instancia.

Las suscripciones se registran en el entorno de ejecución del agente (runtime), ya sea como parte del registro del tipo de agente o mediante un método de la API por separado.

A continuación se muestra cómo registrar una TypeSubscription:

  • Para el agente receptor, se utiliza el decorador @type_subscription().

  • Para el agente emisor (BroadcastingAgent), el registro se realiza sin decorador.

# Registrar y publicar
from saptiva_agents.core import TypeSubscription, SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()

# Opción 1: con el decorador type_subscription
# El decorador de clase type_subscription agrega automáticamente una TypeSubscription
# al runtime cuando se registra el agente.
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))

# Opción 2: con TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))

# Iniciar el runtime y publicar un mensaje.
runtime.start()
await runtime.publish_message(Message("¡Hola Mundo desde runtime!"), topic_id=TopicId(type="default", source="default"))
await runtime.stop_when_idle()
Recibido: ¡Hola Mundo desde runtime!
Recibido: ¡Mensaje desde broadcasting agent!

Como se muestra en el ejemplo anterior, también puedes publicar directamente a un tema utilizando el método publish_message() del runtime, sin necesidad de crear una instancia de agente.

Según la salida, puedes ver que el agente receptor recibió dos mensajes: uno fue publicado a través del runtime y el otro fue publicado por el agente emisor (broadcasting agent).


Default Topic y Subscriptions

En el ejemplo anterior, usamos TopicId y TypeSubscription para especificar el tema (topic) y las suscripciones respectivamente. Esta es la forma adecuada para muchos escenarios.

Sin embargo, cuando existe un único ámbito de publicación, es decir, todos los agentes publican y se suscriben a todos los mensajes transmitidos, podemos usar las clases convenientes DefaultTopicId y default_subscription() para simplificar nuestro código.

DefaultTopicId sirve para crear un tema que utiliza "default" como valor predeterminado para el tipo de tema y la clave del agente emisor como valor predeterminado para el origen del tema. default_subscription() se utiliza para crear una suscripción de tipo que se suscribe al tema por defecto.

Podemos simplificar BroadcastingAgent usando DefaultTopicId y default_subscription().

from saptiva_agents.core import DefaultTopicId, default_subscription

@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        await self.publish_message(
            Message("¡Mensaje desde broadcasting agent!"),
            topic_id=DefaultTopicId(),
        )

Cuando el entorno de ejecución llama a register() para registrar un tipo de agente, se crea una instancia de TypeSubscription cuyo tipo de tópico (topic type) utiliza "default" como valor predeterminado, y el tipo de agente (agent type) corresponde al mismo tipo de agente que se está registrando en ese contexto.

runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent"))
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("¡Hola Mundo desde runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Recibido: ¡Hola Mundo desde runtime!
Recibido: ¡Mensaje desde broadcasting agent!

Nota

Si tu escenario permite que todos los agentes publiquen y se suscriban a todos los mensajes transmitidos, usa DefaultTopicId y default_subscription() para decorar tus clases de agente.

Última actualización