使用 NV‑Ingest、Unstructured 和 Elasticsearch 处理非结构化数据

作者:来自 Elastic Ajay Krishnan Gopalan

了解如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 为 RAG 应用构建可扩展的非结构化文档数据管道。

Elasticsearch 原生集成了行业领先的生成式 AI 工具和提供商。查看我们的网络研讨会,了解如何超越 RAG 基础,或使用 Elastic 向量数据库构建可投入生产的应用。

为了为你的用例构建最佳搜索解决方案,现在就开始免费云试用,或在本地机器上试用 Elastic。


在这篇博客中,我们将讨论如何使用 NV-Ingest、Unstructured Platform 和 Elasticsearch 实现一个可扩展的数据处理流水线。该流水线将来自数据源的非结构化数据转换为结构化、可搜索的内容,为下游的 AI 应用(如 RAG)做好准备。检索增强生成(RAG)是一种 AI 技术,它为大语言模型(LLMs)提供外部知识,以生成对用户查询的响应。这使得 LLM 的回答能够根据特定上下文进行定制,从而使答案更准确、更相关。

在开始之前,让我们先了解一下实现该流水线的关键组件,以及它们各自的作用。

流水线组件

NV-Ingest 是一组微服务,用于将非结构化文档转换为结构化内容和元数据。它可以大规模处理文档解析、视觉结构识别和 OCR 处理。

Unstructured 是一个 ETL+ 平台,用于协调整个非结构化数据处理流程:从从多个数据源中摄取非结构化数据、通过可配置的工作流引擎将原始非结构化文件转换为结构化数据、使用附加转换丰富数据,一直到将结果上传到向量存储、数据库和搜索引擎。它提供了可视化 UI、API 和可扩展的后端基础设施,在一个工作流中协调文档解析、数据丰富和嵌入处理。

Elasticsearch 是业界领先的搜索和分析引擎,现在具备原生的向量搜索能力。它既可以作为传统的文本数据库,也可以作为向量数据库,支持像 k-NN 相似度搜索这样的功能,实现大规模语义搜索。

现在我们已经介绍了核心组件,接下来让我们看看它们在典型工作流程中是如何协同工作的,然后再深入了解具体实现。

使用 NV-Ingest - Unstructured - Elasticsearch 实现 RAG

虽然这里我们只提供关键要点,你可以在此处查看完整的 notebook。

本博客分为三个部分:

  1. 设置源和目标连接器

  2. 使用 Unstructured API 设置工作流

  3. 基于处理后的数据进行 RAG

Unstructured 的工作流以 DAG(Directed Acyclic Graph - 有向无环图)的形式表示,节点称为连接器,用于控制数据的摄取来源以及处理结果的上传目标。这些节点在任何工作流中都是必需的。源连接器配置原始数据从数据源的摄取,目标连接器配置处理后数据上传到向量存储、搜索引擎或数据库。

在本博客中,我们将研究论文存储在 Amazon S3 中,并希望将处理后的数据传送到 Elasticsearch 用于下游用途。这意味着,在构建数据处理工作流之前,我们需要通过 Unstructured API 创建一个 Amazon S3 的源连接器和一个 Elasticsearch 的目标连接器。

步骤 1:设置 S3 源连接器

在创建源连接器时,你需要为其指定一个唯一名称,明确其类型(例如 S3 或 Google Drive),并提供配置,通常包括你要连接的数据源的位置(例如 S3 bucket 的 URI 或 Google Drive 文件夹)以及身份验证信息。

source_connector_response = unstructured_client.sources.create_source(request=CreateSourceRequest(create_source_connector=CreateSourceConnector(name="demo_source1",type=SourceConnectorType.S3,config=S3SourceConnectorConfigInput(key=os.environ['S3_AWS_KEY'],secret=os.environ['S3_AWS_SECRET'],remote_url=os.environ["S3_REMOTE_URL"],recursive=False #True/False)))
)pretty_print_model(source_connector_response.source_connector_information)

步骤 2:设置 Elasticsearch 目标连接器

接下来,我们来设置 Elasticsearch 目标连接器。你使用的 Elasticsearch 索引必须具有与 Unstructured 为你生成的文档架构兼容的架构 —— 你可以在文档中找到所有详细信息。

destination_connector_response = unstructured_client.destinations.create_destination(request=CreateDestinationRequest(create_destination_connector=CreateDestinationConnector(name="demo_dest-3",type=DestinationConnectorType.ELASTICSEARCH,config=ElasticsearchConnectorConfigInput(hosts=[os.environ['es_host']],es_api_key=os.environ['es_api_key'],index_name="demo-index")))
)

步骤 3:使用 Unstructured 创建工作流

一旦你拥有了源连接器和目标连接器,就可以创建一个新的数据处理工作流。我们将通过以下节点构建工作流 DAG:

  • NV-Ingest 用于文档分区

  • Unstructured 的 Image Summarizer、Table Summarizer 和 Named Entity Recognition 节点用于内容丰富

  • Chunker 和 Embedder 节点用于使内容准备好进行相似性搜索

from unstructured_client.models.shared import (WorkflowNode,WorkflowNodeType,WorkflowType,Schedule
)# Partition the content by using NV-Ingest
parition_node = WorkflowNode(name="Ingest",subtype="nvingest",type="partition",settings={"nvingest_host":  userdata.get('NV-Ingest-host-address')},)# Summarize each detected image.
image_summarizer_node = WorkflowNode(name="Image summarizer",subtype="openai_image_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Summarize each detected table.
table_summarizer_node = WorkflowNode(name="Table summarizer",subtype="anthropic_table_description",type=WorkflowNodeType.PROMPTER,settings={}
)# Label each recognized named entity.
named_entity_recognizer_node = WorkflowNode(name="Named entity recognizer",subtype="openai_ner",type=WorkflowNodeType.PROMPTER,settings={"prompt_interface_overrides": None}
)# Chunk the partitioned content.
chunk_node = WorkflowNode(name="Chunker",subtype="chunk_by_title",type=WorkflowNodeType.CHUNK,settings={"unstructured_api_url": None,"unstructured_api_key": None,"multipage_sections": False,"combine_text_under_n_chars": 0,"include_orig_elements": True,"max_characters": 1537,"overlap": 160,"overlap_all": False,"contextual_chunking_strategy": None}
)# Generate vector embeddings.
embed_node = WorkflowNode(name="Embedder",subtype="azure_openai",type=WorkflowNodeType.EMBED,settings={"model_name": "text-embedding-3-large"}
)response = unstructured_client.workflows.create_workflow(request={"create_workflow": {"name": f"s3-to-es-NV-Ingest-custom-workflow","source_id": source_connector_response.source_connector_information.id,"destination_id": "a72838a4-bb72-4e93-972d-22dc0403ae9e","workflow_type": WorkflowType.CUSTOM,"workflow_nodes": [parition_node,image_summarizer_node,table_summarizer_node,named_entity_recognizer_node,chunk_node,embed_node],}}
)workflow_id = response.workflow_information.id
pretty_print_model(response.workflow_information)job = unstructured_client.workflows.run_workflow(request={"workflow_id": workflow_id,}
)pretty_print_model(job.job_information)

一旦这个工作流的任务完成,数据将被上传到 Elasticsearch,我们就可以继续构建一个基础的 RAG 应用程序。

步骤 4:RAG 设置

让我们继续设置一个简单的检索器,它将连接到数据,接收用户查询,使用与原始数据嵌入相同的模型对其进行嵌入,并计算余弦相似度以检索前 3 个文档。

from langchain_elasticsearch import ElasticsearchStore
from langchain.embeddings import OpenAIEmbeddings
import osembeddings = OpenAIEmbeddings(model="text-embedding-3-large",openai_api_key=os.environ['OPENAI_API_KEY'])vector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",distance_strategy="COSINE"
)retriever = vector_store.as_retriever(search_type="similarity",search_kwargs={"k": 3}  # Number of results to return
)

然后,让我们设置一个工作流来接收用户查询,从 Elasticsearch 中获取相似文档,并使用这些文档作为上下文来回答用户的问题。

from openai import OpenAIclient = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))def generate_answer(question: str, documents: str):prompt = """You are an assistant that can answer user questions given provided context.Your answer should be thorough and technical.If you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'"""augmented_prompt = (f"{prompt}"f"User question: {question}\n\n"f"{documents}")response = client.chat.completions.create(messages=[{'role': 'system', 'content': 'You answer users questions.'},{'role': 'user', 'content': augmented_prompt},],model="gpt-4o-2024-11-20",temperature=0,)return response.choices[0].message.contentdef format_docs(docs):seen_texts = set()useful_content = [doc.page_content for doc in docs]return  "\nRetrieved documents:\n" + "".join([f"\n\n===== Document {str(i)} =====\n" + docfor i, doc in enumerate(useful_content)])
def rag(query):docs = retriever.invoke(query)documents = format_docs(docs)answer = generate_answer(query, documents)return documents, answer

将所有内容组合在一起,我们得到:

query = "How did the response lengths change with training?"docs, answer = rag(query)print(answer)

和一个响应:

Based on the provided context, the response lengths during training for the DeepSeek-R1-Zero model showed a clear trend of increasing as the number of training steps progressed. This is evident from the graphs described in Document 0 and Document 1, which both depict the "average length per response" on the y-axis and training steps on the x-axis.### Key Observations:
1. **Increasing Trend**: The average response length consistently increased as training steps advanced. This suggests that the model naturally learned to allocate more "thinking time" (i.e., generate longer responses) as it improved its reasoning capabilities during the reinforcement learning (RL) process.2. **Variability**: Both graphs include a shaded area around the average response length, indicating some variability in response lengths during training. However, the overall trend remained upward.3. **Quantitative Range**: The y-axis for response length ranged from 0 to 12,000 tokens, and the graphs show a steady increase in the average response length over the course of training, though specific numerical values at different steps are not provided in the descriptions.### Implications:
The increase in response length aligns with the model's goal of solving reasoning tasks more effectively. Longer responses likely reflect the model's ability to provide more detailed and comprehensive reasoning, which is critical for tasks requiring complex problem-solving.In summary, the response lengths increased during training, indicating that the model adapted to allocate more resources (in terms of response length) to improve its reasoning performance.

Elasticsearch 提供了多种增强搜索的策略,包括混合搜索,这是近似语义搜索和基于关键字的搜索的结合。

这种方法可以提高作为上下文使用的 RAG 架构中的 top 文档的相关性。要启用此功能,您需要按照以下方式修改 vector_store 初始化:

from langchain_elasticsearch import DenseVectorStrategyvector_store = ElasticsearchStore(es_url=os.environ['es_host'],index_name="demo-index",embedding=embeddings,es_api_key=os.environ['es_api_key'],query_field="text",vector_query_field="embeddings",strategy=DenseVectorStrategy(hybrid=True) // <-- here the change
)

结论

良好的 RAG 从准备充分的数据开始,而 Unstructured 简化了这一关键的第一步。通过 NV-Ingest 启用文档分区、对非结构化数据进行元数据丰富并高效地将其摄取到 Elasticsearch,它确保了您的 RAG 管道建立在坚实的基础上,为所有下游任务释放其全部潜力。

原文:Unstructured data processing with NV‑Ingest, Unstructured, and Elasticsearch - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/82549.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android 13 使能user版本进recovery

在 debug 版本上&#xff0c;可以在关机状态下&#xff0c;同时按 电源键 和 音量加键 进 recovery 。 user 版本上不行。 参考 使用 build 变体 debug 版本和 user 版本的差别之一就是 ro.debuggable 属性不同。 顺着这个思路追踪&#xff0c;找到 bootable/recovery/reco…

每日算法刷题计划

这是我每天坚持刷算法题的仓库&#xff0c;每天刷1-3道&#xff0c;时间30-40min&#xff0c;加油! 目前考虑leetcode洛谷形式&#xff0c;c和python3语言&#xff0c;leetcode主要学核心思想&#xff0c;洛谷学会输入输出格式 每日打卡:markdowncsdn打卡 刷题策略: 按分类刷…

红黑树():

1. 红黑树&#xff1a; 红黑树从根节点开始的最长的路径不会超过最短路径的2倍。 红黑树的话&#xff0c;他的结点的分布没有我们的AVL树的结点的分布均衡&#xff0c;但是效率也不错&#xff0c;AVL树的结点分布的那么均匀&#xff0c;其实也是在进行了旋转&#xff0c;付出了…

【AI智能推荐系统】第六篇:隐私保护与联邦学习在推荐系统中的平衡之道

第六篇:隐私保护与联邦学习在推荐系统中的平衡之道 提示语:🔥 “数据不出域,推荐更精准!深度揭秘腾讯、蚂蚁集团如何用联邦学习打造合规推荐系统,隐私计算技术全景解析与工业级实现方案!” 目录 隐私保护的行业挑战隐私计算技术体系 2.1 联邦学习基础架构2.2 差分隐私…

【Qt/C++】深入理解 Lambda 表达式与 `mutable` 关键字的使用

【Qt/C】深入理解 Lambda 表达式与 mutable 关键字的使用 在 Qt 开发中&#xff0c;我们常常会用到 lambda 表达式来编写简洁的槽函数。今天通过一个实际代码示例&#xff0c;详细讲解 lambda 的语法、变量捕获方式&#xff0c;特别是 mutable 的作用。 示例代码 QPushButto…

记录 ubuntu 安装中文语言出现 software database is broken

搜索出来的结果是 sudo apt-get install language-pack-zh-han* 然而,无效,最后手动安装如下 apt install language-pack-zh-hans apt install language-pack-zh-hans-base apt install language-pack-gnome-zh-hans apt install fonts-arphic-uming apt install libreoffic…

[虚幻官方教程学习笔记]深入理解实时渲染(An In-Depth Look at Real-Time Rendering)

原英文教程地址深入理解实时渲染&#xff08;An In-Depth Look at Real-Time Rendering&#xff09; 文章目录 1.Intro to An In-Depth Look at Real-Time RenderingCPU VS GPUDeferred VS Forward 2. Before Rendering and OcclusionCulling计算的步骤使用console command:fre…

Linux进程间信号

目录 信号入门 生活角度中的信号 技术应用角度的信号 信号的发送与记录 信号处理常见方式概述 产生信号 通过终端按键产生 通过系统函数向进程发信号 由软件条件产生信号 由硬件异常产生信号 阻塞信号 信号其他相关常见概念 在内核中的表示 sigset_t 信号集操作…

Git简介和发展

Git 简介 Git是一个开源的分布式版本控制系统&#xff0c;跨平台&#xff0c;支持Windows、Linux、MacOS。主要是用于项目的版本管理&#xff0c;是由林纳斯托瓦兹(Linux Torvalds)在2005年为Linux内核开发而创建。 起因 在2002年至2005年间&#xff0c;Linux内核开发团队使…

Perspective,数据可视化的超级引擎!

Perspective 是一个强大的交互式数据分析和可视化库&#xff0c;它允许你创建高度可配置的报告、仪表板、笔记本和应用程序。给用户提供了一个新的视角来看待数据。 Stars 数9125Forks 数1217 主要特点 高效流式查询引擎&#xff1a;Perspective使用C编写&#xff0c;并编译为…

MySQL COUNT(*) 查询优化详解!

目录 前言1. COUNT(*) 为什么慢&#xff1f;—— InnoDB 的“计数烦恼” &#x1f914;2. MySQL 执行 COUNT(*) 的方式 (InnoDB)3. COUNT(*) 优化策略&#xff1a;快&#xff01;准&#xff01;狠&#xff01;策略一&#xff1a;利用索引优化带 WHERE 子句的 COUNT(*) (最常见且…

如何在postman使用时间戳

1. 使用 Pre-request Script 动态转换​ 在发送请求前&#xff0c;将日期字符串转为时间戳并存储为环境变量/全局变量。 ​示例代码​ // 将日期字符串&#xff08;如 "2023-10-01"&#xff09;转为时间戳&#xff08;毫秒&#xff09; const dateString "2…

嵌入式学习笔记 - 运算放大器的共模抑制比

一 定义 共模抑制比&#xff08;Common Mode Rejection Ratio, ‌CMRR‌&#xff09;是衡量差分放大器&#xff08;或差分电路&#xff09;抑制共模信号能力的关键指标。它在电子工程中尤为重要&#xff0c;特别是在需要处理微弱信号或对抗环境噪声的场景中。 核心概念 ‌共…

成龙电影中的三菱汽车

帕杰罗、 Lancer Evolution、 3000GT Mitsubishi Lancer Evo ll 1995 附录 Mercedes-Benz 280SL&#xff08;W113&#xff09;&#xff0c;俗称“Pagoda”&#xff08;帕格达&#xff09;

Spring 项目无法连接 MySQL:Nacos 配置误区排查与解决

在开发过程中&#xff0c;我们使用 Nacos 来管理 Spring Boot 项目的配置&#xff0c;其中包括数据库连接配置。然而&#xff0c;在实际操作中&#xff0c;由于一些概念的混淆&#xff0c;我们遇到了一些连接问题。本文将分享我的故障排查过程&#xff0c;帮助大家避免类似的错…

LabVIEW与 IMAQ Vision 机器视觉应用

在工业生产及诸多领域&#xff0c;精确高效的检测至关重要。基于 LabVIEW 与 IMAQ Vision 的机器视觉应用&#xff0c;深入剖析其原理、系统构成、软件设计及优势&#xff0c;为相关领域工程师提供全面技术参考。 ​ 一、技术原理 &#xff08;一&#xff09;机器视觉技术基础…

【STM32 学习笔记】USART串口

注意&#xff1a;在串口助手的接收模式中有文本模式和HEX模式两种模式&#xff0c;那么它们有什么区别&#xff1f;   文本模式和Hex模式是两种不同的文件编辑或浏览模式&#xff0c;不是完全相同的概念。文本模式通常是指以ASCII编码格式表示文本文件的编辑或浏览模式。在文…

【WPS】怎么解决“word的复制表格”粘贴到“excel的单元格”变多行单元格的问题

把 word文档复制表格到这个excel表格上面的话&#xff0c;会出现由单个单元格变成多行单元格的情况。 现在&#xff0c;就这个问题怎么解决&#xff0c;提出了一个方案&#xff0c;就是先查找是什么导致了这个换行&#xff0c;然后再将换行的这个字符进行一个整体的替换&#x…

嵌入式开发面试题详解:STM32 与嵌入式开发核心知识全面解析

一、STM32 共有几种基本时钟信号&#xff1f; 题目 STM32 共有几种基本时钟信号&#xff1f; 解答 STM32 包含 4 种基本时钟信号&#xff0c;分别为 HSI&#xff08;内部高速时钟&#xff09;、HSE&#xff08;外部高速时钟&#xff09;、LSI&#xff08;内部低速时钟&…

华为策略路由

路由策略&#xff1a;是对路由条目进行控制&#xff0c;通告控制路由条目影响报文的转发路径。路由策略为控制平面。 策略路由&#xff1a;是根据报文特征&#xff0c;认为的控制报文从某个即可转发出去&#xff0c;不修改路由表。即策略路由为在转发平面。 路由策略 策略路由…