【LangGraph】Human

    正在检查是否收录...

借助LangGraph的Human in the loop,实现一个命令行交互的程序,AI执行shell命令前都要用户同意。

前言

看langgraph官方文档感觉human in the loop貌似还挺简单的,但实际上手时,那文档看得我云里雾里的。更详细的Guides和Reference,恕我能力有限,悲摧的也没看懂。作为试验,我想做一个功能:本地执行shell命令,每次执行前都要用户确认。左看官方文档, 右去西天请ChatGPT老祖。ChatGPT说得头头是道,Copilot也反复调试,但就是不能用。就这。。。看来碰到新东西AI就十分拉胯。最终,认真看了半天文档,没借助GPT,总算捣鼓出来一个最简版。

自从AI能力越来越强,大多时候自己更习惯直接让AI帮忙解决问题,越来越懒得看文档。自己找饭吃的能力还是得留着,不能光靠AI喂饭。

运行效果

试验嘛,交互就是命令行了。效果大概这样

$ python custom_workflow.py AI助手已启动,输入 'quit'、'exit' 或 'q' 退出程序 User: 今天的日期是什么 Assistant: 今天的日期是 2025-09-03。 User: 合肥的天气怎么样 Assistant: 合肥的天气总是阳光明媚! User: 查看下本地内存占用 Assistant: Do you approve executing this command: free -h? Please answer 'yes' or 'no'. User: yes Assistant: 当前本地内存占用情况如下 total used free shared buff/cache available 内存: 62Gi 10Gi 46Gi 157Mi 6.5Gi 52Gi 交换: 3.8Gi 0B 3.8Gi User: disk呢? Assistant: Do you approve executing this command: df -h? Please answer 'yes' or 'no'. User: yes Assistant: 当前磁盘使用情况如下: 文件系统 大小 已用 可用 已用% 挂载点 udev 32G 0 32G 0% /dev tmpfs 6.3G 1.8M 6.3G 1% /run /dev/mapper/debian-root 234G 29G 194G 13% / tmpfs 32G 37M 32G 1% /dev/shm efivarfs 128K 40K 84K 32% /sys/firmware/efi/efivars tmpfs 5.0M 12K 5.0M 1% /run/lock tmpfs 1.0M 0 1.0M 0% /run/credentials/systemd-journald.service tmpfs 32G 49M 32G 1% /tmp /dev/nvme1n1p1 989M 256M 666M 28% /boot /dev/mapper/debian-home 676G 196G 446G 31% /home /dev/nvme0n1p1 300M 39M 262M 13% /boot/efi tmpfs 6.3G 4.1M 6.3G 1% /run/user/1000 User: 非常好 Assistant: 谢谢!如果您有其他问题或需要进一步的帮助,请随时告诉我。😊 User: quit Goodbye! 

Code

注释写得够详细的了,具体可以直接看注释。LLM用的是阿里千问,注意替换成自己的。

checkpointer用的是内存,在生产环境,可以把checkpointer换成sqlite、postgres、redis等。

log就是个写日志文件的模块,不输出到控制台,之前调试的时候用来发给LLM做诊断,比较简单就不贴了。

在python命令行交互程序中,最好引用下readline模块,不然输入中文会碰到退格键没法正常用的问题,而且方向键也没法用。

""" Human in the loop 示例, 每当AI需要执行shell命令时, 都需要经过用户确认 """ import os import readline # 引入readlint模块以增强命令行输入体验。Linux环境的Python标准库内置 from datetime import datetime import subprocess import traceback from langchain_core.runnables import RunnableConfig from langchain_core.tools import tool from langchain_core.messages import HumanMessage from langchain_openai import ChatOpenAI from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import END, START, StateGraph, MessagesState from langgraph.graph.state import CompiledStateGraph from langgraph.prebuilt import ToolNode, tools_condition from langgraph.types import Command, interrupt from langgraph.prebuilt import create_react_agent # 自定义一个简单的文件型日志记录器 from log import logger # 设置API密钥 os.environ["OPENAI_API_KEY"] = "" # 初始化语言模型 llm = ChatOpenAI( model="qwen-plus", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) # 定义工具函数 @tool def get_date() -> str: """获取今天的日期。 Returns: str: 当前日期,格式为 YYYY-MM-DD """ logger.info("Getting date") return datetime.now().strftime("%Y-%m-%d") @tool def get_weather(city: str) -> str: """获取指定城市的天气信息。 Args: city (str): 城市名称 Returns: str: 天气信息描述 """ logger.info("Getting weather") return f"It's always sunny in {city}!" @tool def execute_command(command: str) -> str: """本地执行shell命令, 每次执行前需要用户确认 Args: command (str): 要执行的命令 Returns: str: 命令执行结果或拒绝信息 """ # 使用interrupt函数暂停执行并请求用户确认 # interrupt会将控制权交还给用户,等待用户输入 decision = interrupt({"query": f"Do you approve executing this command: {command}? Please answer 'yes' or 'no'."}) logger.info(f"Decision: {decision}") # 根据用户决策决定是否执行命令 if decision == "yes": logger.info(f"Executing command, {command}") try: # 执行命令并获取结果 result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10) result = result.stdout or result.stderr return result except subprocess.TimeoutExpired: return "Command timed out" except Exception as e: return f"Error executing command: {str(e)}" else: logger.info("Command execution denied by user") return "Command execution denied by user" # 定义可用工具列表 tools = [get_weather, get_date, execute_command] # 创建ReAct代理,它可以根据需要自动调用工具 agent = create_react_agent( model=llm, tools=tools, prompt="You are a helpful assistant." ) # 创建工具节点,用于执行工具调用 tool_node = ToolNode(tools=tools) # 创建内存检查点保存器,用于保存对话状态 memory = InMemorySaver() # 配置运行时参数,使用固定的线程ID config = RunnableConfig(configurable={"thread_id": "1"}) def create_graph() -> CompiledStateGraph: """创建并返回工作流图。 Returns: CompiledStateGraph: 编译后的工作流图 """ # 创建状态图,使用MessagesState作为状态类型 graph_builder = StateGraph(MessagesState) # 添加节点 graph_builder.add_node("agent", agent) # AI代理节点 graph_builder.add_node("tools", tool_node) # 工具执行节点 # 添加边 graph_builder.add_edge(START, "agent") # 从开始节点连接到代理节点 graph_builder.add_edge("tools", "agent") # 从工具节点连接回代理节点 # 添加条件边,根据代理的决策决定下一步 graph_builder.add_conditional_edges( "agent", tools_condition, # 条件函数,判断是否需要调用工具 {"tools": "tools", END: END} # 映射:需要工具时转到工具节点,否则结束 ) # 编译图并返回,使用内存保存器来保存状态 return graph_builder.compile(checkpointer=memory) def handle_user_decision(user_input: str) -> bool: """处理用户对中断的响应。 Args: user_input (str): 用户的输入 Returns: bool: 如果处理了中断返回True,否则返回False """ # 创建图实例 graph = create_graph() # 获取当前状态 current_state = graph.get_state(config) # 检查是否有待处理的中断 if not current_state.next: logger.warning("No pending interrupts to handle.") return False # 没有待处理的中断 # 根据用户输入决定如何响应中断 if user_input.lower() == "yes": # 用户确认,继续执行 graph.invoke(Command(resume="yes"), config=config) else: # 用户拒绝,取消执行 graph.invoke(Command(resume="no"), config=config) return True # 处理了中断 def graph_invoke(user_input: str): """处理用户输入并执行相应操作。 Args: user_input (str): 用户输入的文本 """ # 首先尝试处理用户对中断的响应 interrupt_handled = handle_user_decision(user_input) # 如果已经处理了中断,则不再继续处理用户输入,而是显示结果 if interrupt_handled: # 获取处理后的状态并显示结果 graph = create_graph() current_state = graph.get_state(config) if current_state.values and 'messages' in current_state.values: # 显示最新的消息内容 messages = current_state.values['messages'] if messages: last_message = messages[-1] if hasattr(last_message, 'content'): print("Assistant:", last_message.content) return # 如果没有待处理的中断,则正常处理用户输入 graph = create_graph() resp = graph.invoke({"messages": [HumanMessage(content=user_input)]}, config=config) logger.debug(f"response: {resp}") # 检查是否有中断需要处理 if "interrupt" in resp: interrupt_data = resp["interrupt"] interrupt = interrupt_data[0] if interrupt_data else None if not interrupt or not hasattr(interrupt, "value"): logger.error("Invalid interrupt data") return interrupt_value = interrupt.value # 显示中断请求给用户 print(f"Assistant: {interrupt_value['query']}") else: # 直接显示AI的响应 print("Assistant:", resp["messages"][-1].content) logger.debug(f"Snapshot state: {graph.get_state(config)}") logger.debug(f"Snapshot next: {graph.get_state(config).next}") # 程序入口点 if name == "main": """主程序循环,处理用户输入并生成响应。""" print("AI助手已启动,输入 'quit'、'exit' 或 'q' 退出程序") while True: try: # 获取用户输入 user_input = input("User: ").strip() logger.info(f"User input: {user_input}") # 检查退出命令 if user_input.lower() in ["quit", "exit", "q"]: print("Goodbye!") break # 处理用户输入 graph_invoke(user_input) except KeyboardInterrupt: # 处理Ctrl+C中断 print("\nGoodbye!") break except Exception as e: # 记录并显示错误信息 logger.error(f"Error occurred: {traceback.format_exc()}") print(f"Error: {traceback.format_exc()}") break 

本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19071001/human-in-the-loop-of-langgraph

评论

昵称
邮箱
主页