Agentic RAG实战指南:从基础实现到生产部署
本文基于综述论文《Agentic Retrieval-Augmented Generation: A Survey on Agentic RAG》,提供完整的代码实现和实操指南,帮助开发者快速构建智能检索增强生成系统。

系统架构概述
Agentic RAG通过引入自主代理和工作流管理,显著提升了传统RAG系统的适应性和推理能力。本文将从基础实现开始,逐步构建完整的Agentic RAG系统。
技术演进路径
- 阶段1:基础RAG实现 - 建立核心检索和生成能力
- 阶段2:工作流增强 - 添加自适应查询处理
- 阶段3:多代理系统 - 实现专业化分工协作
- 阶段4:生产部署 - 构建可扩展的服务架构
环境准备与依赖安装
1. 基础环境配置
# 创建Python环境
python -m venv agentic_rag_env
source agentic_rag_env/bin/activate # Linux/Mac
# agentic_rag_env\Scripts\activate # Windows
# 安装核心依赖
pip install langchain openai chromadb tiktoken
pip install fastapi uvicorn pydantic # API服务
pip install sqlalchemy psycopg2-binary # 数据库支持
pip install "unstructured[all]" # 文档解析
2. 环境变量配置
创建 .env 文件:
OPENAI_API_KEY=your_openai_api_key_here
DATABASE_URL=postgresql://user:password@localhost/rag_db
VECTOR_STORE_PATH=./vector_store
LOG_LEVEL=INFO
阶段1:基础RAG实现
1.1 数据预处理流水线
import os
from typing import List, Dict, Any
from langchain.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
class DataProcessor:
"""数据预处理流水线"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
def load_documents(self, directory: str, file_pattern: str = "**/*.pdf") -> List[Document]:
"""加载文档"""
loader = DirectoryLoader(
directory,
glob=file_pattern,
loader_cls=PyPDFLoader, # 可根据文件类型调整
show_progress=True
)
return loader.load()
def process_documents(self, documents: List[Document]) -> List[Document]:
"""处理文档:清洗、分割、增强元数据"""
processed_docs = []
for doc in documents:
# 基础文本清洗
cleaned_content = self.clean_text(doc.page_content)
# 文本分割
chunks = self.text_splitter.split_text(cleaned_content)
# 为每个块创建Document对象并增强元数据
for i, chunk in enumerate(chunks):
enhanced_metadata = doc.metadata.copy()
enhanced_metadata.update({
"chunk_id": i,
"total_chunks": len(chunks),
"source_file": os.path.basename(doc.metadata.get("source", "unknown")),
"chunk_size": len(chunk)
})
processed_docs.append(Document(page_content=chunk, metadata=enhanced_metadata))
return processed_docs
def clean_text(self, text: str) -> str:
"""文本清洗"""
import re
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符(可根据需要调整)
text = re.sub(r'[^\w\s.,!?;:()\-]', '', text)
return text.strip()
1.2 向量存储初始化
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
class VectorStoreManager:
"""向量存储管理器"""
def __init__(self, persist_directory: str = "./chroma_db"):
self.persist_directory = persist_directory
self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
self.vectorstore = None
self.retriever = None
def initialize_vectorstore(self, documents: List[Document]):
"""初始化向量存储"""
self.vectorstore = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
print(f"向量存储初始化完成,共处理 {len(documents)} 个文档块")
def setup_retriever(self, search_type: str = "similarity", k: int = 5):
"""设置检索器"""
if not self.vectorstore:
raise ValueError("向量存储未初始化")
base_retriever = self.vectorstore.as_retriever(
search_type=search_type,
search_kwargs={"k": k}
)
# 添加上下文压缩
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
compressor = LLMChainExtractor.from_llm(llm)
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
return self.retriever
def similarity_search(self, query: str, k: int = 5) -> List[Document]:
"""相似性搜索"""
if not self.vectorstore:
raise ValueError("向量存储未初始化")
return self.vectorstore.similarity_search(query, k=k)
1.3 基础RAG工作流
from langchain.chat_models import ChatOpenAI
from langchain.schema import BaseOutputParser
import json
class BasicRAGWorkflow:
"""基础RAG工作流实现"""
def __init__(self, embedding_model: str = "text-embedding-ada-002",
llm_model: str = "gpt-3.5-turbo"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.llm = ChatOpenAI(model=llm_model, temperature=0)
self.vectorstore_manager = VectorStoreManager()
self.retriever = None
def initialize_system(self, documents_directory: str):
"""初始化系统"""
# 1. 数据预处理
processor = DataProcessor()
raw_documents = processor.load_documents(documents_directory)
processed_documents = processor.process_documents(raw_documents)
# 2. 向量存储初始化
self.vectorstore_manager.initialize_vectorstore(processed_documents)
self.retriever = self.vectorstore_manager.setup_retriever()
print(f"系统初始化完成,共处理 {len(processed_documents)} 个文档块")
def query(self, query: str, include_sources: bool = True) -> Dict[str, Any]:
"""执行查询"""
if not self.retriever:
raise ValueError("系统未初始化,请先调用 initialize_system()")
# 1. 检索阶段
relevant_docs = self.retriever.get_relevant_documents(query)
# 2. 上下文构建
context = self._build_context(relevant_docs)
# 3. 生成阶段
prompt = self._build_prompt(query, context)
response = self.llm.predict(prompt)
result = {
"query": query,
"response": response,
"context_used": context
}
if include_sources:
result["sources"] = [
{
"content": doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content,
"metadata": doc.metadata
}
for doc in relevant_docs
]
return result
def _build_context(self, documents: List[Document]) -> str:
"""构建上下文"""
context_parts = []
for i, doc in enumerate(documents):
source_info = doc.metadata.get("source_file", "unknown")
context_parts.append(f"[文档 {i+1} - 来源: {source_info}]\n{doc.page_content}")
return "\n\n".join(context_parts)
def _build_prompt(self, query: str, context: str) -> str:
"""构建提示词"""
return f"""基于以下提供的上下文信息,请回答问题。如果上下文中的信息不足以回答问题,请如实说明。
上下文信息:
{context}
问题:{query}
请基于上下文信息提供准确、完整的回答:"""
1.4 测试基础功能
def test_basic_rag():
"""测试基础RAG功能"""
rag = BasicRAGWorkflow()
# 初始化系统(假设文档在 ./documents 目录)
rag.initialize_system("./documents")
# 测试查询
test_queries = [
"什么是机器学习?",
"监督学习和无监督学习有什么区别?",
"请总结文档中提到的深度学习应用场景"
]
for query in test_queries:
print(f"\n问题: {query}")
result = rag.query(query)
print(f"回答: {result['response']}")
print(f"参考文档: {len(result.get('sources', []))} 个")
阶段2:自适应工作流增强
2.1 查询复杂度分类
from enum import Enum
from typing import Optional
class QueryComplexity(Enum):
STRAIGHTFORWARD = "straightforward" # 可直接回答
SIMPLE = "simple" # 需要单步检索
COMPLEX = "complex" # 需要多步检索
class QueryClassifier:
"""查询分类器"""
def __init__(self, llm):
self.llm = llm
def classify(self, query: str) -> QueryComplexity:
"""分类查询复杂度"""
classification_prompt = f"""
请分析以下查询的复杂度,并返回对应的分类:
分类标准:
- "straightforward": 可以直接基于通用知识回答的简单事实性问题
- "simple": 需要检索特定文档但只需单步处理的问题
- "complex": 需要多步推理、多次检索或综合分析的问题
查询:{query}
请只返回以下三个选项之一:straightforward, simple, complex
"""
response = self.llm.predict(classification_prompt).strip().lower()
if "straightforward" in response:
return QueryComplexity.STRAIGHTFORWARD
elif "simple" in response:
return QueryComplexity.SIMPLE
else:
return QueryComplexity.COMPLEX
2.2 自适应RAG工作流
import time
from datetime import datetime
class AdaptiveRAGWorkflow:
"""自适应RAG工作流"""
def __init__(self, basic_rag: BasicRAGWorkflow):
self.basic_rag = basic_rag
self.classifier = QueryClassifier(basic_rag.llm)
self.metrics = {
"total_queries": 0,
"complexity_distribution": {c: 0 for c in QueryComplexity},
"avg_processing_time": 0.0
}
def execute_query(self, query: str) -> Dict[str, Any]:
"""执行自适应查询"""
start_time = time.time()
self.metrics["total_queries"] += 1
# 1. 复杂度分类
complexity = self.classifier.classify(query)
self.metrics["complexity_distribution"][complexity] += 1
# 2. 根据复杂度选择工作流
if complexity == QueryComplexity.STRAIGHTFORWARD:
result = self._direct_generation_workflow(query)
elif complexity == QueryComplexity.SIMPLE:
result = self._single_step_workflow(query)
else:
result = self._multi_step_workflow(query)
# 3. 记录指标
processing_time = time.time() - start_time
self.metrics["avg_processing_time"] = (
(self.metrics["avg_processing_time"] * (self.metrics["total_queries"] - 1) + processing_time)
/ self.metrics["total_queries"]
)
result.update({
"complexity": complexity.value,
"processing_time_seconds": round(processing_time, 2),
"timestamp": datetime.now().isoformat()
})
return result
def _direct_generation_workflow(self, query: str) -> Dict[str, Any]:
"""直接生成工作流"""
response = self.basic_rag.llm.predict(query)
return {
"workflow_type": "direct_generation",
"query": query,
"response": response,
"sources": [],
"iterations": 1
}
def _single_step_workflow(self, query: str) -> Dict[str, Any]:
"""单步检索工作流"""
return self.basic_rag.query(query)
def _multi_step_workflow(self, query: str, max_iterations: int = 3) -> Dict[str, Any]:
"""多步检索工作流"""
iterations = []
current_query = query
for iteration in range(max_iterations):
# 执行检索
result = self.basic_rag.query(current_query)
iterations.append({
"iteration": iteration + 1,
"query": current_query,
"response": result["response"],
"sources_count": len(result.get("sources", []))
})
# 检查是否需要进一步优化
if iteration < max_iterations - 1:
needs_refinement = self._check_refinement_need(query, result["response"])
if not needs_refinement:
break
# 优化查询
current_query = self._optimize_query(query, result["response"], iterations)
else:
# 最后一次迭代,进行最终优化
current_query = self._final_optimization(query, iterations)
final_result = self.basic_rag.query(current_query)
iterations.append({
"iteration": iteration + 2,
"query": current_query,
"response": final_result["response"],
"sources_count": len(final_result.get("sources", []))
})
return {
"workflow_type": "multi_step_retrieval",
"original_query": query,
"final_response": iterations[-1]["response"],
"iterations": iterations,
"total_iterations": len(iterations)
}
def _check_refinement_need(self, original_query: str, current_response: str) -> bool:
"""检查是否需要进一步优化"""
refinement_prompt = f"""
原始问题:{original_query}
当前回答:{current_response}
请评估当前回答是否充分解决了原始问题。考虑以下方面:
1. 回答是否完整覆盖了问题的所有方面
2. 是否存在模糊或不准确的地方
3. 是否需要更多细节或具体示例
如果回答已经充分,请回复"NO"。如果需要改进,请回复"YES"并简要说明原因。
"""
response = self.basic_rag.llm.predict(refinement_prompt)
return "YES" in response.upper()
def _optimize_query(self, original_query: str, current_response: str, iterations: List[Dict]) -> str:
"""优化查询"""
iteration_summary = "\n".join([
f"第{iter['iteration']}轮: {iter['query']} -> 找到{iter['sources_count']}个相关文档"
for iter in iterations
])
optimization_prompt = f"""
基于之前的检索历史,优化查询以获取更精确的结果:
原始问题:{original_query}
当前回答:{current_response}
检索历史:
{iteration_summary}
请生成一个更精确的查询,重点关注:
1. 更具体的关键词
2. 更明确的搜索方向
3. 之前忽略的方面
新查询:
"""
return self.basic_rag.llm.predict(optimization_prompt)
def _final_optimization(self, original_query: str, iterations: List[Dict]) -> str:
"""最终优化"""
optimization_prompt = f"""
基于多轮检索的经验,为以下问题生成最精确的最终查询:
原始问题:{original_query}
检索轮次:{len(iterations)} 轮
请综合考虑所有检索历史,生成最能获取全面准确信息的查询:
"""
return self.basic_rag.llm.predict(optimization_prompt)
def get_metrics(self) -> Dict[str, Any]:
"""获取系统指标"""
return self.metrics.copy()
阶段3:多代理系统实现
3.1 基础代理框架
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
class BaseAgent(ABC):
"""基础代理抽象类"""
def __init__(self, name: str, description: str, llm):
self.name = name
self.description = description
self.llm = llm
self.performance_metrics = {
"requests_processed": 0,
"success_rate": 0,
"avg_processing_time": 0.0
}
@abstractmethod
def process(self, query: str, context: Dict = None) -> Dict[str, Any]:
"""处理查询的抽象方法"""
pass
def update_metrics(self, success: bool, processing_time: float):
"""更新性能指标"""
self.performance_metrics["requests_processed"] += 1
if success:
current_success = self.performance_metrics["success_rate"] * (self.performance_metrics["requests_processed"] - 1)
self.performance_metrics["success_rate"] = (current_success + 1) / self.performance_metrics["requests_processed"]
current_avg = self.performance_metrics["avg_processing_time"] * (self.performance_metrics["requests_processed"] - 1)
self.performance_metrics["avg_processing_time"] = (current_avg + processing_time) / self.performance_metrics["requests_processed"]
class SemanticSearchAgent(BaseAgent):
"""语义搜索代理"""
def __init__(self, llm, vectorstore_manager: VectorStoreManager):
super().__init__(
name="SemanticSearchAgent",
description="处理文档语义搜索和相关性检索",
llm=llm
)
self.vectorstore_manager = vectorstore_manager
def process(self, query: str, context: Dict = None) -> Dict[str, Any]:
start_time = time.time()
try:
# 执行语义搜索
docs = self.vectorstore_manager.similarity_search(query, k=5)
# 评估相关性
relevance_scores = self._evaluate_relevance(query, docs)
# 过滤低相关性文档
filtered_docs = [
doc for doc, score in zip(docs, relevance_scores)
if score >= 0.6 # 相关性阈值
]
result = {
"agent": self.name,
"query": query,
"documents_found": len(filtered_docs),
"documents": [
{
"content": doc.page_content[:300] + "..." if len(doc.page_content) > 300 else doc.page_content,
"metadata": doc.metadata,
"relevance_score": score
}
for doc, score in zip(filtered_docs, relevance_scores[:len(filtered_docs)])
],
"status": "success"
}
self.update_metrics(True, time.time() - start_time)
return result
except Exception as e:
self.update_metrics(False, time.time() - start_time)
return {
"agent": self.name,
"query": query,
"error": str(e),
"status": "error"
}
def _evaluate_relevance(self, query: str, docs: List[Document]) -> List[float]:
"""评估文档相关性"""
scores = []
for doc in docs:
evaluation_prompt = f"""
查询:{query}
文档内容:{doc.page_content[:500]}
请评估文档与查询的相关性,返回0.0到1.0之间的分数:
"""
try:
score_text = self.llm.predict(evaluation_prompt)
score = float(score_text.strip())
scores.append(min(max(score, 0.0), 1.0))
except:
scores.append(0.5) # 默认分数
return scores
class QueryAnalysisAgent(BaseAgent):
"""查询分析代理"""
def __init__(self, llm):
super().__init__(
name="QueryAnalysisAgent",
description="分析查询意图、复杂度和信息需求",
llm=llm
)
def process(self, query: str, context: Dict = None) -> Dict[str, Any]:
start_time = time.time()
try:
analysis_prompt = f"""
请分析以下查询,提供结构化分析:
查询:{query}
请返回JSON格式的分析结果,包含以下字段:
- "intent": 查询的主要意图(信息获取、比较分析、步骤指导等)
- "complexity": 复杂度(low, medium, high)
- "information_needs": 需要的信息类型列表
- "suggested_approach": 建议的处理方法
- "potential_challenges": 可能面临的挑战
"""
analysis_text = self.llm.predict(analysis_prompt)
analysis_result = json.loads(analysis_text)
result = {
"agent": self.name,
"query": query,
"analysis": analysis_result,
"status": "success"
}
self.update_metrics(True, time.time() - start_time)
return result
except Exception as e:
self.update_metrics(False, time.time() - start_time)
return {
"agent": self.name,
"query": query,
"error": str(e),
"status": "error"
}
3.2 协调代理与工作流管理
class CoordinatorAgent:
"""协调代理"""
def __init__(self, llm):
self.llm = llm
self.agents = {}
self.workflow_history = []
def register_agent(self, agent: BaseAgent):
"""注册代理"""
self.agents[agent.name] = agent
def plan_workflow(self, query: str, context: Dict = None) -> List[str]:
"""规划工作流"""
available_agents = list(self.agents.keys())
planning_prompt = f"""
基于以下查询和可用代理,规划处理工作流:
查询:{query}
可用代理:{', '.join(available_agents)}
代理描述:
{chr(10).join([f'- {agent.name}: {agent.description}' for agent in self.agents.values()])}
请返回JSON格式的工作流规划,包含字段:
- "agent_sequence": 代理执行序列
- "reasoning": 规划理由
"""
plan_text = self.llm.predict(planning_prompt)
plan = json.loads(plan_text)
return plan.get("agent_sequence", [])
def execute_workflow(self, query: str) -> Dict[str, Any]:
"""执行工作流"""
workflow_start = time.time()
workflow_id = f"workflow_{len(self.workflow_history) + 1}_{int(time.time())}"
# 1. 工作流规划
agent_sequence = self.plan_workflow(query)
# 2. 顺序执行代理
intermediate_results = {}
context = {"original_query": query}
for agent_name in agent_sequence:
if agent_name in self.agents:
agent = self.agents[agent_name]
print(f"执行代理: {agent_name}")
result = agent.process(query, context)
intermediate_results[agent_name] = result
context.update({f"{agent_name}_result": result})
if result.get("status") == "error":
print(f"代理 {agent_name} 执行失败: {result.get('error')}")
# 可以根据错误类型决定是否继续
# 3. 结果集成
integrated_response = self._integrate_results(query, intermediate_results)
# 4. 记录工作流历史
workflow_result = {
"workflow_id": workflow_id,
"timestamp": datetime.now().isoformat(),
"query": query,
"agent_sequence": agent_sequence,
"intermediate_results": intermediate_results,
"final_response": integrated_response,
"processing_time": time.time() - workflow_start
}
self.workflow_history.append(workflow_result)
return workflow_result
def _integrate_results(self, query: str, agent_results: Dict[str, Any]) -> str:
"""集成多代理结果"""
# 构建结果摘要
results_summary = []
for agent_name, result in agent_results.items():
if result.get("status") == "success":
if "documents" in result:
summary = f"{agent_name}: 找到 {result['documents_found']} 个相关文档"
elif "analysis" in result:
summary = f"{agent_name}: 分析完成 - {result['analysis'].get('intent', 'N/A')}"
else:
summary = f"{agent_name}: 处理完成"
results_summary.append(summary)
integration_prompt = f"""
基于以下多代理处理结果,生成综合回答:
原始问题:{query}
处理过程:
{chr(10).join(results_summary)}
详细信息:
{json.dumps(agent_results, indent=2, ensure_ascii=False)}
请生成一个全面、准确的回答,整合所有相关信息:
"""
return self.llm.predict(integration_prompt)
def get_workflow_history(self, limit: int = 10) -> List[Dict]:
"""获取工作流历史"""
return self.workflow_history[-limit:]
def get_system_metrics(self) -> Dict[str, Any]:
"""获取系统指标"""
metrics = {}
for agent_name, agent in self.agents.items():
metrics[agent_name] = agent.performance_metrics
total_workflows = len(self.workflow_history)
successful_workflows = len([w for w in self.workflow_history if all(
r.get("status") == "success" for r in w["intermediate_results"].values()
)])
metrics["coordinator"] = {
"total_workflows": total_workflows,
"success_rate": successful_workflows / total_workflows if total_workflows > 0 else 0,
"avg_workflow_time": sum(w["processing_time"] for w in self.workflow_history) / total_workflows if total_workflows > 0 else 0
}
return metrics
3.3 多代理系统初始化
def setup_multi_agent_system(documents_directory: str) -> CoordinatorAgent:
"""设置多代理系统"""
from langchain.chat_models import ChatOpenAI
# 初始化LLM
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# 创建协调代理
coordinator = CoordinatorAgent(llm)
# 初始化基础组件
basic_rag = BasicRAGWorkflow()
basic_rag.initialize_system(documents_directory)
# 创建并注册各个代理
semantic_agent = SemanticSearchAgent(llm, basic_rag.vectorstore_manager)
query_analysis_agent = QueryAnalysisAgent(llm)
coordinator.register_agent(semantic_agent)
coordinator.register_agent(query_analysis_agent)
print("多代理系统初始化完成")
return coordinator
阶段4:生产部署
4.1 FastAPI服务
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import uuid
import asyncio
app = FastAPI(title="Agentic RAG API", version="1.0.0")
# 全局系统实例
rag_system = None
coordinator_system = None
class QueryRequest(BaseModel):
query: str
workflow_type: str = "adaptive" # basic, adaptive, multi_agent
include_sources: bool = True
class QueryResponse(BaseModel):
response_id: str
query: str
response: str
workflow_type: str
processing_time: float
sources: Optional[List[Dict]] = None
metadata: Optional[Dict] = None
class SystemMetrics(BaseModel):
basic_rag: Dict
adaptive_rag: Dict
multi_agent: Dict
timestamp: str
@app.on_event("startup")
async def startup_event():
"""启动时初始化系统"""
global rag_system, coordinator_system
# 初始化基础RAG系统
rag_system = BasicRAGWorkflow()
rag_system.initialize_system("./documents")
# 初始化自适应RAG
adaptive_system = AdaptiveRAGWorkflow(rag_system)
# 初始化多代理系统
coordinator_system = setup_multi_agent_system("./documents")
print("Agentic RAG 系统启动完成")
@app.post("/query", response_model=QueryResponse)
async def handle_query(request: QueryRequest):
"""处理查询请求"""
start_time = time.time()
response_id = str(uuid.uuid4())
try:
if request.workflow_type == "basic":
result = rag_system.query(request.query, request.include_sources)
workflow_metadata = {"type": "basic"}
elif request.workflow_type == "adaptive":
result = rag_system.adaptive_system.execute_query(request.query)
workflow_metadata = {
"type": "adaptive",
"complexity": result.get("complexity"),
"iterations": result.get("total_iterations", 1)
}
elif request.workflow_type == "multi_agent":
workflow_result = coordinator_system.execute_workflow(request.query)
result = {
"response": workflow_result["final_response"],
"sources": self._extract_sources_from_workflow(workflow_result)
}
workflow_metadata = {
"type": "multi_agent",
"agent_sequence": workflow_result["agent_sequence"],
"workflow_id": workflow_result["workflow_id"]
}
else:
raise HTTPException(status_code=400, detail="不支持的workflow_type")
processing_time = time.time() - start_time
return QueryResponse(
response_id=response_id,
query=request.query,
response=result["response"],
workflow_type=request.workflow_type,
processing_time=round(processing_time, 2),
sources=result.get("sources"),
metadata=workflow_metadata
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"处理查询时出错: {str(e)}")
@app.get("/metrics")
async def get_system_metrics():
"""获取系统指标"""
basic_metrics = rag_system.metrics if hasattr(rag_system, 'metrics') else {}
adaptive_metrics = rag_system.adaptive_system.get_metrics() if hasattr(rag_system, 'adaptive_system') else {}
multi_agent_metrics = coordinator_system.get_system_metrics() if coordinator_system else {}
return SystemMetrics(
basic_rag=basic_metrics,
adaptive_rag=adaptive_metrics,
multi_agent=multi_agent_metrics,
timestamp=datetime.now().isoformat()
)
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"system": "Agentic RAG API"
}
def _extract_sources_from_workflow(workflow_result: Dict) -> List[Dict]:
"""从工作流结果中提取源文档"""
sources = []
for agent_name, result in workflow_result["intermediate_results"].items():
if "documents" in result:
sources.extend(result["documents"])
return sources
4.2 Docker部署配置
创建 Dockerfile:
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \gcc \g++ \&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建文档目录
RUN mkdir -p documents
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
创建 docker-compose.yml:
version: '3.8'
services:
agentic-rag:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DATABASE_URL=postgresql://user:password@db:5432/rag_db
- VECTOR_STORE_PATH=/app/vector_store
volumes:
- ./documents:/app/documents
- ./vector_store:/app/vector_store
depends_on:
- db
db:
image: postgres:13
environment:
- POSTGRES_DB=rag_db
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
4.3 监控和日志配置
import logging
from logging.handlers import RotatingFileHandler
import sys
def setup_logging():
"""设置日志配置"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
RotatingFileHandler('agentic_rag.log', maxBytes=10485760, backupCount=5),
logging.StreamHandler(sys.stdout)
]
)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = {
"query_latency": [],
"error_rates": {},
"resource_usage": {}
}
def record_query(self, workflow_type: str, processing_time: float, success: bool):
"""记录查询性能"""
self.metrics["query_latency"].append({
"workflow_type": workflow_type,
"processing_time": processing_time,
"timestamp": datetime.now().isoformat()
})
# 保持最近1000个记录
if len(self.metrics["query_latency"]) > 1000:
self.metrics["query_latency"] = self.metrics["query_latency"][-1000:]
def get_performance_report(self) -> Dict[str, Any]:
"""生成性能报告"""
latencies = [q["processing_time"] for q in self.metrics["query_latency"]]
return {
"total_queries": len(latencies),
"avg_latency": sum(latencies) / len(latencies) if latencies else 0,
"p95_latency": sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
"max_latency": max(latencies) if latencies else 0
}
完整测试示例
def comprehensive_test():
"""综合测试"""
# 1. 初始化系统
coordinator = setup_multi_agent_system("./documents")
# 2. 测试查询
test_queries = [
"什么是人工智能?",
"比较机器学习和深度学习的优缺点",
"请详细说明神经网络的工作原理和应用场景"
]
for query in test_queries:
print(f"\n{'='*50}")
print(f"测试查询: {query}")
print(f"{'='*50}")
# 使用多代理系统
result = coordinator.execute_workflow(query)
print(f"工作流ID: {result['workflow_id']}")
print(f"代理序列: {result['agent_sequence']}")
print(f"处理时间: {result['processing_time']:.2f}秒")
print(f"最终回答: {result['final_response'][:200]}...")
# 显示各代理结果
for agent_name, agent_result in result['intermediate_results'].items():
status = agent_result.get('status', 'unknown')
print(f" {agent_name}: {status}")
# 3. 显示系统指标
print(f"\n{'='*50}")
print("系统性能指标:")
print(f"{'='