一、什么是 Agent ?
Agent 是 LangGraph 中的一个核心概念。LangGraph 中 Agent 封装了访问大模型、调用 Tools、保存 ChatMemory 等等这些基础能力。可以完成一系列基于大模型构建的任务。同时,又可以随时干预 Agent 执行进度,对关键步骤随时做出调整。
二、Agent 常用的 Stream 流式输出
以下代码在 链接: agent_ stream.zip (提取码: excj)中。注意修改 .env 中的 api_key 配置。
2.1、invoke - 普通调用,一次性消息输出
invoke.py 文件,代码如下:
# -*- coding: utf-8 -*-
import osfrom dotenv import load_dotenv
from langgraph.prebuilt import create_react_agent
from langchain_deepseek import ChatDeepSeekload_dotenv()
os.environ["DEEPSEEK_API_KEY"] = os.getenv("API_KEY", "")
model = ChatDeepSeek(model="deepseek-chat")# 创建 agent
agent = create_react_agent(model=model, tools=[],prompt="You are helpful assistant")# 调用 agent
result = agent.invoke({"messages": [{"role": "user", "content": "你是谁?你能做什么"}]})# 循环打印结果
for message in result["messages"]:print("==========")print(f"{type(message).__name__}: {message}")
运行后如下图,可以看出它是一次请求返回的所有内容。
2.2、stream - 流失消息调用
stream_mode 有4中选项:
- values: 默认值,一次拿到所有的chunk(字典格式的消息)。
- updates: 流式输出,每个工具调用的每个步骤(包括,事件类型、运行ID、步骤等详细信息)。
- messages: 流式输出,完整的大语言消息对象。
- custom: 自定义格式输出。
2.3、stream_mode = messages
stream_messages.py 文件,代码如下:
# -*- coding: utf-8 -*-
import osfrom dotenv import load_dotenv
from langgraph.prebuilt import create_react_agent
from langchain_deepseek import ChatDeepSeekload_dotenv()
os.environ["DEEPSEEK_API_KEY"] = os.getenv("API_KEY", "")
model = ChatDeepSeek(model="deepseek-chat")# 创建 agent
agent = create_react_agent(model=model, tools=[],prompt="You are helpful assistant")# 调用 agent
for chunk in agent.stream({"messages": [{"role": "user", "content": "你是谁?你能做什么"}]},stream_mode="messages",
):print(chunk)
运行后如下图,可以看出它收到一个token返回就输出一个。
2.4、stream_mode = values(默认值)
stream_values.py 文件,代码如下:
# -*- coding: utf-8 -*-
import osfrom dotenv import load_dotenv
from langgraph.prebuilt import create_react_agent
from langchain_deepseek import ChatDeepSeekload_dotenv()
os.environ["DEEPSEEK_API_KEY"] = os.getenv("API_KEY", "")
model = ChatDeepSeek(model="deepseek-chat")# 创建 agent
agent = create_react_agent(model=model, tools=[],prompt="You are helpful assistant")# 调用 agent
for chunk in agent.stream({"messages": [{"role": "user", "content": "你是谁?你能做什么"}]},
):print(chunk)
运行后如下图,可以看出它是凑齐一个一个完整的消息再输出,在多Agent场景就看出区别了。
三、消息记忆
记忆召回,是实现多轮会话的关键。
例如:用户输入两个问题,如下。
1、今天天津天气如何? 2、北京呢?
如果没有记忆,那 LLM 就不能理解 北京呢? 是在问北京的天气。
LangGraph 将消息记忆分为:
- 短期记忆:用于当前对话中的历史消息记忆。LangGraph 将它封装成 CheckPoint
- 长期记忆:外部存储,一般是用户级别消息记忆。LangGraph 将它封装成 Store
以下代码在 链接: agent_memory.zip (提取码: f7qa)中。注意修改 .env 中的 api_key 配置。
3.1、短期记忆 CheckPoint
在 LangGraph 中只需指定 CheckPoint 属性,就可以实现短期记忆。
另外,在使用 checkpointer 时,需要指定一个单独的 thread_id 来区分不同的对话。以下例子中 thread_id 写死的是 1,到时候根据你的需要产生唯一 id 即可。
short_term_memory.py 文件,代码如下:
# -*- coding: utf-8 -*-
import osfrom dotenv import load_dotenv
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langchain_deepseek import ChatDeepSeekcheckpointer = InMemorySaver()load_dotenv()
os.environ["DEEPSEEK_API_KEY"] = os.getenv("API_KEY", "")
model = ChatDeepSeek(model="deepseek-chat")def get_weater(city: str) -> str:"""获取某个城市的天气"""return f"{city}的天气是晴朗的"# 创建 agent
agent = create_react_agent(model=model, tools=[get_weater],checkpointer=checkpointer)config = {"configurable": {"thread_id": "1"}
}# 调用 agent
result1 = agent.invoke({"messages": [{"role": "user", "content": "今天天津天气如何?"}]}, config=config
)# 循环打印结果
for message in result1["messages"]:print("==========")print(f"{type(message).__name__}: {message}")print("+++++++++++++++++++++++++++++++")# 调用 agent
result2 = agent.invoke({"messages": [{"role": "user", "content": "北京呢?"}]}, config=config
)# 循环打印结果
for message in result2["messages"]:print("==========")print(f"{type(message).__name__}: {message}")
运行后如下图,可以看到第二个 invoke 的时候携带了第一个请求的内容。
可以看出,每次都会携带短期记忆,如果每次都携带所有的内容会浪费 token,所以需要按照一定的规则定期清理。
LangGraph 中管理短期记忆的方法有如下两种:
- Summarization(总结): 用大模型的方式,对短期记忆进行总结,然后再把总结的结果作为新的短期记忆。
- Trimming(删除): 直接把短期记忆总最旧的消息删除 。
只需要在 create_react_agent 的 pre_model_hook 中设置相应的 Summarization 或 Trimming 即可。
3.2、长期记忆 Store
与短期记忆最大区别在于,短期记忆通过 thread_id 来区分不同的对话,而长期记忆通过 namespace 来区分不同的命名空间。
比如,你想让智能体记住用户的名字或爱好,就可以把这些保存到长期记忆中。
long_term_memory.py 文件,代码如下:
# -*- coding: utf-8 -*-
import osfrom dotenv import load_dotenv
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
from langgraph.store.memory import InMemoryStore
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langchain_deepseek import ChatDeepSeek# 定义长期记忆
store = InMemoryStore()
# 添加一些测试数据
store.put(("users",),"user_123",{"name": "张三","age": 30,"email": "zhangsan@example.com"}
)@tool(return_direct=True)
def get_user_info(config: RunnableConfig) -> str:"""获取用户信息"""# 获取长期存储。获取后这个存储组件可以读也可以写store = get_store()# store.put(# ("users",),# "user_456",# {# "name": "李四",# "age": 25,# "email": "lisi@example.com"# }# )# 获取用户 IDuser_id = config["configurable"].get("user_id")user_info = store.get(("users",), user_id)return str(user_info.value) if user_info else "Unknown user"load_dotenv()
os.environ["DEEPSEEK_API_KEY"] = os.getenv("API_KEY", "")
model = ChatDeepSeek(model="deepseek-chat")# 创建 agent
agent = create_react_agent(model=model, tools=[get_user_info],store=store)# 调用 agent
result1 = agent.invoke({"messages": [{"role": "user", "content": "查找用户信息"}]}, config={"configurable": {"user_id": "user_123"}}
)# 循环打印结果
for message in result1["messages"]:print("==========")print(f"{type(message).__name__}: {message}")
运行后如下图,可以看到 invoke 后从 store 中取回了用户信息。
当然这里展示的 store 是 InMemoryStore,后面可以实现存入数据库或文件。
四、Human-in-the-loop (人类监督)
Human-in-the-loop 也是 LangGraph 的常用核心功能。LangGraph 提供了 interruput() 方法添加人类监督。
比如,当 LangGraph 要使用某些 tools 时,可以中断,并等待用户确认后,再执行的效果。
以下代码在 链接: agent_interrupt.zip (提取码: 4uwr)中。注意修改 .env 中的 api_key 配置。
agent_interrupt.py 文件,代码如下:
# -*- coding: utf-8 -*-
import os
from tkinter import Nfrom dotenv import load_dotenv
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
from langchain_deepseek import ChatDeepSeek
from langgraph.types import Command# 需要 人工审查/批准 的敏感工具示例
@tool(return_direct=True)
def book_hotel(hotel_name: str):"""可以帮助客户预定宾馆或酒店Args:hotel_name (str): 客户指定的宾馆或酒店名称Returns:str: 预定结果确认信息"""response = interrupt(f"正准备执行 'book_hotel' 工具预定宾馆,相关参数名 {{'hostl_name': {hotel_name}}}。""选择 ok,表示同意,或者选择 edit,提出补充意见。")if response["type"] == "ok":passelif response["type"] == "edit":hotel_name = response["args"]["hotel_name"]else:return ValueError(f"Unknown response type: {response['type']}")return f"已成功在 {hotel_name} 预定了一个房间。"checkpointer = InMemorySaver()load_dotenv()
os.environ["DEEPSEEK_API_KEY"] = os.getenv("API_KEY", "")
model = ChatDeepSeek(model="deepseek-chat")# 创建 agent
agent = create_react_agent(model=model, tools=[book_hotel],checkpointer=checkpointer)config = {"configurable": {"thread_id": "2"}
}# 执行过程中会输出一个 Interrupt 响应,提示正在等待用户输入确认
for chunk in agent.stream({"messages": [{"role": "user", "content": "我想在图灵宾馆预定一个房间"}]}, config=config
):print(chunk)print("############\n")# 通过 Agent 提交一个 Command 请求,来继续完成之前的任务
for chunk in agent.stream(Command(resume={"type": "ok"}),# Command(resume={"type": "edit", "args": {"hotel_name": "三号宾馆"}}),config=config
):print(chunk)print("############\n")
运行后如下图,可以看到输出了一个 interrupt,然后通过 command 恢复执行。