Agentes Concurrentes
En esta sección, exploramos el uso de múltiples agentes trabajando concurrentemente. Cubrimos tres patrones principales:
Mensaje Único y Múltiples Procesadores Demuestra cómo un único mensaje puede ser procesado simultáneamente por múltiples agentes suscritos al mismo tópico.
Múltiples Mensajes y Múltiples Procesadores Ilustra cómo tipos específicos de mensajes pueden ser dirigidos a agentes dedicados según los tópicos.
Mensajería Directa Se enfoca en enviar mensajes entre agentes y desde el runtime hacia los agentes.
import asyncio
from dataclasses import dataclass
from saptiva_agents.core import (
AgentId,
ClosureAgent,
ClosureContext,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
default_subscription,
message_handler,
type_subscription,
)
@dataclass
class Task:
task_id: str
@dataclass
class TaskResponse:
task_id: str
result: str
Mensaje Único y Múltiples Procesadores
El primer patrón muestra cómo un único mensaje puede ser procesado por múltiples agentes simultáneamente:
Cada agente Processor se suscribe al tópico por defecto usando el decorador
default_subscription()
.Al publicar un mensaje en el tópico por defecto, todos los agentes registrados procesarán el mensaje de forma independiente.
Below, we are subscribing Processor
using the default_subscription()
decorator, there’s an alternative way to subscribe an agent without using decorators altogether as shown in Subscribe and Publish to Topics, this way the same agent class can be subscribed to different topics.
@default_subscription
class Processor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"{self._description} starting task {message.task_id}")
await asyncio.sleep(2) # Simula trabajo
print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()
await Processor.register(runtime, "agent_1", lambda: Processor("Agente 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agente 2"))
runtime.start()
await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Agente 1 starting task task-1
Agente 2 starting task task-1
Agente 1 finished task task-1
Agente 2 finished task task-1
Múltiples Mensajes y Múltiples Procesadores
Este patrón demuestra cómo enrutar diferentes tipos de mensajes a procesadores específicos:
UrgentProcessor
se suscribe al tópico "urgent"NormalProcessor
se suscribe al tópico "normal"
Hacemos que un agente se suscriba a un tipo de tópico específico usando el decorador type_subscription()
.
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Procesador urgente empezando tarea {message.task_id}")
await asyncio.sleep(1) # Simula trabajo
print(f"Procesador urgente terminó tarea {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Resultado por Procesador Urgente")
await self.publish_message(task_response, topic_id=task_results_topic_id)
@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Procesador normal empezando tarea {message.task_id}")
await asyncio.sleep(3) # Simula trabajo
print(f"Procesador normal terminó tarea {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Resultado por Procesador Normal")
await self.publish_message(task_response, topic_id=task_results_topic_id)
Después de registrar los agentes, publicamos mensajes en los tópicos “urgent” y “normal”:
runtime = SingleThreadedAgentRuntime()
await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Procesador Urgente"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Procesador Normal"))
runtime.start()
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Procesador normal empezando tarea normal-1
Procesador urgente empezando tarea urgent-1
Procesador urgente terminó tarea urgent-1
Procesador normal terminó tarea normal-1
Recolectando Resultados
En el ejemplo anterior usamos print
, pero en aplicaciones reales normalmente queremos recolectar y procesar resultados programáticamente.
Usamos un ClosureAgent
para recolectar los mensajes publicados por ambos procesadores:
queue = asyncio.Queue[TaskResponse]()
async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
await queue.put(message)
runtime.start()
CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
runtime,
CLOSURE_AGENT_TYPE,
collect_result,
subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Procesador normal empezando tarea normal-1
Procesador urgente empezando tarea urgent-1
Procesador urgente terminó tarea urgent-1
Procesador normal terminó tarea normal-1
while not queue.empty():
print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')
Mensajes Directos
Este patrón se enfoca en mensajes directos. Demostramos dos formas:
Mensajería directa entre agentes
Envío de mensajes desde el runtime a agentes específicos
Puntos clave:
Los mensajes se dirigen usando
AgentId
.El emisor puede esperar una respuesta del agente objetivo.
Solo registramos una vez
WorkerAgent
, pero enviamos tareas a dos instancias distintas.¿Cómo? Como se explicó en Ciclo de vida del agente, el runtime crea instancias si no existen cuando recibe un
AgentId
.
class WorkerAgent(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"{self.id} empezando tarea {message.task_id}")
await asyncio.sleep(2) # Simula trabajo
print(f"{self.id} terminó tarea {message.task_id}")
return TaskResponse(task_id=message.task_id, result=f"Resultado por {self.id}")
class DelegatorAgent(RoutedAgent):
def __init__(self, description: str, worker_type: str):
super().__init__(description)
self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"Delegador recibió la tarea {message.task_id}.")
subtask1 = Task(task_id="task-part-1")
subtask2 = Task(task_id="task-part-2")
worker1_result, worker2_result = await asyncio.gather(
self.send_message(subtask1, self.worker_instances[0]),
self.send_message(subtask2, self.worker_instances[1])
)
combined_result = f"Parte 1: {worker1_result.result}, Parte 2: {worker2_result.result}"
task_response = TaskResponse(task_id=message.task_id, result=combined_result)
return task_response
runtime = SingleThreadedAgentRuntime()
await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Agente Trabajador"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Agente Delegador", "worker"))
runtime.start()
delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)
print(f"Resultado final: {response.result}")
await runtime.stop_when_idle()
Delegador recibió la tarea main-task.
worker/worker-1 empezando tarea task-part-1
worker/worker-2 empezando tarea task-part-2
worker/worker-1 terminó tarea task-part-1
worker/worker-2 terminó tarea task-part-2
Resultado final: Parte 1: Resultado por worker/worker-1, Parte 2: Resultado por worker/worker-2
Última actualización