arXiv论文管理RAG系统:从零构建生产级AI研究助手

news/2025/11/9 21:47:30/文章来源:https://www.cnblogs.com/qife122/p/19205350

arXiv论文管理RAG系统

一个完整的生产级检索增强生成(RAG)系统,专门用于管理和查询arXiv学术论文。该系统能够自动获取最新的AI研究论文,处理PDF内容,并提供智能问答功能。

功能特性

  • 自动化论文收集: 每日自动从arXiv API获取计算机科学AI领域的最新论文
  • 智能PDF解析: 使用Docling技术解析科学论文PDF,提取结构化内容
  • 混合搜索系统: 结合BM25关键词搜索和向量语义搜索的混合检索
  • 本地LLM集成: 集成Ollama支持本地大语言模型推理
  • 生产级监控: 集成Langfuse进行完整的RAG管道追踪和分析
  • 高性能缓存: Redis缓存实现150-400倍响应速度提升
  • Web界面: Gradio提供的交互式Web界面,支持实时流式响应
  • 工作流编排: Apache Airflow实现自动化数据处理管道

安装指南

系统要求

  • Python 3.12+
  • Docker & Docker Compose
  • 至少8GB可用内存

快速安装

  1. 克隆项目仓库:
git clone <repository-url>
cd arxiv-paper-curator
  1. 设置环境变量:
cp .env.example .env
  1. 启动所有服务:
docker compose up --build -d
  1. 验证服务状态:
curl http://localhost:8000/api/v1/health

依赖服务

系统包含以下核心服务:

  • FastAPI (端口8000): REST API服务
  • PostgreSQL (端口5432): 论文元数据存储
  • OpenSearch (端口9200): 混合搜索引擎
  • Apache Airflow (端口8080): 工作流编排
  • Ollama (端口11434): 本地LLM服务
  • Redis (端口6379): 缓存服务

使用说明

启动Web界面

uv run python gradio_launcher.py

访问 http://localhost:7861 使用交互式界面。

API使用示例

基础问答

import requestsresponse = requests.post("http://localhost:8000/api/v1/ask",json={"query": "什么是机器学习中的注意力机制?","top_k": 3,"use_hybrid": True,"model": "llama3.2:1b"}
)
print(response.json())

流式响应

import requestsresponse = requests.post("http://localhost:8000/api/v1/stream",json={"query": "解释Transformer架构","top_k": 2},stream=True
)for line in response.iter_lines():if line:print(line.decode('utf-8'))

混合搜索

response = requests.post("http://localhost:8000/api/v1/hybrid-search/",json={"query": "神经网络深度学习","size": 10,"categories": ["cs.AI", "cs.LG"],"use_hybrid": True}
)

核心代码

1. 混合搜索服务

class HybridIndexingService:"""服务用于索引论文的分块和嵌入,实现混合搜索"""def __init__(self, chunker: TextChunker, embeddings_client: JinaEmbeddingsClient, opensearch_client: OpenSearchClient):self.chunker = chunkerself.embeddings_client = embeddings_clientself.opensearch_client = opensearch_clientlogger.info("混合索引服务初始化完成")async def index_paper(self, paper_data: Dict) -> Dict[str, int]:"""索引单篇论文,包括分块和嵌入生成"""arxiv_id = paper_data.get("arxiv_id")paper_id = str(paper_data.get("id", ""))if not arxiv_id:logger.error("论文缺少arxiv_id")return {"chunks_created": 0, "chunks_indexed": 0, "embeddings_generated": 0, "errors": 1}try:# 步骤1: 使用混合分段方法对论文进行分块chunks = self.chunker.chunk_paper(title=paper_data.get("title", ""),abstract=paper_data.get("abstract", ""),full_text=paper_data.get("raw_text", paper_data.get("full_text", "")),arxiv_id=arxiv_id,paper_id=paper_id,sections=paper_data.get("sections"))if not chunks:logger.warning(f"论文 {arxiv_id} 无有效分块")return {"chunks_created": 0, "chunks_indexed": 0, "embeddings_generated": 0}# 步骤2: 为分块生成嵌入chunk_texts = [chunk.text for chunk in chunks]embeddings = await self.embeddings_client.embed_passages(chunk_texts)# 步骤3: 准备OpenSearch文档documents = []for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):doc = {"chunk_id": f"{arxiv_id}_chunk_{i}","arxiv_id": arxiv_id,"paper_id": paper_id,"chunk_index": i,"chunk_text": chunk.text,"chunk_word_count": len(chunk.text.split()),"embedding": embedding,"title": paper_data.get("title", ""),"authors": paper_data.get("authors", []),"abstract": paper_data.get("abstract", ""),"categories": paper_data.get("categories", []),"published_date": paper_data.get("published_date"),"section_title": chunk.metadata.section_title,"embedding_model": "jina-embeddings-v3"}documents.append(doc)# 步骤4: 批量索引到OpenSearchsuccess_count = await self.opensearch_client.bulk_index_documents(documents)logger.info(f"成功索引论文 {arxiv_id}: {success_count}/{len(documents)} 分块")return {"chunks_created": len(chunks),"chunks_indexed": success_count,"embeddings_generated": len(embeddings)}except Exception as e:logger.error(f"索引论文 {arxiv_id} 时出错: {e}")return {"chunks_created": 0, "chunks_indexed": 0, "embeddings_generated": 0, "errors": 1}

2. RAG问答端点

@ask_router.post("/ask", response_model=AskResponse)
async def ask_question(request: AskRequest,opensearch_client: OpenSearchDep,embeddings_service: EmbeddingsDep,ollama_client: OllamaDep,langfuse_tracer: LangfuseDep,cache_client: CacheDep,
) -> AskResponse:"""RAG问答端点,支持缓存和追踪"""# 检查缓存if cache_client:cached_response = await cache_client.find_cached_response(request)if cached_response:logger.info("缓存命中,返回缓存响应")return cached_response# 创建追踪器rag_tracer = RAGTracer(langfuse_tracer)with rag_tracer.trace_request(user_id="api_user", query=request.query) as trace:start_time = time.time()try:# 检索相关分块chunks, arxiv_ids, sources = await _prepare_chunks_and_sources(request, opensearch_client, embeddings_service, rag_tracer, trace)if not chunks:return AskResponse(query=request.query,answer="未找到相关论文内容来回答此问题。",sources=[],chunks_used=0,search_mode="none")# 构建提示词with rag_tracer.trace_prompt_construction(trace, chunks) as prompt_span:prompt_builder = RAGPromptBuilder()prompt = prompt_builder.create_rag_prompt(request.query, chunks)rag_tracer.end_prompt(prompt_span, prompt)# 生成回答with rag_tracer.trace_generation(trace, request.model, prompt) as gen_span:response_text = await ollama_client.generate_response(prompt=prompt,model=request.model,system_prompt=prompt_builder.system_prompt)rag_tracer.end_generation(gen_span, response_text, request.model)# 构建响应response = AskResponse(query=request.query,answer=response_text,sources=sources,chunks_used=len(chunks),search_mode="hybrid" if request.use_hybrid else "bm25")# 缓存响应if cache_client:await cache_client.store_response(request, response)total_duration = time.time() - start_timerag_tracer.end_request(trace, response_text, total_duration)logger.info(f"RAG问答完成: {len(chunks)}分块, {total_duration:.2f}秒")return responseexcept Exception as e:logger.error(f"RAG问答错误: {e}")raise HTTPException(status_code=500, detail=f"生成回答时出错: {str(e)}")

3. 文本分块服务

class TextChunker:"""文本分块服务,将文本分割为重叠的段落"""def __init__(self, chunk_size: int = 600, overlap_size: int = 100, min_chunk_size: int = 100):self.chunk_size = chunk_sizeself.overlap_size = overlap_sizeself.min_chunk_size = min_chunk_sizeif overlap_size >= chunk_size:raise ValueError("重叠大小必须小于分块大小")logger.info(f"文本分块器初始化: 分块大小={chunk_size}, 重叠大小={overlap_size}, 最小分块大小={min_chunk_size}")def chunk_paper(self,title: str,abstract: str,full_text: str,arxiv_id: str,paper_id: str,sections: Optional[Union[Dict[str, str], str, list]] = None,) -> List[TextChunk]:"""使用混合分段方法对论文进行分块"""chunks = []# 处理标题和摘要if title and abstract:title_abstract_text = f"标题: {title}\n\n摘要: {abstract}"title_chunks = self._chunk_text(title_abstract_text, arxiv_id, paper_id, "title_abstract")chunks.extend(title_chunks)# 处理正文内容if full_text:if sections and isinstance(sections, list):# 使用分段结构进行智能分块for section in sections:if isinstance(section, dict) and 'title' in section and 'content' in section:section_title = section['title']section_content = section['content']if section_content:section_chunks = self._chunk_text(section_content, arxiv_id, paper_id, f"section_{section_title}")chunks.extend(section_chunks)else:# 回退到基于段落的分块body_chunks = self._chunk_text(full_text, arxiv_id, paper_id, "body")chunks.extend(body_chunks)logger.info(f"为论文 {arxiv_id} 创建了 {len(chunks)} 个分块")return chunksdef _chunk_text(self, text: str, arxiv_id: str, paper_id: str, section_title: str) -> List[TextChunk]:"""将文本分割为重叠的分块"""if not text or len(text.strip()) == 0:return []words = self._split_into_words(text)if len(words) < self.min_chunk_size:return []chunks = []start_idx = 0while start_idx < len(words):end_idx = min(start_idx + self.chunk_size, len(words))# 确保不在单词中间截断while end_idx < len(words) and not text[end_idx-1].isspace():end_idx += 1chunk_words = words[start_idx:end_idx]chunk_text = self._reconstruct_text(chunk_words)# 创建分块元数据metadata = ChunkMetadata(chunk_index=len(chunks),start_char=start_idx,end_char=end_idx,word_count=len(chunk_words),overlap_with_previous=self.overlap_size if start_idx > 0 else 0,overlap_with_next=self.overlap_size if end_idx < len(words) else 0,section_title=section_title)chunk = TextChunk(text=chunk_text,metadata=metadata,arxiv_id=arxiv_id,paper_id=paper_id)chunks.append(chunk)# 移动起始位置,考虑重叠start_idx += (self.chunk_size - self.overlap_size)# 如果剩余文本太少,则停止if len(words) - start_idx < self.min_chunk_size:breakreturn chunks

4. 缓存客户端

class CacheClient:"""基于Redis的精确匹配缓存,用于RAG查询"""def __init__(self, redis_client: redis.Redis, settings: RedisSettings):self.redis = redis_clientself.settings = settingsself.ttl = timedelta(hours=settings.ttl_hours)def _generate_cache_key(self, request: AskRequest) -> str:"""基于请求参数生成精确缓存键"""key_data = {"query": request.query,"model": request.model,"top_k": request.top_k,"use_hybrid": request.use_hybrid,"categories": sorted(request.categories) if request.categories else [],}key_string = json.dumps(key_data, sort_keys=True)key_hash = hashlib.sha256(key_string.encode()).hexdigest()[:16]return f"exact_cache:{key_hash}"async def find_cached_response(self, request: AskRequest) -> Optional[AskResponse]:"""查找精确查询匹配的缓存响应"""try:cache_key = self._generate_cache_key(request)cached_response = self.redis.get(cache_key)if cached_response:try:response_data = json.loads(cached_response)logger.info("精确查询匹配缓存命中")return AskResponse(**response_data)except json.JSONDecodeError as e:logger.warning(f"反序列化缓存响应失败: {e}")return Nonereturn Noneexcept Exception as e:logger.error(f"检查缓存时出错: {e}")return Noneasync def store_response(self, request: AskRequest, response: AskResponse) -> bool:"""存储精确查询匹配的响应"""try:cache_key = self._generate_cache_key(request)success = self.redis.set(cache_key, response.model_dump_json(), ex=self.ttl)if success:logger.info(f"响应已存储到精确缓存,键为 {cache_key[:16]}...")return Trueelse:logger.warning("存储响应到缓存失败")return Falseexcept Exception as e:logger.error(f"存储到缓存时出错: {e}")return False

更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)

公众号二维码

公众号二维码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/960762.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

双亲委派模型?就是【Java开发日记】请介绍类加载过程,什么

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

#20232408 2025-2026-1 《网络与系统攻防技术》实验四实验报告 - 20232408

一、实验内容 1.1 实验要求 (1)对恶意代码样本进行识别文件类型、脱壳、字符串提取操作。 (2)使用IDA Pro静态或动态分析所给的exe文件,找到输出成功信息的方法。 (3)分析恶意代码样本并撰写报告,回答问题。 (…

2025.11.10~2025.11.16

周计划 除了当天学习任务之外只搞dp相关练习 提前把练习的内容准备好 模拟赛尽快全部拿到会的分数,只有这一个目标 每天晚上8:30开始总结今天一天的题目收获(做法) 然后补充周计划,NOIP前计划,个人原则 作息规律…

性能学习

1.性能测试理论 01.性能测试理论 02.性能测试指标 03.性能测试流程 2.Jmeter学习 01.jmeter介绍与安装 02.线程组 03.setup线程组 04.tearDown线程组 05.http请求 06.分布式jmeter

实用指南:苹果手机误删照片?别慌,这些找回方法助你找回珍贵回忆

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

前端三剑客——javascript-BOM-DOM-http协议-异步

大纲:1.BOM(浏览器对象模型) window对象 location对象 本地存储2.DOM(文档对象模型)元素操作(直接获取/间接获取)事件操作3.js异步PromiseAsync/await4.HTTP协议URL地址解释HTTP协议概述常见HTTP请求方法请求…

npm: 无法加载文件

在VSCODE终端窗口里运行 编译 TYPESCIPT脚本时(node hello.ts),提示 :npm: 无法加载文件 D:\Program Files\Nodejs\node_global\nmp.ps1, 因为在此系统禁止运行脚本。简单例子: 在VSCODE终端窗口里运行 编译 TYP…

C语言中的算术类型转换

1.寻常算数转换 在C语言中,当不同类型的操作数参与到算术运算时,编译器会将操作数转换成同一类型,再运算。这一过程被称为寻常算术转换,由于这个过程我们程序员看不见,所以它也是一种隐式类型转换(见整型提升) …

OIFHA251108(成都嘉祥)

吐槽 虽然难,但全部都是比较好玩的题目(除了 \(T_1\))。 T1 幸好没做这题(doge)。 其核心思想在于看到有向图以及每条边可以走很多次且只算一次需要很快想到 tarjan,为什么要很快?因为你还要调代码。 然后这是一…

NOIP 模拟赛 4 总结

分数:\(40 + 0 + 0 + 0 = \color{red}{40}\)我恨子任务! 我恨全是坑的贪心! 我很码量超大的数据结构! 我恨 ad-hoc !当然,还是要承认自己很菜,不然分数不可能如此惨淡。 T1 众所周知,贪心本身并不难,难的是这…

2025.11.9——1橙1绿

普及- P14477 图寻中国 Div2月赛T1 普及+/提高 P5687 [CSP-S 2019 江西] 网格图 Kruskal的变形,看了题解才做出来

Python中a = b = 10的底层机制:从名字绑定到引用计数的完整拆解

Python中a = b = 10的底层机制:从名字绑定到引用计数的完整拆解 在Python中,a = b = 10这种“链式赋值”看似是简单的语法糖,但其底层执行逻辑与C语言的同名语法存在本质差异——它不是“先把10赋给b,再把b的值赋给…

Python中“赋值”说法是否规范?与C语言赋值的界限必须划清

Python中“赋值”说法是否规范?与C语言赋值的界限必须划清 在Python语境中,“赋值”是行业内约定俗成的常用说法(如官方文档、教材、社区讨论中频繁出现),但其语义边界必须与C语言的“赋值”严格区分——若直接将…

详细介绍:java-springboot电子商务商品物流跟踪系统 SpringBoot+Java电商订单全程物流可视化平台 基于Java框架的网购商品在途追踪与签收管理系统计算机毕业设计

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

Python中“赋值”说法是否规范?详解`=`的语句属性与无返回值特性

Python中“赋值”说法是否规范?详解=的语句属性与无返回值特性 在Python学习中,“赋值”是描述a = 10这类语句的常用说法,但结合之前讨论的“名字-对象绑定模型”“对象三属性(标识、类型、值)”,很多开发者会疑…

CIO修炼之道读书笔记

目录一个目标三层价值提高运营效率(操作层)​加强运营管控(管理层)防范运营风险(决策层)参考资料 CIO修炼之“一三四六” 一个目标 从只关注技术转变为同时关注企业业务和战略,并努力围绕如何让企业更赚钱这个目…

小题狂练 (K)

solset-K\[\]目录 目录[AGC036F] Square Constraints[AGC036F] Square Constraints 容斥钦定一些下界不满足转为只有上界的问题,困难只在求每个界的排名 . 比 \(n\) 小的部分的上界肯定比所有数都大,所以如果知道钦定…

洛谷 P14461 【MX-S10-T2】『FeOI-4』青年晚报

P14461 【MX-S10-T2】『FeOI-4』青年晚报 空降 很好得诠释了数学和眼神的重要性 感谢奆佬设求导算子为 $ d $ ( 熟悉线代科技的奆佬可以从求导矩阵和单位矩阵结合成分块矩阵形式的角度进行理解),显然对于 $ F $ 的一阶…

Microsoft Agent Framework 接入DeepSeek的优雅姿势

Microsoft Agent Framework 接入DeepSeek的优雅姿势合集 - AI(1)1.Microsoft Agent Framework 接入DeepSeek的优雅姿势11-05收起一、前言 ​ Microsoft Agent Framework 框架发布也有一阵子了,在观望(摸鱼)过后,也…

人工智能团队的示例角色

人工智能团队的示例角色1 软件工程师(设计专业的软件) 2 机器学习工程师(运用人工智能算法到产品软件中) 3 机器学习研究员(负责开发机器学习的前沿技术) 4 应用机器学习科学家(对学术文献或者研究文献,向团队…