Graph Orchestration 模式:有向图控制流
解决什么问题
旅行规划的步骤之间有严格的先后顺序和分支逻辑: 必须先订机票才能选酒店,签证过了才能出行程,预算超标要走"砍需求"分支。
Graph Orchestration 用有向图 (DAG/StateGraph) 显式定义节点和边, 每个节点是一个 Agent/函数,边是条件跳转。类似 LangGraph 的设计思路。
| 场景 | 为什么选它 |
|---|---|
| 旅行规划流水线 | 步骤有严格依赖 + 条件分支 |
| 审批流程 | 通过/拒绝走不同路径 |
| 数据处理 DAG | Airflow 式的任务编排 |
复杂度
⭐⭐⭐ 中等。核心是图定义 + 状态传递 + 条件路由。
和其他模式的关系
| 模式 | 谁决定下一步 | 什么时候用 |
|---|---|---|
| Graph Orchestration | 图的边(含条件边)决定流转 | 步骤有严格依赖和条件分支 |
| Prompt Chaining | Python 代码写死线性步骤 | 流程线性、无分支 |
| Manager-Worker | Manager 动态拆任务再分发 | 子任务由模型决定,不是预定义的图 |
Graph 比 Prompt Chaining 更灵活(支持条件分支和循环),比 Manager-Worker 更可预测(结构编译期确定)。
角色关系
┌──────────┐
│ START │
└────┬─────┘
▼
┌──────────┐ ┌───────────┐
│ 查机票 │───►│ 查酒店 │
└──────────┘ └─────┬─────┘
▼
┌──────────────┐
│ 预算检查 │
└──┬────────┬──┘
≤预算 │ │ >预算
▼ ▼
┌─────────┐ ┌──────────┐
│ 生成行程 │ │ 砍需求 │──► 回到查酒店
└────┬────┘ └──────────┘
▼
┌─────────┐
│ END │
└─────────┘
完整 Python 实现
"""
Graph Orchestration: 泰国旅行规划 DAG
用有向图定义: 查机票 → 查酒店 → 预算检查 → 生成行程(或砍需求后重来)。
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Callable
# ── MockLLM ──────────────────────────────────────────────
class MockLLM:
def __init__(self):
self.call_log: list[dict] = []
self._budget_check_count = 0
def chat(self, role: str, prompt: str) -> str:
self.call_log.append({"role": role, "prompt": prompt[:80]})
if role == "flight_node":
return json.dumps({"航班": "春秋9C8951", "价格": 1500, "路线": "上海→曼谷"})
if role == "hotel_node":
state_str = prompt
if "budget_level" in state_str and "economy" in state_str:
return json.dumps({"酒店": "考山路青旅", "每晚": 100, "总价": 500})
return json.dumps({"酒店": "暹罗凯宾斯基", "每晚": 800, "总价": 4000})
if role == "budget_node":
self._budget_check_count += 1
# 第一次超预算,第二次(砍需求后)通过
if self._budget_check_count == 1:
return json.dumps({"总花费": 7500, "预算": 5000, "超标": True})
return json.dumps({"总花费": 4000, "预算": 5000, "超标": False})
if role == "cut_node":
return json.dumps({"策略": "降级酒店为经济型", "budget_level": "economy"})
if role == "itinerary_node":
return json.dumps({"行程": "D1曼谷大皇宫 D2水上市场 D3清迈古城 D4丛林飞跃 D5返程"})
return "{}"
# ── State ────────────────────────────────────────────────
@dataclass
class GraphState:
"""在节点之间传递的全局状态。"""
data: dict[str, Any] = field(default_factory=dict)
def set(self, key: str, value: Any):
self.data[key] = value
def get(self, key: str, default: Any = None) -> Any:
return self.data.get(key, default)
def __repr__(self):
return f"State({self.data})"
# ── Node ─────────────────────────────────────────────────
@dataclass
class GraphNode:
"""图中的一个节点,包装了一个执行函数。"""
name: str
fn: Callable[[GraphState], GraphState]
def execute(self, state: GraphState) -> GraphState:
print(f" [Node: {self.name}] 执行中...")
return self.fn(state)
# ── Edge ─────────────────────────────────────────────────
@dataclass
class ConditionalEdge:
"""条件边:根据 state 决定下一个节点。"""
condition: Callable[[GraphState], str] # 返回目标节点名
# ── StateGraph ───────────────────────────────────────────
@dataclass
class StateGraph:
nodes: dict[str, GraphNode] = field(default_factory=dict)
edges: dict[str, str | ConditionalEdge] = field(default_factory=dict)
entry: str = ""
def add_node(self, name: str, fn: Callable[[GraphState], GraphState]):
self.nodes[name] = GraphNode(name=name, fn=fn)
def add_edge(self, from_node: str, to_node: str):
"""无条件边。"""
self.edges[from_node] = to_node
def add_conditional_edge(self, from_node: str, condition: Callable[[GraphState], str]):
"""条件边。"""
self.edges[from_node] = ConditionalEdge(condition=condition)
def set_entry(self, name: str):
self.entry = name
def run(self, initial_state: GraphState, max_steps: int = 10) -> GraphState:
state = initial_state
current = self.entry
step = 0
print(f"[Graph] 开始执行,入口: {current}")
while current != "__END__" and step < max_steps:
step += 1
node = self.nodes.get(current)
if node is None:
print(f" [Error] 节点 {current} 不存在")
break
state = node.execute(state)
# 确定下一个节点
edge = self.edges.get(current, "__END__")
if isinstance(edge, ConditionalEdge):
current = edge.condition(state)
print(f" [路由] → {current}")
elif isinstance(edge, str):
current = edge
if current != "__END__":
print(f" [跳转] → {current}")
print(f"[Graph] 执行完毕,共 {step} 步")
return state
# ── 构建旅行规划图 ───────────────────────────────────────
def build_travel_graph(llm: MockLLM) -> StateGraph:
graph = StateGraph()
# 节点函数
def flight_fn(state: GraphState) -> GraphState:
raw = llm.chat("flight_node", "查机票")
result = json.loads(raw)
state.set("flight", result)
state.set("flight_cost", result["价格"])
print(f" 结果: {result}")
return state
def hotel_fn(state: GraphState) -> GraphState:
budget_level = state.get("budget_level", "normal")
raw = llm.chat("hotel_node", f"查酒店 budget_level={budget_level}")
result = json.loads(raw)
state.set("hotel", result)
state.set("hotel_cost", result["总价"])
print(f" 结果: {result}")
return state
def budget_fn(state: GraphState) -> GraphState:
raw = llm.chat("budget_node", "检查预算")
result = json.loads(raw)
state.set("budget_check", result)
print(f" 结果: {result}")
return state
def cut_fn(state: GraphState) -> GraphState:
raw = llm.chat("cut_node", "砍需求")
result = json.loads(raw)
state.set("budget_level", result.get("budget_level", "economy"))
print(f" 策略: {result}")
return state
def itinerary_fn(state: GraphState) -> GraphState:
raw = llm.chat("itinerary_node", "生成行程")
result = json.loads(raw)
state.set("itinerary", result)
print(f" 结果: {result}")
return state
# 注册节点
graph.add_node("flight", flight_fn)
graph.add_node("hotel", hotel_fn)
graph.add_node("budget_check", budget_fn)
graph.add_node("cut_requirements", cut_fn)
graph.add_node("itinerary", itinerary_fn)
# 连边
graph.set_entry("flight")
graph.add_edge("flight", "hotel")
graph.add_edge("hotel", "budget_check")
# 条件边: 预算检查
def budget_router(state: GraphState) -> str:
check = state.get("budget_check", {})
if check.get("超标"):
return "cut_requirements"
return "itinerary"
graph.add_conditional_edge("budget_check", budget_router)
graph.add_edge("cut_requirements", "hotel") # 砍完需求回去重选酒店
graph.add_edge("itinerary", "__END__")
return graph
# ── 运行入口 ─────────────────────────────────────────────
def main():
llm = MockLLM()
graph = build_travel_graph(llm)
state = GraphState()
state.set("destination", "泰国")
state.set("days", 5)
state.set("budget", 5000)
final_state = graph.run(state)
# 验证
assert final_state.get("itinerary") is not None
assert final_state.get("budget_level") == "economy" # 经历了砍需求
assert llm._budget_check_count == 2 # 检查了两次预算
print(f"\n✓ 图执行完成,经历了预算超标→砍需求→重选酒店的循环")
print(f" 共 {len(llm.call_log)} 次 LLM 调用")
if __name__ == "__main__":
main()
运行记录
[Graph] 开始执行,入口: flight
[Node: flight] 执行中...
结果: {'航班': '春秋9C8951', '价格': 1500, '路线': '上海→曼谷'}
[跳转] → hotel
[Node: hotel] 执行中...
结果: {'酒店': '暹罗凯宾斯基', '每晚': 800, '总价': 4000}
[跳转] → budget_check
[Node: budget_check] 执行中...
结果: {'总花费': 7500, '预算': 5000, '超标': True}
[路由] → cut_requirements
[Node: cut_requirements] 执行中...
策略: {'策略': '降级酒店为经济型', 'budget_level': 'economy'}
[跳转] → hotel
[Node: hotel] 执行中...
结果: {'酒店': '考山路青旅', '每晚': 100, '总价': 500}
[跳转] → budget_check
[Node: budget_check] 执行中...
结果: {'总花费': 4000, '预算': 5000, '超标': False}
[路由] → itinerary
[Node: itinerary] 执行中...
结果: {'行程': 'D1曼谷大皇宫 D2水上市场 D3清迈古城 D4丛林飞跃 D5返程'}
[Graph] 执行完毕,共 7 步
✓ 图执行完成,经历了预算超标→砍需求→重选酒店的循环
共 7 次 LLM 调用
踩坑记录
| 坑 | 现象 | trace 信号 | 修法 |
|---|---|---|---|
| 无限循环 | 预算永远超标,砍了还超 | step 数触达 max_steps 且未到 END | 设 max_steps + 砍需求时记录已砍次数 |
| 状态丢失 | 节点 B 读不到节点 A 写的数据 | 节点 B 的 state 中缺少节点 A 写入的 key | 所有数据统一写到 GraphState,不要用局部变量 |
| 图太复杂 | 10+ 节点难以理解 | 图可视化后边数 > 节点数的 2 倍 | 用 Mermaid/Graphviz 可视化图结构 |
| 条件边遗漏 | 某个分支没处理,直接跳到 END | 条件路由返回了未注册的节点名 | 条件函数加 else 兜底,返回默认节点 |
工程备忘
- 图定义与执行分离:先
build_graph()定义结构,再graph.run()执行,方便测试和复用。 - 状态不可变原则:建议每个节点返回新 state 而非原地修改(这里为简化用了 mutable)。
- LangGraph 映射:本示例的
StateGraph对应 LangGraph 的StateGraph,add_conditional_edge对应add_conditional_edges。 - 子图嵌套:复杂流程可以把一组节点封装成子图,外层图调用子图节点。
- 与 Magentic 的区别:Graph 是静态结构,编译期确定;Magentic 的任务账本是运行时动态变化的。
- 可视化:建议自动生成 Mermaid 图,代码改了图也跟着变。
- 持久化检查点:长流程在每个节点执行后保存 state 快照,支持断点恢复。
读完以后
如果流程是线性的、不需要条件分支,退回更简单的 Prompt Chaining。 如果任务拆解需要模型动态决定、不适合预定义图结构,看 Manager-Worker。 如果运行时任务可能卡住、需要停滞检测,看 Magentic Orchestration。
参考资料
- LangGraph 文档 — Graph Orchestration 的主流实现
- LangGraph StateGraph Tutorial
- Apache Airflow — DAG 任务编排的经典参考(非 LLM 场景)