跳转至

Manager-Worker 模式:经理分活,工人执行

解决什么问题

旅行规划涉及多个并行子任务——查机票、订酒店、排行程——一个 Agent 全做会上下文爆炸。 Manager-Worker 把决策(拆任务、合结果)和执行(调 API、跑搜索)分离: Manager 只管"分活+收活",Worker 只管"干活+汇报"。

场景 为什么选它
旅行规划 机票/酒店/景点可并行查询
数据管道 ETL 各阶段可独立执行
报告生成 章节可并行撰写再合并

复杂度

⭐⭐ 中低。核心就是一个分发循环 + Worker 注册表。

和其他模式的关系

模式 谁决定下一步 什么时候用
Manager-Worker Manager 拆任务、分发、合并 子任务独立、可并行
Agents-as-Tools 主 Agent 按需 tool_call 专家 对话中动态决定调谁
Prompt Chaining Python 代码写死步骤 流程已知、不需要动态拆解

Manager-Worker 比 Prompt Chaining 灵活(任务由模型拆解),比 Agents-as-Tools 结构化(先拆后做,不是边聊边调)。

角色关系

┌─────────────┐
│   Manager    │  ← 拆解任务、合并结果
└──┬───┬───┬──┘
   │   │   │     分发子任务
   ▼   ▼   ▼
 ┌───┐┌───┐┌───┐
 │ W1 ││ W2 ││ W3 │  ← 独立执行、返回结果
 └───┘└───┘└───┘
  • Manager → Worker:单向指令
  • Worker → Manager:单向回报
  • Worker 之间:无直接通信

完整 Python 实现

"""
Manager-Worker 旅行规划示例
Manager 把"北京5日游"拆成3个子任务,分给不同 Worker。
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any


# ── MockLLM ──────────────────────────────────────────────
class MockLLM:
    """用字典映射模拟 LLM 调用,方便离线测试。"""

    def __init__(self):
        self.call_log: list[dict] = []

    def chat(self, role: str, prompt: str) -> str:
        self.call_log.append({"role": role, "prompt": prompt[:80]})

        if role == "manager" and "拆解" in prompt:
            return json.dumps({
                "subtasks": [
                    {"id": "flight", "desc": "查询上海→北京往返机票"},
                    {"id": "hotel",  "desc": "查询北京5晚酒店"},
                    {"id": "itinerary", "desc": "生成北京5日景点行程"},
                ]
            })

        if role == "manager" and "合并" in prompt:
            return "【北京5日游方案】机票: 东航MU5100 ¥1200 | 酒店: 王府井大饭店 ¥600/晚 | 行程: 故宫→长城→颐和园→天坛→南锣鼓巷"

        if role == "flight_worker":
            return json.dumps({"航班": "东航MU5100", "价格": "¥1200", "时间": "08:30-11:00"})

        if role == "hotel_worker":
            return json.dumps({"酒店": "王府井大饭店", "价格": "¥600/晚", "评分": 4.7})

        if role == "itinerary_worker":
            return json.dumps({"行程": ["故宫", "长城", "颐和园", "天坛", "南锣鼓巷"]})

        return "{}"


# ── Worker ───────────────────────────────────────────────
@dataclass
class Worker:
    worker_id: str
    role: str          # 对应 MockLLM 中的角色
    llm: MockLLM = field(repr=False)

    def execute(self, task_desc: str) -> dict:
        """执行单个子任务,返回结构化结果。"""
        raw = self.llm.chat(self.role, task_desc)
        try:
            return json.loads(raw)
        except json.JSONDecodeError:
            return {"raw": raw}


# ── Manager ──────────────────────────────────────────────
@dataclass
class Manager:
    llm: MockLLM = field(repr=False)
    workers: dict[str, Worker] = field(default_factory=dict)

    def register_worker(self, task_type: str, worker: Worker):
        self.workers[task_type] = worker

    def decompose(self, user_request: str) -> list[dict]:
        """让 LLM 把用户需求拆成子任务列表。"""
        raw = self.llm.chat("manager", f"请拆解任务: {user_request}")
        return json.loads(raw).get("subtasks", [])

    def dispatch(self, subtasks: list[dict]) -> dict[str, Any]:
        """按 task id 分发给对应 Worker。"""
        results: dict[str, Any] = {}
        for task in subtasks:
            tid = task["id"]
            worker = self.workers.get(tid)
            if worker is None:
                results[tid] = {"error": f"无可用 Worker: {tid}"}
                continue
            results[tid] = worker.execute(task["desc"])
            print(f"  [Worker:{tid}] 完成 → {results[tid]}")
        return results

    def merge(self, results: dict[str, Any]) -> str:
        """让 LLM 合并所有子结果,生成最终方案。"""
        prompt = f"请合并以下子结果: {json.dumps(results, ensure_ascii=False)}"
        return self.llm.chat("manager", prompt)

    def run(self, user_request: str) -> str:
        """完整流程:拆解 → 分发 → 合并。"""
        print(f"[Manager] 收到请求: {user_request}")

        subtasks = self.decompose(user_request)
        print(f"[Manager] 拆解为 {len(subtasks)} 个子任务:")
        for st in subtasks:
            print(f"  - {st['id']}: {st['desc']}")

        results = self.dispatch(subtasks)
        final = self.merge(results)
        print(f"[Manager] 最终方案: {final}")
        return final


# ── 运行入口 ─────────────────────────────────────────────
def main():
    llm = MockLLM()

    # 创建 Workers
    w_flight = Worker("w1", "flight_worker", llm)
    w_hotel  = Worker("w2", "hotel_worker", llm)
    w_itin   = Worker("w3", "itinerary_worker", llm)

    # 创建 Manager 并注册
    mgr = Manager(llm=llm)
    mgr.register_worker("flight", w_flight)
    mgr.register_worker("hotel", w_hotel)
    mgr.register_worker("itinerary", w_itin)

    # 执行
    result = mgr.run("帮我规划北京5日游,从上海出发")

    # 验证
    assert "故宫" in result
    assert "东航" in result or "MU5100" in result
    assert len(llm.call_log) == 5  # 1拆解 + 3执行 + 1合并
    print(f"\n✓ 共 {len(llm.call_log)} 次 LLM 调用,全部通过")


if __name__ == "__main__":
    main()

运行记录

[Manager] 收到请求: 帮我规划北京5日游,从上海出发
[Manager] 拆解为 3 个子任务:
  - flight: 查询上海→北京往返机票
  - hotel: 查询北京5晚酒店
  - itinerary: 生成北京5日景点行程
  [Worker:flight] 完成 → {'航班': '东航MU5100', '价格': '¥1200', '时间': '08:30-11:00'}
  [Worker:hotel] 完成 → {'酒店': '王府井大饭店', '价格': '¥600/晚', '评分': 4.7}
  [Worker:itinerary] 完成 → {'行程': ['故宫', '长城', '颐和园', '天坛', '南锣鼓巷']}
[Manager] 最终方案: 【北京5日游方案】机票: 东航MU5100 ¥1200 | 酒店: 王府井大饭店 ¥600/晚 | 行程: 故宫→长城→颐和园→天坛→南锣鼓巷

✓ 共 5 次 LLM 调用,全部通过

踩坑记录

现象 trace 信号 修法
Manager 自己干活 上下文膨胀、回复变慢 Manager 的 prompt 里出现具体 API 调用或数据处理 严格只让 Manager 做拆解/合并,不碰具体数据
Worker 结果格式不一致 合并时 JSON 解析炸了 merge 阶段抛 JSONDecodeError 或 KeyError 给每个 Worker 统一输出 schema + 异常兜底
子任务遗漏 用户说"还要签证"但 Manager 没拆出来 subtasks 列表缺少用户明确提到的需求 拆解后加一轮"检查是否遗漏"的自检 prompt
并发竞态 两个 Worker 改同一个 state 同一 key 在 results 中被覆盖、值不稳定 Worker 之间禁止共享状态,全走 Manager 汇总

工程备忘

  1. Worker 注册表用字典就够,按 task_type 映射,别搞复杂的服务发现。
  2. 并行执行:示例为串行方便理解,生产用 asyncio.gather 或线程池。
  3. 超时控制:给每个 Worker 设 timeout,Manager 等不到就跳过并标记失败。
  4. 重试策略:Worker 失败后 Manager 可选重试/换 Worker/降级。
  5. 结果 schema:所有 Worker 返回 {"status": "ok"|"error", "data": {...}},Manager 不用猜格式。
  6. 日志追踪:每个子任务带 trace_id,方便排查哪一步出了问题。
  7. 与其他模式对比:如果子任务之间有依赖关系,考虑用 Graph Orchestration;如果需要 Worker 之间讨论,考虑用 Group Chat。

读完以后

如果 Worker 要独立对外、被主 Agent 像函数一样调用,看 Agents-as-Tools。 如果任务容易卡住、需要停滞检测和自动恢复,看 Magentic Orchestration。 如果子任务之间有严格先后顺序和条件分支,看 Graph Orchestration

参考资料