跳转至

第六章:Workflow 与 Agent Loop

第五章的 ReAct Agent 每一步都由模型决定下一步做什么。这很灵活,但不是所有任务都需要这种灵活性。

如果你的旅游助手每次都是"查天气→搜景点→查交通→出方案",步骤固定、顺序不变,让模型每轮自己决定下一步反而是浪费——它可能会跳过某个步骤,或者做多余的事。

这一章把控制流分成两类:

  • Workflow — 步骤由你的代码决定,模型只负责每一步的执行。
  • Agent Loop — 步骤由模型决定,你的代码只负责执行工具和管理循环。

它们不是对立的,而是一个频谱的两端。


6.1 Workflow

6.1.1 Prompt Chaining(提示链)

最简单的 Workflow:把任务拆成几个步骤,每步调一次 LLM,上一步的输出作为下一步的输入。

import json
from agent_patterns_lab.runtime.mock_model import MockLLM
from agent_patterns_lab.runtime.types import Message


def prompt_chain(steps: list[dict], initial_input: str) -> list[dict]:
    """
    执行一条 prompt chain。

    steps: [{"name": "步骤名", "system": "system prompt", "model": MockLLM}]
    initial_input: 用户原始输入

    返回每一步的结果列表。
    """
    results = []
    current_input = initial_input

    for i, step in enumerate(steps):
        messages = [
            Message(role="system", content=step["system"]),
            Message(role="user", content=current_input),
        ]
        output = step["model"].complete(messages)
        results.append({"step": step["name"], "output": output})
        print(f"── {step['name']} ──")
        print(f"输入: {current_input[:80]}{'...' if len(current_input) > 80 else ''}")
        print(f"输出: {output[:80]}{'...' if len(output) > 80 else ''}")
        print()
        # 下一步的输入 = 这一步的输出
        current_input = output

    return results

逐行解释:

  • prompt_chain 接收一个步骤列表和初始输入。每一步有自己的 system prompt 和模型实例。
  • 循环里做的事很简单:构建 messages → 调模型 → 存结果 → 把输出传给下一步。
  • 每一步都是独立的 LLM 调用,context 互不污染。第二步看不到第一步的对话历史,只看到第一步的输出结果。

旅游助手的三步 chain:

# 步骤 1:分析需求
model_analyze = MockLLM([
    '{"needs": ["天气信息", "茶相关景点", "交通方案"], "constraints": ["一天时间", "喜欢喝茶"], "city": "杭州"}'
])

# 步骤 2:查询信息(假设已经查了工具,把结果整理成结构化数据)
model_research = MockLLM([
    '{"weather": "阵雨转多云 18-23°C", "places": ["梅家坞", "中国茶叶博物馆"], "transport": "地铁+公交45分钟"}'
])

# 步骤 3:生成行程
model_plan = MockLLM([
    "根据查询结果,为您安排杭州一日游行程:\n\n上午:中国茶叶博物馆(室内,避雨)\n下午:梅家坞品龙井茶\n交通:地铁+公交约45分钟\n\n提醒:明天有阵雨,建议带伞。"
])

steps = [
    {
        "name": "需求分析",
        "system": "分析用户的旅游需求,提取关键信息。输出 JSON: {needs, constraints, city}",
        "model": model_analyze,
    },
    {
        "name": "信息查询",
        "system": "根据需求分析结果,整理查询到的信息。输出 JSON: {weather, places, transport}",
        "model": model_research,
    },
    {
        "name": "行程生成",
        "system": "根据查询到的信息,生成完整的一日游行程。输出纯文本。",
        "model": model_plan,
    },
]

results = prompt_chain(steps, "我喜欢喝茶,明天去杭州一日游")

输出:

── 需求分析 ──
输入: 我喜欢喝茶,明天去杭州一日游
输出: {"needs": ["天气信息", "茶相关景点", "交通方案"], "constraints": ["一天时间", "喜欢喝茶"...

── 信息查询 ──
输入: {"needs": ["天气信息", "茶相关景点", "交通方案"], "constraints": ["一天时间", "喜欢喝茶"...
输出: {"weather": "阵雨转多云 18-23°C", "places": ["梅家坞", "中国茶叶博物馆"], "transport"...

── 行程生成 ──
输入: {"weather": "阵雨转多云 18-23°C", "places": ["梅家坞", "中国茶叶博物馆"], "transport"...
输出: 根据查询结果,为您安排杭州一日游行程:

上午:中国茶叶博物馆(室内,避雨)
下午:梅家坞品龙...

Prompt Chaining 和 ReAct 的区别:

Prompt Chaining ReAct
谁决定下一步 你的代码(硬编码顺序) 模型(根据 observation)
步骤数量 固定 动态
每步的 context 只有上一步输出 累积所有历史
适用场景 流程固定、步骤明确 流程不确定、需要根据中间结果调整

6.1.2 Routing(路由)

不是所有用户输入都走同一条 chain。有时候你需要先判断输入的类型,然后走不同的处理路径。

规则路由: 用关键词或正则匹配。

import re
from dataclasses import dataclass
from typing import Callable


@dataclass
class Route:
    """一条路由规则。"""
    name: str
    pattern: str  # 正则表达式
    handler: Callable[[str], str]


class RuleRouter:
    """基于规则的路由器。"""

    def __init__(self, routes: list[Route], fallback: Callable[[str], str]):
        self.routes = routes
        self.fallback = fallback

    def route(self, user_input: str) -> str:
        for r in self.routes:
            if re.search(r.pattern, user_input):
                print(f"  匹配路由: {r.name} (pattern: {r.pattern})")
                return r.handler(user_input)
        print(f"  无匹配,走 fallback")
        return self.fallback(user_input)

逐行解释:

  • Route 把路由名、正则表达式和处理函数绑在一起。
  • RuleRouter.route 按顺序匹配正则,命中第一个就执行对应的 handler。都没命中走 fallback。
  • 简单粗暴,但对确定性高的场景够用了。
def handle_weather(text: str) -> str:
    return "调用天气 API → 返回天气信息"

def handle_itinerary(text: str) -> str:
    return "调用行程规划 chain → 返回行程"

def handle_general(text: str) -> str:
    return "通用对话 → 直接 LLM 回答"

router = RuleRouter(
    routes=[
        Route("天气查询", r"天气|气温|下雨|穿什么", handle_weather),
        Route("行程规划", r"行程|安排|一日游|规划", handle_itinerary),
    ],
    fallback=handle_general,
)

test_inputs = [
    "明天杭州天气怎么样?",
    "帮我安排一个杭州一日游",
    "杭州有什么好吃的?",
]

for inp in test_inputs:
    print(f"输入: {inp}")
    result = router.route(inp)
    print(f"结果: {result}\n")

输出:

输入: 明天杭州天气怎么样?
  匹配路由: 天气查询 (pattern: 天气|气温|下雨|穿什么)
结果: 调用天气 API → 返回天气信息

输入: 帮我安排一个杭州一日游
  匹配路由: 行程规划 (pattern: 行程|安排|一日游|规划)
结果: 调用行程规划 chain → 返回行程

输入: 杭州有什么好吃的?
  无匹配,走 fallback
结果: 通用对话 → 直接 LLM 回答

LLM 路由: 当用户输入不容易用正则匹配时,让模型来判断。

class LLMRouter:
    """用 LLM 判断路由。"""

    def __init__(self, model: MockLLM, routes: dict[str, Callable[[str], str]]):
        self.model = model
        self.routes = routes

    def route(self, user_input: str) -> str:
        route_names = list(self.routes.keys())
        system = (
            f"判断用户输入属于以下哪个类别,只返回类别名,不要解释。\n"
            f"类别: {', '.join(route_names)}"
        )
        messages = [
            Message(role="system", content=system),
            Message(role="user", content=user_input),
        ]
        category = self.model.complete(messages).strip()
        print(f"  LLM 判断类别: {category}")

        handler = self.routes.get(category)
        if handler is None:
            print(f"  类别 '{category}' 未注册,走第一个路由")
            handler = list(self.routes.values())[0]
        return handler(user_input)


model_router = MockLLM(["行程规划", "美食推荐"])

llm_router = LLMRouter(
    model=model_router,
    routes={
        "天气查询": handle_weather,
        "行程规划": handle_itinerary,
        "美食推荐": lambda x: "调用美食搜索 → 返回推荐",
        "通用对话": handle_general,
    },
)

print("输入: 周末想带孩子去杭州玩两天")
llm_router.route("周末想带孩子去杭州玩两天")
print()
print("输入: 杭州有什么好吃的?")
llm_router.route("杭州有什么好吃的?")

输出:

输入: 周末想带孩子去杭州玩两天
  LLM 判断类别: 行程规划
结果: 调用行程规划 chain → 返回行程

输入: 杭州有什么好吃的?
  LLM 判断类别: 美食推荐
结果: 调用美食搜索 → 返回推荐

两种路由的选择:

方式 延迟 成本 准确度 适用场景
规则路由 <1ms 0 取决于正则覆盖率 类别少、关键词明确
LLM 路由 100-500ms 一次 API 调用 高(语义理解) 类别多、用户输入多样

实践中常用混合方式:先用规则路由处理明确的情况,没命中的再走 LLM 路由。

6.1.3 Parallelization(并行化)

有些步骤之间没有依赖关系,可以同时执行。

import concurrent.futures
from dataclasses import dataclass


@dataclass
class ParallelTask:
    name: str
    system: str
    model: MockLLM
    input_text: str


def run_parallel(tasks: list[ParallelTask]) -> dict[str, str]:
    """并行执行多个 LLM 调用。"""
    results = {}

    def execute_one(task: ParallelTask) -> tuple[str, str]:
        messages = [
            Message(role="system", content=task.system),
            Message(role="user", content=task.input_text),
        ]
        output = task.model.complete(messages)
        return task.name, output

    with concurrent.futures.ThreadPoolExecutor(max_workers=len(tasks)) as executor:
        futures = [executor.submit(execute_one, t) for t in tasks]
        for future in concurrent.futures.as_completed(futures):
            name, output = future.result()
            results[name] = output
            print(f"  完成: {name}")

    return results


# 三个独立查询可以并行
tasks = [
    ParallelTask(
        name="天气",
        system="返回天气信息,JSON 格式",
        model=MockLLM(['{"weather": "阵雨转多云", "temp": "18-23°C"}']),
        input_text="杭州明天天气",
    ),
    ParallelTask(
        name="景点",
        system="返回景点列表,JSON 格式",
        model=MockLLM(['["梅家坞", "中国茶叶博物馆", "西湖"]']),
        input_text="杭州茶相关景点",
    ),
    ParallelTask(
        name="美食",
        system="返回美食推荐,JSON 格式",
        model=MockLLM(['["龙井虾仁", "西湖醋鱼", "叫花鸡"]']),
        input_text="杭州特色美食",
    ),
]

print("并行查询开始:")
results = run_parallel(tasks)
print(f"\n结果: {json.dumps(results, ensure_ascii=False, indent=2)}")

输出:

并行查询开始:
  完成: 天气
  完成: 景点
  完成: 美食

结果: {
  "天气": "{\"weather\": \"阵雨转多云\", \"temp\": \"18-23°C\"}",
  "景点": "[\"梅家坞\", \"中国茶叶博物馆\", \"西湖\"]",
  "美食": "[\"龙井虾仁\", \"西湖醋鱼\", \"叫花鸡\"]"
}

三个查询同时执行,总延迟约等于最慢的那一个,而不是三个的总和。

并行的前提条件:各任务之间没有数据依赖。如果"搜景点"需要先知道天气才能筛选室内/室外,那它们就不能并行。


6.2 Agent Loop

6.2.1 Workflow 和 Agent 的边界

Workflow 和 Agent 不是二选一,而是一个频谱:

← 代码控制多                                    模型控制多 →

Prompt Chain    Routing    Parallelization    ReAct    完全自主 Agent
   │              │              │              │           │
   │  步骤固定     │  分支固定     │  并行固定     │  步骤动态   │
   │  模型执行     │  模型分类     │  模型执行     │  模型决策   │

向左走的理由:可预测性高、调试简单、成本低。你知道它一定会执行三步,一步一步跑,出了问题知道是哪一步。

向右走的理由:灵活性高。用户的需求不固定,有时候不需要查天气(因为用户自己说了"明天晴天"),有时候需要多查一步("帮我看看附近有没有停车场")。

经验法则:能用 Workflow 就用 Workflow。只有当步骤无法提前确定时,才用 Agent Loop。

6.2.2 循环控制

Agent Loop 是一个 while 循环。任何 while 循环都需要终止条件,否则就是死循环。

四种终止控制:

import time
from dataclasses import dataclass, field


@dataclass
class LoopControl:
    """Agent Loop 的终止控制。"""
    max_steps: int = 10          # 最大步数
    timeout_s: float = 30.0      # 最大运行时间(秒)
    max_stall: int = 3           # 连续无进展的最大轮数
    max_token_budget: int = 8000 # token 预算上限

    # 内部状态
    _step: int = field(default=0, init=False)
    _start_time: float = field(default=0.0, init=False)
    _stall_count: int = field(default=0, init=False)
    _total_tokens: int = field(default=0, init=False)
    _last_action: str = field(default="", init=False)

    def start(self):
        self._start_time = time.time()
        self._step = 0
        self._stall_count = 0
        self._total_tokens = 0

    def record_step(self, action: str, tokens_used: int):
        """记录一步的执行。"""
        self._step += 1
        self._total_tokens += tokens_used

        # 检查是否 stall(和上一步做了同样的动作)
        if action == self._last_action:
            self._stall_count += 1
        else:
            self._stall_count = 0
        self._last_action = action

    def should_stop(self) -> tuple[bool, str]:
        """返回 (是否停止, 原因)。"""
        if self._step >= self.max_steps:
            return True, f"达到最大步数 {self.max_steps}"

        elapsed = time.time() - self._start_time
        if elapsed >= self.timeout_s:
            return True, f"超时 {self.timeout_s}s(已用 {elapsed:.1f}s)"

        if self._stall_count >= self.max_stall:
            return True, f"连续 {self.max_stall} 步无进展(重复动作: {self._last_action})"

        if self._total_tokens >= self.max_token_budget:
            return True, f"token 预算耗尽(已用 {self._total_tokens}/{self.max_token_budget})"

        return False, ""

逐段解释:

  • max_steps — 最基本的安全阀。防止无限循环。
  • timeout_s — 时间上限。有些工具调用可能很慢(外部 API 超时),纯靠步数控制不住。
  • max_stall — 检测"原地踏步"。如果模型连续 3 次调用同一个工具(通常意味着它在等一个不会改变的结果),强制停止。
  • max_token_budget — token 总预算。防止 context 膨胀失控。

演示:

ctrl = LoopControl(max_steps=5, timeout_s=10.0, max_stall=2, max_token_budget=3000)
ctrl.start()

# 模拟 5 步执行
demo_steps = [
    ("get_weather", 300),
    ("search_places", 450),
    ("search_places", 450),   # 和上一步相同!stall_count = 1
    ("search_places", 450),   # 还是相同!stall_count = 2 → 触发 stall 终止
    ("get_transport", 400),   # 不会到这一步
]

for action, tokens in demo_steps:
    ctrl.record_step(action, tokens)
    should_stop, reason = ctrl.should_stop()
    print(f"Step {ctrl._step}: action={action}, tokens={tokens}, "
          f"total_tokens={ctrl._total_tokens}, stall={ctrl._stall_count}")
    if should_stop:
        print(f"  → 停止: {reason}")
        break

输出:

Step 1: action=get_weather, tokens=300, total_tokens=300, stall=0
Step 2: action=search_places, tokens=450, total_tokens=750, stall=0
Step 3: action=search_places, tokens=450, total_tokens=1200, stall=1
Step 4: action=search_places, tokens=450, total_tokens=1650, stall=2
  → 停止: 连续 2 步无进展(重复动作: search_places)

第四步触发了 stall 检测。模型连续三次调 search_places,说明它卡住了——可能是工具返回的结果不够用,也可能是模型没理解结果。无论原因,继续循环没有意义。

6.2.3 Evaluator-Optimizer(评估-优化循环)

ReAct 循环的一个变体:在 Agent 生成结果后,用另一个 LLM(或同一个 LLM 换个 prompt)来评估结果质量,不合格就要求改进。

@dataclass
class EvalResult:
    """评估结果。"""
    passed: bool
    score: float       # 0.0 - 1.0
    feedback: str      # 改进建议


class EvaluatorOptimizer:
    """评估-优化循环。"""

    def __init__(
        self,
        generator_model: MockLLM,
        evaluator_model: MockLLM,
        generator_system: str,
        evaluator_system: str,
        max_rounds: int = 3,
        pass_threshold: float = 0.8,
    ):
        self.generator = generator_model
        self.evaluator = evaluator_model
        self.generator_system = generator_system
        self.evaluator_system = evaluator_system
        self.max_rounds = max_rounds
        self.pass_threshold = pass_threshold

    def run(self, task: str) -> str:
        current_input = task

        for round_num in range(1, self.max_rounds + 1):
            print(f"── Round {round_num} ──")

            # ── 生成 ──
            gen_messages = [
                Message(role="system", content=self.generator_system),
                Message(role="user", content=current_input),
            ]
            draft = self.generator.complete(gen_messages)
            print(f"生成: {draft[:100]}{'...' if len(draft) > 100 else ''}")

            # ── 评估 ──
            eval_messages = [
                Message(role="system", content=self.evaluator_system),
                Message(role="user", content=f"任务: {task}\n\n草稿:\n{draft}"),
            ]
            eval_raw = self.evaluator.complete(eval_messages)
            eval_result = json.loads(eval_raw)
            print(f"评估: score={eval_result['score']}, "
                  f"passed={eval_result['passed']}, "
                  f"feedback={eval_result['feedback']}")

            if eval_result["passed"]:
                print(f"\n通过! 最终输出:\n{draft}")
                return draft

            # ── 带着反馈重新生成 ──
            current_input = (
                f"原始任务: {task}\n\n"
                f"上一版草稿:\n{draft}\n\n"
                f"评估反馈: {eval_result['feedback']}\n\n"
                f"请根据反馈改进。"
            )

        print(f"\n达到最大轮数 {self.max_rounds},返回最后一版。")
        return draft

逐段解释:

  • generator_model 负责生成内容,evaluator_model 负责评估质量。可以是同一个模型不同 prompt,也可以是不同模型。
  • 每轮做三件事:(1) 生成草稿,(2) 评估草稿,(3) 不合格就带着反馈重新生成。
  • pass_threshold 定义"合格"的标准。评估分数达到阈值就通过。
  • max_rounds 防止无限优化。实践中 2-3 轮就够了——如果 3 轮还不合格,通常是任务本身有问题,不是优化不够。

旅游行程的评估-优化:

# 第一轮生成:缺少天气考虑
# 第二轮生成:根据反馈加了天气应对
generator = MockLLM([
    # Round 1 生成
    "杭州一日游行程:\n上午:西湖骑行环湖\n下午:梅家坞品茶\n傍晚:河坊街小吃",
    # Round 2 生成(改进版)
    ("杭州一日游行程(阵雨天气版):\n"
     "上午:中国茶叶博物馆(室内,避雨)\n"
     "下午:梅家坞品龙井茶(半室外,下午转多云)\n"
     "傍晚:河坊街小吃(有棚区域)\n"
     "备选:如全天大雨,改去浙江省博物馆+南宋御街室内商圈\n"
     "提醒:带伞,穿防滑鞋"),
])

evaluator = MockLLM([
    # Round 1 评估:不合格
    json.dumps({
        "passed": False,
        "score": 0.5,
        "feedback": "行程没有考虑天气因素。明天杭州有阵雨,全天室外活动(西湖骑行)不合适。需要加入天气应对方案和备选室内行程。"
    }, ensure_ascii=False),
    # Round 2 评估:合格
    json.dumps({
        "passed": True,
        "score": 0.9,
        "feedback": "行程合理,考虑了天气因素,有室内备选方案。"
    }, ensure_ascii=False),
])

eo = EvaluatorOptimizer(
    generator_model=generator,
    evaluator_model=evaluator,
    generator_system="你是杭州旅游行程规划师。",
    evaluator_system=(
        "评估旅游行程质量。检查:天气适配、时间合理性、备选方案。\n"
        '返回 JSON: {"passed": bool, "score": float, "feedback": str}'
    ),
    max_rounds=3,
    pass_threshold=0.8,
)

final = eo.run("明天杭州阵雨转多云,安排一日游行程(用户喜欢喝茶)")

输出:

── Round 1 ──
生成: 杭州一日游行程:
上午:西湖骑行环湖
下午:梅家坞品茶
傍晚:河坊街小吃
评估: score=0.5, passed=False, feedback=行程没有考虑天气因素。明天杭州有阵雨,全天室外活动(西湖骑行)不合适。需要加入天气应对方案和备选室内行程。

── Round 2 ──
生成: 杭州一日游行程(阵雨天气版):
上午:中国茶叶博物馆(室内,避雨)
下午:梅家坞品龙井茶(半室外,下午转多云...
评估: score=0.9, passed=True, feedback=行程合理,考虑了天气因素,有室内备选方案。

通过! 最终输出:
杭州一日游行程(阵雨天气版):
上午:中国茶叶博物馆(室内,避雨)
下午:梅家坞品龙井茶(半室外,下午转多云)
傍晚:河坊街小吃(有棚区域)
备选:如全天大雨,改去浙江省博物馆+南宋御街室内商圈
提醒:带伞,穿防滑鞋

第一轮的行程忽略了阵雨。评估器指出问题后,第二轮生成了天气适配版本,包含室内备选方案。

Evaluator-Optimizer 和 ReAct 的关系:

ReAct Evaluator-Optimizer
循环目的 收集信息做决策 迭代改进输出质量
外部工具 有(查天气、搜景点等) 通常没有(纯 LLM 评估)
评估方 没有独立评估——模型自己判断够不够 有独立评估器
典型轮数 3-8 轮 2-3 轮
适用场景 需要外部数据的任务 需要高质量文本输出的任务

两者可以组合:先用 ReAct 收集数据、生成初版行程,再用 Evaluator-Optimizer 打磨行程质量。


本章回顾

模式 控制方 适用场景 实现复杂度
Prompt Chaining 代码 固定步骤、前后依赖
Routing 代码+模型 输入类型不同、走不同路径
Parallelization 代码 多个独立子任务
ReAct 模型 步骤不确定、需要根据观察决定
Evaluator-Optimizer 代码+模型 需要迭代改进输出质量

选择策略:从左到右,先用最简单的能解决问题的模式。

  1. 步骤固定?→ Prompt Chaining
  2. 需要分支?→ Routing
  3. 子任务独立?→ Parallelization
  4. 步骤不确定?→ ReAct
  5. 输出质量要高?→ Evaluator-Optimizer

下一章: 第七章:模式总览 —— 当 ReAct 基线不够用时,按失败类型选择设计模式。