DeepAgents - Human in the loop
在开始之前,先理清几个关键概念:
| 概念 | 说明 |
|---|---|
| interrupt(中断) | 当 Agent 准备调用某个被监控的 tool 时,HumanInTheLoopMiddleware调用 LangGraph 的interrupt()暂停图执行,并抛出包含action_requests和review_configs的请求 |
| checkpoint(检查点) | 中断时图状态会被持久化。必须配置 checkpointer,否则中断后无法恢复。生产环境建议用AsyncPostgresSaver,测试用InMemorySaver |
version="v2" | LangGraph 1.0 的 v2 模式,ainvoke()返回GraphOutput对象(含.interrupts属性),astream()的updates流中会出现__interrupt__事件 |
| Command(resume=) | 用户做出决策后,用Command(resume={"decisions": [...]})从断点恢复执行 |
| Decision(决策) | 四种类型:approve(批准)、reject(拒绝并反馈)、edit(修改参数后执行)、respond(人类直接回答,跳过 tool 执行) |
执行生命周期
用户提问 → Agent 调用 LLM 生成回复 → LLM 决定调用 tool(如 execute_shell_command) → after_model 钩子:检查 tool 是否在 interrupt_on 中 → 是:构建 HITLRequest → interrupt() → 暂停 ⌛ → 否:继续执行 → 人类做出决策(approve / reject / edit / respond) → 恢复执行 → 执行/拒绝 tool → LLM 生成最终回复 → 返回流程逻辑
以 Chainlit 聊天应用为交互载体,消息处理流程如下:
- 用户在聊天页面发送消息(如"检查下系统负载")
- Agent 调用 LLM 生成回复,LLM 决定调用
execute_shell_command HumanInTheLoopMiddleware检测到该 tool 在interrupt_on列表中,触发中断- Chainlit 应用检测到中断,向用户展示审批提示
- 用户回复
批准/拒绝,应用用Command(resume=)恢复执行 - Agent 根据决策执行或拒绝 tool,最终返回结果给用户
配置中断
首先需要在创建 Agent 时配置HumanInTheLoopMiddleware:
from deepagents import create_deep_agent from langchain.agents.middleware import HumanInTheLoopMiddleware agent = create_deep_agent( model=llm, tools=[execute_shell_command], checkpointer=checkpointer, # 必须配置! system_prompt="你是一位智能助手...", middleware=[ HumanInTheLoopMiddleware( interrupt_on={ # 对 execute_shell_command 进行审批 "execute_shell_command": { "allowed_decisions": ["approve", "reject"] } } ), ], )interrupt_on是一个字典,key 为 tool 名称,value 的可配置项:
True— 允许所有四种决策(approve / edit / reject / respond)False— 不拦截该 tool(等同于不写){"allowed_decisions": [...]}— 只允许指定决策类型- 还可以配置
when谓词按参数条件判断是否拦截、description自定义中断提示文本
invoke 模式中的实现
v2 模式下的ainvoke()返回GraphOutput对象,可通过.interrupts属性直接获取中断数据,不需要去查 state。
检测中断
resp = await agent.ainvoke( input={"messages": [HumanMessage(content=query)]}, config=config, version="v2", ) if resp.interrupts: # 存在中断,resp.interrupts 是 Interrupt 对象的元组 interrupt = resp.interrupts[0] # interrupt.value 是 HITLRequest,包含 action_requests 和 review_configs print(interrupt.value["action_requests"])恢复中断
用户做出决策后,用Command(resume=)恢复:
from langgraph.types import Command await agent.ainvoke( Command(resume={ "decisions": [{"type": "approve"}] # 或 {"type": "reject", "message": "..."} }), config=config, # 必须用同一个 thread_id version="v2", )关键:如何分辨"新消息"还是"中断恢复"
在聊天应用中,用户发来的每条消息都走同一个@cl.on_message处理函数。用户说"检查负载"和回复"批准"都只是文本。解决方法是——调用前先检查是否有待处理的中断:
# 检查当前会话是否有待处理的中断 state = await agent.aget_state(config) if state.next: # 有待处理中断 → 本次消息是审批回复,构建 resume 命令 cmd = Command(resume={"decisions": [{"type": "approve"}]}) await agent.ainvoke(cmd, config=config, version="v2") else: # 无中断 → 正常对话 resp = await agent.ainvoke( {"messages": [HumanMessage(content=query)]}, config=config, version="v2" )state.next不为空表示图执行被暂停了(有中断等待处理)。
stream 模式中的实现
流式模式需要用stream_mode=["messages", "updates"](官方推荐同时开启两种流):
messages流:获取 LLM 的 token 级输出updates流:检测中断事件__interrupt__
async for chunk in agent.astream( input=input_data, stream_mode=["messages", "updates"], version="v2", config=config, ): if chunk["type"] == "messages": msg, _meta = chunk["data"] # msg 是 AIMessageChunk,包含 content 和 tool_calls if isinstance(msg, AIMessageChunk) and msg.content: yield extract_text(msg) # 流式输出文本 elif chunk["type"] == "updates": if "__interrupt__" in chunk["data"]: interrupt = chunk["data"]["__interrupt__"][0] yield format_question(interrupt) # 输出审批问题stream 模式的恢复与 invoke 类似——在调用astream()之前同样要先检查state.next来判断是正常对话还是中断恢复。
完整示例
下面是核心代码。
checkpointer和llm的配置函数、日志模块等非核心代码省略。
Agent 封装(internal/agent/agent.py核心部分)
# --- 审批关键词匹配 --- _APPROVE_KEYWORDS = frozenset( {"yes", "accept", "approve", "ok", "是", "允许", "同意", "批准"} ) def _parse_decision(query: str) -> str: return "approve" if query.strip().lower() in _APPROVE_KEYWORDS else "reject" def _build_resume_command(decision_type: str, actions_count: int) -> Command: item = {"type": decision_type} if decision_type == "reject": item["message"] = "user rejected this action" return Command(resume={"decisions": [item for _ in range(actions_count)]}) def _extract_text(message) -> str: """从消息中提取纯文本(兼容 str 和 list[dict] 两种 content 格式)。""" if not message or not hasattr(message, "content"): return "" content = message.content if isinstance(content, str): return content if isinstance(content, list): return "".join( b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text" ) return "" def _format_interrupt_question(interrupt) -> str: """将中断数据格式化为用户的审批问题。""" action_requests = interrupt.value.get("action_requests", []) review_configs = interrupt.value.get("review_configs", []) allowed = ( review_configs[0].get("allowed_decisions", ["approve", "reject"]) if review_configs else ["approve", "reject"] ) lines = [] for req in action_requests: lines.append( "Do you approve me to execute this action?\n\n" f"- name: {req['name']}\n" f"- args: `{req['args']}`\n" ) lines.append(f"Input your decision: {', '.join(allowed)}\n") return "\n".join(lines) class AIAgent: # ... __init__, _init_deep_agent, _init_tools 省略 ... async def _has_pending_interrupt(self, config: RunnableConfig) -> bool: state = await self._agent.aget_state(config) return bool(state.next) # --- invoke 模式 --- async def ainvoke(self, query: str, config: RunnableConfig) -> str: if not self._agent: await self._init_deep_agent() # 优先处理中断恢复 if await self._has_pending_interrupt(config): state = await self._agent.aget_state(config) actions_count = len( state.interrupts[0].value["action_requests"] ) decision = _parse_decision(query) cmd = _build_resume_command(decision, actions_count) await self._agent.ainvoke(cmd, config=config, version="v2") # 恢复后取最新消息 state = await self._agent.aget_state(config) if state.values and "messages" in state.values: return _extract_text(state.values["messages"][-1]) return "Oops, something went wrong." # 正常对话 resp = await self._agent.ainvoke( input={"messages": [HumanMessage(content=query)]}, config=config, version="v2", ) if resp.interrupts: return _format_interrupt_question(resp.interrupts[0]) return _extract_text(resp.value["messages"][-1]) # --- stream 模式 --- async def astream(self, query: str, config: RunnableConfig): if not self._agent: await self._init_deep_agent() # 判断是中断恢复还是正常对话 state = await self._agent.aget_state(config) if state.next: actions_count = len( state.interrupts[0].value["action_requests"]