来源:市场资讯

(来源:DeepHub IMBA)

ReAct(Reason + Act)架构要解决的问题是开放式研究里最经典的问题。本文要做的是一个 Research Brief Agent:会上网搜索、抓取真实 URL、压缩证据,最终产出一份带真实引用的结构化简报。重点不在于功能,而在于 正确写法——不再依赖那种脆弱的 "Thought: / Action:" 字符串解析。

打开网易新闻 查看精彩图片

早期 ReAct 留下的问题

ReAct 论文最初证明了,让 LLM 在动手之前先把推理写出来,效果会明显更好。

那时候的实现可以说就是 prompt hack。给模型一段这样的提示:You have access to tools. You must use this format: Thought: [your thought], Action: [tool_name], Action Input: [tool_input].

模型吐回一段字符串,Python 用正则去抠工具名和参数,工具运行的结果再以 Observation: [result] 的形式拼回 prompt 里。

demo 阶段勉强能跑到了生产环境就问题成堆。模型对格式的幻觉源源不断:一会儿漏掉 Action Input: 前缀,一会儿调用一个根本不存在的工具,正则当场就崩。

打开网易新闻 查看精彩图片

2026 年的 ReAct:原生工具调用

这套写法早就被淘汰了,但Reason、Act、Observe 这三段核心节奏依然成立,只是执行模型完全换了一种思路。

现在的工具使用系统不再做字符串解析,而是原生的、结构化的 API tool calling。schema 校验由 LLM 提供方负责——OpenAI、Anthropic、Google 都是如此——严格性放在他们那一侧。

新的 ReAct 循环大致是这样:

  1. Reason:LLM 看一遍会话历史,判断还缺什么信息。

  2. Act:LLM 发出一段严格的 JSON tool call payload,例如 {"name": "search_web", "arguments": {"query": "react agent failures"}}。

  3. Observe:LangGraph 运行时执行工具,把带有结果的 ToolMessage 追加回 state。

循环一直跑到 LLM 觉得证据足够,然后输出一段普通的文本回复,而不是再发一个 tool call。

打开网易新闻 查看精彩图片

Research Brief Agent:State 与 Schema

动手开始写。第一件事是定义 state schema。

确定性 workflow 里的 state 通常是一组离散字段——raw_diff、has_critical_findings 之类。但开放式 ReAct 循环里,state 主要表现为一份 append-only 的消息账本。

只有消息还不够。引用也要追踪:不光要让 LLM 写出一段总结,还得拿到一份能在 UI 里渲染的具体引用列表。

from typing import Annotated, TypedDictfrom langchain_core.messages import BaseMessagefrom langgraph.graph.message import add_messagesimport operatorclass ResearchState(TypedDict):topic: str# 会话的核心账本messages: Annotated[list[BaseMessage], add_messages]# 在循环过程中累积起来的证据库citations: Annotated[list[dict[str, str]], operator.add]seen_urls: Annotated[list[str], operator.add]# 防止无限循环的控制变量step_count: intmax_steps: intstagnant_turns: intfinal_brief: str

注意 reducer 的作用,add_messages 让新消息追加而不是覆盖,operator.add 给 citations 和 URL 列表做的是同样的事。在循环里维护历史,靠的就是这两件小工具。

打开网易新闻 查看精彩图片

Search 与 Fetch

把图连起来之前得先有工具,一个常见错误是给 agent 一个返回完整原始 HTML 的工具——第一轮循环还没结束,上下文窗口就已经被冲爆。

下面两个普通的 HTTP 工具就够了:search_web 找候选链接,fetch_url 拉真正的正文。

import jsonimport urllib.requestimport urllib.parsefrom langchain_core.tools import toolimport reimport htmldef _http_get(url: str, timeout: int = 12) -> str:with httpx.Client(timeout=timeout, headers={"User-Agent": "Mozilla/5.0"}) as client:response = client.get(url)response.raise_for_status()return response.text@tooldef search_web(query: str, max_results: int = 5) -> str:"""Search the web via DuckDuckGo Instant Answer API. Returns JSON list."""try:params = urllib.parse.urlencode({"q": query, "format": "json"})payload = _http_get(f"https://api.duckduckgo.com/?{params}")data = json.loads(payload)# ...(提取 URL 与 snippet 的解析逻辑)...# 简洁起见,假设这里返回的 JSON 字符串结构为:# [{"url": "...", "title": "...", "snippet": "..."}]return json.dumps(results[:max_results])except Exception as exc:return json.dumps([{"url": "", "title": "error", "snippet": str(exc)}])@tooldef fetch_url(url: str) -> str:"""Fetch and compress a URL into JSON: {url,title,snippet}."""try:raw_html = _http_get(url)# 剥掉 script、style 与 HTML 标签,留下纯文本no_script = re.sub(r"]*>.*?", " ", raw_html, flags=re.IGNORECASE | re.DOTALL)no_style = re.sub(r"]*>.*?", " ", no_script, flags=re.IGNORECASE | re.DOTALL)text = re.sub(r"<[^>]+>", " ", no_style)clean_text = html.unescape(re.sub(r"\s+", " ", text)).strip()# 只取前 2000 个字符,省一点上下文return json.dumps({"url": url, "title": "Fetched Page", "snippet": clean_text[:2000]})except Exception as exc:return json.dumps({"url": url, "title": "error", "snippet": str(exc)})

错误处理的写法值得多看一眼:httpx 抛超时,捕获之后把异常转成 JSON 字符串返回。一个失效链接不应当能整死一整张图的执行。错误以字符串形式回流,LLM 把它读作一次 observation,自然会去尝试别的链接。

打开网易新闻 查看精彩图片

图的形状:比你想的小

ReAct 图的结构出乎意料地简单——本质上只要两个主节点:一个让 LLM 推理,一个执行工具。

节点和路由是这样写的:

from langgraph.graph import StateGraph, START, ENDfrom langgraph.prebuilt import ToolNodefrom langchain_core.messages import SystemMessage, AIMessageasync def reason_node(state: ResearchState, llm_with_tools) -> dict:system = SystemMessage(content=("You are a research brief agent. Use tools to gather evidence and cite URLs. ""Prefer search first, then fetch_url for top links. ""When you have enough evidence, return a concise final brief. ""Never fabricate citations."# LLM 看一遍完整消息历史,再决定下一步动作response = await llm_with_tools.ainvoke([system] + state["messages"])return {"messages": [response],"step_count": state["step_count"] + 1,def route_after_reason(state: ResearchState) -> str:# 硬停止条件if state["step_count"] >= state["max_steps"]:return "finalize"last_message = state["messages"][-1]# LLM 决定调用工具,则导向工具执行节点if isinstance(last_message, AIMessage) and last_message.tool_calls:return "tools"# 没有 tool call,说明 LLM 认为研究阶段结束return "finalize"

reason_node 是认知工作真正发生的地方,整段消息历史会一次性喂给模型;信息不够时,模型会回一条带 tool_calls 的 AIMessage。

路由函数 route_after_reason 检查这条消息:包含 tool calls 就把图切到 tools 节点。LangGraph 提供了一个预置的 ToolNode,它会自动解包参数、执行 Python 函数,并产出 ToolMessage 形式的 observation。

拦截器:清洗 Observation

如果让 ToolNode 把结果直接倒回 state 就此打住,等于错失了一个机会。引用应该在过程中就抽出来,不必等到写最终简报时再让 LLM 凭记忆复盘每一个 URL。

工具节点之后再加一个 sanitize 节点。

import jsondef sanitize_observation_node(state: ResearchState) -> dict:# 取最近的 tool messagesrecent_messages = state["messages"][-3:]new_citations = []new_urls = []for msg in recent_messages:if getattr(msg, "type", "") != "tool":continuetry:# 解析工具返回的 JSON 字符串data = json.loads(str(msg.content))items = data if isinstance(data, list) else [data]for item in items:url = item.get("url", "").strip()if url and url not in state["seen_urls"]:new_urls.append(url)new_citations.append({"url": url,/* Lines 230-231 omitted */"snippet": item.get("snippet", "")except Exception:pass# 跟踪 agent 是否陷入找不到任何新内容的状态stagnant_turns = state["stagnant_turns"] + 1 if not new_urls else 0return {"citations": new_citations,"seen_urls": new_urls,"stagnant_turns": stagnant_turns,}

这个节点拦下原始的工具输出做一次解析,更新结构化的 citations 账本,顺手也维护 stagnant_turns。Agent 连续三轮没找到任何新 URL,那就是卡住了。

持久化与可重放

一个开放式 agent 的最大一个问题是无法收敛。你问的是 2026 年的实践,它一头扎进 2018 年的 API 文档——这种情况发生时,你需要清楚地知道是哪些搜索结果把它带歪的。

State 不过是一份消息列表,不持久化的话,脚本一退出就什么都没了。前面几篇文章里造好的 Postgres checkpointer 现在派上用场。

import hashlibfrom langgraph.checkpoint.postgres.aio import AsyncPostgresSaverfrom psycopg_pool import AsyncConnectionPooldef topic_to_thread_id(topic: str) -> str:digest = hashlib.sha1(topic.encode("utf-8")).hexdigest()[:10]return f"research-{digest}"# 在 run 函数内部:db_uri = "postgresql://postgres:postgres@localhost:5432/postgres"async with AsyncConnectionPool(conninfo=db_uri, max_size=10) as pool:checkpointer = AsyncPostgresSaver(pool)await checkpointer.setup()app = graph.compile(checkpointer=checkpointer)thread_id = topic_to_thread_id(topic)config = {"configurable": {"thread_id": thread_id}}# 运行图……

把 topic 哈希成 thread_id,相同主题二次运行时就会从中断点续上,而不是从零开跑。还有一个更现实的好处:整个 ReAct 循环会在数据库里留下完整可查询的记录。

把图拼起来

各部分拼起来之后,整个 workflow 看起来很顺:agent 推理,工具执行,state 被清洗,循环再来一遍。条件触发后,落到 finalization 阶段。

async def finalize_node(state: ResearchState, llm) -> dict:# 从清洗过的账本里组一段干净的证据块citation_block = "\n".join(f"- {c['title']} | {c['url']} | {c['snippet'][:180]}"for c in state["citations"][:8]prompt = (f"Topic: {state['topic']}\n\n""Write a concise research brief in 8-12 bullets.\n""Each bullet should be evidence-backed where possible.\n""Then add a short 'Citations' section with URL list only.\n\n"f"Evidence:\n{citation_block if citation_block else '- No citations collected.'}"response = await llm.ainvoke([HumanMessage(content=prompt)])return {"final_brief": str(response.content),# 构建执行图def build_graph(tools, llm):llm_with_tools = llm.bind_tools(tools)tool_node = ToolNode(tools)builder = StateGraph(ResearchState)builder.add_node("reason", lambda state: reason_node(state, llm_with_tools))builder.add_node("tools", tool_node)builder.add_node("sanitize", sanitize_observation_node)builder.add_node("finalize", lambda state: finalize_node(state, llm))builder.add_edge(START, "reason")builder.add_conditional_edges("reason",route_after_reason,{"tools": "tools", "finalize": "finalize"}builder.add_edge("tools", "sanitize")# 用于检查是否停滞的另一条条件边def route_after_sanitize(state):if state["step_count"] >= state["max_steps"] or state["stagnant_turns"] >= 3:return "finalize"return "reason"builder.add_conditional_edges("sanitize",route_after_sanitize,{"reason": "reason", "finalize": "finalize"}builder.add_edge("finalize", END)return builder

第 100 次工具调用问题

这段代码现在跑起来,多数时候表现得很漂亮:搜索、抓几个页面、写出一份扎实的简报。

但是有时候它就根本停不下来。

LLM 会判定手头的信息还差点意思,于是换个略不一样的关键词再搜一次,再抓一个页面,再搜一次。每次新增的 ToolMessage 都在拉长上下文窗口,agent 渐渐失焦被自己堆出来的海量文本搞混了。

循环不加约束的后果非常具体——一个 agent 可以心安理得地连发 100 次 tool call,把 API 预算烧光,最后在超过模型最大 token 限制时直接挂掉。

路由里那个 max_steps 检查只是基础款;stagnant_turns 用来抓住卡住的瞬间。生产环境要的远不止这些:实时监控 token 用量、为特定工具加熔断器(circuit breaker)、给 agent 留一个 "逃生舱" 在彻底找不到答案时去找人介入——一整套约束都得跟上。

打开网易新闻 查看精彩图片

完整可运行代码

下面是 Research Brief Agent 的完整脚本。保存好,配置 GEMINI_API_KEY,本地起一个 Postgres 实例,跑起来看效果。

import argparseimport asyncioimport hashlibimport htmlimport jsonimport operatorimport osimport refrom urllib.parse import urlencodefrom typing import Annotated, Any, TypedDictfrom dotenv import load_dotenvfrom langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessagefrom langchain_core.tools import toolfrom langchain_google_genai import ChatGoogleGenerativeAIfrom langgraph.checkpoint.postgres.aio import AsyncPostgresSaverfrom langgraph.graph import END, START, StateGraphfrom langgraph.graph.message import add_messagesfrom langgraph.prebuilt import ToolNodefrom psycopg_pool import AsyncConnectionPoolimport httpxload_dotenv()def _http_get(url: str, timeout: int = 12) -> str:with httpx.Client(timeout=timeout, headers={"User-Agent": "Mozilla/5.0"}) as client:response = client.get(url)response.raise_for_status()return response.textdef _clean_text_from_html(raw_html: str) -> tuple[str, str]:title_match = re.search(r"]*>(.*?)", raw_html, flags=re.IGNORECASE | re.DOTALL)title = html.unescape(title_match.group(1).strip()) if title_match else "Untitled"no_script = re.sub(r"]*>.*?", " ", raw_html, flags=re.IGNORECASE | re.DOTALL)no_style = re.sub(r"]*>.*?", " ", no_script, flags=re.IGNORECASE | re.DOTALL)text = re.sub(r"<[^>]+>", " ", no_style)text = html.unescape(re.sub(r"\s+", " ", text)).strip()return title, text@tooldef search_web(query: str, max_results: int = 5) -> str:"""Search the web via DuckDuckGo Instant Answer API. Returns JSON list."""try:params = urlencode("q": query,"format": "json","no_html": "1","no_redirect": "1","skip_disambig": "1",payload = _http_get(f"https://api.duckduckgo.com/?{params}")data = json.loads(payload)results: list[dict[str, str]] = []abstract_url = (data.get("AbstractURL") or "").strip()if abstract_url:results.append("url": abstract_url,"title": (data.get("Heading") or "DuckDuckGo Abstract").strip(),"snippet": (data.get("AbstractText") or "").strip(),def _collect_topics(items: list[dict[str, Any]]) -> None:for item in items:if "FirstURL" in item:results.append({/* Lines 437-440 omitted */}elif "Topics" in item and isinstance(item["Topics"], list):_collect_topics(item["Topics"])related = data.get("RelatedTopics")if isinstance(related, list):_collect_topics(related)deduped: dict[str, dict[str, str]] = {}for item in results:if item["url"]:deduped[item["url"]] = itemtrimmed = list(deduped.values())[: max(1, max_results)]return json.dumps(trimmed)except Exception as exc:return json.dumps([{"url": "", "title": "search_error", "snippet": str(exc)}])@tooldef fetch_url(url: str) -> str:"""Fetch and compress a URL into JSON: {url,title,snippet}."""try:raw_html = _http_get(url)title, text = _clean_text_from_html(raw_html)return json.dumps({"url": url, "title": title, "snippet": text[:900]})except Exception as exc:return json.dumps({"url": url, "title": "fetch_error", "snippet": str(exc)})class ResearchState(TypedDict):topic: strmessages: Annotated[list[BaseMessage], add_messages]citations: Annotated[list[dict[str, str]], operator.add]seen_urls: Annotated[list[str], operator.add]step_count: intmax_steps: intstagnant_turns: intfinal_brief: strdef parse_citations_from_tool_messages(messages: list[BaseMessage]) -> tuple[list[dict[str, str]], list[str]]:citations: list[dict[str, str]] = []seen_urls: list[str] = []for message in messages:if getattr(message, "type", "") != "tool":continuecontent = getattr(message, "content", "")if not isinstance(content, str):continuetry:parsed = json.loads(content)except Exception:continuecandidates = parsed if isinstance(parsed, list) else [parsed]for item in candidates:if not isinstance(item, dict):continueurl = str(item.get("url", "")).strip()if not url:continueseen_urls.append(url)citations.append("url": url,/* Lines 511-512 omitted */"snippet": str(item.get("snippet", "")).strip(),return citations, seen_urlsdef compact_unique_citations(citations: list[dict[str, str]]) -> list[dict[str, str]]:deduped: dict[str, dict[str, str]] = {}for c in citations:url = c.get("url", "")if not url:continueif url not in deduped:deduped[url] = creturn list(deduped.values())async def reason_node(state: ResearchState, llm_with_tools: Any) -> dict[str, Any]:system = SystemMessage(content=("You are a research brief agent. Use tools to gather evidence and cite URLs. ""Prefer search first, then fetch_url for top links. ""When you have enough evidence, return a concise final brief. ""Never fabricate citations."response = await llm_with_tools.ainvoke([system] + state["messages"])return {"messages": [response],"step_count": state["step_count"] + 1,def route_after_reason(state: ResearchState) -> str:if state["step_count"] >= state["max_steps"]:return "finalize"last = state["messages"][-1]if isinstance(last, AIMessage) and last.tool_calls:return "tools"return "finalize"def sanitize_observation_node(state: ResearchState) -> dict[str, Any]:citations, seen = parse_citations_from_tool_messages(state["messages"][-3:])already_seen = set(state["seen_urls"])new_seen = [u for u in seen if u and u not in already_seen]new_citations = [c for c in citations if c["url"] in set(new_seen)]new_url_count = len(new_seen)stagnant_turns = state["stagnant_turns"] + 1 if new_url_count == 0 else 0return {"citations": new_citations,"seen_urls": new_seen,"stagnant_turns": stagnant_turns,def route_after_sanitize(state: ResearchState) -> str:if state["step_count"] >= state["max_steps"]:return "finalize"if state["stagnant_turns"] >= 3:return "finalize"return "reason"async def finalize_node(state: ResearchState, llm: Any) -> dict[str, Any]:citation_block = "\n".join(f"- {c['title']} | {c['url']} | {c['snippet'][:180]}"for c in compact_unique_citations(state["citations"])[:8]prompt = (f"Topic: {state['topic']}\n\n""Write a concise research brief in 8-12 bullets.\n""Each bullet should be evidence-backed where possible.\n""Then add a short 'Citations' section with URL list only.\n\n"f"Evidence:\n{citation_block if citation_block else '- No citations collected.'}"response = await llm.ainvoke([HumanMessage(content=prompt)])return {"final_brief": str(response.content),"messages": [AIMessage(content=f"Final brief generated with {len(state['citations'])} citations.")],def build_graph(tools: list[Any], llm: Any):llm_with_tools = llm.bind_tools(tools)tool_node = ToolNode(tools)builder = StateGraph(ResearchState)async def _reason(state: ResearchState) -> dict[str, Any]:return await reason_node(state, llm_with_tools)async def _finalize(state: ResearchState) -> dict[str, Any]:return await finalize_node(state, llm)builder.add_node("reason", _reason)builder.add_node("tools", tool_node)builder.add_node("sanitize", sanitize_observation_node)builder.add_node("finalize", _finalize)builder.add_edge(START, "reason")builder.add_conditional_edges("reason", route_after_reason, {"tools": "tools", "finalize": "finalize"})builder.add_edge("tools", "sanitize")builder.add_conditional_edges("sanitize", route_after_sanitize, {"reason": "reason", "finalize": "finalize"})builder.add_edge("finalize", END)return builderdef topic_to_thread_id(topic: str) -> str:digest = hashlib.sha1(topic.encode("utf-8")).hexdigest()[:10]return f"research-{digest}"async def run(topic: str, max_steps: int) -> None:db_uri = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/postgres")model_name = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")llm = ChatGoogleGenerativeAI(model=model_name, temperature=0.1)tools = [search_web, fetch_url]graph = build_graph(tools=tools, llm=llm)async with AsyncConnectionPool(conninfo=db_uri, max_size=10) as pool:checkpointer = AsyncPostgresSaver(pool)await checkpointer.setup()app = graph.compile(checkpointer=checkpointer)thread_id = topic_to_thread_id(topic)config = {"configurable": {"thread_id": thread_id}}initial: ResearchState = {"topic": topic,"messages": [HumanMessage(content=f"Research this topic: {topic}")],"citations": [],"seen_urls": [],"step_count": 0,"max_steps": max_steps,"stagnant_turns": 0,"final_brief": "",result = await app.ainvoke(initial, config=config)final_brief = result.get("final_brief", "").strip()citations = compact_unique_citations(result.get("citations", []))print("\n" + "=" * 70)print(f"Thread ID: {thread_id}")print(f"Model: {model_name}")print(f"Tools loaded: {[getattr(t, 'name', str(t)) for t in tools]}")print("=" * 70)print("\nResearch Brief:\n")print(final_brief or "(No final brief generated.)")print("\nCitations:\n")if citations:for idx, c in enumerate(citations, start=1):print(f"{idx}. {c['url']}")else:print("No citations captured.")def parse_args() -> argparse.Namespace:parser = argparse.ArgumentParser(description="ReAct research agent (LangGraph + Postgres).")parser.add_argument("--topic",type=str,default="Practical use cases of ReAct agents in 2026 and common failure modes",help="Research topic to investigate.",parser.add_argument("--max-steps",type=int,default=10,help="Maximum ReAct loop steps before forced finalization.",return parser.parse_args()if __name__ == "__main__":args = parse_args()asyncio.run(run(topic=args.topic, max_steps=args.max_steps))

总结

到这一步,一个能跑的 ReAct 循环已经具备:原生 tool calling,state 持久化,脆弱的字符串解析换成了结构化的证据收集。

但成本失控的问题还存在。后续我们会把这个循环锁紧——硬性约束、token 预算、循环检测算法一起上,把这份原型变成可以放到生产环境长期运行的模式。

by Anubhav