协作相关
src/magentic_ui/
├── teams/
│ ├── orchestrator/
│ │ ├── _group_chat.py # GroupChat 类定义,管理参与者的状态
│ │ ├── _orchestrator.py # Orchestrator 类定义,维护对话的状态
│ │ └── orchestrator_config.py # 相关的配置
│ └── roundrobin_orchestrator.py # 轮询状态的管理
├── agents/
│ ├── file_surfer/
│ │ └── _file_surfer.py
│ ├── web_surfer/
│ │ └── _web_surfer.py
│ └── users/
│ ├── _dummy_user_proxy.py
│ └── _metadata_user_proxy.py
├── backend/
│ ├── teammanager/
│ └── _teammanager.py
└── task_team.py
核心文件的核心函数
-
_group_chat.py:
-
__init__:智能体团队的创建
-
-
_orchestrator.py:
-
__init__:协调器的初始化
-
_orchestrate_step_planning:进行规划
-
_orchestrate_step_execution:分配合适的智能体进行执行
-
handle_agent_response:处理智能体的响应(例如:某个Agent:我已完成任务,xxx)
-
handle_start:将所有的消息转发给智能体,消息流转的入口点
-
-
_file_surfer.py:
-
on_messages:消息的转化
-
on_messages_stream:自己的智能体消息的响应
-
-
_web_surfer.py:
-
on_messages:消息的转化
-
on_messages_stream:自己的智能体消息的响应
-
_handle_action:处理具体的网页操作,返回操作结果消息
-
_handle_tool_call:处理工具调用,返回工具调用结果消息
-
-
_dummy_user_proxy.py:
-
on_messages:消息的转化
-
on_messages_stream:自己的智能体消息的响应
-
-
_metadata_user_proxy.py:
-
on_messages:消息的转化
-
on_messages_stream:自己的智能体消息的响应
-
-
_teammanager.py:
-
run_stream:团队的运行相关,顶级
-
-
task_team.py:
-
get_task_team:创建智能体的实例
-
关系的一些区分处理:
handle_start 和 _orchestrate_step_execution之间的关系:
-
消息的流转过程
-
handle_start 是入口点,负责接收原始消息并转发给所有智能体.
-
消息添加到message_histroy中保存
-
调用_orchestrate_step 开始协调过程,是进行计划还是分发给智能体执行。
-
_orchestrate_step_execution 处理的是经过的具体消息,也就是planning后的消息
-
-
执行流程
-
handle_start 启动整个对话流程
-
_orchestrate_step 根据当前状态决定是进入规划模式还是执行模式
-
orchestrate_step_execution 负责具体的执行阶段
-
-
通过self._state进行信息的共享
智能体之间的协作:
初始化阶段
-
智能体的创建(src\magentic_ui\task_team.py)下的get_task_team
async def get_task_team(...):# 1. 创建各个智能体实例file_surfer = FileSurfer(name="file_surfer",model_client=model_client_file_surfer,work_dir=paths.internal_run_dir,bind_dir=paths.external_run_dir,model_context_token_limit=magentic_ui_config.model_context_token_limit,approval_guard=approval_guard,)# 2.创建并注册智能体team = GroupChat(participants=[web_surfer, user_proxy, coder_agent, file_surfer],orchestrator_config=orchestrator_config,model_client=model_client_orch,memory_provider=memory_provider,)# 3. 初始化团队await team.lazy_init()return team
-
团队的创建(src\magentic_ui\teams\orchestrator\_group_chat.py)下的GroupChat,需要有参与的智能体,模型的客户端,协调器
class GroupChat(BaseGroupChat, Component[GroupChatConfig]):def __init__(self,participants: List[ChatAgent],model_client: ChatCompletionClient,orchestrator_config: OrchestratorConfig,...):# 1. 调用父类初始化super().__init__(participants,group_chat_manager_name="Orchestrator",group_chat_manager_class=Orchestrator,...)# 2. 初始化内部变量self._orchestrator_config = orchestrator_configself._model_client = model_clientself._message_factory = MessageFactory()self._memory_provider = memory_provider
- 协调器的初始化(src\magentic_ui\teams\orchestrator\_orchestrator.py)下的Orchestrato
class Orchestrator(BaseGroupChatManager):def __init__(self,name: str,group_topic_type: str,output_topic_type: str,participant_topic_types: List[str],participant_names: List[str],participant_descriptions: List[str],...):# 初始化基础属性self._model_client = model_clientself._config = configself._user_agent_topic = "user_proxy"self._web_agent_topic = "web_surfer"
消息相关
团队相关
teammanager.py VS task_team.py VS _orchestrator.py
-
teammanager.py是管理团队的生命周期
-
task_team.py只是团队的创建,不管生命周期
-
_orchestrator.py仅是团队的内部协调,进行通信
用户相关
用户代理是如何加入到团队中?
# src\magentic_ui\task_team.py
# 用户代理的创建
user_proxy: DummyUserProxy | MetadataUserProxy | UserProxyAgentif magentic_ui_config.user_proxy_type == "dummy":user_proxy = DummyUserProxy(name="user_proxy")elif magentic_ui_config.user_proxy_type == "metadata":assert (magentic_ui_config.task is not None), "Task must be provided for metadata user proxy"assert (magentic_ui_config.hints is not None), "Hints must be provided for metadata user proxy"assert (magentic_ui_config.answer is not None), "Answer must be provided for metadata user proxy"user_proxy = MetadataUserProxy(name="user_proxy",description="Metadata User Proxy Agent",task=magentic_ui_config.task,helpful_task_hints=magentic_ui_config.hints,task_answer=magentic_ui_config.answer,model_client=model_client_orch,)else:user_proxy_input_func = make_agentchat_input_func(input_func)user_proxy = UserProxyAgent(description=USER_PROXY_DESCRIPTION,name="user_proxy",input_func=user_proxy_input_func,)# 轮询代理if websurfer_loop_team:# simplified team of only the web surferteam = RoundRobinGroupChat(participants=[web_surfer, user_proxy],max_turns=10000,)await team.lazy_init()return team# 由LLM智能选择team = GroupChat(participants=[web_surfer, user_proxy, coder_agent, file_surfer],orchestrator_config=orchestrator_config,model_client=model_client_orch,memory_provider=memory_provider,)await team.lazy_init()return team
_metadata_user_proxy.py为例,它是创建复杂的用户代理,另一个是创建简单的用户代理
async def on_messages_stream(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:"""处理输入消息并生成响应流这是核心方法,处理所有消息逻辑"""# 1. 严格模式下的提示重写if (self.how_helpful == "strict"and self.helpful_task_hintsand self.helpful_task_hints != "Helpful hints are not available for this task."and self.rewritten_helpful_task_hints is None):self.rewritten_helpful_task_hints = await self._rewrite_helpful_hints(cancellation_token)# 2. 消息处理chat_messages = thread_to_context(list(messages),agent_name=self.name,is_multimodal=self._model_client.model_info["vision"],)self._chat_history.extend(chat_messages)# 3. 阶段判断if ("type" in messages[-1].metadataand messages[-1].metadata["type"] == "plan_message"):self.have_encountered_plan_message = Trueself.in_planning_phase = Trueelse:if self.have_encountered_plan_message:self.in_planning_phase = Falseelse:self.in_planning_phase = True# 4. 模型上下文准备await self._model_context.clear()system_message = SystemMessage(content=self._get_system_message())await self._model_context.add_message(system_message)# 5. 添加聊天历史for msg in self._chat_history:await self._model_context.add_message(msg)# 6. 获取token限制的历史记录token_limited_history = await self._model_context.get_messages()# 7. 规划阶段处理if self.in_planning_phase:if self.simulated_user_type in ["co-planning", "co-planning-and-execution"]:if (self.max_co_planning_rounds is Noneor self.current_co_planning_round < self.max_co_planning_rounds):# 生成规划响应result = await self._model_client.create(messages=token_limited_history,cancellation_token=cancellation_token,)yield Response(chat_message=TextMessage(content=result.content,source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "llm","helpful_task_hints": self.rewritten_helpful_task_hintsif self.rewritten_helpful_task_hintselse self.helpful_task_hints,},),inner_messages=[],)self.current_co_planning_round += 1else:# 达到最大规划轮次yield Response(chat_message=TextMessage(content="accept",source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "accept",},),inner_messages=[],)else:# 非协作规划模式yield Response(chat_message=TextMessage(content="accept",source=self.name,metadata={"co_planning_round": str(self.current_co_planning_round),"user_plan_reply": "accept",},),inner_messages=[],)# 8. 执行阶段处理else:if self.simulated_user_type in ["co-execution", "co-planning-and-execution"]:if (self.max_co_execution_rounds is Noneor self.current_co_execution_round < self.max_co_execution_rounds):# 生成执行响应result = await self._model_client.create(messages=token_limited_history,cancellation_token=cancellation_token,)yield Response(chat_message=TextMessage(content=result.content,source=self.name,metadata={"user_execution_reply": "llm"},),inner_messages=[],)self.current_co_execution_round += 1else:# 达到最大执行轮次yield Response(chat_message=TextMessage(content="I don't know, you figure it out, don't ask me again.",source=self.name,),)else:# 非协作执行模式yield Response(chat_message=TextMessage(content="I don't know, you figure it out, don't ask me again.",source=self.name,metadata={"user_execution_reply": "idk"},),)
感觉像用户代理这块,不需要进行LLM的调用,我认为的消息流程是:用户输入-->用户代理-->Orchestrator-->智能体-->LLM-->(智能体-->LLM)可能多个-->Orchesatrator-->用户代理-->用户界面的显示。由于在智能体那块以及调用了LLM,有了确切的解决,我认为正确是:
-
用户代理只负责消息的传递和显示
-
Orchestrator负责任务的规划和分配(交给智能体)
-
智能体负责调用LLM和处理任务
-
响应通过用户代理直接显示给用户
原始消息从哪里来的,怎么传到handle_start?
@rpcasync def handle_start(self, message: GroupChatStart, ctx: MessageContext) -> None:
# @rpc 是远程过程调用,运行函数被远程调用,就像调用本地函数一样
# message 来自于外部的调用.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py
文件中的 run_stream 方法,就是负责将初始值(初始消息)通过 GroupChatStart 消息传递
给群聊管理器(GroupChatManager)的地方。具体流程如下:'''
对第一点的self.team的解释,实际上self.team就是代指GroupChat
路径:src\magentic_ui\teams\orchestrator\_group_chat.py下的run_stream谁去调用?
不会直接 new 一个 GroupChat 然后直接 run_stream,而是通过 TeamManager 统一管理
GroupChat.run_stream 的实际调用,主要是通过 TeamManager 的 self.team.run_stream
实现的,这就是 “间接调用”
(本文件的)14,15,16行在src\magentic_ui\backend\teammanager\teammanager.py下的run_stream 调用
的313到315行代码中async for message in self.team.run_stream(task=task, cancellation_token=cancellation_token):
其中的self.team.run_stream也就代指了GroupChat.run_stream
src\magentic_ui\backend\teammanager\teammanager.py 的 run_stream 由
src\magentic_ui\backend\web\managers\connection.py 下start_stream方法的run_stream调用
'''
1.调用 self.team.run_stream(task=...),其中 task 可以是字符串、消息对象、
消息列表,或者为 None。2.run_stream 方法会把 task 转换成 messages,然后封装到 GroupChatStart(
messages=messages) 这个消息对象里。(路径:.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py)if task is None:# 如果 task 是 None,则 messages 也是 Nonepasselif isinstance(task, str):# 如果 task 是字符串,就会被包装成一个 TextMessage,放到 messages 列表里messages = [TextMessage(content=task, source="user")]elif isinstance(task, BaseChatMessage):# 如果 task 是单个消息对象,也会被放到 messages 列表里。messages = [task]elif isinstance(task, list):# 如果 task 是消息对象列表,则直接赋值给 messages。...# 也就是把 messages 作为参数,封装进 GroupChatStart 消息对象里 499行await self._runtime.send_message(GroupChatStart(messages=messages),recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),cancellation_token=cancellation_token,)3.通过 self._runtime.send_message(...),把 GroupChatStart
消息发送给群聊管理器(GroupChatManager)。
# .venv/Lib/site-packages/autogen_agentchat/teams/_group_chat/_base_group_chat.py 中的run_stream方法await self._runtime.send_message(GroupChatStart(messages=messages),recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id),cancellation_token=cancellation_token,)
# recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id)
# 明确指定了接收者就是群聊管理器, _group_chat_manager_topic_type 就是Orchestrator这个群聊管理器实例4.群聊管理器收到 GroupChatStart 后,会调用 handle_start 方法,正式启动群聊流程。
# 对于 GroupChatStart 消息,自动调用handle_start方法,message参数就是刚收到的 GroupChatStart消息
self.team
是一个团队对象,类型是GroupChat
,在业务层(比如 WebSocket、CLI、API)不会直接操作 GroupChat
,而是通过 TeamManager
这个中间层来操作团队。现在到了GroupChat.run_stream
了,它内部继承了.venv\Lib\site-packages\autogen_agentchat\teams\_group_chat\_base_group_chat.py
下的run_stream
,这个就是上面代码框中描述的第二点。现在把消息封装到GroupChatStart
-
WebSocket 路由/管理器调用 WebSocketManager.start_stream
-
WebSocketManager.start_stream 调用 TeamManager.run_stream
-
TeamManager.run_stream 内部调用 self.team.run_stream
-
self.team 实际上就是 GroupChat 或其子类的实例
-
最终执行到 GroupChat.run_stream