Human-in-the-Loop
En la sección anterior, Equipos, hemos visto cómo crear, observar y controlar un equipo de agentes. Esta sección se centrará en cómo interactuar con el equipo desde su aplicación y proporcionar retroalimentación humana al equipo.
Hay dos formas principales de interactuar con el equipo desde tu aplicación:
Durante la ejecución de un equipo, ya sea a través de
run()
orun_stream()
, proporciona retroalimentación a través de unUserProxyAgent
..Una vez que la ejecución finalice, proporcione retroalimentación a través de la entrada a la siguiente llamada a
run()
orun_stream()
.
Cubriremos ambos métodos en esta sección.
Proporcionar Retroalimentación Durante una Ejecución
El UserProxyAgent
es un agente especial incorporado que actúa como un intermediario para que un usuario proporcione retroalimentación al equipo.
Para usar el UserProxyAgent
, puedes crear una instancia y agregarla al equipo antes de ejecutar el equipo. El equipo decidirá cuándo llamar al UserProxyAgent
para solicitar comentarios del usuario.
Por ejemplo, en un equipo de RoundRobinGroupChat
, se llama al UserProxyAgent
en el orden en que se pasa al equipo, mientras que en un equipo de SelectorGroupChat
, el mensaje del selector o la función del selector determina cuándo se llama al UserProxyAgent
.
El siguiente diagrama ilustra cómo puedes utilizar UserProxyAgent
para obtener retroalimentación del usuario durante la ejecución del equipo:
Las flechas en negrita indican el flujo de control durante la ejecución de un equipo: cuando el equipo llama al UserProxyAgent
, transfiere el control a la aplicación/usuario, y espera a recibir retroalimentación; una vez que se brinda dicha retroalimentación, el control se transfiere de vuelta al equipo y este continúa su ejecución.
Debido a la naturaleza bloqueante de este enfoque, se recomienda usarlo solo para interacciones cortas que requieran retroalimentación inmediata del usuario, como solicitar aprobación o desaprobación con un clic de botón, o una alerta que requiera atención inmediata, de lo contrario fallará la tarea.
Aquí tienes un ejemplo de cómo utilizar el UserProxyAgent
en un RoundRobinGroupChat
para una tarea de generación de poema:
from saptiva_agents import QWEN_MODEL
from saptiva_agents.agents import AssistantAgent, UserProxyAgent
from saptiva_agents.conditions import TextMentionTermination
from saptiva_agents.teams import RoundRobinGroupChat
from saptiva_agents.ui import Console
from saptiva_agents.base import SaptivaAIChatCompletionClient
# Crear los agentes.
model_client = SaptivaAIChatCompletionClient(model=QWEN_MODEL, api_key="TU_SAPTIVA_API_KEY")
assistant = AssistantAgent("assistant", model_client=model_client)
user_proxy = UserProxyAgent("user_proxy", input_func=input) # Usa input() para obtener datos del usuario desde la consola.
# Cree la condición de terminación que finalizará la conversación cuando el usuario diga "APPROVE".
termination = TextMentionTermination("APPROVE")
# Crear el equipo.
team = RoundRobinGroupChat([assistant, user_proxy], termination_condition=termination)
# Ejecutar la conversación y transmitirla a la consola.
stream = team.run_stream(task="Write a 4-line poem about the ocean.")
# Use asyncio.run(...) when running in a script.
await Console(stream)
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
In endless blue where whispers play,
The ocean's waves dance night and day.
A world of depths, both calm and wild,
Nature's heart, forever beguiled.
TERMINATE
---------- user_proxy ----------
APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Write a 4-line poem about the ocean.', type='TextMessage'), TextMessage(source='assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=43), metadata={}, content="In endless blue where whispers play, \nThe ocean's waves dance night and day. \nA world of depths, both calm and wild, \nNature's heart, forever beguiled. \nTERMINATE", type='TextMessage'), UserInputRequestedEvent(source='user_proxy', models_usage=None, metadata={}, request_id='2622a0aa-b776-4e54-9e8f-4ecbdf14b78d', content='', type='UserInputRequestedEvent'), TextMessage(source='user_proxy', models_usage=None, metadata={}, content='APPROVE', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")
A partir de la salida de la consola, puedes ver que el equipo solicitó comentarios del usuario a través de user_proxy
para aprobar el poema generado.
Puedes proporcionar tu propia función de entrada al UserProxyAgent
para personalizar el proceso de retroalimentación. Por ejemplo, cuando el equipo funciona como un servicio web, puedes usar una función de entrada personalizada para esperar mensajes de una conexión de socket web. El siguiente fragmento de código muestra un ejemplo de función de entrada personalizada al usar el FastAPI web framework:
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
async def _user_input(prompt: str, cancellation_token: CancellationToken | None) -> str:
data = await websocket.receive_json() # Esperar el mensaje del usuario desde el websocket.
message = TextMessage.model_validate(data) # Asumir que el mensaje del usuario es un TextMessage.
return message.content
# Crear un proxy de usuario con una función de entrada
# Ejecuta el equipo con el proxy de usuario
# ...
Ver el FastAPI sample para un ejemplo completo.
Para integración de ChainLit con UserProxyAgent
, ver el ChainLit sample.
Proporcionando Retroalimentación para la Próxima Ejecución
A menudo, una aplicación o un usuario interactúa con el equipo de agentes en un bucle interactivo: el equipo se ejecuta hasta la finalización, la aplicación o el usuario proporciona retroalimentación, y el equipo se ejecuta nuevamente con la retroalimentación.
Este enfoque es útil en una sesión persistente con comunicación asíncrona entre el equipo y la aplicación/usuario: Una vez que un equipo termina una ejecución, la aplicación guarda el estado del equipo, lo almacena en un almacenamiento persistente y reanuda el equipo cuando llega la retroalimentación.
El siguiente diagrama ilustra el flujo de control en este enfoque:
Existen dos maneras de implementar este enfoque:
Establezca el número máximo de turnos para que el equipo siempre se detenga después del número especificado de turnos.
Utilice condiciones de terminación como
TextMentionTermination
yHandoffTermination
para permitir que el equipo decida cuándo detenerse y devolver el control, según el estado interno del equipo.
Puedes usar ambos métodos juntos para lograr el comportamiento deseado.
Uso de Turnos Máximos
Este método te permite pausar el equipo para obtener la entrada del usuario estableciendo un número máximo de turnos. Por ejemplo, puedes configurar el equipo para detenerse después de que el primer agente responda estableciendo max_turns
a 1. Esto es particularmente útil en escenarios donde se requiere una participación continua del usuario, como en un chatbot.
Para implementar esto, establezca el parámetro max_turns
en el constructor de RoundRobinGroupChat()
.
team = RoundRobinGroupChat([...], max_turns=1)
Una vez que el equipo se detenga, el conteo de turnos se restablecerá. Cuando reanudes el equipo, comenzará desde 0 nuevamente. Sin embargo, se conservará el estado interno del equipo; por ejemplo, el RoundRobinGroupChat
se reanudará desde el próximo agente en la lista con el mismo historial de conversación.
Aquí tienes un ejemplo de cómo usar max_turns
en un RoundRobinGroupChat
para una tarea de generación de poesía con un máximo de 1 turno:
from saptiva_agents import LLAMA_MODEL
from saptiva_agents.agents import AssistantAgent
from saptiva_agents.teams import RoundRobinGroupChat
from saptiva_agents.ui import Console
from saptiva_agents.base import SaptivaAIChatCompletionClient
# Crear los agentes.
model_client = SaptivaAIChatCompletionClient(
model=LLAMA_MODEL,
api_key="TU_SAPTIVA_API_KEY",
)
assistant = AssistantAgent("assistant", model_client=model_client)
# Cree el equipo estableciendo un número máximo de turnos en 1.
team = RoundRobinGroupChat([assistant], max_turns=1)
task = "Write a 4-line poem about the ocean."
while True:
# Ejecuta la conversación y transmite a la consola.
stream = team.run_stream(task=task)
# utiliza asyncio.run(...) cuando ejecutes en un script.
await Console(stream)
# Obten la respuesta del usuario.
task = input("Enter your feedback (type 'exit' to leave): ")
if task.lower().strip() == "exit":
break
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
Endless waves in a dance with the shore,
Whispers of secrets in tales from the roar,
Beneath the vast sky, where horizons blend,
The ocean’s embrace is a timeless friend.
TERMINATE
[Prompt tokens: 46, Completion tokens: 48]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 46
Total completion tokens: 48
Duration: 1.63 seconds
---------- user ----------
Can you make it about a person and its relationship with the ocean
---------- assistant ----------
She walks along the tide, where dreams intertwine,
With every crashing wave, her heart feels aligned,
In the ocean's embrace, her worries dissolve,
A symphony of solace, where her spirit evolves.
TERMINATE
[Prompt tokens: 117, Completion tokens: 49]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 117
Total completion tokens: 49
Duration: 1.21 seconds
Puede ver que el equipo se detuvo inmediatamente después de que un agente respondió.
Uso de Condiciones de Terminación
Ya hemos visto varios ejemplos de condiciones de terminación en las secciones anteriores. En esta sección, nos centramos en HandoffTermination
, que detiene al equipo cuando un agente envía un mensaje HandoffMessaage
.
Vamos a crear un equipo con un único agente AssistantAgent
con una configuración de transferencia y ejecutar el equipo con una tarea que requiere información adicional del usuario porque el agente no tiene las herramientas relevantes para continuar procesando la tarea.
from saptiva_agents import QWEN_MODEL
from saptiva_agents.agents import AssistantAgent
from saptiva_agents.base import Handoff
from saptiva_agents.conditions import HandoffTermination, TextMentionTermination
from saptiva_agents.teams import RoundRobinGroupChat
from saptiva_agents.ui import Console
from saptiva_agents.base import SaptivaAIChatCompletionClient
# Define el cliente de modelo de Saptiva.
model_client = SaptivaAIChatCompletionClient(
model=QWEN_MODEL,
api_key="TU_SAPTIVA_API_KEY"
)
# Crea un asistente lazy que siempre entregue al usuario.
lazy_agent = AssistantAgent(
"lazy_assistant",
model_client=model_client,
handoffs=[Handoff(target="user", message="Transfer to user.")],
system_message="Si no puedes completar la tarea, transfiérela al usuario. De lo contrario, cuando termines, responde con 'TERMINATE'.",
)
# Definir una condición de terminación que verifique los mensajes de transferencia.
handoff_termination = HandoffTermination(target="user")
# Definir una condición de terminación que busque una mención de texto especifica.
text_termination = TextMentionTermination("TERMINATE")
# Crear un equipo de un solo agente con el asistente perezoso y ambas condiciones de terminación.
lazy_agent_team = RoundRobinGroupChat([lazy_agent], termination_condition=handoff_termination | text_termination)
# Ejecuta el equipo y transmite a la consola.
task = "What is the weather in New York?"
await Console(lazy_agent_team.run_stream(task=task), output_stats=True)
---------- user ----------
What is the weather in New York?
---------- lazy_assistant ----------
[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')]
[Prompt tokens: 69, Completion tokens: 12]
---------- lazy_assistant ----------
[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')]
---------- lazy_assistant ----------
Transfer to user.
---------- Summary ----------
Number of messages: 4
Finish reason: Handoff to user from lazy_assistant detected.
Total prompt tokens: 69
Total completion tokens: 12
Duration: 0.69 seconds
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the weather in New York?', type='TextMessage'), ToolCallRequestEvent(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=69, completion_tokens=12), content=[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')], type='ToolCallRequestEvent'), ToolCallExecutionEvent(source='lazy_assistant', models_usage=None, content=[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')], type='ToolCallExecutionEvent'), HandoffMessage(source='lazy_assistant', models_usage=None, target='user', content='Transfer to user.', context=[], type='HandoffMessage')], stop_reason='Handoff to user from lazy_assistant detected.')
Puede ver que el equipo se detuvo debido a que se detectó el mensaje de transferencia. Continuemos con el equipo proporcionando la información que el agente necesita.
await Console(lazy_agent_team.run_stream(task="The weather in New York is sunny."))
---------- user ----------
The weather in New York is sunny.
---------- lazy_assistant ----------
Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?
---------- lazy_assistant ----------
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='The weather in New York is sunny.', type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=110, completion_tokens=21), content="Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?", type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=137, completion_tokens=5), content='TERMINATE', type='TextMessage')], stop_reason="Text 'TERMINATE' mentioned")
Puedes ver que el equipo continuó después de que el usuario proporcionó la información.
Última actualización