来源:市场资讯
(来源:DeepHub IMBA)
生产里真正有分量的工作流是能批量处理几千份保险理赔、跑完一周的销售触达节奏、跨系统对账等等的复杂工作,而这些是没办法塞进一次对话轮次里。因为他们的处理时间以天为单位,而不是秒。
一旦动手做这类长时运行的 agent,会遇到一个问题:大多数 agent 架构本质上是无状态的,每次交互都从数据库里把 context 重新拼回来。拼回来的过程中推理链丢了,软信号也丢了,那些让前一步决策看起来合理的置信度梯度也丢了。
Cloud Next 26 上 Google 宣布 Agent Runtime 开始支持长时运行 agent,状态可以维持七天。围绕这次发布,本文整理出五种把生产系统和脆弱 demo 区分开来的设计模式。
Fleet Orchestration
跨天工作流里最常见的崩盘方式就是 context 丢失。一个 agent 花四个小时处理 400 份文档,跑到第 301 份时报错——没有 checkpointing,整个流程要从零再来。
长时运行 agent 必须把执行状态持久化到一个安全的 cloud sandbox 里。agent 拥有 bash 命令权限和一个沙箱化的文件系统,所以可以把中间结果落到磁盘,把处理日志写下来这样故障时也能体面地恢复。
所以这里就需要把 agent 当成一个长期跑着的服务进程,而不是一次请求处理函数。就像写一条要吞掉几百万条记录的数据管道——进度要 checkpoint,部分失败要兜住,幂等性要保证。
每 30 份文档落一次盘,是在持久性和计算开销之间一个比较舒服的折中。出错时agent 直接从最近一次保存的状态接着跑。
# Pattern 1: Checkpoint-and-Resumedef process_documents(docs, checkpoint_file="state.json"):state = load_checkpoint(checkpoint_file) or {"processed": 0, "results": []}for i in range(state["processed"], len(docs)):try:result = agent.analyze(docs[i])state["results"].append(result)state["processed"] = i + 1# 每 30 份文档执行一次 checkpointif (i + 1) % 30 == 0:save_checkpoint(checkpoint_file, state)except Exception as e:save_checkpoint(checkpoint_file, state) # 在崩溃前保存raise ereturn state["results"]
Delegated Approval(Human-in-the-Loop)
几乎每个框架都在宣传自己支持 human-in-the-loop。但落到实现层面,多数版本都很糙:把状态序列化成 JSON,发一个 webhook 出去,剩下的就指望有人会去看。
问题会越积越多。
JSON 序列化会丢掉那些隐式的推理 context;通知则要和几十条其他告警一起抢眼球。等到几小时后终于有人回过头来响应,agent 又得反序列化、重建 context,并默认期间一切照旧——这是个非常脆弱的假设。
长时运行 agent 的处理方式不一样。走到审批节点时,它就在原地暂停。整套执行状态完整保留下来:推理链、工作记忆、tool call 历史、待执行的动作,全都不动。
关键在于资源效率。agent 等 24 小时让人来点 Approve,这 24 小时对 agent 是死时间,对人是工作时间。暂停期间 agent 不消耗任何计算;亚秒级冷启动让恢复几乎没有延迟代价。
# Pattern 2: Delegated Approval@agent.tooldef request_human_approval(action_plan: dict, context: str):"""暂停 agent 执行并请求人类审核。"""approval_id = db.create_approval_request(plan=action_plan,context=context,status="pending")# 将执行控制权交还给 orchestrator# 在 webhook 触发之前,agent 消耗 0 计算资源raise SuspendExecution(reason="human_approval_required",resume_webhook=f"/api/resume/{approval_id}")
Memory-Layered Context
跑七天的 agent 需要的远不止 session 状态。它得记住前几次 session 里的东西、几周前用户留下的偏好,以及那些任何单次对话都装不下的组织信息。
Memory-Layered Context 把长期存储和工作记忆拆开,由严格的策略来管。
分层记忆在这里就变成必需品。一边是 Memory Bank——长期记忆,动态地把对话整理沉淀下来;一边是 Memory Profiles——工作记忆,专门服务那些低延迟、高准确度的细节查询。
但其中藏着一个坑:记忆漂移。agent 从少数几次非典型交互里"学到"某个程序性捷径可以接受,接下来可能就把这个捷径泛化到所有场景。多个 agent 共用同一个记忆池时,数据泄漏也是个真实存在的风险。
不能放任 agent 随便往向量数据库里写东西。治理它们的方式应当和治理 microservices 一样,由三个核心组件支撑:
Agent Identity:相当于 agent 的 IAM,决定它能访问哪些 memory bank 和 tool。
Agent Registry:相当于服务发现,记录哪些 agent 在跑、它们的 prompt 版本是哪一版、当前处于什么执行状态。
Agent Gateway:相当于 API gateway,根据组织策略评估每一次请求——比如阻止 agent 把 PII 写入长期记忆。
# Pattern 3: Memory-Layered Contextclass AgentGateway:def __init__(self, identity_provider, policy_engine):self.iam = identity_providerself.policies = policy_enginedef write_to_memory_bank(self, agent_id, data):# 1. 验证身份if not self.iam.can_write(agent_id, "long_term_memory"):raise UnauthorizedError()# 2. 执行策略(例如 PII 脱敏)safe_data = self.policies.redact_pii(data)# 3. 写入受管理的存储vector_db.upsert(collection="memory_bank",metadata={"source_agent": agent_id},content=safe_data)
Ambient Processing
不是所有长时运行 agent 都和人类有交互。还有一类是 ambient 的:盯着事件、消费数据流、在后台直接动手,全程不需要任何用户提示。
Ambient Processing 让 agent 在无人值守状态下持续运行,按事件触发动作。
举个例子:一个直接挂在 Pub/Sub 流上的内容审核 agent。它会跑上好几天,处理源源不断进来的用户内容,自己维护一份关于趋势和模式的状态,只在确实需要时才升级给人类。
这里最关键的架构决定又回到治理上。内容策略不应该被硬编码进 agent 本身,而应该在 Agent Gateway 里定义。策略一变,只更新一次 Gateway,整个 fleet 的 ambient agent 立刻拿到新规则。
# Pattern 4: Ambient Processingasync def ambient_moderation_agent(pubsub_stream):"""持续运行,对事件做出反应而无需提示。"""async for event in pubsub_stream.listen("user_content"):# agent 自主评估内容analysis = await agent.evaluate(event.text)if analysis.flagged:if analysis.confidence > 0.95:# 高置信度时自动处理await api.ban_user(event.user_id)else:# 升级边缘案例await request_human_approval(action_plan={"action": "ban", "user": event.user_id},context=analysis.reasoning)
Fleet Orchestration
生产里很少出现一个 agent 单打独斗的情况。常见的形态是:一个 coordinator agent 把子任务分派给若干 specialist agent,每个 specialist 独立运行,时长各不相同。
Fleet Orchestration 用一个 coordinator 来统筹一群独立的 specialist agent。
拿销售开发流程举例:Coordinator Agent 把工作分给五个 specialist:Research Agent 负责收集每个 lead 的公开数据;Scoring Agent 按匹配度和意图信号给 lead 打分排序;Draft Agent 写出第一封个性化消息;Outreach Agent 选合适的渠道把消息发出去;Reporting Agent 收尾,把整轮跑完后的结果汇总。
每个 specialist 拥有独立的 Agent Identity,因此只能访问自己需要的资源;通过 Agent Gateway 走自己的策略校验;在 Agent Registry 里有独立条目。Coordinator Agent 维护全局状态,处理 specialist 之间的交接。
把每个 specialist 当成独立单元,一个直接的好处是更新可以分开做。Scoring Agent 的排序逻辑要改进,发新版本上去就行,不会牵连到 fleet 里的其他成员。
# Pattern 5: Fleet Orchestrationasync def coordinator_agent(lead_list):results = []for lead in lead_list:# 1. Research Agent — 收集 lead 的公开数据research = await fleet.call("research_agent", target=lead)# 2. Scoring Agent — 根据匹配度和意图信号对 lead 排名score = await fleet.call("scoring_agent", data=research)if score > 80:# 3. Draft Agent — 撰写个性化的首条消息draft = await fleet.call("draft_agent",context=research,tone="professional")# 4. Outreach Agent — 通过合适的渠道发送消息await fleet.call("outreach_agent",lead=lead,message=draft)results.append({"lead": lead, "score": score, "draft": draft})# 5. Reporting Agent — 总结整个运行过程await fleet.call("reporting_agent", summary=results)
总结
这五种模式反映的是 AI agent 在真实世界里运转方式的一次推进。它们也直接对应着 agentic AI 的下一次转向——agent 不再只是被动响应,而是能够持续存在、自我治理、规模化协同。
底层原则其实很简单:结构化、有状态、被治理的执行,比无状态的请求-响应循环是更基础。
把确定性的 checkpointing 和概率性的推理拆开;用原地暂停取代序列化成 JSON 再恢复;让记忆通过身份和策略来约束,而不是盲目信任 agent 的写入行为——做到这几点,工作流里的推理链就能被保留下来,必要时还能复原。
agent 本身就是进程。其余的都是脚手架。
生产级 AI 不是单轮里把 agent 调得多聪明,而是看它能否在很多轮、很多天、很多次交接之间保持可靠。把已知和推断区分清楚的团队,以及一开始就把治理嵌进系统而不是事后再补的团队,会决定公司未来怎么用 AI。
作者:Ana Bildea, PhD
热门跟贴