Magentic Orchestration 模式:任务账本 + 停滞检测
解决什么问题
复杂行程规划(10 天多国跨境游)涉及大量子任务,有些任务互相依赖,有些会卡住。 普通 Manager-Worker 只会"分活→等结果",如果某个 Worker 卡死了,整个流程就瘫了。
Magentic Orchestration 引入两个关键机制: - 任务账本 (Task Ledger):全局可见的任务看板,记录每个子任务的状态 - 停滞检测 (Stall Detection):定期扫描账本,发现卡住的任务并介入
复杂度
⭐⭐⭐⭐ 中高。需要状态机 + 定时检测 + 恢复策略。
和其他模式的关系
| 模式 | 谁决定下一步 | 什么时候用 |
|---|---|---|
| Magentic Orchestration | Orchestrator 根据账本状态调度,含停滞检测 | 任务有依赖、可能卡住、需要自动恢复 |
| Manager-Worker | Manager 分发后等结果 | 子任务独立、不担心卡住 |
| ReAct | Agent 自己决定下一步行动 | 单 Agent 循环,不需要多任务调度 |
Magentic 比 Manager-Worker 更健壮(有停滞检测和恢复),比 ReAct 更适合多任务并行(不是单 Agent 循环)。
角色关系
┌────────────────────────────────┐
│ Orchestrator │
│ ┌─────────────────────────┐ │
│ │ Task Ledger │ │ ← 全局任务看板
│ │ [✓] 机票查询 │ │
│ │ [⏳] 酒店预订 (已超时) │ │
│ │ [○] 签证办理 │ │
│ └─────────────────────────┘ │
│ ┌─────────────────────────┐ │
│ │ Stall Detector │ │ ← 检测停滞、触发恢复
│ └─────────────────────────┘ │
└──┬──────────┬───────────┬─────┘
▼ ▼ ▼
Agent1 Agent2 Agent3
完整 Python 实现
"""
Magentic Orchestration: 10天日韩跨境游规划
任务账本追踪所有子任务,停滞检测器发现卡住的任务并触发恢复。
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
STALLED = "stalled"
RECOVERED = "recovered"
# ── MockLLM ──────────────────────────────────────────────
class MockLLM:
def __init__(self):
self.call_log: list[dict] = []
self._simulate_stall = True # 模拟酒店任务卡住
def chat(self, role: str, prompt: str) -> str:
self.call_log.append({"role": role, "prompt": prompt[:80]})
if role == "orchestrator" and "拆解" in prompt:
return json.dumps({"tasks": [
{"id": "flight", "desc": "查询上海→东京→首尔→上海机票", "deps": []},
{"id": "hotel_jp", "desc": "预订东京5晚酒店", "deps": ["flight"]},
{"id": "hotel_kr", "desc": "预订首尔5晚酒店", "deps": ["flight"]},
{"id": "visa", "desc": "确认日韩签证", "deps": []},
{"id": "itinerary", "desc": "生成10日行程", "deps": ["flight", "hotel_jp", "hotel_kr"]},
]})
if role == "flight_worker":
return json.dumps({"航班": ["MU523 上海→东京", "KE2710 东京→首尔", "MU5042 首尔→上海"]})
if role == "hotel_jp_worker":
if self._simulate_stall:
self._simulate_stall = False
return "__STALL__" # 模拟卡住
return json.dumps({"酒店": "新宿格拉斯丽", "价格": "¥500/晚"})
if role == "hotel_kr_worker":
return json.dumps({"酒店": "明洞乐天", "价格": "¥400/晚"})
if role == "visa_worker":
return json.dumps({"日本签证": "已有", "韩国签证": "需办理,5工作日"})
if role == "itinerary_worker":
return json.dumps({"行程": "D1-5东京(浅草+秋叶原+富士山) → D6-10首尔(景福宫+明洞+DMZ)"})
if role == "orchestrator" and "恢复" in prompt:
return "重试hotel_jp任务,使用备用供应商"
return "{}"
# ── Task Ledger ──────────────────────────────────────────
@dataclass
class TaskEntry:
task_id: str
desc: str
deps: list[str]
status: TaskStatus = TaskStatus.PENDING
result: Any = None
started_at: float = 0.0
timeout_sec: float = 5.0 # 演示用短超时
@dataclass
class TaskLedger:
"""全局任务看板:记录所有子任务状态。"""
entries: dict[str, TaskEntry] = field(default_factory=dict)
def add(self, task_id: str, desc: str, deps: list[str]):
self.entries[task_id] = TaskEntry(task_id=task_id, desc=desc, deps=deps)
def is_ready(self, task_id: str) -> bool:
entry = self.entries[task_id]
if entry.status != TaskStatus.PENDING:
return False
for dep in entry.deps:
if self.entries[dep].status not in (TaskStatus.DONE, TaskStatus.RECOVERED):
return False
return True
def get_stalled(self, current_time: float) -> list[str]:
stalled = []
for tid, entry in self.entries.items():
if entry.status == TaskStatus.RUNNING:
if current_time - entry.started_at > entry.timeout_sec:
stalled.append(tid)
return stalled
def summary(self) -> str:
lines = []
icons = {TaskStatus.DONE: "✓", TaskStatus.RUNNING: "⏳", TaskStatus.PENDING: "○",
TaskStatus.STALLED: "⚠", TaskStatus.RECOVERED: "♻"}
for tid, e in self.entries.items():
lines.append(f" [{icons.get(e.status, '?')}] {tid}: {e.desc} ({e.status.value})")
return "\n".join(lines)
# ── Worker ───────────────────────────────────────────────
@dataclass
class Worker:
role: str
llm: MockLLM = field(repr=False)
def execute(self, desc: str) -> dict | None:
raw = self.llm.chat(self.role, desc)
if raw == "__STALL__":
return None # 模拟卡住
try:
return json.loads(raw)
except json.JSONDecodeError:
return {"raw": raw}
# ── Orchestrator ─────────────────────────────────────────
@dataclass
class MagenticOrchestrator:
llm: MockLLM = field(repr=False)
ledger: TaskLedger = field(default_factory=TaskLedger)
workers: dict[str, Worker] = field(default_factory=dict)
tick_count: int = 0
def decompose(self, request: str):
raw = self.llm.chat("orchestrator", f"请拆解任务: {request}")
tasks = json.loads(raw)["tasks"]
for t in tasks:
self.ledger.add(t["id"], t["desc"], t["deps"])
def register_worker(self, task_id: str, worker: Worker):
self.workers[task_id] = worker
def _run_ready_tasks(self):
for tid in list(self.ledger.entries.keys()):
if self.ledger.is_ready(tid):
entry = self.ledger.entries[tid]
entry.status = TaskStatus.RUNNING
entry.started_at = time.time() - 10 # 模拟已经过了很久
worker = self.workers.get(tid)
if worker:
result = worker.execute(entry.desc)
if result is not None:
entry.result = result
entry.status = TaskStatus.DONE
print(f" [✓] {tid} 完成")
else:
print(f" [⏳] {tid} 执行中... (可能卡住)")
def _detect_and_recover_stalls(self):
stalled = self.ledger.get_stalled(time.time())
for tid in stalled:
entry = self.ledger.entries[tid]
entry.status = TaskStatus.STALLED
print(f" [⚠] 检测到停滞: {tid}")
# 请 LLM 决定恢复策略
recovery = self.llm.chat("orchestrator", f"任务{tid}停滞,请决定恢复策略")
print(f" [♻] 恢复策略: {recovery}")
# 重试
worker = self.workers.get(tid)
if worker:
result = worker.execute(entry.desc)
if result:
entry.result = result
entry.status = TaskStatus.RECOVERED
print(f" [♻] {tid} 已恢复")
def run(self, request: str) -> dict:
print(f"[Orchestrator] 收到: {request}")
self.decompose(request)
print(f"[Orchestrator] 任务账本:\n{self.ledger.summary()}\n")
for tick in range(5): # 最多 5 个 tick
self.tick_count = tick
print(f"--- Tick {tick+1} ---")
self._run_ready_tasks()
self._detect_and_recover_stalls()
print(f"账本状态:\n{self.ledger.summary()}\n")
# 全部完成?
all_done = all(
e.status in (TaskStatus.DONE, TaskStatus.RECOVERED)
for e in self.ledger.entries.values()
)
if all_done:
print("[Orchestrator] 所有任务完成!")
break
return {tid: e.result for tid, e in self.ledger.entries.items()}
# ── 运行入口 ─────────────────────────────────────────────
def main():
llm = MockLLM()
orch = MagenticOrchestrator(llm=llm)
orch.decompose("规划10天日韩跨境游,从上海出发")
# 注册 Workers
orch.register_worker("flight", Worker("flight_worker", llm))
orch.register_worker("hotel_jp", Worker("hotel_jp_worker", llm))
orch.register_worker("hotel_kr", Worker("hotel_kr_worker", llm))
orch.register_worker("visa", Worker("visa_worker", llm))
orch.register_worker("itinerary", Worker("itinerary_worker", llm))
results = orch.run("规划10天日韩跨境游,从上海出发")
# 验证停滞检测生效
hotel_jp = orch.ledger.entries["hotel_jp"]
assert hotel_jp.status == TaskStatus.RECOVERED, f"期望 RECOVERED,实际 {hotel_jp.status}"
print(f"\n✓ 停滞检测 + 恢复成功,共 {len(llm.call_log)} 次 LLM 调用")
if __name__ == "__main__":
main()
运行记录
[Orchestrator] 收到: 规划10天日韩跨境游,从上海出发
[Orchestrator] 任务账本:
[○] flight: 查询上海→东京→首尔→上海机票 (pending)
[○] hotel_jp: 预订东京5晚酒店 (pending)
[○] hotel_kr: 预订首尔5晚酒店 (pending)
[○] visa: 确认日韩签证 (pending)
[○] itinerary: 生成10日行程 (pending)
--- Tick 1 ---
[✓] flight 完成
[✓] visa 完成
[⏳] hotel_jp 执行中... (可能卡住)
[⚠] 检测到停滞: hotel_jp
[♻] 恢复策略: 重试hotel_jp任务,使用备用供应商
[♻] hotel_jp 已恢复
--- Tick 2 ---
[✓] hotel_kr 完成
[✓] itinerary 完成
[Orchestrator] 所有任务完成!
✓ 停滞检测 + 恢复成功,共 10 次 LLM 调用
踩坑记录
| 坑 | 现象 | trace 信号 | 修法 |
|---|---|---|---|
| 超时太短 | 正常任务被误判为停滞 | stall_detected 事件频繁出现在正常执行的任务上 | 根据任务类型设不同 timeout |
| 恢复后依赖 | 下游任务拿到旧数据 | 下游任务的输入包含 recovered 任务的旧版本结果 | 恢复后重新触发所有依赖它的任务 |
| 无限重试 | 同一任务反复卡住反复重试 | 同一 task_id 的 stall_detected 事件出现 3+ 次 | 设 max_retries,超过就标记失败 |
| 状态竞态 | 并发修改 ledger | ledger 中同一 task 状态在同一 tick 内被改多次 | 用锁保护 ledger,或单线程 tick 循环 |
工程备忘
- 任务账本是核心:所有状态集中管理,任何组件都能查看全局进度。
- Tick 循环:每个 tick 做"执行就绪任务→检测停滞→恢复",类似游戏主循环。
- 恢复策略多样化:重试、换 Worker、降级、跳过——让 LLM 根据上下文选择。
- 依赖图:任务之间的依赖用 DAG 管理,
is_ready检查所有前置任务完成。 - 与 Manager-Worker 的区别:Manager-Worker 没有全局账本和停滞检测,任务卡了就等着。
- 与 Graph Orchestration 的区别:Graph 是静态有向图;Magentic 的账本是动态可变的。
- 生产建议:账本持久化到 Redis/DB,支持跨进程查看和恢复。
读完以后
如果任务不会卡住、不需要停滞检测,退回更简单的 Manager-Worker。 如果任务之间的依赖关系是静态的、用有向图就够表达,看 Graph Orchestration。 如果 Agent 之间需要自主协作、没有中心调度,看 Swarm Blackboard。