Day 92:【99天精通Python】终极项目 - AI 聊天机器人 (中) - 知识库与 RAG
前言
欢迎来到第92天!
在昨天的课程中,我们搭建了一个带记忆的流式聊天 API。但是,这个 AI 只能基于它自身的通用知识来回答问题。如果我们想让它成为一个特定领域的专家(比如,回答关于我们公司内部文档的问题),就需要给它挂载"外脑"。
这就是RAG (Retrieval-Augmented Generation)的用武之地。
我们将实现一个文件上传功能,让用户可以上传 PDF,然后 AI 就能基于这个 PDF 的内容来回答问题。
本节内容:
- RAG 流程与 LangChain 实现
- 文件上传 API
- 创建并持久化向量数据库
- 改造聊天 API,支持 RAG
- 实战测试:基于 PDF 的问答
一、文件处理与向量化
我们需要一个 API 来接收用户上传的文件,并将其处理成向量存起来。
1.1 文件上传接口 (app.py)
importosfromflaskimportrequest,jsonifyfromwerkzeug.utilsimportsecure_filenamefromlangchain_community.document_loadersimportPyPDFLoaderfromlangchain_text_splittersimportRecursiveCharacterTextSplitterfromlangchain_openaiimportOpenAIEmbeddingsfromlangchain_community.vectorstoresimportChroma UPLOAD_FOLDER='./uploads'DB_FOLDER='./vector_db'app.config['UPLOAD_FOLDER']=UPLOAD_FOLDERifnotos.path.exists(UPLOAD_FOLDER):os.makedirs(UPLOAD_FOLDER)ifnotos.path.exists(DB_FOLDER):os.makedirs(DB_FOLDER)# 初始化 Embedding 模型 (只需一次)embeddings=OpenAIEmbeddings(api_key="...",base_url="...")@app.route("/api/upload",methods=["POST"])defupload_file():if'file'notinrequest.files:returnjsonify({"error":"缺少文件"}),400file=request.files['file']session_id=request.form.get("session_id","default_session")iffile.filename=='':returnjsonify({"error":"未选择文件"}),400# 1. 保存文件filename=secure_filename(file.filename)filepath=os.path.join(app.config['UPLOAD_FOLDER'],filename)file.save(filepath)# 2. 加载和切分loader=PyPDFLoader(filepath)docs=loader.load()text_splitter=RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200)splits=text_splitter.split_documents(docs)# 3. 向量化并存入 ChromaDB# 每个 session 有自己独立的数据库db_path=os.path.join(DB_FOLDER,session_id)vectorstore=Chroma.from_documents(documents=splits,embedding=embeddings,persist_directory=db_path)returnjsonify({"message":f"文件 '{filename}' 上传并处理成功"}),200二、改造聊天逻辑:双模式切换
现在,我们的聊天 API 需要能够判断:当前会话是否有关联的知识库?
- 有:走 RAG 流程。
- 没有:走昨天的普通对话流程。
2.1 构建 RAG 链
我们需要一个更复杂的链,它能结合历史记录和知识库。
# app.py 中fromlangchain.chainsimportcreate_history_aware_retriever,create_retrieval_chainfromlangchain.chains.combine_documentsimportcreate_stuff_documents_chainfromlangchain_core.promptsimportChatPromptTemplate,MessagesPlaceholderdefget_rag_chain(session_id):"""根据 session_id 获取 RAG 链"""db_path=os.path.join(DB_FOLDER,session_id)ifnotos.path.exists(db_path):returnNone# 没有知识库# 加载向量数据库vectorstore=Chroma(persist_directory=db_path,embedding_function=embeddings)retriever=vectorstore.as_retriever()llm=ChatOpenAI(api_key="...",base_url="...",streaming=True)# 1. 重构问题 Prompt (让AI把历史和新问题整合成一个独立问题)contextualize_q_prompt=ChatPromptTemplate.from_messages([("system","根据聊天历史和最新问题,生成一个无需历史即可理解的独立问题。"),MessagesPlaceholder("chat_history"),("user","{input}"),])history_aware_retriever=create_history_aware_retriever(llm,retriever,contextualize_q_prompt)# 2. 问答 Promptqa_prompt=ChatPromptTemplate.from_messages([("system","你是一个问答助手。请根据下面的上下文回答问题:\n\n{context}"),MessagesPlaceholder("chat_history"),("user","{input}"),])question_answer_chain=create_stuff_documents_chain(llm,qa_prompt)# 3. 组装最终的 RAG 链rag_chain=create_retrieval_chain(history_aware_retriever,question_answer_chain)returnrag_chain2.2 升级流式生成器
# app.pyfromlangchain_core.messagesimportHumanMessage,AIMessagedefstream_generator(session_id,user_input):# 获取或创建记忆ifsession_idnotinconversations:conversations[session_id]=ConversationBufferMemory(return_messages=True)memory=conversations[session_id]# 尝试获取 RAG 链rag_chain=get_rag_chain(session_id)ifrag_chain:print(f"[{session_id}] 使用 RAG 模式")# RAG 链需要 chat_history 对象chat_history=memory.load_memory_variables({})['history']asyncdefrag_stream():# RAG 链的流式调用asyncforchunkinrag_chain.astream({"input":user_input,"chat_history":chat_history}):if"answer"inchunk:yieldf"data:{json.dumps({'token':chunk['answer']})}\\n\n"# 手动更新记忆memory.save_context({"input":user_input},{"output":"..."})# 这里可以后续优化returnrag_stream()else:print(f"[{session_id}] 使用普通对话模式")# 普通对话链 (同 Day 91)# ... (省略)三、测试 RAG 流程
启动 Flask:
python app.py。上传文件:
curl-X POST -F"file=@/path/to/your.pdf"-F"session_id=pdf_session"http://127.0.0.1:5001/api/upload基于文档提问:
curl-X POST -H"Content-Type: application/json"\-d'{"session_id": "pdf_session", "input": "总结一下这份文档的主要内容"}'\http://127.0.0.1:5001/api/chat你会看到 AI 的回答是基于你上传的 PDF 内容的。
普通聊天:
curl-X POST -H"Content-Type: application/json"\-d'{"session_id": "normal_session", "input": "你好"}'\http://127.0.0.1:5001/api/chat这次 AI 会进行普通回复。
四、常见问题
Q1:create_history_aware_retriever是干嘛的?
如果用户问"它怎么样?“,这个链会结合历史记录(比如上一句是"我正在看《三体》”),把问题重构成"《三体》怎么样?",然后再去知识库里检索。
Q2:Chroma.from_documents很慢怎么办?
Embedding 计算是耗时的。对于大文件,不应该在 HTTP 请求中同步处理。
生产环境方案:文件上传后,把"处理任务"丢给Celery (Day 64)后台 Worker 去慢慢执行。
Q3:如何支持 Word/Txt 文件?
LangChain 提供了各种DocumentLoader:
TextLoaderUnstructuredWordDocumentLoader(需pip install unstructured)
五、小结
关键要点:
- RAG是给 AI 扩展私有知识的"外挂"。
- 向量数据库是 RAG 的核心,负责存储和检索知识片段。
- 通过
session_id可以为每个用户或每个文档创建独立的知识库。
六、课后作业
- 前端进度条:
from_documents过程是耗时的,思考如何在前端展示文件处理的进度?(提示:后端可以返回一个task_id,前端轮询查询任务状态)。 - 多文档支持:允许一个
session_id关联多个上传的文档。 - 元数据过滤 (进阶):在切分文档时,给每个
split添加元数据(如metadata={'source': 'doc1.pdf'}),在检索时可以进行过滤(只在doc1.pdf中搜索)。
下节预告
Day 93:AI 聊天机器人 (下) - 前端界面与部署- 万事俱备,只欠东风!明天我们将编写 HTML 和 JavaScript,打造一个真正的 Web 聊天界面,并完成项目的最终部署。
系列导航:
- 上一篇:Day 91 - AI聊天机器人(上)
- 下一篇:Day 93 - AI聊天机器人(下)(待更新)