【TIDE DIARY 4.1】Agentic RAG - 详解

news/2025/11/12 13:01:52/文章来源:https://www.cnblogs.com/gccbuaa/p/19213586

Agentic RAG实战指南:从基础实现到生产部署

本文基于综述论文《Agentic Retrieval-Augmented Generation: A Survey on Agentic RAG》,提供完整的代码实现和实操指南,帮助开发者快速构建智能检索增强生成系统。

在这里插入图片描述

系统架构概述

Agentic RAG通过引入自主代理和工作流管理,显著提升了传统RAG系统的适应性和推理能力。本文将从基础实现开始,逐步构建完整的Agentic RAG系统。

技术演进路径

  • 阶段1:基础RAG实现 - 建立核心检索和生成能力
  • 阶段2:工作流增强 - 添加自适应查询处理
  • 阶段3:多代理系统 - 实现专业化分工协作
  • 阶段4:生产部署 - 构建可扩展的服务架构

环境准备与依赖安装

1. 基础环境配置

# 创建Python环境
python -m venv agentic_rag_env
source agentic_rag_env/bin/activate  # Linux/Mac
# agentic_rag_env\Scripts\activate  # Windows
# 安装核心依赖
pip install langchain openai chromadb tiktoken
pip install fastapi uvicorn pydantic  # API服务
pip install sqlalchemy psycopg2-binary  # 数据库支持
pip install "unstructured[all]"  # 文档解析

2. 环境变量配置

创建 .env 文件:

OPENAI_API_KEY=your_openai_api_key_here
DATABASE_URL=postgresql://user:password@localhost/rag_db
VECTOR_STORE_PATH=./vector_store
LOG_LEVEL=INFO

阶段1:基础RAG实现

1.1 数据预处理流水线

import os
from typing import List, Dict, Any
from langchain.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
class DataProcessor:
"""数据预处理流水线"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
def load_documents(self, directory: str, file_pattern: str = "**/*.pdf") -> List[Document]:
"""加载文档"""
loader = DirectoryLoader(
directory,
glob=file_pattern,
loader_cls=PyPDFLoader,  # 可根据文件类型调整
show_progress=True
)
return loader.load()
def process_documents(self, documents: List[Document]) -> List[Document]:
"""处理文档:清洗、分割、增强元数据"""
processed_docs = []
for doc in documents:
# 基础文本清洗
cleaned_content = self.clean_text(doc.page_content)
# 文本分割
chunks = self.text_splitter.split_text(cleaned_content)
# 为每个块创建Document对象并增强元数据
for i, chunk in enumerate(chunks):
enhanced_metadata = doc.metadata.copy()
enhanced_metadata.update({
"chunk_id": i,
"total_chunks": len(chunks),
"source_file": os.path.basename(doc.metadata.get("source", "unknown")),
"chunk_size": len(chunk)
})
processed_docs.append(Document(page_content=chunk, metadata=enhanced_metadata))
return processed_docs
def clean_text(self, text: str) -> str:
"""文本清洗"""
import re
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符(可根据需要调整)
text = re.sub(r'[^\w\s.,!?;:()\-]', '', text)
return text.strip()

1.2 向量存储初始化

from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
class VectorStoreManager:
"""向量存储管理器"""
def __init__(self, persist_directory: str = "./chroma_db"):
self.persist_directory = persist_directory
self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
self.vectorstore = None
self.retriever = None
def initialize_vectorstore(self, documents: List[Document]):
"""初始化向量存储"""
self.vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
print(f"向量存储初始化完成,共处理 {len(documents)} 个文档块")
def setup_retriever(self, search_type: str = "similarity", k: int = 5):
"""设置检索器"""
if not self.vectorstore:
raise ValueError("向量存储未初始化")
base_retriever = self.vectorstore.as_retriever(
search_type=search_type,
search_kwargs={"k": k}
)
# 添加上下文压缩
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
compressor = LLMChainExtractor.from_llm(llm)
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
return self.retriever
def similarity_search(self, query: str, k: int = 5) -> List[Document]:
"""相似性搜索"""
if not self.vectorstore:
raise ValueError("向量存储未初始化")
return self.vectorstore.similarity_search(query, k=k)

1.3 基础RAG工作流

from langchain.chat_models import ChatOpenAI
from langchain.schema import BaseOutputParser
import json
class BasicRAGWorkflow:
"""基础RAG工作流实现"""
def __init__(self, embedding_model: str = "text-embedding-ada-002",
llm_model: str = "gpt-3.5-turbo"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.llm = ChatOpenAI(model=llm_model, temperature=0)
self.vectorstore_manager = VectorStoreManager()
self.retriever = None
def initialize_system(self, documents_directory: str):
"""初始化系统"""
# 1. 数据预处理
processor = DataProcessor()
raw_documents = processor.load_documents(documents_directory)
processed_documents = processor.process_documents(raw_documents)
# 2. 向量存储初始化
self.vectorstore_manager.initialize_vectorstore(processed_documents)
self.retriever = self.vectorstore_manager.setup_retriever()
print(f"系统初始化完成,共处理 {len(processed_documents)} 个文档块")
def query(self, query: str, include_sources: bool = True) -> Dict[str, Any]:
"""执行查询"""
if not self.retriever:
raise ValueError("系统未初始化,请先调用 initialize_system()")
# 1. 检索阶段
relevant_docs = self.retriever.get_relevant_documents(query)
# 2. 上下文构建
context = self._build_context(relevant_docs)
# 3. 生成阶段
prompt = self._build_prompt(query, context)
response = self.llm.predict(prompt)
result = {
"query": query,
"response": response,
"context_used": context
}
if include_sources:
result["sources"] = [
{
"content": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content,
"metadata": doc.metadata
}
for doc in relevant_docs
]
return result
def _build_context(self, documents: List[Document]) -> str:
"""构建上下文"""
context_parts = []
for i, doc in enumerate(documents):
source_info = doc.metadata.get("source_file", "unknown")
context_parts.append(f"[文档 {i+1} - 来源: {source_info}]\n{doc.page_content}")
return "\n\n".join(context_parts)
def _build_prompt(self, query: str, context: str) -> str:
"""构建提示词"""
return f"""基于以下提供的上下文信息,请回答问题。如果上下文中的信息不足以回答问题,请如实说明。
上下文信息:
{context}
问题:{query}
请基于上下文信息提供准确、完整的回答:"""

1.4 测试基础功能

def test_basic_rag():
"""测试基础RAG功能"""
rag = BasicRAGWorkflow()
# 初始化系统(假设文档在 ./documents 目录)
rag.initialize_system("./documents")
# 测试查询
test_queries = [
"什么是机器学习?",
"监督学习和无监督学习有什么区别?",
"请总结文档中提到的深度学习应用场景"
]
for query in test_queries:
print(f"\n问题: {query}")
result = rag.query(query)
print(f"回答: {result['response']}")
print(f"参考文档: {len(result.get('sources', []))} 个")

阶段2:自适应工作流增强

2.1 查询复杂度分类

from enum import Enum
from typing import Optional
class QueryComplexity(Enum):
STRAIGHTFORWARD = "straightforward"  # 可直接回答
SIMPLE = "simple"                    # 需要单步检索
COMPLEX = "complex"                  # 需要多步检索
class QueryClassifier:
"""查询分类器"""
def __init__(self, llm):
self.llm = llm
def classify(self, query: str) -> QueryComplexity:
"""分类查询复杂度"""
classification_prompt = f"""
请分析以下查询的复杂度,并返回对应的分类:
分类标准:
- "straightforward": 可以直接基于通用知识回答的简单事实性问题
- "simple": 需要检索特定文档但只需单步处理的问题
- "complex": 需要多步推理、多次检索或综合分析的问题
查询:{query}
请只返回以下三个选项之一:straightforward, simple, complex
"""
response = self.llm.predict(classification_prompt).strip().lower()
if "straightforward" in response:
return QueryComplexity.STRAIGHTFORWARD
elif "simple" in response:
return QueryComplexity.SIMPLE
else:
return QueryComplexity.COMPLEX

2.2 自适应RAG工作流

import time
from datetime import datetime
class AdaptiveRAGWorkflow:
"""自适应RAG工作流"""
def __init__(self, basic_rag: BasicRAGWorkflow):
self.basic_rag = basic_rag
self.classifier = QueryClassifier(basic_rag.llm)
self.metrics = {
"total_queries": 0,
"complexity_distribution": {c: 0 for c in QueryComplexity},
"avg_processing_time": 0.0
}
def execute_query(self, query: str) -> Dict[str, Any]:
"""执行自适应查询"""
start_time = time.time()
self.metrics["total_queries"] += 1
# 1. 复杂度分类
complexity = self.classifier.classify(query)
self.metrics["complexity_distribution"][complexity] += 1
# 2. 根据复杂度选择工作流
if complexity == QueryComplexity.STRAIGHTFORWARD:
result = self._direct_generation_workflow(query)
elif complexity == QueryComplexity.SIMPLE:
result = self._single_step_workflow(query)
else:
result = self._multi_step_workflow(query)
# 3. 记录指标
processing_time = time.time() - start_time
self.metrics["avg_processing_time"] = (
(self.metrics["avg_processing_time"] * (self.metrics["total_queries"] - 1) + processing_time)
/ self.metrics["total_queries"]
)
result.update({
"complexity": complexity.value,
"processing_time_seconds": round(processing_time, 2),
"timestamp": datetime.now().isoformat()
})
return result
def _direct_generation_workflow(self, query: str) -> Dict[str, Any]:
"""直接生成工作流"""
response = self.basic_rag.llm.predict(query)
return {
"workflow_type": "direct_generation",
"query": query,
"response": response,
"sources": [],
"iterations": 1
}
def _single_step_workflow(self, query: str) -> Dict[str, Any]:
"""单步检索工作流"""
return self.basic_rag.query(query)
def _multi_step_workflow(self, query: str, max_iterations: int = 3) -> Dict[str, Any]:
"""多步检索工作流"""
iterations = []
current_query = query
for iteration in range(max_iterations):
# 执行检索
result = self.basic_rag.query(current_query)
iterations.append({
"iteration": iteration + 1,
"query": current_query,
"response": result["response"],
"sources_count": len(result.get("sources", []))
})
# 检查是否需要进一步优化
if iteration < max_iterations - 1:
needs_refinement = self._check_refinement_need(query, result["response"])
if not needs_refinement:
break
# 优化查询
current_query = self._optimize_query(query, result["response"], iterations)
else:
# 最后一次迭代,进行最终优化
current_query = self._final_optimization(query, iterations)
final_result = self.basic_rag.query(current_query)
iterations.append({
"iteration": iteration + 2,
"query": current_query,
"response": final_result["response"],
"sources_count": len(final_result.get("sources", []))
})
return {
"workflow_type": "multi_step_retrieval",
"original_query": query,
"final_response": iterations[-1]["response"],
"iterations": iterations,
"total_iterations": len(iterations)
}
def _check_refinement_need(self, original_query: str, current_response: str) -> bool:
"""检查是否需要进一步优化"""
refinement_prompt = f"""
原始问题:{original_query}
当前回答:{current_response}
请评估当前回答是否充分解决了原始问题。考虑以下方面:
1. 回答是否完整覆盖了问题的所有方面
2. 是否存在模糊或不准确的地方
3. 是否需要更多细节或具体示例
如果回答已经充分,请回复"NO"。如果需要改进,请回复"YES"并简要说明原因。
"""
response = self.basic_rag.llm.predict(refinement_prompt)
return "YES" in response.upper()
def _optimize_query(self, original_query: str, current_response: str, iterations: List[Dict]) -> str:
"""优化查询"""
iteration_summary = "\n".join([
f"第{iter['iteration']}轮: {iter['query']} -> 找到{iter['sources_count']}个相关文档"
for iter in iterations
])
optimization_prompt = f"""
基于之前的检索历史,优化查询以获取更精确的结果:
原始问题:{original_query}
当前回答:{current_response}
检索历史:
{iteration_summary}
请生成一个更精确的查询,重点关注:
1. 更具体的关键词
2. 更明确的搜索方向
3. 之前忽略的方面
新查询:
"""
return self.basic_rag.llm.predict(optimization_prompt)
def _final_optimization(self, original_query: str, iterations: List[Dict]) -> str:
"""最终优化"""
optimization_prompt = f"""
基于多轮检索的经验,为以下问题生成最精确的最终查询:
原始问题:{original_query}
检索轮次:{len(iterations)} 轮
请综合考虑所有检索历史,生成最能获取全面准确信息的查询:
"""
return self.basic_rag.llm.predict(optimization_prompt)
def get_metrics(self) -> Dict[str, Any]:
"""获取系统指标"""
return self.metrics.copy()

阶段3:多代理系统实现

3.1 基础代理框架

from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
class BaseAgent(ABC):
"""基础代理抽象类"""
def __init__(self, name: str, description: str, llm):
self.name = name
self.description = description
self.llm = llm
self.performance_metrics = {
"requests_processed": 0,
"success_rate": 0,
"avg_processing_time": 0.0
}
@abstractmethod
def process(self, query: str, context: Dict = None) -> Dict[str, Any]:
"""处理查询的抽象方法"""
pass
def update_metrics(self, success: bool, processing_time: float):
"""更新性能指标"""
self.performance_metrics["requests_processed"] += 1
if success:
current_success = self.performance_metrics["success_rate"] * (self.performance_metrics["requests_processed"] - 1)
self.performance_metrics["success_rate"] = (current_success + 1) / self.performance_metrics["requests_processed"]
current_avg = self.performance_metrics["avg_processing_time"] * (self.performance_metrics["requests_processed"] - 1)
self.performance_metrics["avg_processing_time"] = (current_avg + processing_time) / self.performance_metrics["requests_processed"]
class SemanticSearchAgent(BaseAgent):
"""语义搜索代理"""
def __init__(self, llm, vectorstore_manager: VectorStoreManager):
super().__init__(
name="SemanticSearchAgent",
description="处理文档语义搜索和相关性检索",
llm=llm
)
self.vectorstore_manager = vectorstore_manager
def process(self, query: str, context: Dict = None) -> Dict[str, Any]:
start_time = time.time()
try:
# 执行语义搜索
docs = self.vectorstore_manager.similarity_search(query, k=5)
# 评估相关性
relevance_scores = self._evaluate_relevance(query, docs)
# 过滤低相关性文档
filtered_docs = [
doc for doc, score in zip(docs, relevance_scores)
if score >= 0.6  # 相关性阈值
]
result = {
"agent": self.name,
"query": query,
"documents_found": len(filtered_docs),
"documents": [
{
"content": doc.page_content[:300] + "..." if len(doc.page_content) > 300 else doc.page_content,
"metadata": doc.metadata,
"relevance_score": score
}
for doc, score in zip(filtered_docs, relevance_scores[:len(filtered_docs)])
],
"status": "success"
}
self.update_metrics(True, time.time() - start_time)
return result
except Exception as e:
self.update_metrics(False, time.time() - start_time)
return {
"agent": self.name,
"query": query,
"error": str(e),
"status": "error"
}
def _evaluate_relevance(self, query: str, docs: List[Document]) -> List[float]:
"""评估文档相关性"""
scores = []
for doc in docs:
evaluation_prompt = f"""
查询:{query}
文档内容:{doc.page_content[:500]}
请评估文档与查询的相关性,返回0.0到1.0之间的分数:
"""
try:
score_text = self.llm.predict(evaluation_prompt)
score = float(score_text.strip())
scores.append(min(max(score, 0.0), 1.0))
except:
scores.append(0.5)  # 默认分数
return scores
class QueryAnalysisAgent(BaseAgent):
"""查询分析代理"""
def __init__(self, llm):
super().__init__(
name="QueryAnalysisAgent",
description="分析查询意图、复杂度和信息需求",
llm=llm
)
def process(self, query: str, context: Dict = None) -> Dict[str, Any]:
start_time = time.time()
try:
analysis_prompt = f"""
请分析以下查询,提供结构化分析:
查询:{query}
请返回JSON格式的分析结果,包含以下字段:
- "intent": 查询的主要意图(信息获取、比较分析、步骤指导等)
- "complexity": 复杂度(low, medium, high)
- "information_needs": 需要的信息类型列表
- "suggested_approach": 建议的处理方法
- "potential_challenges": 可能面临的挑战
"""
analysis_text = self.llm.predict(analysis_prompt)
analysis_result = json.loads(analysis_text)
result = {
"agent": self.name,
"query": query,
"analysis": analysis_result,
"status": "success"
}
self.update_metrics(True, time.time() - start_time)
return result
except Exception as e:
self.update_metrics(False, time.time() - start_time)
return {
"agent": self.name,
"query": query,
"error": str(e),
"status": "error"
}

3.2 协调代理与工作流管理

class CoordinatorAgent:
"""协调代理"""
def __init__(self, llm):
self.llm = llm
self.agents = {}
self.workflow_history = []
def register_agent(self, agent: BaseAgent):
"""注册代理"""
self.agents[agent.name] = agent
def plan_workflow(self, query: str, context: Dict = None) -> List[str]:
"""规划工作流"""
available_agents = list(self.agents.keys())
planning_prompt = f"""
基于以下查询和可用代理,规划处理工作流:
查询:{query}
可用代理:{', '.join(available_agents)}
代理描述:
{chr(10).join([f'- {agent.name}: {agent.description}' for agent in self.agents.values()])}
请返回JSON格式的工作流规划,包含字段:
- "agent_sequence": 代理执行序列
- "reasoning": 规划理由
"""
plan_text = self.llm.predict(planning_prompt)
plan = json.loads(plan_text)
return plan.get("agent_sequence", [])
def execute_workflow(self, query: str) -> Dict[str, Any]:
"""执行工作流"""
workflow_start = time.time()
workflow_id = f"workflow_{len(self.workflow_history) + 1}_{int(time.time())}"
# 1. 工作流规划
agent_sequence = self.plan_workflow(query)
# 2. 顺序执行代理
intermediate_results = {}
context = {"original_query": query}
for agent_name in agent_sequence:
if agent_name in self.agents:
agent = self.agents[agent_name]
print(f"执行代理: {agent_name}")
result = agent.process(query, context)
intermediate_results[agent_name] = result
context.update({f"{agent_name}_result": result})
if result.get("status") == "error":
print(f"代理 {agent_name} 执行失败: {result.get('error')}")
# 可以根据错误类型决定是否继续
# 3. 结果集成
integrated_response = self._integrate_results(query, intermediate_results)
# 4. 记录工作流历史
workflow_result = {
"workflow_id": workflow_id,
"timestamp": datetime.now().isoformat(),
"query": query,
"agent_sequence": agent_sequence,
"intermediate_results": intermediate_results,
"final_response": integrated_response,
"processing_time": time.time() - workflow_start
}
self.workflow_history.append(workflow_result)
return workflow_result
def _integrate_results(self, query: str, agent_results: Dict[str, Any]) -> str:
"""集成多代理结果"""
# 构建结果摘要
results_summary = []
for agent_name, result in agent_results.items():
if result.get("status") == "success":
if "documents" in result:
summary = f"{agent_name}: 找到 {result['documents_found']} 个相关文档"
elif "analysis" in result:
summary = f"{agent_name}: 分析完成 - {result['analysis'].get('intent', 'N/A')}"
else:
summary = f"{agent_name}: 处理完成"
results_summary.append(summary)
integration_prompt = f"""
基于以下多代理处理结果,生成综合回答:
原始问题:{query}
处理过程:
{chr(10).join(results_summary)}
详细信息:
{json.dumps(agent_results, indent=2, ensure_ascii=False)}
请生成一个全面、准确的回答,整合所有相关信息:
"""
return self.llm.predict(integration_prompt)
def get_workflow_history(self, limit: int = 10) -> List[Dict]:
"""获取工作流历史"""
return self.workflow_history[-limit:]
def get_system_metrics(self) -> Dict[str, Any]:
"""获取系统指标"""
metrics = {}
for agent_name, agent in self.agents.items():
metrics[agent_name] = agent.performance_metrics
total_workflows = len(self.workflow_history)
successful_workflows = len([w for w in self.workflow_history if all(
r.get("status") == "success" for r in w["intermediate_results"].values()
)])
metrics["coordinator"] = {
"total_workflows": total_workflows,
"success_rate": successful_workflows / total_workflows if total_workflows > 0 else 0,
"avg_workflow_time": sum(w["processing_time"] for w in self.workflow_history) / total_workflows if total_workflows > 0 else 0
}
return metrics

3.3 多代理系统初始化

def setup_multi_agent_system(documents_directory: str) -> CoordinatorAgent:
"""设置多代理系统"""
from langchain.chat_models import ChatOpenAI
# 初始化LLM
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# 创建协调代理
coordinator = CoordinatorAgent(llm)
# 初始化基础组件
basic_rag = BasicRAGWorkflow()
basic_rag.initialize_system(documents_directory)
# 创建并注册各个代理
semantic_agent = SemanticSearchAgent(llm, basic_rag.vectorstore_manager)
query_analysis_agent = QueryAnalysisAgent(llm)
coordinator.register_agent(semantic_agent)
coordinator.register_agent(query_analysis_agent)
print("多代理系统初始化完成")
return coordinator

阶段4:生产部署

4.1 FastAPI服务

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import uuid
import asyncio
app = FastAPI(title="Agentic RAG API", version="1.0.0")
# 全局系统实例
rag_system = None
coordinator_system = None
class QueryRequest(BaseModel):
query: str
workflow_type: str = "adaptive"  # basic, adaptive, multi_agent
include_sources: bool = True
class QueryResponse(BaseModel):
response_id: str
query: str
response: str
workflow_type: str
processing_time: float
sources: Optional[List[Dict]] = None
metadata: Optional[Dict] = None
class SystemMetrics(BaseModel):
basic_rag: Dict
adaptive_rag: Dict
multi_agent: Dict
timestamp: str
@app.on_event("startup")
async def startup_event():
"""启动时初始化系统"""
global rag_system, coordinator_system
# 初始化基础RAG系统
rag_system = BasicRAGWorkflow()
rag_system.initialize_system("./documents")
# 初始化自适应RAG
adaptive_system = AdaptiveRAGWorkflow(rag_system)
# 初始化多代理系统
coordinator_system = setup_multi_agent_system("./documents")
print("Agentic RAG 系统启动完成")
@app.post("/query", response_model=QueryResponse)
async def handle_query(request: QueryRequest):
"""处理查询请求"""
start_time = time.time()
response_id = str(uuid.uuid4())
try:
if request.workflow_type == "basic":
result = rag_system.query(request.query, request.include_sources)
workflow_metadata = {"type": "basic"}
elif request.workflow_type == "adaptive":
result = rag_system.adaptive_system.execute_query(request.query)
workflow_metadata = {
"type": "adaptive",
"complexity": result.get("complexity"),
"iterations": result.get("total_iterations", 1)
}
elif request.workflow_type == "multi_agent":
workflow_result = coordinator_system.execute_workflow(request.query)
result = {
"response": workflow_result["final_response"],
"sources": self._extract_sources_from_workflow(workflow_result)
}
workflow_metadata = {
"type": "multi_agent",
"agent_sequence": workflow_result["agent_sequence"],
"workflow_id": workflow_result["workflow_id"]
}
else:
raise HTTPException(status_code=400, detail="不支持的workflow_type")
processing_time = time.time() - start_time
return QueryResponse(
response_id=response_id,
query=request.query,
response=result["response"],
workflow_type=request.workflow_type,
processing_time=round(processing_time, 2),
sources=result.get("sources"),
metadata=workflow_metadata
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"处理查询时出错: {str(e)}")
@app.get("/metrics")
async def get_system_metrics():
"""获取系统指标"""
basic_metrics = rag_system.metrics if hasattr(rag_system, 'metrics') else {}
adaptive_metrics = rag_system.adaptive_system.get_metrics() if hasattr(rag_system, 'adaptive_system') else {}
multi_agent_metrics = coordinator_system.get_system_metrics() if coordinator_system else {}
return SystemMetrics(
basic_rag=basic_metrics,
adaptive_rag=adaptive_metrics,
multi_agent=multi_agent_metrics,
timestamp=datetime.now().isoformat()
)
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"system": "Agentic RAG API"
}
def _extract_sources_from_workflow(workflow_result: Dict) -> List[Dict]:
"""从工作流结果中提取源文档"""
sources = []
for agent_name, result in workflow_result["intermediate_results"].items():
if "documents" in result:
sources.extend(result["documents"])
return sources

4.2 Docker部署配置

创建 Dockerfile:

FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \gcc \g++ \&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建文档目录
RUN mkdir -p documents
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

创建 docker-compose.yml:

version: '3.8'
services:
agentic-rag:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://user:password@db:5432/rag_db
- VECTOR_STORE_PATH=/app/vector_store
volumes:
- ./documents:/app/documents
- ./vector_store:/app/vector_store
depends_on:
- db
db:
image: postgres:13
environment:
- POSTGRES_DB=rag_db
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:

4.3 监控和日志配置

import logging
from logging.handlers import RotatingFileHandler
import sys
def setup_logging():
"""设置日志配置"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
RotatingFileHandler('agentic_rag.log', maxBytes=10485760, backupCount=5),
logging.StreamHandler(sys.stdout)
]
)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = {
"query_latency": [],
"error_rates": {},
"resource_usage": {}
}
def record_query(self, workflow_type: str, processing_time: float, success: bool):
"""记录查询性能"""
self.metrics["query_latency"].append({
"workflow_type": workflow_type,
"processing_time": processing_time,
"timestamp": datetime.now().isoformat()
})
# 保持最近1000个记录
if len(self.metrics["query_latency"]) > 1000:
self.metrics["query_latency"] = self.metrics["query_latency"][-1000:]
def get_performance_report(self) -> Dict[str, Any]:
"""生成性能报告"""
latencies = [q["processing_time"] for q in self.metrics["query_latency"]]
return {
"total_queries": len(latencies),
"avg_latency": sum(latencies) / len(latencies) if latencies else 0,
"p95_latency": sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
"max_latency": max(latencies) if latencies else 0
}

完整测试示例

def comprehensive_test():
"""综合测试"""
# 1. 初始化系统
coordinator = setup_multi_agent_system("./documents")
# 2. 测试查询
test_queries = [
"什么是人工智能?",
"比较机器学习和深度学习的优缺点",
"请详细说明神经网络的工作原理和应用场景"
]
for query in test_queries:
print(f"\n{'='*50}")
print(f"测试查询: {query}")
print(f"{'='*50}")
# 使用多代理系统
result = coordinator.execute_workflow(query)
print(f"工作流ID: {result['workflow_id']}")
print(f"代理序列: {result['agent_sequence']}")
print(f"处理时间: {result['processing_time']:.2f}秒")
print(f"最终回答: {result['final_response'][:200]}...")
# 显示各代理结果
for agent_name, agent_result in result['intermediate_results'].items():
status = agent_result.get('status', 'unknown')
print(f"  {agent_name}: {status}")
# 3. 显示系统指标
print(f"\n{'='*50}")
print("系统性能指标:")
print(f"{'='

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/963339.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年11月酵母抽提物品牌口碑榜:五强排名与关键指标对比

引言与现状 当食品企业准备升级清洁标签、降低味精用量或开发植物基新品时,酵母抽提物往往成为配方师首先想到的关键词。2025年第四季度,国内调味品产量同比再增7.3%,其中“减钠、增鲜、天然来源”配料的采购需求环…

2025年11月白酒曲厂家推荐榜:五家口碑对比与性能评测

酒厂换曲就像给发动机换芯片:一旦选错,整批酒的风味、出酒率、合规风险全部失控。2025年,白酒行业进入“品质优先”深度调整期,国家市场监管总局把“食品属性”写进生产许可审查细则,没有SC编号的发酵剂将被挡在门…

Nov 12

今天爆 0 了,没有什么好讲的,技不如人罢了。感觉自己需要放假。 T1 CF2117F 场上想出来了一半。 首先明显的,最多有两个叶子节点。 我于是尝试按照有几个叶子进行讨论。 如果只有一个叶子,说明这个图就是一条链。 …

2025年专业的nfc标签厂家最新推荐排行榜

2025年专业NFC标签厂家最新推荐排行榜:技术赋能与行业优选一、行业背景与市场趋势 近场通信(NFC)技术作为物联网的核心入口之一,正以年均18.7%的增速扩张(据ABI Research 2024数据)。2025年全球NFC标签市场规模…

2025年11月酵母抽提物品牌推荐:年度榜对比与鲜味性能评价

正在打开食品配料柜的研发工程师、正在更新清洁标签的采购经理、正在调试高汤酱料的连锁餐饮总厨,常常在同一时间发出相似的疑问:到底该选哪一家酵母抽提物?这个看似细分的配料,却是实现“减盐不减味、无味精提鲜”…

2025年评价高的轻奢鞋服亚克力展示架厂家推荐及选购参考榜

2025年评价高的轻奢鞋服亚克力展示架厂家推荐及选购参考榜行业背景与市场趋势随着轻奢鞋服市场的持续扩张,2024年全球轻奢市场规模已达到3200亿美元,预计2025年将保持8.2%的年增长率。作为品牌形象展示的重要载体,亚…

【日记】解决了一个人际冲突,但耳机怕是永远找不回来了(1344 字)

正文昨天又找了一个小时耳机。彻底放弃了。我真不知道丢到哪儿去了。昨天还解决了一起人际冲突,就结果来看感觉还蛮荒唐的。已经是第二次被健身房拦在门外了。人找人找人终于是把这个事情解决了。前台有她们的原则和坚…

手写汉字识别准确率

import os import random import numpy as np import matplotlib.pyplot as plt from PIL import Image, ImageDraw, ImageFont import torch import torch.nn as nn import torch.optim as optim from torch.utils.da…

2025年有实力自建房家用电梯厂家最新TOP排行榜

2025年有实力自建房家用电梯厂家最新TOP排行榜行业背景与市场趋势随着我国城镇化进程加快和居民生活水平提高,家用电梯市场迎来了快速发展期。据中国电梯行业协会最新数据显示,2024年中国家用电梯市场规模已突破150亿…

2025年诺士诚公司:全过程工程咨询资质全景深度解析

引言 本文聚焦“全过程工程咨询资质”这一核心维度,为读者提供一份可对照、可验证的客观参考,帮助判断诺士诚公司在复杂工程场景下的真实服务能力与合规边界。 背景与概况 北京诺士诚国际工程项目管理有限公司成立于…

2025年专业的亚克力制品行业内知名厂家排行榜

2025年专业的亚克力制品行业内知名厂家排行榜行业背景与市场趋势亚克力制品行业作为现代展示道具和商业空间设计的重要组成部分,近年来呈现出稳定增长态势。根据中国塑料加工工业协会最新发布的《2024-2025中国塑料制…

详细介绍:Zephyr RTOS在智能家居中的应用:智能插座开发

详细介绍:Zephyr RTOS在智能家居中的应用:智能插座开发2025-11-12 12:50 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important;…

2025年口碑好的免拉手封边条厂家推荐及选购参考榜

2025年口碑好的免拉手封边条厂家推荐及选购参考榜行业背景与市场趋势随着定制家具行业的蓬勃发展,封边条作为家具制造中不可或缺的辅料,其市场需求呈现稳定增长态势。据中国家具协会2024年发布的行业报告显示,2023年…

2025年11月北京昌平回龙观酒店推荐榜:会议婚宴与运动配套对比榜

回龙观位于北京昌平南部,地铁八号线与十三号线交汇,互联网企业密集,周末家庭客流与工作日商务客流叠加,酒店需求呈现“双高峰”特征:周一到周四以培训、会议、差旅为主,周五到周日则以婚宴、亲子、研学团队为主。…

2025年可靠的全屋定制厂家最新热销排行

2025年可靠的全屋定制厂家最新热销排行:数据化解析与采购指南 行业背景与市场趋势 近年来,全屋定制行业持续高速增长。据《中国家居建材行业白皮书(2025)》数据显示,2024年全屋定制市场规模已突破6000亿元,年…

2025年质量好的水泥毯厂家推荐及选购参考榜

2025年质量好的水泥毯厂家推荐及选购参考榜行业背景与市场趋势水泥毯作为一种新型复合材料,近年来在土木工程、水利建设、应急抢险等领域展现出显著优势。根据中国建筑材料联合会2024年发布的行业报告显示,全球水泥毯…

2025年评价高的短视频运营最新TOP厂家排名

2025年评价高的短视频运营最新TOP厂家排名行业背景与市场趋势随着5G技术的全面普及和移动互联网的深入发展,短视频行业在2025年迎来了新一轮爆发式增长。据《2025中国短视频行业发展白皮书》显示,中国短视频用户规模…

2025年专业的钢板预处理线优质厂家推荐榜单

2025年专业钢板预处理线优质厂家推荐榜单:技术与服务并重的行业标杆钢板预处理线行业背景与市场趋势钢板预处理是金属加工领域的关键环节,其质量直接影响后续涂装、焊接等工艺的最终效果。随着中国制造业向高质量发展…

2025年质量好的废气处理风机热门厂家推荐榜单

2025年质量好的废气处理风机热门厂家推荐榜单 行业背景与市场趋势 随着环保政策的日益严格和工业废气排放标准的不断提高,废气处理风机作为环保设备的核心部件,市场需求持续增长。据《2024年中国环保设备行业分析报…

2025年靠谱的抗病毒防火板高评价厂家推荐榜

2025年靠谱的抗病毒防火板高评价厂家推荐榜行业背景与市场趋势近年来,随着全球公共卫生意识的提升和建筑安全法规的日益严格,抗病毒防火板市场需求呈现爆发式增长。据《2024-2029年中国防火板材行业市场调研与投资前…