Agentes Concurrentes

En esta sección, exploramos el uso de múltiples agentes trabajando concurrentemente. Cubrimos tres patrones principales:

  1. Mensaje Único y Múltiples Procesadores Demuestra cómo un único mensaje puede ser procesado simultáneamente por múltiples agentes suscritos al mismo tópico.

  2. Múltiples Mensajes y Múltiples Procesadores Ilustra cómo tipos específicos de mensajes pueden ser dirigidos a agentes dedicados según los tópicos.

  3. Mensajería Directa Se enfoca en enviar mensajes entre agentes y desde el runtime hacia los agentes.

import asyncio
from dataclasses import dataclass

from saptiva_agents.core import (
    AgentId,
    ClosureAgent,
    ClosureContext,
    DefaultTopicId,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    default_subscription,
    message_handler,
    type_subscription,
)
@dataclass
class Task:
    task_id: str


@dataclass
class TaskResponse:
    task_id: str
    result: str

Mensaje Único y Múltiples Procesadores

El primer patrón muestra cómo un único mensaje puede ser procesado por múltiples agentes simultáneamente:

  • Cada agente Processor se suscribe al tópico por defecto usando el decorador default_subscription().

  • Al publicar un mensaje en el tópico por defecto, todos los agentes registrados procesarán el mensaje de forma independiente.

Nota

Abajo, estamos suscribiendo Processor usando el decorador default_subscription(), pero hay una manera alternativa de suscribir un agente sin decoradores como se muestra en Suscribirse y Publicar en Tópicos, lo que permite que una misma clase de agente se suscriba a diferentes tópicos.

Below, we are subscribing Processor using the default_subscription() decorator, there’s an alternative way to subscribe an agent without using decorators altogether as shown in Subscribe and Publish to Topics, this way the same agent class can be subscribed to different topics.

@default_subscription
class Processor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"{self._description} starting task {message.task_id}")
        await asyncio.sleep(2)  # Simula trabajo
        print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()

await Processor.register(runtime, "agent_1", lambda: Processor("Agente 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agente 2"))

runtime.start()

await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())

await runtime.stop_when_idle()
Agente 1 starting task task-1
Agente 2 starting task task-1
Agente 1 finished task task-1
Agente 2 finished task task-1

Múltiples Mensajes y Múltiples Procesadores

Este patrón demuestra cómo enrutar diferentes tipos de mensajes a procesadores específicos:

  • UrgentProcessor se suscribe al tópico "urgent"

  • NormalProcessor se suscribe al tópico "normal"

Hacemos que un agente se suscriba a un tipo de tópico específico usando el decorador type_subscription().

TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")

@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Procesador urgente empezando tarea {message.task_id}")
        await asyncio.sleep(1)  # Simula trabajo
        print(f"Procesador urgente terminó tarea {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Resultado por Procesador Urgente")
        await self.publish_message(task_response, topic_id=task_results_topic_id)

@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> None:
        print(f"Procesador normal empezando tarea {message.task_id}")
        await asyncio.sleep(3)  # Simula trabajo
        print(f"Procesador normal terminó tarea {message.task_id}")

        task_response = TaskResponse(task_id=message.task_id, result="Resultado por Procesador Normal")
        await self.publish_message(task_response, topic_id=task_results_topic_id)

Después de registrar los agentes, publicamos mensajes en los tópicos “urgent” y “normal”:

runtime = SingleThreadedAgentRuntime()

await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Procesador Urgente"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Procesador Normal"))

runtime.start()

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()
Procesador normal empezando tarea normal-1
Procesador urgente empezando tarea urgent-1
Procesador urgente terminó tarea urgent-1
Procesador normal terminó tarea normal-1

Recolectando Resultados

En el ejemplo anterior usamos print, pero en aplicaciones reales normalmente queremos recolectar y procesar resultados programáticamente.

Usamos un ClosureAgent para recolectar los mensajes publicados por ambos procesadores:

queue = asyncio.Queue[TaskResponse]()


async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
    await queue.put(message)


runtime.start()

CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
    runtime,
    CLOSURE_AGENT_TYPE,
    collect_result,
    subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)

await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))

await runtime.stop_when_idle()
Procesador normal empezando tarea normal-1
Procesador urgente empezando tarea urgent-1
Procesador urgente terminó tarea urgent-1
Procesador normal terminó tarea normal-1
while not queue.empty():
    print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')

Mensajes Directos

Este patrón se enfoca en mensajes directos. Demostramos dos formas:

  • Mensajería directa entre agentes

  • Envío de mensajes desde el runtime a agentes específicos

Puntos clave:

  • Los mensajes se dirigen usando AgentId.

  • El emisor puede esperar una respuesta del agente objetivo.

  • Solo registramos una vez WorkerAgent, pero enviamos tareas a dos instancias distintas.

  • ¿Cómo? Como se explicó en Ciclo de vida del agente, el runtime crea instancias si no existen cuando recibe un AgentId.

class WorkerAgent(RoutedAgent):
    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"{self.id} empezando tarea {message.task_id}")
        await asyncio.sleep(2)  # Simula trabajo
        print(f"{self.id} terminó tarea {message.task_id}")
        return TaskResponse(task_id=message.task_id, result=f"Resultado por {self.id}")

class DelegatorAgent(RoutedAgent):
    def __init__(self, description: str, worker_type: str):
        super().__init__(description)
        self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]

    @message_handler
    async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
        print(f"Delegador recibió la tarea {message.task_id}.")

        subtask1 = Task(task_id="task-part-1")
        subtask2 = Task(task_id="task-part-2")

        worker1_result, worker2_result = await asyncio.gather(
            self.send_message(subtask1, self.worker_instances[0]),
            self.send_message(subtask2, self.worker_instances[1])
        )

        combined_result = f"Parte 1: {worker1_result.result}, Parte 2: {worker2_result.result}"
        task_response = TaskResponse(task_id=message.task_id, result=combined_result)
        return task_response
runtime = SingleThreadedAgentRuntime()

await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Agente Trabajador"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Agente Delegador", "worker"))

runtime.start()

delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)

print(f"Resultado final: {response.result}")
await runtime.stop_when_idle()
Delegador recibió la tarea main-task.
worker/worker-1 empezando tarea task-part-1
worker/worker-2 empezando tarea task-part-2
worker/worker-1 terminó tarea task-part-1
worker/worker-2 terminó tarea task-part-2
Resultado final: Parte 1: Resultado por worker/worker-1, Parte 2: Resultado por worker/worker-2

Última actualización