‎
loginstudio
  • Overview
  • COMIENZA
    • Quickstart
  • Basicos
    • Modelos Disponibles
    • TEXT API (NEW)
    • TEXT API (ACTUAL)
    • Formas de pago
  • Mejores prácticas
    • RAG
    • Prompteo
  • Saptiva Agents
    • Introducción
    • Instalación
    • Quick Start
    • Tutorial
      • Modelos
      • Mensajes
      • Agentes
      • Equipos
      • Human-in-the-Loop
      • Terminación
      • Manejo De Estados
    • Avanzado
      • Agentes Personalizados
      • Selector Group Chat
      • Memoria
      • Logging
      • Serialización
    • Conceptos Del Núcleo
      • Quick Start
      • Aplicaciones De Agentes & Multi-Agentes
      • Entornos De Ejecución Para Agentes
      • Pila De Aplicación
      • Identidad & Ciclo De Vida Del Agente
      • Tema & Suscripción (Topic & Subscription)
    • Guía De Framework
      • Agente & Entorno De Ejecución De Agentes
      • Mensaje & Comunicación
      • Open Telemetry
    • Guía De Componentes
      • Cliente De Modelo
      • Contexto De Modelo
      • Herramientas (Tools)
    • Patrones De Diseño Multi-Agente
      • Agentes Concurrentes
      • Flujo de Trabajo Secuencial
      • Transferencia De Tareas (Handoffs)
      • Mezcla De Agentes (Mixture Of Agents)
      • Multi-Agent Debate
      • Reflexión (Reflection)
    • Ejemplos
      • Planificación De Viajes
      • Investigación De Empresas
      • Revisión De Literatura
    • PyPi
  • Manuales
  • Model cards
    • Quickstart
      • Model Card: DeepSeek R1 Lite
      • Model Card: LLAMA3.3 70B
      • Model Card: Saptiva Turbo
      • Model Card: Phi 4
      • Model Card: Qwen
      • Model Card: Gemma 3
  • DEFINICIONES
    • Temperature
Con tecnología de GitBook
En esta página
  • Mensajes
  • Manejadores de Mensajes
  • Enrutamiento de Mensajes por Tipo
  • Enrutamiento de Mensajes del Mismo Tipo
  • Mensajería Directa
  • Solicitud/Respuesta
  • Broadcast
  • Suscribirse y Publicar en Temas (Topics)
  • Default Topic y Subscriptions
  1. Saptiva Agents
  2. Guía De Framework

Mensaje & Comunicación

Un agente en Saptiva-Agents puede reaccionar, enviar y publicar mensajes, y los mensajes son el único medio a través del cual los agentes pueden comunicarse entre sí.


Mensajes

Los mensajes son objetos serializables y pueden definirse utilizando:

  • Una subclase de pydantic.BaseModel, o

  • Un dataclass

from dataclasses import dataclass

@dataclass
class TextMessage:
    content: str
    source: str

@dataclass
class ImageMessage:
    url: str
    source: str

Nota

Los mensajes son puramente datos y no deben contener lógica.


Manejadores de Mensajes

Cuando un agente recibe un mensaje, el runtime invoca el manejador de mensajes del agente (on_message()), el cual debe implementar la lógica para manejarlo. Si el mensaje no puede ser manejado, el agente debe lanzar CantHandleException.

La clase base BaseAgent no implementa manejo de mensajes, por lo tanto, no se recomienda implementar directamente on_message() salvo para casos avanzados.

Se recomienda usar la clase RoutedAgent, que proporciona capacidad incorporada de enrutamiento de mensajes.


Enrutamiento de Mensajes por Tipo

La clase RoutedAgent permite asociar tipos de mensajes a funciones manejadoras usando el decorador @message_handler.

from saptiva_agents.core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler

class MyAgent(RoutedAgent):
    @message_handler
    async def on_text_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hola, {message.source}, dijiste {message.content}!")

    @message_handler
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hola, {message.source}, me enviaste {message.url}!")
# Crear runtime y registrar el tipo de agente
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My Agent"))
AgentType(type='my_agent')

Prueba el agente con TextMessage y ImageMessage.

# Probar con ambos mensajes
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="¡Hola Mundo!", source="Usuario"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="Usuario"), agent_id)
await runtime.stop_when_idle()
Hola, Usuario, dijiste ¡Hola Mundo!!
Hola, Usuario, me enviaste https://example.com/image.jpg!

El runtime crea automáticamente una instancia de MyAgent con el ID de agente AgentId("my_agent", "default") al entregar el primer mensaje.


Enrutamiento de Mensajes del Mismo Tipo

A veces queremos manejar un mismo tipo de mensaje con diferentes funciones, por ejemplo, según el remitente. Para eso usamos match en @message_handler.

class RoutedBySenderAgent(RoutedAgent):
    @message_handler(match=lambda msg, ctx: msg.source.startswith("user1"))  # type: ignore
    async def on_user1_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hola desde user1, {message.source}, dijiste {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_user2_message(self, message: TextMessage, ctx: MessageContext) -> None:
        print(f"Hola desde user2, {message.source}, dijiste {message.content}!")

    @message_handler(match=lambda msg, ctx: msg.source.startswith("user2"))  # type: ignore
    async def on_image_message(self, message: ImageMessage, ctx: MessageContext) -> None:
        print(f"Hola {message.source}, me enviaste {message.url}!")
# Registrar y enviar mensajes con distintos remitentes
runtime = SingleThreadedAgentRuntime()
await RoutedBySenderAgent.register(runtime, "my_agent", lambda: RoutedBySenderAgent("Routed by sender agent"))
runtime.start()
agent_id = AgentId("my_agent", "default")
await runtime.send_message(TextMessage(content="Hello, World!", source="user1-test"), agent_id)
await runtime.send_message(TextMessage(content="Hello, World!", source="user2-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user1-test"), agent_id)
await runtime.send_message(ImageMessage(url="https://example.com/image.jpg", source="user2-test"), agent_id)
await runtime.stop_when_idle()
Hola desde user1, user1-test, dijiste Hello, World!!
Hola desde user2, user2-test, dijiste Hello, World!!
Hola user2-test, me enviaste https://example.com/image.jpg!

En el ejemplo anterior, el primer ImageMessage no es manejado porque el campo source del mensaje no coincide con la condición de coincidencia (match) del manejador.


Mensajería Directa

Hay dos formas de comunicación en Saptiva-Agents:

  • Mensajería Directa: el mensaje va a un agente específico.

  • Broadcast: el mensaje se publica en un topic y lo reciben múltiples agentes.

Veamos primero la mensajería directa. Para enviar un mensaje directo a otro agente, dentro de un manejador de mensajes se utiliza el método saptiva_agents.core.BaseAgent.send_message(), y desde el entorno de ejecución se utiliza el método saptiva_agents.core.AgentRuntime.send_message(). Al esperar (await) estas llamadas, se devolverá el valor de retorno del manejador de mensajes del agente receptor. Si el manejador del agente receptor devuelve None, también se devolverá None.

Nota

Si el agente invocado lanza una excepción mientras el emisor está esperando, la excepción será propagada de vuelta al emisor.

Solicitud/Respuesta

La mensajería directa puede utilizarse para escenarios de solicitud/respuesta, donde el emisor espera una respuesta del receptor. El receptor puede responder al mensaje devolviendo un valor desde su manejador de mensajes. Puedes pensar en esto como una llamada a función entre agentes.

Por ejemplo, considera los siguientes agentes:

from dataclasses import dataclass

from saptiva_agents.core import MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler

@dataclass
class Message:
    content: str

class InnerAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> Message:
        return Message(content=f"Hola desde inner, {message.content}")

class OuterAgent(RoutedAgent):
    def __init__(self, description: str, inner_agent_type: str):
        super().__init__(description)
        self.inner_agent_id = AgentId(inner_agent_type, self.id.key)

    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Mensaje recibido: {message.content}")
        # Envía un mensaje directo al agente interno y recibe una respuesta.
        response = await self.send_message(Message(f"Hola desde outer, {message.content}"), self.inner_agent_id)
        print(f"Respuesta desde inner: {response.content}")
# Ejecutar
runtime = SingleThreadedAgentRuntime()
await InnerAgent.register(runtime, "inner_agent", lambda: InnerAgent("InnerAgent"))
await OuterAgent.register(runtime, "outer_agent", lambda: OuterAgent("OuterAgent", "inner_agent"))
runtime.start()
outer_agent_id = AgentId("outer_agent", "default")
await runtime.send_message(Message(content="Hello, World!"), outer_agent_id)
await runtime.stop_when_idle()
Mensaje recibido: Hello, World!
Respuesta desde inner: Hola desde inner, Hola desde outer, Hello, World!

Ambas salidas son producidas por el manejador de mensajes del OuterAgent, sin embargo, la segunda salida se basa en la respuesta del InnerAgent.

En términos generales, la mensajería directa es adecuada para escenarios en los que el emisor y el receptor están estrechamente acoplados: se crean juntos y el emisor está vinculado a una instancia específica del receptor. Por ejemplo, un agente ejecuta llamadas a herramientas enviando mensajes directos a una instancia de ToolAgent, y utiliza las respuestas para formar un bucle de acción-observación.


Broadcast

La difusión (broadcast) es, en esencia, el modelo de publicación/suscripción con temas (topics) y suscripciones. Consulta Topic and Subscription para conocer los conceptos clave.

La diferencia principal entre la mensajería directa y la difusión es que broadcast no puede usarse en escenarios de solicitud/respuesta. Cuando un agente publica un mensaje, es una acción unidireccional: no puede recibir una respuesta de ningún otro agente, incluso si el manejador del agente receptor retorna un valor.

Nota

Si se proporciona una respuesta a un mensaje publicado, esta será descartada.

Nota

Si un agente publica un tipo de mensaje al cual está suscrito, no recibirá el mensaje que publicó. Esto es para prevenir bucles infinitos.

Suscribirse y Publicar en Temas (Topics)

La suscripción basada en tipo (type-based subscription) mapea los mensajes publicados en temas de un tipo específico de tema (topic type) hacia agentes de un tipo específico de agente (agent type). Para que un agente que hereda de RoutedAgent se suscriba a un tema de cierto tipo, puedes usar el decorador de clase type_subscription().

El siguiente ejemplo muestra una clase ReceiverAgent que se suscribe a temas del tipo "default" usando el decorador type_subscription(), y que imprime los mensajes recibidos.

Usa modelo publish/subscribe con topics. No puede usarse para solicitar respuesta.

Nota

Si un agente publica un mensaje y está suscrito al mismo, no recibirá su propio mensaje.

from saptiva_agents.core import RoutedAgent, message_handler, type_subscription, MessageContext

@type_subscription(topic_type="default")
class ReceivingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        print(f"Recibido: {message.content}")

Para publicar un mensaje desde el handler de un agente, usa el método publish_message() y especifica un TopicId. Esta llamada debe ser awaited para permitir que el runtime programe la entrega del mensaje a todos los suscriptores, pero siempre devolverá None. Si un agente lanza una excepción mientras maneja un mensaje publicado, esta será registrada en los logs, pero no será propagada de vuelta al agente que publicó el mensaje.

from saptiva_agents.core import TopicId

class BroadcastingAgent(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        await self.publish_message(
            Message("¡Mensaje desde broadcasting agent!"),
            topic_id=TopicId(type="default", source=self.id.key),
        )

El siguiente ejemplo muestra un BroadcastingAgent que publica un mensaje a un tema al recibir un mensaje.

# Registrar y publicar
from saptiva_agents.core import TypeSubscription, SingleThreadedAgentRuntime

runtime = SingleThreadedAgentRuntime()
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
await BroadcastingAgent.register(runtime, "broadcasting_agent", lambda: BroadcastingAgent("Broadcasting Agent"))
await runtime.add_subscription(TypeSubscription(topic_type="default", agent_type="broadcasting_agent"))

runtime.start()
await runtime.publish_message(Message("¡Hola Mundo desde runtime!"), topic_id=TopicId(type="default", source="default"))
await runtime.stop_when_idle()
Recibido: ¡Hola Mundo desde runtime!
Recibido: ¡Mensaje desde broadcasting agent!

Como se muestra en el ejemplo anterior, también puedes publicar directamente a un tema utilizando el método publish_message() del runtime, sin necesidad de crear una instancia de agente.

Según la salida, puedes ver que el agente receptor recibió dos mensajes: uno fue publicado a través del runtime y el otro fue publicado por el agente emisor (broadcasting agent).


Default Topic y Subscriptions

En el ejemplo anterior, usamos TopicId y TypeSubscription para especificar el tema (topic) y las suscripciones respectivamente. Esta es la forma adecuada para muchos escenarios.

Sin embargo, cuando existe un único ámbito de publicación, es decir, todos los agentes publican y se suscriben a todos los mensajes transmitidos, podemos usar las clases convenientes DefaultTopicId y default_subscription() para simplificar nuestro código.

DefaultTopicId sirve para crear un tema que utiliza "default" como valor predeterminado para el tipo de tema y la clave del agente emisor como valor predeterminado para el origen del tema. default_subscription() se utiliza para crear una suscripción de tipo que se suscribe al tema por defecto.

Podemos simplificar BroadcastingAgent usando DefaultTopicId y default_subscription().

from saptiva_agents.core import DefaultTopicId, default_subscription

@default_subscription
class BroadcastingAgentDefaultTopic(RoutedAgent):
    @message_handler
    async def on_my_message(self, message: Message, ctx: MessageContext) -> None:
        await self.publish_message(
            Message("¡Mensaje desde broadcasting agent!"),
            topic_id=DefaultTopicId(),
        )
runtime = SingleThreadedAgentRuntime()
await BroadcastingAgentDefaultTopic.register(runtime, "broadcasting_agent", lambda: BroadcastingAgentDefaultTopic("Broadcasting Agent"))
await ReceivingAgent.register(runtime, "receiving_agent", lambda: ReceivingAgent("Receiving Agent"))
runtime.start()
await runtime.publish_message(Message("¡Hola Mundo desde runtime!"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Recibido: ¡Hola Mundo desde runtime!
Recibido: ¡Mensaje desde broadcasting agent!

Nota

Si tu escenario permite que todos los agentes publiquen y se suscriban a todos los mensajes transmitidos, usa DefaultTopicId y default_subscription() para decorar tus clases de agente.

AnteriorAgente & Entorno De Ejecución De AgentesSiguienteOpen Telemetry

Última actualización hace 29 días