跳转至

Graph Orchestration 模式:有向图控制流

解决什么问题

旅行规划的步骤之间有严格的先后顺序和分支逻辑: 必须先订机票才能选酒店,签证过了才能出行程,预算超标要走"砍需求"分支。

Graph Orchestration 用有向图 (DAG/StateGraph) 显式定义节点和边, 每个节点是一个 Agent/函数,边是条件跳转。类似 LangGraph 的设计思路。

场景 为什么选它
旅行规划流水线 步骤有严格依赖 + 条件分支
审批流程 通过/拒绝走不同路径
数据处理 DAG Airflow 式的任务编排

复杂度

⭐⭐⭐ 中等。核心是图定义 + 状态传递 + 条件路由。

和其他模式的关系

模式 谁决定下一步 什么时候用
Graph Orchestration 图的边(含条件边)决定流转 步骤有严格依赖和条件分支
Prompt Chaining Python 代码写死线性步骤 流程线性、无分支
Manager-Worker Manager 动态拆任务再分发 子任务由模型决定,不是预定义的图

Graph 比 Prompt Chaining 更灵活(支持条件分支和循环),比 Manager-Worker 更可预测(结构编译期确定)。

角色关系

┌──────────┐
│  START    │
└────┬─────┘
     ▼
┌──────────┐    ┌───────────┐
│ 查机票    │───►│ 查酒店     │
└──────────┘    └─────┬─────┘
                      ▼
               ┌──────────────┐
               │  预算检查      │
               └──┬────────┬──┘
          ≤预算  │        │ >预算
                 ▼        ▼
          ┌─────────┐ ┌──────────┐
          │ 生成行程  │ │ 砍需求    │──► 回到查酒店
          └────┬────┘ └──────────┘
               ▼
          ┌─────────┐
          │   END    │
          └─────────┘

完整 Python 实现

"""
Graph Orchestration: 泰国旅行规划 DAG
用有向图定义: 查机票 → 查酒店 → 预算检查 → 生成行程(或砍需求后重来)。
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Callable


# ── MockLLM ──────────────────────────────────────────────
class MockLLM:
    def __init__(self):
        self.call_log: list[dict] = []
        self._budget_check_count = 0

    def chat(self, role: str, prompt: str) -> str:
        self.call_log.append({"role": role, "prompt": prompt[:80]})

        if role == "flight_node":
            return json.dumps({"航班": "春秋9C8951", "价格": 1500, "路线": "上海→曼谷"})

        if role == "hotel_node":
            state_str = prompt
            if "budget_level" in state_str and "economy" in state_str:
                return json.dumps({"酒店": "考山路青旅", "每晚": 100, "总价": 500})
            return json.dumps({"酒店": "暹罗凯宾斯基", "每晚": 800, "总价": 4000})

        if role == "budget_node":
            self._budget_check_count += 1
            # 第一次超预算,第二次(砍需求后)通过
            if self._budget_check_count == 1:
                return json.dumps({"总花费": 7500, "预算": 5000, "超标": True})
            return json.dumps({"总花费": 4000, "预算": 5000, "超标": False})

        if role == "cut_node":
            return json.dumps({"策略": "降级酒店为经济型", "budget_level": "economy"})

        if role == "itinerary_node":
            return json.dumps({"行程": "D1曼谷大皇宫 D2水上市场 D3清迈古城 D4丛林飞跃 D5返程"})

        return "{}"


# ── State ────────────────────────────────────────────────
@dataclass
class GraphState:
    """在节点之间传递的全局状态。"""
    data: dict[str, Any] = field(default_factory=dict)

    def set(self, key: str, value: Any):
        self.data[key] = value

    def get(self, key: str, default: Any = None) -> Any:
        return self.data.get(key, default)

    def __repr__(self):
        return f"State({self.data})"


# ── Node ─────────────────────────────────────────────────
@dataclass
class GraphNode:
    """图中的一个节点,包装了一个执行函数。"""
    name: str
    fn: Callable[[GraphState], GraphState]

    def execute(self, state: GraphState) -> GraphState:
        print(f"  [Node: {self.name}] 执行中...")
        return self.fn(state)


# ── Edge ─────────────────────────────────────────────────
@dataclass
class ConditionalEdge:
    """条件边:根据 state 决定下一个节点。"""
    condition: Callable[[GraphState], str]  # 返回目标节点名


# ── StateGraph ───────────────────────────────────────────
@dataclass
class StateGraph:
    nodes: dict[str, GraphNode] = field(default_factory=dict)
    edges: dict[str, str | ConditionalEdge] = field(default_factory=dict)
    entry: str = ""

    def add_node(self, name: str, fn: Callable[[GraphState], GraphState]):
        self.nodes[name] = GraphNode(name=name, fn=fn)

    def add_edge(self, from_node: str, to_node: str):
        """无条件边。"""
        self.edges[from_node] = to_node

    def add_conditional_edge(self, from_node: str, condition: Callable[[GraphState], str]):
        """条件边。"""
        self.edges[from_node] = ConditionalEdge(condition=condition)

    def set_entry(self, name: str):
        self.entry = name

    def run(self, initial_state: GraphState, max_steps: int = 10) -> GraphState:
        state = initial_state
        current = self.entry
        step = 0

        print(f"[Graph] 开始执行,入口: {current}")
        while current != "__END__" and step < max_steps:
            step += 1
            node = self.nodes.get(current)
            if node is None:
                print(f"  [Error] 节点 {current} 不存在")
                break

            state = node.execute(state)

            # 确定下一个节点
            edge = self.edges.get(current, "__END__")
            if isinstance(edge, ConditionalEdge):
                current = edge.condition(state)
                print(f"  [路由] → {current}")
            elif isinstance(edge, str):
                current = edge
                if current != "__END__":
                    print(f"  [跳转] → {current}")

        print(f"[Graph] 执行完毕,共 {step} 步")
        return state


# ── 构建旅行规划图 ───────────────────────────────────────
def build_travel_graph(llm: MockLLM) -> StateGraph:
    graph = StateGraph()

    # 节点函数
    def flight_fn(state: GraphState) -> GraphState:
        raw = llm.chat("flight_node", "查机票")
        result = json.loads(raw)
        state.set("flight", result)
        state.set("flight_cost", result["价格"])
        print(f"    结果: {result}")
        return state

    def hotel_fn(state: GraphState) -> GraphState:
        budget_level = state.get("budget_level", "normal")
        raw = llm.chat("hotel_node", f"查酒店 budget_level={budget_level}")
        result = json.loads(raw)
        state.set("hotel", result)
        state.set("hotel_cost", result["总价"])
        print(f"    结果: {result}")
        return state

    def budget_fn(state: GraphState) -> GraphState:
        raw = llm.chat("budget_node", "检查预算")
        result = json.loads(raw)
        state.set("budget_check", result)
        print(f"    结果: {result}")
        return state

    def cut_fn(state: GraphState) -> GraphState:
        raw = llm.chat("cut_node", "砍需求")
        result = json.loads(raw)
        state.set("budget_level", result.get("budget_level", "economy"))
        print(f"    策略: {result}")
        return state

    def itinerary_fn(state: GraphState) -> GraphState:
        raw = llm.chat("itinerary_node", "生成行程")
        result = json.loads(raw)
        state.set("itinerary", result)
        print(f"    结果: {result}")
        return state

    # 注册节点
    graph.add_node("flight", flight_fn)
    graph.add_node("hotel", hotel_fn)
    graph.add_node("budget_check", budget_fn)
    graph.add_node("cut_requirements", cut_fn)
    graph.add_node("itinerary", itinerary_fn)

    # 连边
    graph.set_entry("flight")
    graph.add_edge("flight", "hotel")
    graph.add_edge("hotel", "budget_check")

    # 条件边: 预算检查
    def budget_router(state: GraphState) -> str:
        check = state.get("budget_check", {})
        if check.get("超标"):
            return "cut_requirements"
        return "itinerary"

    graph.add_conditional_edge("budget_check", budget_router)
    graph.add_edge("cut_requirements", "hotel")  # 砍完需求回去重选酒店
    graph.add_edge("itinerary", "__END__")

    return graph


# ── 运行入口 ─────────────────────────────────────────────
def main():
    llm = MockLLM()
    graph = build_travel_graph(llm)

    state = GraphState()
    state.set("destination", "泰国")
    state.set("days", 5)
    state.set("budget", 5000)

    final_state = graph.run(state)

    # 验证
    assert final_state.get("itinerary") is not None
    assert final_state.get("budget_level") == "economy"  # 经历了砍需求
    assert llm._budget_check_count == 2  # 检查了两次预算
    print(f"\n✓ 图执行完成,经历了预算超标→砍需求→重选酒店的循环")
    print(f"  共 {len(llm.call_log)} 次 LLM 调用")


if __name__ == "__main__":
    main()

运行记录

[Graph] 开始执行,入口: flight
  [Node: flight] 执行中...
    结果: {'航班': '春秋9C8951', '价格': 1500, '路线': '上海→曼谷'}
  [跳转] → hotel
  [Node: hotel] 执行中...
    结果: {'酒店': '暹罗凯宾斯基', '每晚': 800, '总价': 4000}
  [跳转] → budget_check
  [Node: budget_check] 执行中...
    结果: {'总花费': 7500, '预算': 5000, '超标': True}
  [路由] → cut_requirements
  [Node: cut_requirements] 执行中...
    策略: {'策略': '降级酒店为经济型', 'budget_level': 'economy'}
  [跳转] → hotel
  [Node: hotel] 执行中...
    结果: {'酒店': '考山路青旅', '每晚': 100, '总价': 500}
  [跳转] → budget_check
  [Node: budget_check] 执行中...
    结果: {'总花费': 4000, '预算': 5000, '超标': False}
  [路由] → itinerary
  [Node: itinerary] 执行中...
    结果: {'行程': 'D1曼谷大皇宫 D2水上市场 D3清迈古城 D4丛林飞跃 D5返程'}
[Graph] 执行完毕,共 7 步

✓ 图执行完成,经历了预算超标→砍需求→重选酒店的循环
  共 7 次 LLM 调用

踩坑记录

现象 trace 信号 修法
无限循环 预算永远超标,砍了还超 step 数触达 max_steps 且未到 END 设 max_steps + 砍需求时记录已砍次数
状态丢失 节点 B 读不到节点 A 写的数据 节点 B 的 state 中缺少节点 A 写入的 key 所有数据统一写到 GraphState,不要用局部变量
图太复杂 10+ 节点难以理解 图可视化后边数 > 节点数的 2 倍 用 Mermaid/Graphviz 可视化图结构
条件边遗漏 某个分支没处理,直接跳到 END 条件路由返回了未注册的节点名 条件函数加 else 兜底,返回默认节点

工程备忘

  1. 图定义与执行分离:先 build_graph() 定义结构,再 graph.run() 执行,方便测试和复用。
  2. 状态不可变原则:建议每个节点返回新 state 而非原地修改(这里为简化用了 mutable)。
  3. LangGraph 映射:本示例的 StateGraph 对应 LangGraph 的 StateGraphadd_conditional_edge 对应 add_conditional_edges
  4. 子图嵌套:复杂流程可以把一组节点封装成子图,外层图调用子图节点。
  5. 与 Magentic 的区别:Graph 是静态结构,编译期确定;Magentic 的任务账本是运行时动态变化的。
  6. 可视化:建议自动生成 Mermaid 图,代码改了图也跟着变。
  7. 持久化检查点:长流程在每个节点执行后保存 state 快照,支持断点恢复。

读完以后

如果流程是线性的、不需要条件分支,退回更简单的 Prompt Chaining。 如果任务拆解需要模型动态决定、不适合预定义图结构,看 Manager-Worker。 如果运行时任务可能卡住、需要停滞检测,看 Magentic Orchestration

参考资料