跳转至

Magentic Orchestration 模式:任务账本 + 停滞检测

解决什么问题

复杂行程规划(10 天多国跨境游)涉及大量子任务,有些任务互相依赖,有些会卡住。 普通 Manager-Worker 只会"分活→等结果",如果某个 Worker 卡死了,整个流程就瘫了。

Magentic Orchestration 引入两个关键机制: - 任务账本 (Task Ledger):全局可见的任务看板,记录每个子任务的状态 - 停滞检测 (Stall Detection):定期扫描账本,发现卡住的任务并介入

复杂度

⭐⭐⭐⭐ 中高。需要状态机 + 定时检测 + 恢复策略。

和其他模式的关系

模式 谁决定下一步 什么时候用
Magentic Orchestration Orchestrator 根据账本状态调度,含停滞检测 任务有依赖、可能卡住、需要自动恢复
Manager-Worker Manager 分发后等结果 子任务独立、不担心卡住
ReAct Agent 自己决定下一步行动 单 Agent 循环,不需要多任务调度

Magentic 比 Manager-Worker 更健壮(有停滞检测和恢复),比 ReAct 更适合多任务并行(不是单 Agent 循环)。

角色关系

┌────────────────────────────────┐
│         Orchestrator           │
│  ┌─────────────────────────┐  │
│  │     Task Ledger          │  │  ← 全局任务看板
│  │  [✓] 机票查询            │  │
│  │  [⏳] 酒店预订 (已超时)   │  │
│  │  [○] 签证办理            │  │
│  └─────────────────────────┘  │
│  ┌─────────────────────────┐  │
│  │   Stall Detector         │  │  ← 检测停滞、触发恢复
│  └─────────────────────────┘  │
└──┬──────────┬───────────┬─────┘
   ▼          ▼           ▼
 Agent1    Agent2      Agent3

完整 Python 实现

"""
Magentic Orchestration: 10天日韩跨境游规划
任务账本追踪所有子任务,停滞检测器发现卡住的任务并触发恢复。
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    DONE = "done"
    STALLED = "stalled"
    RECOVERED = "recovered"


# ── MockLLM ──────────────────────────────────────────────
class MockLLM:
    def __init__(self):
        self.call_log: list[dict] = []
        self._simulate_stall = True  # 模拟酒店任务卡住

    def chat(self, role: str, prompt: str) -> str:
        self.call_log.append({"role": role, "prompt": prompt[:80]})

        if role == "orchestrator" and "拆解" in prompt:
            return json.dumps({"tasks": [
                {"id": "flight", "desc": "查询上海→东京→首尔→上海机票", "deps": []},
                {"id": "hotel_jp", "desc": "预订东京5晚酒店", "deps": ["flight"]},
                {"id": "hotel_kr", "desc": "预订首尔5晚酒店", "deps": ["flight"]},
                {"id": "visa", "desc": "确认日韩签证", "deps": []},
                {"id": "itinerary", "desc": "生成10日行程", "deps": ["flight", "hotel_jp", "hotel_kr"]},
            ]})

        if role == "flight_worker":
            return json.dumps({"航班": ["MU523 上海→东京", "KE2710 东京→首尔", "MU5042 首尔→上海"]})

        if role == "hotel_jp_worker":
            if self._simulate_stall:
                self._simulate_stall = False
                return "__STALL__"  # 模拟卡住
            return json.dumps({"酒店": "新宿格拉斯丽", "价格": "¥500/晚"})

        if role == "hotel_kr_worker":
            return json.dumps({"酒店": "明洞乐天", "价格": "¥400/晚"})

        if role == "visa_worker":
            return json.dumps({"日本签证": "已有", "韩国签证": "需办理,5工作日"})

        if role == "itinerary_worker":
            return json.dumps({"行程": "D1-5东京(浅草+秋叶原+富士山) → D6-10首尔(景福宫+明洞+DMZ)"})

        if role == "orchestrator" and "恢复" in prompt:
            return "重试hotel_jp任务,使用备用供应商"

        return "{}"


# ── Task Ledger ──────────────────────────────────────────
@dataclass
class TaskEntry:
    task_id: str
    desc: str
    deps: list[str]
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    started_at: float = 0.0
    timeout_sec: float = 5.0  # 演示用短超时

@dataclass
class TaskLedger:
    """全局任务看板:记录所有子任务状态。"""
    entries: dict[str, TaskEntry] = field(default_factory=dict)

    def add(self, task_id: str, desc: str, deps: list[str]):
        self.entries[task_id] = TaskEntry(task_id=task_id, desc=desc, deps=deps)

    def is_ready(self, task_id: str) -> bool:
        entry = self.entries[task_id]
        if entry.status != TaskStatus.PENDING:
            return False
        for dep in entry.deps:
            if self.entries[dep].status not in (TaskStatus.DONE, TaskStatus.RECOVERED):
                return False
        return True

    def get_stalled(self, current_time: float) -> list[str]:
        stalled = []
        for tid, entry in self.entries.items():
            if entry.status == TaskStatus.RUNNING:
                if current_time - entry.started_at > entry.timeout_sec:
                    stalled.append(tid)
        return stalled

    def summary(self) -> str:
        lines = []
        icons = {TaskStatus.DONE: "✓", TaskStatus.RUNNING: "⏳", TaskStatus.PENDING: "○",
                 TaskStatus.STALLED: "⚠", TaskStatus.RECOVERED: "♻"}
        for tid, e in self.entries.items():
            lines.append(f"  [{icons.get(e.status, '?')}] {tid}: {e.desc} ({e.status.value})")
        return "\n".join(lines)


# ── Worker ───────────────────────────────────────────────
@dataclass
class Worker:
    role: str
    llm: MockLLM = field(repr=False)

    def execute(self, desc: str) -> dict | None:
        raw = self.llm.chat(self.role, desc)
        if raw == "__STALL__":
            return None  # 模拟卡住
        try:
            return json.loads(raw)
        except json.JSONDecodeError:
            return {"raw": raw}


# ── Orchestrator ─────────────────────────────────────────
@dataclass
class MagenticOrchestrator:
    llm: MockLLM = field(repr=False)
    ledger: TaskLedger = field(default_factory=TaskLedger)
    workers: dict[str, Worker] = field(default_factory=dict)
    tick_count: int = 0

    def decompose(self, request: str):
        raw = self.llm.chat("orchestrator", f"请拆解任务: {request}")
        tasks = json.loads(raw)["tasks"]
        for t in tasks:
            self.ledger.add(t["id"], t["desc"], t["deps"])

    def register_worker(self, task_id: str, worker: Worker):
        self.workers[task_id] = worker

    def _run_ready_tasks(self):
        for tid in list(self.ledger.entries.keys()):
            if self.ledger.is_ready(tid):
                entry = self.ledger.entries[tid]
                entry.status = TaskStatus.RUNNING
                entry.started_at = time.time() - 10  # 模拟已经过了很久
                worker = self.workers.get(tid)
                if worker:
                    result = worker.execute(entry.desc)
                    if result is not None:
                        entry.result = result
                        entry.status = TaskStatus.DONE
                        print(f"  [✓] {tid} 完成")
                    else:
                        print(f"  [⏳] {tid} 执行中... (可能卡住)")

    def _detect_and_recover_stalls(self):
        stalled = self.ledger.get_stalled(time.time())
        for tid in stalled:
            entry = self.ledger.entries[tid]
            entry.status = TaskStatus.STALLED
            print(f"  [⚠] 检测到停滞: {tid}")

            # 请 LLM 决定恢复策略
            recovery = self.llm.chat("orchestrator", f"任务{tid}停滞,请决定恢复策略")
            print(f"  [♻] 恢复策略: {recovery}")

            # 重试
            worker = self.workers.get(tid)
            if worker:
                result = worker.execute(entry.desc)
                if result:
                    entry.result = result
                    entry.status = TaskStatus.RECOVERED
                    print(f"  [♻] {tid} 已恢复")

    def run(self, request: str) -> dict:
        print(f"[Orchestrator] 收到: {request}")
        self.decompose(request)
        print(f"[Orchestrator] 任务账本:\n{self.ledger.summary()}\n")

        for tick in range(5):  # 最多 5 个 tick
            self.tick_count = tick
            print(f"--- Tick {tick+1} ---")
            self._run_ready_tasks()
            self._detect_and_recover_stalls()
            print(f"账本状态:\n{self.ledger.summary()}\n")

            # 全部完成?
            all_done = all(
                e.status in (TaskStatus.DONE, TaskStatus.RECOVERED)
                for e in self.ledger.entries.values()
            )
            if all_done:
                print("[Orchestrator] 所有任务完成!")
                break

        return {tid: e.result for tid, e in self.ledger.entries.items()}


# ── 运行入口 ─────────────────────────────────────────────
def main():
    llm = MockLLM()

    orch = MagenticOrchestrator(llm=llm)
    orch.decompose("规划10天日韩跨境游,从上海出发")

    # 注册 Workers
    orch.register_worker("flight", Worker("flight_worker", llm))
    orch.register_worker("hotel_jp", Worker("hotel_jp_worker", llm))
    orch.register_worker("hotel_kr", Worker("hotel_kr_worker", llm))
    orch.register_worker("visa", Worker("visa_worker", llm))
    orch.register_worker("itinerary", Worker("itinerary_worker", llm))

    results = orch.run("规划10天日韩跨境游,从上海出发")

    # 验证停滞检测生效
    hotel_jp = orch.ledger.entries["hotel_jp"]
    assert hotel_jp.status == TaskStatus.RECOVERED, f"期望 RECOVERED,实际 {hotel_jp.status}"
    print(f"\n✓ 停滞检测 + 恢复成功,共 {len(llm.call_log)} 次 LLM 调用")


if __name__ == "__main__":
    main()

运行记录

[Orchestrator] 收到: 规划10天日韩跨境游,从上海出发
[Orchestrator] 任务账本:
  [○] flight: 查询上海→东京→首尔→上海机票 (pending)
  [○] hotel_jp: 预订东京5晚酒店 (pending)
  [○] hotel_kr: 预订首尔5晚酒店 (pending)
  [○] visa: 确认日韩签证 (pending)
  [○] itinerary: 生成10日行程 (pending)

--- Tick 1 ---
  [✓] flight 完成
  [✓] visa 完成
  [⏳] hotel_jp 执行中... (可能卡住)
  [⚠] 检测到停滞: hotel_jp
  [♻] 恢复策略: 重试hotel_jp任务,使用备用供应商
  [♻] hotel_jp 已恢复

--- Tick 2 ---
  [✓] hotel_kr 完成
  [✓] itinerary 完成
[Orchestrator] 所有任务完成!

✓ 停滞检测 + 恢复成功,共 10 次 LLM 调用

踩坑记录

现象 trace 信号 修法
超时太短 正常任务被误判为停滞 stall_detected 事件频繁出现在正常执行的任务上 根据任务类型设不同 timeout
恢复后依赖 下游任务拿到旧数据 下游任务的输入包含 recovered 任务的旧版本结果 恢复后重新触发所有依赖它的任务
无限重试 同一任务反复卡住反复重试 同一 task_id 的 stall_detected 事件出现 3+ 次 设 max_retries,超过就标记失败
状态竞态 并发修改 ledger ledger 中同一 task 状态在同一 tick 内被改多次 用锁保护 ledger,或单线程 tick 循环

工程备忘

  1. 任务账本是核心:所有状态集中管理,任何组件都能查看全局进度。
  2. Tick 循环:每个 tick 做"执行就绪任务→检测停滞→恢复",类似游戏主循环。
  3. 恢复策略多样化:重试、换 Worker、降级、跳过——让 LLM 根据上下文选择。
  4. 依赖图:任务之间的依赖用 DAG 管理,is_ready 检查所有前置任务完成。
  5. 与 Manager-Worker 的区别:Manager-Worker 没有全局账本和停滞检测,任务卡了就等着。
  6. 与 Graph Orchestration 的区别:Graph 是静态有向图;Magentic 的账本是动态可变的。
  7. 生产建议:账本持久化到 Redis/DB,支持跨进程查看和恢复。

读完以后

如果任务不会卡住、不需要停滞检测,退回更简单的 Manager-Worker。 如果任务之间的依赖关系是静态的、用有向图就够表达,看 Graph Orchestration。 如果 Agent 之间需要自主协作、没有中心调度,看 Swarm Blackboard

参考资料