LangGraph MCP - 使用LangGraph实现多智能体架构(七)

news/2025/10/28 14:53:12/文章来源:https://www.cnblogs.com/rslai/p/19171798

在 LangChain 体系中,LangChain 主要集成了和大语言模型交互的能力,而 LangGraph 主要实现了复杂的流程调度。将这两个能力结合起来,就可以实现一个复杂的多智能体。

一、多智能体典型的组装方式

二、实现一个简单的多智能体

  • 首先,通过一个 SuperVisor 节点,对用户的输入进行分类,然后根据分类结果,选择不同的 Agent 节点进行处理;
  • 接下来,每个 Agent 节点,都可以使用不同的工具进行处理,最后将处理结果汇总,再返回给 SuperVisor 节点;
  • 最后,SuperVisor 节点再将结果返回给用户。

三、完成多智能体交互(不包含智能体中具体功能)

1、新建一个目录 multi_agent

2、创建文件 director.py ,并添加如下代码:

import osfrom dotenv import load_dotenv
from operator import add
from tkinter import END
from typing import Annotated, TypedDict
from IPython.display import Image, displayfrom langchain_core.messages import AnyMessage
from langchain.schema import HumanMessage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph
from langgraph.constants import START, END
from langchain_openai import ChatOpenAIload_dotenv()
api_key = os.getenv("API_KEY", "")
base_url = os.getenv("BASE_URL", "")
llm = ChatOpenAI(model="deepseek-chat",openai_api_key=api_key,openai_api_base=base_url,temperature=0.85,max_tokens=8000
)nodes = ["supervisor", "travel", "joke", "couplet", "other"]# 配置状态
class State(TypedDict):messages: Annotated[list[AnyMessage], add]type: str# 监督(仲裁)节点
def supervisor_node(state: State):print(">>> supervisor_node")writer = get_stream_writer()writer({"node": ">>> supervisor_node"})# 根据用户的问题,对问题进行分类。分类结果保存到 state.type 中sys_prompt = """你是一个专业的客服助手,负责对用户的问题进行分类,并将任务分给其他的 Agent 执行。如果用户的问题是和旅游路线规划相关的,那就返回 travel。如果用户的问题是希望讲一个笑话,那就返回 joke。如果用户的问题是希望对一个对联,那就返回 couplet。如果是其他的问题,返回 other。除了这几个选项外,不要返回任何其他的内容。"""prompts = [{"role": "system", "content": sys_prompt},{"role": "user", "content": state["messages"][0]}]# 如果已经有 type 属性,表示问题已经由节点处理完成,就可以直接返回了if "type" in state:writer({"supervisor_step": f"已获得 {state['type']} 智能体处理结果"})return {"type": END}else:response = llm.invoke(prompts)typeRes = response.contentwriter({"supervisor_step": f"问题分类结果:{typeRes}"})if typeRes in nodes:return {"type": typeRes}else:raise ValueError("type is not in (travel, joke, couplet, other)")return {}# 旅游路线规划节点
def travel_node(state: State):print(">>> travel_node")writer = get_stream_writer()writer({"node": ">>> travel_node"})return {"messages": [HumanMessage(content="travel_node")], "type": "travel"}# 笑话节点
def joke_node(state: State):print(">>> joke_node")writer = get_stream_writer()writer({"node": ">>> joke_node"})return {"messages": [HumanMessage(content="joke_node")], "type": "joke"}# 对联节点
def couplet_node(state: State):print(">>> couplet_node")writer = get_stream_writer()writer({"node": ">>> couplet_node"})return {"messages": [HumanMessage(content="couplet_node")], "type": "couplet"}# 其他节点
def other_node(state: State):print(">>> other_node")writer = get_stream_writer()writer({"node": ">>> other_node"})return {"messages": [HumanMessage(content="我暂时无法回答这个问题")], "type": "other"}# 条件路由
def routing_func(state: State):if state["type"] == "travel":return "travel_node"elif state["type"] == "joke":return "joke_node"elif state["type"] == "couplet":return "couplet_node"elif state["type"] == END:return ENDelse:return "other_node"# 构建图
builder = StateGraph(State)
# 添加节点
builder.add_node("supervisor_node", supervisor_node)
builder.add_node("travel_node", travel_node)
builder.add_node("joke_node", joke_node)
builder.add_node("couplet_node", couplet_node)
builder.add_node("other_node", other_node)
# 添加边
builder.add_edge(START, "supervisor_node")
builder.add_conditional_edges("supervisor_node", routing_func, ["travel_node", "joke_node", "couplet_node", "other_node", END])
builder.add_edge("travel_node", "supervisor_node")
builder.add_edge("joke_node", "supervisor_node")
builder.add_edge("couplet_node", "supervisor_node")
builder.add_edge("other_node", "supervisor_node")# 编译图
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)# 执行任务的测试代码
if __name__ == "__main__":config = {"configurable": {"thread_id": "1"}}# 执行任务 - 旅游路线规划for event in graph.stream({"messages": ["请帮我规划一条从天安门到颐和园的旅游路线"]}, config, stream_mode="custom"):print(event)# # 执行任务 - 笑话# for event in graph.stream({"messages": ["给我讲一个郭德纲的笑话"]}, config, stream_mode="custom"):#     print(event)# # 执行任务 - 对联# for event in graph.stream({"messages": ["请帮我对“你好”进行对联"]}, config, stream_mode="custom"):#     print(event)# # 执行任务 - 其他# res = graph.invoke({"messages": ["今天天气如何?"]}, config, stream_mode="values")# print(res["messages"][-1].content)# image_data = graph.get_graph().draw_mermaid_png()# with open("graph.png", "wb") as f:#     f.write(image_data)# display(Image(image_data))

运行后如下图:

生成 graph 图后如下:

源代码: 提取码: dzn8

四、实现笑话助手(调用大模型)

在刚才的代码中,找到 joke_node ,替换成如下代码:

# 笑话节点
def joke_node(state: State):print(">>> joke_node")writer = get_stream_writer()writer({"node": ">>> joke_node"})sys_prompt = "你是一个笑话大师,根据用户的问题,写一个不超过100个字的笑话。"prompts = [{"role": "system", "content": sys_prompt},{"role": "user", "content": state["messages"][0]}]response = llm.invoke(prompts)return {"messages": [AIMessage(content=response.content)], "type": "joke"}

第二将 stream_mode="custom" 改为 stream_mode="values"

运行后结果如下:

源代码: 提取码: usix

五、实现路线规划助手(调用MCP)

在刚才的代码中,找到 travel_node_async ,替换成如下代码:

# 异步的旅游路线规划节点
async def travel_node_async(state: State):print(">>> travel_node")writer = get_stream_writer()writer({"node": ">>> travel_node"})sys_prompt = "你是一个专业的旅行规划助手,根据用户的问题,生成一个旅游路线规划。请用中文回答,并返回一个不超过100个字的规划。"prompts = [{"role": "system", "content": sys_prompt},{"role": "user", "content": state["messages"][0]}]client = MultiServerMCPClient({"server": {"url": "http://127.0.0.1:3002/mcp_atlas", # 服务器地址,需要根据 MCP Server 配置修改 "transport": "streamable_http"}})tools = await client.get_tools()agent = create_react_agent(model=llm, tools=tools)# 使用异步调用response = await agent.ainvoke({"messages": prompts})for message in response["messages"]:print(message)writer({"travel_step": response["messages"][-1].content})return {"messages": [AIMessage(content=response["messages"][-1].content)], "type": "travel"}# 包装成同步函数供 LangGraph 使用
def travel_node(state: State):return asyncio.run(travel_node_async(state))

由于 MCP 是异步调用,所有增加了 travel_node_async 方法,然后在 travel_node 中包装成同步函数供 LangGraph 使用。

再找到 执行任务 - 旅游路线规划 将其修改为,如下代码:

    # # 执行任务 - 旅游路线规划# for event in graph.stream({"messages": ["请帮我规划一条从天安门到颐和园的旅游路线"]}, config, stream_mode="custom"):#     print(event)# 执行任务 - 其他res = graph.invoke({"messages": ["请帮我规划一条从天安门到颐和园的旅游路线"]}, config, stream_mode="values")print(res["messages"][-1].content) 

使用如下命令启动 MCP server,mcp_server.py 代码在下面网盘中

python mcp_server.py
重新运行 python director.py 后结果如下:

源代码:  提取码: wy83

六、实现对联助手(调用RAG)

RAG 业务逻辑,先去找一些参考资料,将其向量化后保存到向量数据库中。当用户提出问题时,先去向量数据库中找相关的条目,然后将找到的信息一起发给大语言模型,大语言模型参考这些找到的数据再给出相应的答案。

6.1、下载对联数据集

后续要用这个对联数据集来做知识库,后续会将其存入向量数据库。

访问  搜索对联,如下图

如下图,根据提示下载数据集

下载成功后,可以打开数据集文件(train.csv)看一下,内容如下。

在代码目录中创建 resource 目录,将刚才看的数据集文件 train.csv 复制到此目录中。

6.2、安装向量数据库

LangGraph 支持很多种向量数据库,这里为了简单安装的是 Redis 向量数据库。

docker 安装命令如下:(注意需要想方法上网)

docker run -p 6397:6397 redis/redis-stack-server:latest  
安装成功后可以再安装一个 Redis 图形工具,例如:RDM

6.3、将数据集保存到向量数据库中

在 resource 目录中创建文件 couplet_loader.py ,并添加如下代码:

# 把对联数据保存到 Redis 向量数据库中import os
import redis
from langchain_community.embeddings import OllamaEmbeddings# 使用 OllamaEmbeddings
embedding_model = OllamaEmbeddings(model="bge-m3:latest",base_url="http://172.31.12.104:11434"  # 自定义服务器地址
)# 保存向量数据库
redis_url = "redis://localhost:6379"redis_client = redis.from_url(redis_url)
print(redis_client.ping()) # 测试连接 返回 True 表示连接成功from langchain_redis import RedisConfig, RedisVectorStore
config = RedisConfig(index_name="couplet",redis_url=redis_url,
)
vector_store = RedisVectorStore(embeddings=embedding_model, config=config)# 按行读取 train.csv 文件
lines = []
with open("../resource/train.csv", "r", encoding="utf-8") as file:for line in file:print(line)lines.append(line.strip())# 把 lines 中的文本添加到向量数据库中
vector_store.add_texts(lines)

运行完成后,显示如下图:

在 RDM 中查看,如下图:(由于这个数据集有14M,所以存入的时间较长多等下)

6.3、修改智能体代码

打开 director.py 文件,找到 couplet_node 替换成如下代码:

# 对联节点
def couplet_node(state: State):print(">>> couplet_node")writer = get_stream_writer()writer({"node": ">>> couplet_node"})# 用户输入的内容query = state["messages"][0]# 使用 OllamaEmbeddingsembedding_model = OllamaEmbeddings(model="bge-m3:latest",base_url="http://172.31.12.104:11434"  # 自定义服务器地址)# 保存向量数据库redis_url = "redis://localhost:6379"redis_client = redis.from_url(redis_url)print(redis_client.ping()) # 测试连接 返回 True 表示连接成功from langchain_redis import RedisConfig, RedisVectorStoreconfig = RedisConfig(index_name="couplet",redis_url=redis_url,)vector_store = RedisVectorStore(embeddings=embedding_model, config=config)# 从向量库中召回samples = []scored_results = vector_store.similarity_search_with_score(query, k=10)for doc, score in scored_results:samples.append(doc.page_content)# 构建提示词prompt_template = ChatPromptTemplate.from_messages([("system", f"""你是一个专业的对联大师,你的任务是根据用户给出的上联设计下联。回答时,可以参考下面的参考对联。参考对联:{samples}请用中文回答问题。"""),("user", "{text}")])prompts = prompt_template.invoke({"samples": samples, "text": query})writer({"couplet_step": prompts})response = llm.invoke(prompts)return {"messages": [AIMessage(content=response.content)], "type": "couplet"}

再找到 执行任务 - 对联 替换为如下代码:

    # # 执行任务 - 对联# for event in graph.stream({"messages": ["给我对一个对联,上联是: 金榜题名时"]}, config, stream_mode="custom"):#     print(event)# # 执行任务 - 对联res = graph.invoke({"messages": ["给我对一个对联,上联是: 金榜题名时"]}, config, stream_mode="values")print(res["messages"][-1].content) 

运行代码,如下图:

如果想看提示词和调用过程,将上面注释打开再将下面注释后重新运行,如下图:

源代码:  提取码: hq5b

七、将以上多智能体封装成一个 service

创建 director_service.py 文件,添加如下代码:

import randomfrom director import graphconfig = {"configurable": {"thread_id": random.randint(1, 10000)}
}# query = "给我讲一个关于郭德纲的笑话"
query = "给我对一个对联,上联是: 金榜题名时"
res = graph.invoke({"messages": [query]}, config, stream_mode="values"
)
print(res["messages"][-1].content)

运行后如下图:

源代码: 提取码: 6yi3

八、将以上多智能体封装成一个 web 页面

创建 director_fe.py 文件,添加如下代码:

# grdio 前端
import gradio as gr
import randomfrom director import graphdef process_input(text):config = {"configurable": {"thread_id": random.randint(1, 10000)}}result = graph.invoke({"messages": [text]}, config, stream_mode="values")return result["messages"][-1].contentwith gr.Blocks() as demo:gr.Markdown("# LangGraph Multi-Agent")with gr.Row():with gr.Column():gr.Markdown("## 可以问路线规划,对对联,讲笑话,快来试试吧。")input_text = gr.Textbox(label="问题*", placeholder="请输入你的问题", value="讲一个关于郭德纲的笑话")btn_start = gr.Button("Start", variant="primary")with gr.Column():output_text = gr.Textbox(label="Output")btn_start.click(fn=process_input, inputs=[input_text], outputs=[output_text])demo.launch()

运行成功后,如下图:

浏览器中访问,上图地址。

输入问题后,点击 start 按钮,如下图:

源代码: 提取码: 9xwg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/948808.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DP 复习 - L

本文写于2025.10.28,旨在复习所有做过的 DP 题目,以及它们引出的 DP 思想。 一、DP 概念 1. 概念 DP 是 Dynamic Programming 的简称,专指动态规划算法。 2. 性质 能用 DP 求解的问题,必须满足如下三个性质:最优子…

完整教程:swin-transformer架构解析和源码解析

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年沈阳/北京/东三省制造业企业商业秘密保护权威推荐榜单:高新技术企业与上市公司数据安全解决方案精选

2025年沈阳/北京/东三省制造业企业商业秘密保护权威推荐榜单:高新技术企业与上市公司数据安全解决方案精选 在当今数字化经济时代,商业秘密保护已成为制造业企业特别是高新技术企业和上市公司的核心竞争力所在。随着…

2025沈阳/北京/东三省制造业企业商业秘密保护厂家推荐大宸商业,专业合规护航企业发展

2025沈阳/北京/东三省制造业企业商业秘密保护厂家推荐大宸商业,专业合规护航企业发展在当今激烈的市场竞争环境中,商业秘密保护已成为制造业企业核心竞争力的重要组成部分。随着数字化转型的加速推进,制造业企业的技…

LangGraph MCP - 使用LangGraph构建多智能体工作流(六)

一、流式输出大模型调用结果 之前提到 Graph 的流式输出有几种不同的模式,这里展示 messages 模式,是用来监控大语言模型的 Token 记录的。 代码在 stream_mode_messages.py 文件中,内容如下:# -*- coding: utf-8 …

告别卡顿与等待,Rancher Vai 让集群操作“秒响应”

有时候,创新不只是功能更多,而是让等待更少。Rancher Vai,让每一次操作都快得刚刚好。如果你正用 Rancher 在大规模环境中管理 Kubernetes,那么你一定知道,UI 性能不只是“锦上添花”——它对效率至关重要。Ranch…

2025 年机械设备铝型材,轻型铝型材,定制铝型材厂家最新推荐,产能、专利、环保三维数据透视

引言 随着工业自动化进程加速,机械设备、轻型结构等领域对铝型材的定制化需求持续攀升,但市场产品质量与服务能力差异显著。为精准筛选优质厂家,本次测评联合行业权威机构,依据 GB/T 6892-2025 及 ISO 9001 标准体…

2025 年铝型材框架、铝型材围栏、6063 铝型材、重型铝型材厂家最新推荐 —— 产能、专利、环保三维数据透视

引言 随着工业自动化与高端制造领域的快速发展,铝型材框架、围栏等结构件的应用场景持续拓展,6063 合金因优异的成型性与重型铝型材的高承载性成为核心需求材料。但市场中厂家产能差异达 10 倍以上,专利密度悬殊,部…

LangGraph MCP - Graph(五)

Graph 是 LangGraph 的核心,它以有向无环图的方式来整合多个 Agent,构建更复杂的 Agent 大模型应用,形成更复杂的工作流。并且提供了很多产品级的特性,保证这些应用可以更稳定高效的执行。 Graph 主要包含三个基本…

乐维社区“专家坐诊”第303期问答

问题一 Q:lwops 这个暴露的端口是啥哈。 我看日志输出 没检测到服务端是active状态。 我重新还了台部署。 ip改变了。 客户端那边改配置文件部署么。 这个重启的命令是啥哈 好像智能kill 后 自动起来。我去配置文件里…

Phpstudy博客网站apache2日志分析python代码 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

如何配置Docker通过代理去pull镜像

概述 如果Docker所在宿主机无法直接与互联网连通,需要通过代理去上网的话,直接通过docker pull xxx拉取镜像将会失败, 本文主要是记录一下如何配置代理服务器让Docker能够正常拉取外网镜像。 说明: 本文几乎完全照…

2025年不停电双电源开关最新标杆厂家推荐:容磁电气,静态转换双电源开关|STS高速双电源开关|UTS高速双电源开关|ATS高速双电源开关|防晃电新标准​

在工业生产对供电可靠性要求持续提升的 2025 年,容磁电气 (杭州) 有限公司凭借对高速双电源开关领域的技术深耕与全场景适配能力,成为工业企业、数据中心、医疗设施等场景采购清单中的 “优选品牌”。自成立以来,公…

2025 年 10 月铝合金型材,工业铝型材,3030 铝型材,铝型材防护罩厂家最新推荐,产能、专利、环保三维数据透视!

引言 为破解工业铝型材采购中的品质甄别、产能匹配与服务适配难题,有色金属工业协会联合权威质检机构开展 2025 年度专项测评,覆盖 23 个省份 120 余家源头厂家,采用 “产能 - 专利 - 环保” 三维评估体系。产能维度…

大数值的精度与格式化显示问题

大数值的精度与格式化显示问题Posted on 2025-10-28 14:37 江城2211 阅读(0) 评论(0) 收藏 举报问题描述: Java后端的默认序列化方法:当数值过大(如 1000000000000000000.00)或过小(如 0.0000000001)时,会…

LangGraph MCP - 使用 LangGraph构建单独的 Agent(四)

一、什么是 Agent ? Agent 是 LangGraph 中的一个核心概念。LangGraph 中 Agent 封装了访问大模型、调用 Tools、保存 ChatMemory 等等这些基础能力。可以完成一系列基于大模型构建的任务。同时,又可以随时干预 Agen…

深入解析:GESP2025年9月认证C++一级( 第三部分编程题(2)金字塔)

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025 年碳化硅金刚线切割机,石墨金刚线切割机,陶瓷金刚线切割机厂家最新推荐,产能、专利、适配性三维数据透视

引言 随着第三代半导体、新能源等产业加速升级,碳化硅(莫氏硬度 9.5)、石墨(脆性易崩边)、陶瓷(高硬度低导热)等材料的精密切割需求激增,2025 年相关设备市场规模预计突破 138 亿元。但行业普遍存在材料适配性…

2025 年 10 月油石、保温材料、玉石、石英金刚线切割机厂家最新推荐,产能、专利、环保三维数据透视

引言 油石、保温材料、玉石、石英等材料因硬度、脆性差异大,对金刚线切割机的定制化适配性、切割精度与环保性提出严苛要求。为破解采购选型难题,本次测评联合行业协会,依据《金刚石绳锯切割机技术条件》标准,构建…