Mensaje & Comunicación
Un agente en Saptiva-Agents puede reaccionar, enviar y publicar mensajes, y los mensajes son el único medio a través del cual los agentes pueden comunicarse entre sí.
Mensajes
Los mensajes son objetos serializables y pueden definirse utilizando:
Una subclase de
pydantic.BaseModel
, oUn
dataclass
from dataclasses import dataclass
@dataclass
class TextMessage:
content: str
source: str
@dataclass
class ImageMessage:
url: str
source: str
Manejadores de Mensajes
Cuando un agente recibe un mensaje, el runtime invoca el manejador de mensajes del agente (on_message()
), el cual debe implementar la lógica para manejarlo.
Si el mensaje no puede ser manejado, el agente debe lanzar CantHandleException
.
La clase base
BaseAgent
no implementa manejo de mensajes, por lo tanto, no se recomienda implementar directamenteon_message()
salvo para casos avanzados.
Se recomienda usar la clase RoutedAgent
, que proporciona capacidad incorporada de enrutamiento de mensajes.
Enrutamiento de Mensajes por Tipo
La clase base RoutedAgent
proporciona un mecanismo para asociar tipos de mensajes con funciones manejadoras (handlers) usando el decorador @message_handler()
, lo que evita que los desarrolladores tengan que implementar directamente el método on_message()
.
Por ejemplo, el siguiente agente basado en enrutamiento por tipo responde de forma distinta a TextMessage
e ImageMessage
utilizando diferentes funciones manejadoras:
from saptiva_agents.core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
class MyAgent(RoutedAgent):
@message_handler
async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hola, {message.source}, dijiste {message.content}!")
@message_handler
async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
print(f"Hola, {message.source}, me enviaste {message.url}!")
# Crear runtime y registrar el tipo de agente
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')
Prueba el agente con TextMessage
y ImageMessage
.
# Probar con ambos mensajes
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="¡Hola Mundo!", source="Usuario"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="Usuario"), agent_id)
await runtime.stop_when_idle()
Hola, Usuario, dijiste ¡Hola Mundo!!
Hola, Usuario, me enviaste https://example.com/image.jpg!
El runtime crea automáticamente una instancia de MyAgent
con el ID de agente AgentId("my_agent", "default")
al entregar el primer mensaje.
Enrutamiento de Mensajes del Mismo Tipo
En algunos escenarios, es útil enrutar mensajes del mismo tipo a diferentes funciones manejadoras.
Por ejemplo, puede que desees manejar de forma distinta los mensajes que provienen de distintos agentes emisores.
Para ello, puedes usar el parámetro match
del decorador @message_handler()
.
El parámetro match
permite asociar múltiples handlers al mismo tipo de mensaje, según condiciones adicionales.
Este parámetro acepta una función que recibe el mensaje y el MessageContext
, y devuelve un valor booleano indicando si ese handler debe procesar el mensaje.
Los handlers con match
se evalúan en orden alfabético de nombre de función.
A continuación se muestra un ejemplo de un agente que enruta los mensajes en función del agente emisor utilizando match
:
class RoutedBySenderAgent(RoutedAgent):
@message_handler(match=lambda msg, ctx: msg.source.startswith("user1")) # type: ignore
async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hola desde user1, {message.source}, dijiste {message.content}!")
@message_handler(match=lambda msg, ctx: msg.source.startswith("user2")) # type: ignore
async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
print(f"Hola desde user2, {message.source}, dijiste {message.content}!")
@message_handler(match=lambda msg, ctx: msg.source.startswith("user2")) # type: ignore
async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
print(f"Hola {message.source}, me enviaste {message.url}!")
El agente anterior utiliza el campo source
del mensaje para identificar al agente emisor.
También puedes usar el campo sender
del MessageContext
para determinar el agente emisor mediante su ID, si está disponible.
Probemos este agente con mensajes que contienen distintos valores en el campo source
:
# Registrar y enviar mensajes con distintos remitentes
runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hola desde user1, user1-test, dijiste Hello, World!!
Hola desde user2, user2-test, dijiste Hello, World!!
Hola user2-test, me enviaste https://example.com/image.jpg!
En el ejemplo anterior, el primer ImageMessage
no es manejado porque el campo source
del mensaje no coincide con la condición de coincidencia (match
) del manejador.
Mensajería Directa
Hay dos formas de comunicación en Saptiva-Agents:
Mensajería Directa: el mensaje va a un agente específico.
Broadcast: el mensaje se publica en un topic y lo reciben múltiples agentes.
Veamos primero la mensajería directa. Para enviar un mensaje directo a otro agente, dentro de un manejador de mensajes se utiliza el método saptiva_agents.core.BaseAgent.send_message()
, y desde el entorno de ejecución se utiliza el método saptiva_agents.core.AgentRuntime.send_message()
. Al esperar (await
) estas llamadas, se devolverá el valor de retorno del manejador de mensajes del agente receptor. Si el manejador del agente receptor devuelve None
, también se devolverá None
.
Solicitud/Respuesta
La mensajería directa puede utilizarse para escenarios de solicitud/respuesta, donde el emisor espera una respuesta del receptor. El receptor puede responder al mensaje devolviendo un valor desde su manejador de mensajes. Puedes pensar en esto como una llamada a función entre agentes.
Por ejemplo, considera los siguientes agentes:
from dataclasses import dataclass
from saptiva_agents.core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
@dataclass
class Message:
content: str
class InnerAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
return Message(content=f"Hola desde inner, {message.content}")
class OuterAgent(RoutedAgent):
def __init__(self, description: str, inner_agent_type: str):
super().__init__(description)
self.inner_agent_id = AgentId(inner_agent_type, self.id.key)
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
print(f"Mensaje recibido: {message.content}")
# Envía un mensaje directo al agente interno y recibe una respuesta.
response = await self.send_message(Message(f"Hola desde outer, {message.content}"), self.inner_agent_id)
print(f"Respuesta desde inner: {response.content}")
Al recibir un mensaje, el OuterAgent
envía un mensaje directo al InnerAgent
y recibe una respuesta de vuelta.
Podemos probar estos agentes enviando un Message
al OuterAgent
.
# Ejecutar
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Mensaje recibido: Hello, World!
Respuesta desde inner: Hola desde inner, Hola desde outer, Hello, World!
Ambas salidas son producidas por el manejador de mensajes del OuterAgent
, sin embargo, la segunda salida se basa en la respuesta del InnerAgent
.
En términos generales, la mensajería directa es adecuada para escenarios en los que el emisor y el receptor están estrechamente acoplados: se crean juntos y el emisor está vinculado a una instancia específica del receptor. Por ejemplo, un agente ejecuta llamadas a herramientas enviando mensajes directos a una instancia de ToolAgent
, y utiliza las respuestas para formar un bucle de acción-observación.
Broadcast
La difusión (broadcast) es, en esencia, el modelo de publicación/suscripción con temas (topics) y suscripciones. Consulta Topic and Subscription para conocer los conceptos clave.
La diferencia principal entre la mensajería directa y la difusión es que broadcast no puede usarse en escenarios de solicitud/respuesta. Cuando un agente publica un mensaje, es una acción unidireccional: no puede recibir una respuesta de ningún otro agente, incluso si el manejador del agente receptor retorna un valor.
Suscribirse y Publicar en Temas (Topics)
La suscripción basada en tipo (type-based subscription) mapea los mensajes publicados en temas de un tipo específico de tema (topic type) hacia agentes de un tipo específico de agente (agent type).
Para que un agente que hereda de RoutedAgent
se suscriba a un tema de cierto tipo, puedes usar el decorador de clase type_subscription()
.
El siguiente ejemplo muestra una clase ReceiverAgent
que se suscribe a temas del tipo "default"
usando el decorador type_subscription()
, y que imprime los mensajes recibidos.
from saptiva_agents.core import RoutedAgent, message_handler, type_subscription, MessageContext
@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
print(f"Recibido: {message.content}")
Para publicar un mensaje desde el handler de un agente, usa el método publish_message()
y especifica un TopicId
. Esta llamada debe ser awaited para permitir que el runtime programe la entrega del mensaje a todos los suscriptores, pero siempre devolverá None
.
Si un agente lanza una excepción mientras maneja un mensaje publicado, esta será registrada en los logs, pero no se propagará de vuelta al agente que publicó el mensaje.
El siguiente ejemplo muestra un BroadcastingAgent
que publica un mensaje en un tópico al recibir uno.
from saptiva_agents.core import TopicId
class BroadcastingAgent(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
await self.publish_message(
Message("¡Mensaje desde broadcasting agent!"),
topic_id=TopicId(type="default", source=self.id.key),
)
BroadcastingAgent
publica mensajes en un tópico con tipo "default"
y con el campo source
asignado a la clave del agente (agent key
) de la instancia.
Las suscripciones se registran en el entorno de ejecución del agente (runtime), ya sea como parte del registro del tipo de agente o mediante un método de la API por separado.
A continuación se muestra cómo registrar una TypeSubscription
:
Para el agente receptor, se utiliza el decorador
@type_subscription()
.Para el agente emisor (
BroadcastingAgent
), el registro se realiza sin decorador.
# Registrar y publicar
from saptiva_agents.core import TypeSubscription, SingleThreadedAgentRuntime
runtime = SingleThreadedAgentRuntime()
# Opción 1: con el decorador type_subscription
# El decorador de clase type_subscription agrega automáticamente una TypeSubscription
# al runtime cuando se registra el agente.
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
# Opción 2: con TypeSubscription
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))
# Iniciar el runtime y publicar un mensaje.
runtime.start()
await runtime.publish_message(Message("¡Hola Mundo desde runtime!"), topic_id=TopicId(type="default", source="default"))
await runtime.stop_when_idle()
Recibido: ¡Hola Mundo desde runtime!
Recibido: ¡Mensaje desde broadcasting agent!
Como se muestra en el ejemplo anterior, también puedes publicar directamente a un tema utilizando el método publish_message()
del runtime, sin necesidad de crear una instancia de agente.
Según la salida, puedes ver que el agente receptor recibió dos mensajes: uno fue publicado a través del runtime y el otro fue publicado por el agente emisor (broadcasting agent).
Default Topic y Subscriptions
En el ejemplo anterior, usamos TopicId
y TypeSubscription
para especificar el tema (topic) y las suscripciones respectivamente. Esta es la forma adecuada para muchos escenarios.
Sin embargo, cuando existe un único ámbito de publicación, es decir, todos los agentes publican y se suscriben a todos los mensajes transmitidos, podemos usar las clases convenientes DefaultTopicId
y default_subscription()
para simplificar nuestro código.
DefaultTopicId
sirve para crear un tema que utiliza "default"
como valor predeterminado para el tipo de tema y la clave del agente emisor como valor predeterminado para el origen del tema. default_subscription()
se utiliza para crear una suscripción de tipo que se suscribe al tema por defecto.
Podemos simplificar BroadcastingAgent
usando DefaultTopicId
y default_subscription()
.
from saptiva_agents.core import DefaultTopicId, default_subscription
@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):
@message_handler
async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
await self.publish_message(
Message("¡Mensaje desde broadcasting agent!"),
topic_id=DefaultTopicId(),
)
Cuando el entorno de ejecución llama a register()
para registrar un tipo de agente, se crea una instancia de TypeSubscription
cuyo tipo de tópico (topic type
) utiliza "default"
como valor predeterminado, y el tipo de agente (agent type
) corresponde al mismo tipo de agente que se está registrando en ese contexto.
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent"))
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("¡Hola Mundo desde runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Recibido: ¡Hola Mundo desde runtime!
Recibido: ¡Mensaje desde broadcasting agent!
Última actualización