Manager-Worker 模式:经理分活,工人执行
解决什么问题
旅行规划涉及多个并行子任务——查机票、订酒店、排行程——一个 Agent 全做会上下文爆炸。 Manager-Worker 把决策(拆任务、合结果)和执行(调 API、跑搜索)分离: Manager 只管"分活+收活",Worker 只管"干活+汇报"。
| 场景 | 为什么选它 |
|---|---|
| 旅行规划 | 机票/酒店/景点可并行查询 |
| 数据管道 | ETL 各阶段可独立执行 |
| 报告生成 | 章节可并行撰写再合并 |
复杂度
⭐⭐ 中低。核心就是一个分发循环 + Worker 注册表。
和其他模式的关系
| 模式 | 谁决定下一步 | 什么时候用 |
|---|---|---|
| Manager-Worker | Manager 拆任务、分发、合并 | 子任务独立、可并行 |
| Agents-as-Tools | 主 Agent 按需 tool_call 专家 | 对话中动态决定调谁 |
| Prompt Chaining | Python 代码写死步骤 | 流程已知、不需要动态拆解 |
Manager-Worker 比 Prompt Chaining 灵活(任务由模型拆解),比 Agents-as-Tools 结构化(先拆后做,不是边聊边调)。
角色关系
┌─────────────┐
│ Manager │ ← 拆解任务、合并结果
└──┬───┬───┬──┘
│ │ │ 分发子任务
▼ ▼ ▼
┌───┐┌───┐┌───┐
│ W1 ││ W2 ││ W3 │ ← 独立执行、返回结果
└───┘└───┘└───┘
- Manager → Worker:单向指令
- Worker → Manager:单向回报
- Worker 之间:无直接通信
完整 Python 实现
"""
Manager-Worker 旅行规划示例
Manager 把"北京5日游"拆成3个子任务,分给不同 Worker。
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any
# ── MockLLM ──────────────────────────────────────────────
class MockLLM:
"""用字典映射模拟 LLM 调用,方便离线测试。"""
def __init__(self):
self.call_log: list[dict] = []
def chat(self, role: str, prompt: str) -> str:
self.call_log.append({"role": role, "prompt": prompt[:80]})
if role == "manager" and "拆解" in prompt:
return json.dumps({
"subtasks": [
{"id": "flight", "desc": "查询上海→北京往返机票"},
{"id": "hotel", "desc": "查询北京5晚酒店"},
{"id": "itinerary", "desc": "生成北京5日景点行程"},
]
})
if role == "manager" and "合并" in prompt:
return "【北京5日游方案】机票: 东航MU5100 ¥1200 | 酒店: 王府井大饭店 ¥600/晚 | 行程: 故宫→长城→颐和园→天坛→南锣鼓巷"
if role == "flight_worker":
return json.dumps({"航班": "东航MU5100", "价格": "¥1200", "时间": "08:30-11:00"})
if role == "hotel_worker":
return json.dumps({"酒店": "王府井大饭店", "价格": "¥600/晚", "评分": 4.7})
if role == "itinerary_worker":
return json.dumps({"行程": ["故宫", "长城", "颐和园", "天坛", "南锣鼓巷"]})
return "{}"
# ── Worker ───────────────────────────────────────────────
@dataclass
class Worker:
worker_id: str
role: str # 对应 MockLLM 中的角色
llm: MockLLM = field(repr=False)
def execute(self, task_desc: str) -> dict:
"""执行单个子任务,返回结构化结果。"""
raw = self.llm.chat(self.role, task_desc)
try:
return json.loads(raw)
except json.JSONDecodeError:
return {"raw": raw}
# ── Manager ──────────────────────────────────────────────
@dataclass
class Manager:
llm: MockLLM = field(repr=False)
workers: dict[str, Worker] = field(default_factory=dict)
def register_worker(self, task_type: str, worker: Worker):
self.workers[task_type] = worker
def decompose(self, user_request: str) -> list[dict]:
"""让 LLM 把用户需求拆成子任务列表。"""
raw = self.llm.chat("manager", f"请拆解任务: {user_request}")
return json.loads(raw).get("subtasks", [])
def dispatch(self, subtasks: list[dict]) -> dict[str, Any]:
"""按 task id 分发给对应 Worker。"""
results: dict[str, Any] = {}
for task in subtasks:
tid = task["id"]
worker = self.workers.get(tid)
if worker is None:
results[tid] = {"error": f"无可用 Worker: {tid}"}
continue
results[tid] = worker.execute(task["desc"])
print(f" [Worker:{tid}] 完成 → {results[tid]}")
return results
def merge(self, results: dict[str, Any]) -> str:
"""让 LLM 合并所有子结果,生成最终方案。"""
prompt = f"请合并以下子结果: {json.dumps(results, ensure_ascii=False)}"
return self.llm.chat("manager", prompt)
def run(self, user_request: str) -> str:
"""完整流程:拆解 → 分发 → 合并。"""
print(f"[Manager] 收到请求: {user_request}")
subtasks = self.decompose(user_request)
print(f"[Manager] 拆解为 {len(subtasks)} 个子任务:")
for st in subtasks:
print(f" - {st['id']}: {st['desc']}")
results = self.dispatch(subtasks)
final = self.merge(results)
print(f"[Manager] 最终方案: {final}")
return final
# ── 运行入口 ─────────────────────────────────────────────
def main():
llm = MockLLM()
# 创建 Workers
w_flight = Worker("w1", "flight_worker", llm)
w_hotel = Worker("w2", "hotel_worker", llm)
w_itin = Worker("w3", "itinerary_worker", llm)
# 创建 Manager 并注册
mgr = Manager(llm=llm)
mgr.register_worker("flight", w_flight)
mgr.register_worker("hotel", w_hotel)
mgr.register_worker("itinerary", w_itin)
# 执行
result = mgr.run("帮我规划北京5日游,从上海出发")
# 验证
assert "故宫" in result
assert "东航" in result or "MU5100" in result
assert len(llm.call_log) == 5 # 1拆解 + 3执行 + 1合并
print(f"\n✓ 共 {len(llm.call_log)} 次 LLM 调用,全部通过")
if __name__ == "__main__":
main()
运行记录
[Manager] 收到请求: 帮我规划北京5日游,从上海出发
[Manager] 拆解为 3 个子任务:
- flight: 查询上海→北京往返机票
- hotel: 查询北京5晚酒店
- itinerary: 生成北京5日景点行程
[Worker:flight] 完成 → {'航班': '东航MU5100', '价格': '¥1200', '时间': '08:30-11:00'}
[Worker:hotel] 完成 → {'酒店': '王府井大饭店', '价格': '¥600/晚', '评分': 4.7}
[Worker:itinerary] 完成 → {'行程': ['故宫', '长城', '颐和园', '天坛', '南锣鼓巷']}
[Manager] 最终方案: 【北京5日游方案】机票: 东航MU5100 ¥1200 | 酒店: 王府井大饭店 ¥600/晚 | 行程: 故宫→长城→颐和园→天坛→南锣鼓巷
✓ 共 5 次 LLM 调用,全部通过
踩坑记录
| 坑 | 现象 | trace 信号 | 修法 |
|---|---|---|---|
| Manager 自己干活 | 上下文膨胀、回复变慢 | Manager 的 prompt 里出现具体 API 调用或数据处理 | 严格只让 Manager 做拆解/合并,不碰具体数据 |
| Worker 结果格式不一致 | 合并时 JSON 解析炸了 | merge 阶段抛 JSONDecodeError 或 KeyError | 给每个 Worker 统一输出 schema + 异常兜底 |
| 子任务遗漏 | 用户说"还要签证"但 Manager 没拆出来 | subtasks 列表缺少用户明确提到的需求 | 拆解后加一轮"检查是否遗漏"的自检 prompt |
| 并发竞态 | 两个 Worker 改同一个 state | 同一 key 在 results 中被覆盖、值不稳定 | Worker 之间禁止共享状态,全走 Manager 汇总 |
工程备忘
- Worker 注册表用字典就够,按 task_type 映射,别搞复杂的服务发现。
- 并行执行:示例为串行方便理解,生产用
asyncio.gather或线程池。 - 超时控制:给每个 Worker 设 timeout,Manager 等不到就跳过并标记失败。
- 重试策略:Worker 失败后 Manager 可选重试/换 Worker/降级。
- 结果 schema:所有 Worker 返回
{"status": "ok"|"error", "data": {...}},Manager 不用猜格式。 - 日志追踪:每个子任务带 trace_id,方便排查哪一步出了问题。
- 与其他模式对比:如果子任务之间有依赖关系,考虑用 Graph Orchestration;如果需要 Worker 之间讨论,考虑用 Group Chat。
读完以后
如果 Worker 要独立对外、被主 Agent 像函数一样调用,看 Agents-as-Tools。 如果任务容易卡住、需要停滞检测和自动恢复,看 Magentic Orchestration。 如果子任务之间有严格先后顺序和条件分支,看 Graph Orchestration。