基于论文的大模型应用:基于SmartETL的arXiv论文数据接入与预处理(四)

上一篇介绍了基于SmartETL框架实现arxiv采集处理的基本流程,通过少量的组件定制开发,配合yaml流程配置,实现了复杂的arxiv采集处理。

由于其业务流程复杂,在实际应用中还存在一些不足需要优化。

5. 基于Kafka的任务解耦设计

5.1.存在问题

由于arXiv论文数量庞大、解析处理流程复杂,实际应用中可能会遇到不少问题。主要包括:

  1. 持续采集问题。数据集需要定期下载更新,每篇新论文需要进行完整处理。
  2. 采集处理性能问题。数据量大,以及下载、解析、向量化、入库等过程都会比较耗时,需要尽可能提升处理性能。
  3. 异常中断问题。过程中出现问题,如程序BUG,可能导致全部数据都要重新处理,应该尽量避免影响已处理的部分。

应对这些问题的一个比较好的办法就是引入消息中间件技术,基于生产者-消费者模式,将不同处理环节进行解耦,既可以避免相互影响,而且可以针对不同处理环节的计算复杂度进行独立设计以提高整体吞吐。

5.2.Kafka队列设计

引入kafka消息中间件,设计多个消息队列,将不同环节作为独立的生产者/消费者,实现流程解耦。如下图所示:
在这里插入图片描述
设计消息队列包括:

  • 采集任务队列(arxiv_task):arxiv采集处理任务队列,即kaggle数据集排序后的论文元数据。
  • 采集结果队列(arxiv_html):待处理的html解析任务。
  • 解析结果队列(arxiv_parsed):解析后待入库的论文信息。

5.3.子流程设计

流程解耦后的每个环节作为一个子流程,与Kafka进行读取或写入。具体包括包括:

  • 任务载入子流程:将下载或订阅的最新论文元数据写入arxiv_task队列。
  • 采集子流程:基于提供的arxiv论文元数据,进行HTML(或/和PDF文件)采集。HTML/PDF保存在本地或MinIO系统。完成后,在arxiv_html队列中插入一条数据,可以是输入的论文元数据,也可以附加html文件路径信息。
  • 解析子流程:对提供的html文件信息进行解析,解析结果写入arxiv_parsed队列中。
  • 入库索引子流程:对解析结果写入ESQdrant数据库中。(可能还需要考虑到与已有数据的融合或消歧,本文暂不考虑)

上述每个子流程都运行为一个SmartETL实例。根据速度要求,对部分子流程可以启动多个SmartETL实例。

6.新流程设计

如前所述,基于kafka解耦的流程将分成4个子流程,通过kafka消息队列数据进行驱动。

6.1.任务载入子流程

此子流程较为简单,读取kaggle数据集文件(JSON格式),写入Kafka即可。按需运行。

name: json数据写入kafka
consts:kafka_config:kafka_ip: 10.60.1.148:9092kafka_topic:  arxiv_taskloader: JsonLine('data/sorted_arxiv.json')
nodes:writer: database.kafka.KafkaWriter(**kafka_config, buffer_size=50)processor: writer

6.2.采集子流程

基于前文流程进行简单改造即可。

name: arxiv论文下载consts:consumer:kafka_ip: 10.60.1.148:9092kafka_topic: arxiv_taskgroup_id: arxiv_crawlerproducer:kafka_ip: 10.60.1.148:9092kafka_topic: arxiv_htmlloader: database.kafka_v2.KafkaConsumer(**consumer)
nodes:#拼接html下载urlmk_url: Map('gestata.arxiv.url4html', key='id', target_key='url_html')#下载html内容download: Map('util.http.content', key='url_html', target_key='content', most_times=3, ignore_error=True)#保存html文件save: WriteFiles('data/arxiv_html', name_key='id', suffix='.html')#元数据集写入kafkawrite: database.kafka.KafkaWriter(**producer, buffer_size=1)processor: Chain(Print("id"), mk_url, download, save, write)

6.3.解析子流程

name: arxiv论文解析consts:consumer:kafka_ip: 10.60.1.148:9092kafka_topic: arxiv_htmlgroup_id: arxiv_htmlproducer:kafka_ip: 10.60.1.148:9092kafka_topic: arxiv_parseloader: database.kafka_v2.KafkaConsumer(**consumer)
nodes:#论文解析parse: Map('gestata.arxiv.extract')#元数据集写入kafkawrite: database.kafka.KafkaWriter(**producer, buffer_size=1)processor: Chain(parse, write)

6.4.入库索引子流程

name: arxiv论文入库ES和Qdrantconsts:bge_large: http://10.208.63.29:8001/embedqd_config:host: '10.60.1.145'es_config:host: '10.208.61.117'port: 9200index: doc_arxivbuffer_size: 3consumer:kafka_ip: 10.60.1.148:9092kafka_topic: arxiv_parsegroup_id: arxiv_parseloader: database.kafka_v2.KafkaConsumer(**consumer)
nodes:# 入库处理write: gestata.arxiv.ArxivProcess(bge_large, qd_config, es_config)processor: write

这里出于效率考虑,将论文正文chunk拆分、向量化、入向量库的工作集中在一个组件中实现。核心代码如下:

abstract = paper.get('abstract')
self.embed_and_write2(index=1, _id=_id, content=abstract, collection='chunk_arxiv_abstract2505')
sections = paper.get('sections')
for index, section in enumerate(sections,start=1):figures = section.get('figures')if figures:self.image_embed_and_write(_id=_id, index=index, figures=figures)title = section['title'].lower()content = section['content']collection = 'chunk_arxiv_discusss2505'if 'introduction' in title:collection = 'chunk_arxiv_introduction2505'elif 'method' in title:collection = 'chunk_arxiv_method2505'elif 'experiment' in title:collection = 'chunk_arxiv_experiment2505'self.embed_and_write2(index=index, _id=_id, content=content, collection=collection)self.ESWriter.index_name = self.es_indexself.ESWriter.write_batch(rows=[paper])

7.总结

本系列文章讨论了arXiv论文数据采集和预处理的相关技术,实现了对arXiv论文内容抽取以及建立向量化索引,通过Kafka消息中间件实现了各处理环节的解耦,更加方便实际业务中使用。整个流程基于SmartETL框架进行开发,同时也对SmartETL框架进行了完善。

相关代码已经全部推送到 SmartETL项目,具体流程定义在 这里,欢迎下载体验

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/82193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Fiori学习专题三十五:Device Adaptation

由于在类似于手机的小面板上显示时&#xff0c;我们为了留出更多空间展示数据&#xff0c;可以将一些控件折叠。 1.修改HelloPanel.view.xml&#xff0c;加入expandable“{device>/system/phone}” expanded"{ !${device>/system/phone} <mvc:ViewcontrollerNam…

【记录】HunyuanVideo 文生视频工作流

HunyuanVideo 文生视频工作流指南 概述 本指南详细介绍如何在ComfyUI中使用腾讯混元HunyuanVideo模型进行文本到视频生成的全流程操作&#xff0c;包含环境配置、模型安装和工作流使用说明。 参考&#xff1a;https://comfyui-wiki.com/zh/install/install-comfyui/install-c…

统一返回JsonResult踩坑

定义了一个统一返回类&#xff0c;但是没有给Data 导致没有get/set方法&#xff0c;请求一直报错 public class JsonResult<T> {private int code;private String message;private T data;public JsonResult() {}public JsonResult(int code, String message, T data) {…

dubbo-token验证

服务提供者过滤器 import java.util.Map; import java.util.Objects;/*** title ProviderTokenFilter* description 服务提供者 token 验证* author zzw* version 1.0.0* create 2025/5/7 22:17**/ Activate(group CommonConstants.PROVIDER) public class ProviderTokenFilt…

沃伦森电气高压动态无功补偿装置助力企业电能优化

在工业生产的复杂电能环境中&#xff0c;电能质量直接影响企业的生产效率和运营成本。XX光伏科技有限公司作为一家快速发展的制造企业&#xff0c;随着生产规模的不断扩大&#xff0c;其内部电网面临功率因数过低、电压波动频繁等问题&#xff0c;导致供电部门罚款增加、设备故…

基于EFISH-SCB-RK3576工控机/SAIL-RK3576核心板的网络安全防火墙技术方案‌(国产化替代J1900的全栈技术解析)

‌基于EFISH-SCB-RK3576/SAIL-RK3576的网络安全防火墙技术方案‌ &#xff08;国产化替代J1900的全栈技术解析&#xff09; ‌一、硬件架构设计‌ ‌流量处理核心模块‌ ‌多核异构架构‌&#xff1a; ‌四核Cortex-A72&#xff08;2.3GHz&#xff09;‌&#xff1a;处理深度…

Maven 动态版本与SNAPSHOT机制详解

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

趣味编程:答案之书

概述&#xff1a;该篇博客主要介绍的是曾经一度风靡全网的答案之书小程序。 目录 1. 效果展示 2. 源码展示 3. 代码逻辑详解 3.1 头文件与全局变量 3.2 main函数 3.3 主循环 3. 4 绘制界面 4. 运行问题 5.小结 1. 效果展示 该小程序是动态的效果&#xff0c; 因此实…

多线程初阶(2)

说到多线程编程&#xff0c;一定少不了线程安全这个话题。我们前面了解了线程的原理以及线程与进程的关系。线程之间共享资源&#xff0c;这就代表了在多线程编程中一定会产生冲突&#xff0c;所以我们需要在敲代码时保证线程安全&#xff0c;避免这样的问题发生。 我们先看一…

【Ubuntu】安裝向日葵远程控制

前言 在Ubuntu 24.04.2下安装向日葵远程控制出错&#xff0c;少了一些依赖&#xff0c;需要安装一些依赖。 1.安装gconf2-common wget http://mirrors.kernel.org/ubuntu/pool/universe/g/gconf/gconf2-common_3.2.6-6ubuntu1_all.deb sudo dpkg -i gconf2-common_3.2.6-6ub…

【Python开源】深度解析:一款高效音频封面批量删除工具的设计与实现

&#x1f3b5; 【Python开源】深度解析&#xff1a;一款高效音频封面批量删除工具的设计与实现 &#x1f308; 个人主页&#xff1a;创客白泽 - CSDN博客 &#x1f525; 系列专栏&#xff1a;&#x1f40d;《Python开源项目实战》 &#x1f4a1; 热爱不止于代码&#xff0c;热情…

JAVA房屋租售管理系统房屋出租出售平台房屋销售房屋租赁房屋交易信息管理源码

一、源码描述 这是一套房屋租售管理源码&#xff0c;基于SpringBootVue框架&#xff0c;后端采用JAVA开发&#xff0c;源码功能完善&#xff0c;涵盖了房屋租赁、房屋销售、房屋交易等业务。 二、源码截图

一篇文章讲清楚mysql的聚簇索引、非聚簇索引、辅助索引

聚簇索引与非聚簇索引最大的区别就是&#xff1a; 聚簇索引的索引和数据是存放在一起的&#xff0c;都是在叶子结点&#xff1b; 非聚簇索引的索引和数据是分开存储的&#xff0c;叶子节点存放的是索引和指向数据文件的地址&#xff0c;通过叶子节点找到索引&#xff0c;再通…

使用ESPHome烧录固件到ESP32-C3并接入HomeAssistant

文章目录 一、安装ESPHome二、配置ESP32-C3控制灯1.主配置文件esp32c3-luat.yaml2.基础通用配置base.yaml3.密码文件secret.yaml4.围栏灯four_light.yaml5.彩灯rgb_light.yaml6.左右柱灯left_right_light.yaml 三、安装固件四、HomeAssistant配置ESPHome1.直接访问2.配置ESPHom…

什么是变量提升?

变量提升&#xff08;Hoisting&#xff09; 是 JavaScript 引擎在代码执行前的一个特殊行为&#xff0c;它会将变量声明和函数声明自动移动到当前作用域的顶部。但需要注意的是&#xff0c;只有声明会被提升&#xff0c;赋值操作不会提升。 ​​核心概念​​ 变量声明提升&…

【万字长文】深入浅出 LlamaIndex 和 LangChain:从RAG到智能体,轻松驾驭LLM应用开发

Langchain系列文章目录 01-玩转LangChain&#xff1a;从模型调用到Prompt模板与输出解析的完整指南 02-玩转 LangChain Memory 模块&#xff1a;四种记忆类型详解及应用场景全覆盖 03-全面掌握 LangChain&#xff1a;从核心链条构建到动态任务分配的实战指南 04-玩转 LangChai…

2025 后端自学UNIAPP【项目实战:旅游项目】3、API接口请求封装,封装后的简单测试以及实际使用

一、创建请求封装目录 选中自己的项目&#xff0c;右键鼠标---->新建---->目录---->名字自定义【我的是api】 二、创建两个js封装文件 选中封装的目录&#xff0c;右键鼠标---->新建---->js文件---->名字自定义【我的两个js文件分别是my_http和my_api】 三…

autojs和冰狐智能辅助该怎么选择?

最近打算做自动化脚本&#xff0c;在autojs和冰狐智能辅助中做选择&#xff0c;不知道该怎么选。没办法只能花费大量时间仔细研究了autojs和冰狐智能辅助&#xff0c;综合考虑功能需求、开发复杂度、编程经验及项目规模等因素。以下是两者的核心对比及选择建议&#xff0c;仅供…

python24-匿名函数

课程&#xff1a;B站大学 记录python学习&#xff0c;直到学会基本的爬虫&#xff0c;使用python搭建接口自动化测试就算学会了&#xff0c;在进阶webui自动化&#xff0c;app自动化 匿名函数 匿名函数实践是检验真理的唯一标准 匿名函数 匿名函数是指没有名字的函数&#xff…

Android 查看 Logcat (可纯手机方式 无需电脑)

安装 Logcat Reader Github Google Play 如果有电脑 使用其ADB方式可执行如下命令 后续无需安装Termux # 使用 ADB 授予 android.permission.READ_LOGS 权限给 Logcat Reader adb shell "pm grant com.dp.logcatapp android.permission.READ_LOGS && am force-…