快速掌握大语言模型+向量数据库_RAG实现

一、前言

结合前面掌握的vLLM部署Qwen7B模型、通过Embedding模型(bdg-large-zh模型)提取高质量作文内容并预先存储到Milvus向量数据库中,我们很容易实现RAG方案进一步提高写作内容的生成质量。

本篇要实现的目标是:通过FlaskAPI对上游业务系统提供一个开放接口,调用该接口将返回写作内容(非流式)。 该接口优先通过dbg-large-zh模型生成ebedding向量,然后从Milvus向量数据库中查询和向量相似度最高的1条数据,若查到数据且相似度达到指定阈值(如0.75或0.9,具体数值由业务决定),则直接返回Milvus中存储的内容,否则调用Qwen2-7B-Instruct模型生成写作内容。

要实现的架构为

往期文章:

快速掌握大语言模型-Qwen2-7B-Instruct落地1-CSDN博客

快速掌握大语言模型-Qwen2-7B-Instruct落地2-CSDN博客

快速掌握向量数据库-Milvus探索1-CSDN博客

快速掌握向量数据库-Milvus探索2_集成Embedding模型-CSDN博客

二、术语

2.1 vLLM

vLLM采用连续批处理,吞吐量相比传统框架(如Hugging Face Transformers)可提升5-10倍;

分布式推理支持允许在多GPU环境下并行处理;

通过PagedAttention技术,vLLM将注意力计算的键值(KV Cache)分页存储在显存中,动态分配空间以减少碎片;

内置了对 OpenAI API 的兼容支持,可直接启动符合 OpenAI 标准的 API 服务;

2.2 FastApi

基于Python的Web框架,主要用于构建高性能、易维护的API和Web应用程序,通过FastApi可以对上游业务系统暴露API,上游业务系统通过http请求即调用相关接口。

2.3 Milvus

Milvus是一个专为处理高维向量数据设计的开源向量数据库,支持数百亿级数据规模,在多数开源向量数据库中综合表现突出(一般是其他的2~5倍)。

提供三种部署方式:本地调试Milvus Lite、企业级小规模数据的Milvus Standalone(一亿以内向量)、企业级大规模数据的Milvus Distributed (数百亿向量)。

2.4 bge-large-zh

bge-large-zh模型是专为中文文本设计的,同尺寸模型中性能优异的开源Embedding模型,凭借其中文优化、高效检索、长文本支持和低资源消耗等特性,成为中文Embedding领域的标杆模型。

2.5 Qwen2-7B-Instruct

Qwen2-7B-Instruct 是阿里云推出的开源指令微调大语言模型,属于 Qwen2 系列中的 70 亿参数版本。该模型基于 Transformer 架构,通过大规模预训练和指令优化,展现出强大的语言理解、生成及多任务处理能力

三、代码

3.1 代码目录

  • vllm.py :提供调用Qwen7B模型的方法;
  • app.py :提供通过FastAPI暴露给上游业务系统调用的API;
  • embedding.py :提供调用bge-large-zh模型生成内容的embedding向量的方法 ;
  • milvus_server.py :提供查询Milvus向量数据库的方法;
  • generate_text_request.py : 模拟上游业务系统通过http调用app.py提供的生成写作内容的API

3.2 完整代码

3.2.1 安装依赖

# vllm需要的依赖
pip install vllm# milvus需要的依赖
python -m pip install -U pymilvus

 3.2.2 app.py

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import logging
from vllm import Model
from embedding import search_writing_embeddings# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = FastAPI()
model = Model()# 流式响应
@app.get("/chat")
async def chat(prompt: str):return StreamingResponse(model.generate_stream(prompt), media_type="text/plain")# 非流式响应(RAG增强检索)
@app.get("/generate")
async def generate(prompt: str):# 优先通过embedding向量从milvus中搜索相似度最高的1条数据,如果相似度达到90%以上,则直接返回该数据的content_full字段作为结果,否则调用llm模型生成# 这里的90%是一个阈值,可以根据实际情况调整res = search_writing_embeddings(prompt)print(f"search_writing_embeddings:{res}")threshold = 0.75if res and res[0] and res[0][0]['distance'] >= threshold:print("使用milvus中的数据")result = res[0][0]['entity']['content_full']else:print("使用llm模型生成")result = await model.generate_text(prompt)return resultif __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)

3.2.3 vllm.py

# 大语言模型LLM文件import logging
from openai import OpenAI
from pydantic_core import Url# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
model_path = "/mnt/workspace/models/Qwen2-7B-Instruct"  
base_url = "http://localhost:9000/v1"class Model:def __init__(self):# 创建 OpenAI 客户端实例,这里的密钥可以随意填写,vLLM 不需要验证self.client = OpenAI(api_key="EMPTY", base_url= base_url)   # 流式处理async def generate_stream(self, prompt: str):try:logging.info(f"开始处理提示: {prompt}")# 使用新客户端调用 API# 确保使用类的实例属性 self.clientchat_response = self.client.chat.completions.create(model= model_path,messages=[{"role": "user", "content": prompt}],stream=True,max_tokens=512,temperature=0.7,top_p=0.9)for chunk in chat_response:msg = chunk.choices[0].delta.contentif msg is not None:logging.info(f"生成文本块: {msg}")yield msglogging.info("流式输出结束")except Exception as e:logging.error(f"处理提示时出错: {e}", exc_info=True)# 非流式处理async def generate_text(self, prompt: str):try:logging.info(f"开始处理提示: {prompt}")# 使用新客户端调用 APIresponse = self.client.chat.completions.create(model= model_path,messages=[{"role": "user", "content": prompt}],stream=False,max_tokens=512,temperature=0.7,top_p=0.9)# 获取完整响应content = response.choices[0].message.contentlogging.info(f"文本生成完成:{content}")return contentexcept Exception as e:logging.error(f"处理提示时出错: {e}", exc_info=True)return ""

3.2.4 embedding.py

from transformers import AutoTokenizer, AutoModel
import torch
import random
from milvus_server import WritingDTO, insert_data_to_milvus,search_data_from_milvus# 加载模型和分词器
model_path = "/mnt/workspace/models/bge-large-zh"  # 根据实际情况修改
model = AutoModel.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 推理模式
model.eval()  # 写作生成合集名称
writing_collection_name = "writing"def get_embedding_list(text_list):"""输入文本列表,返回文本的embedding向量:param text_list: 待获取embedding的文本集合:return: ebedding向量集合"""# 编码输入(自动截断和填充)encoded_input = tokenizer(text_list, padding=True, truncation=True, return_tensors='pt')# 调用大模型得到文本的embeddingwith torch.no_grad():model_output = model(**encoded_input)sentence_embeddings = model_output[0][:, 0] # bdge-large-zh 模型的输出是一个元组,第一个元素是句子的嵌入向量,1024维print(f"sentence_embeddings:{ len(sentence_embeddings[0])}")# 归一化处理:可以提高模型的稳定性和收敛速度,尤其在处理向量相似度计算时非常有用sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)# 转换嵌入向量为列表embeddings_list = sentence_embeddings.tolist()return embeddings_listdef insert_writing_embeddings(text_list):"""获取写作生成内容的文本的embedding向量,并写入到milvus中:param text_list: 待获取embedding的写作生成内容集合:return: 写入到到milvus结果,ebedding向量集合"""# 获取embedding embeddings_list = get_embedding_list(text_list)print(f"embeddings_list:{len(embeddings_list)}")# 写作记录合集writing_list = []for i, embedding in enumerate(embeddings_list):writingDTO = WritingDTO(embedding, text_list[i], random.randint(500,9999))  #组装对象数据,其中biz_id是业务ID,这里这里方便说明暂设为随机数字writing_list.append(writingDTO.to_dict()) # 将对象转换为字典,并添加到集合中# 插入数据到 Milvusres = insert_data_to_milvus(writing_collection_name,writing_list)return res,embeddings_listdef search_writing_embeddings(text):"""获取写作内容的embedding向量,并从milvus中搜索"""# 获取embeddingembeddings_list = get_embedding_list([text])# 搜索数据res = search_data_from_milvus(writing_collection_name,embeddings_list,["id","content_full","biz_id"],1)return resif __name__ == "__main__":# 输入文本content = "推荐广州有哪些好玩的地方"# 将文本转eembedding向量并写入到milvus中res = search_writing_embeddings(content)print("Search result:", res)

3.2.5 milvus_server.py

from pymilvus import MilvusClient, db
import numpy as np
from pymilvus.orm import collection
from typing import Iterable# 定义 Milvus 服务的主机地址
host = "阿里云公网IP"# 创建一个 Milvus 客户端实例,连接到指定的 Milvus 服务
client = MilvusClient(uri=f"http://{host}:19530",db_name="db001") # 连接到 Milvus 服务并选择数据库 "db001"
# collection_name = "writing" # 指定要连接的集合名称 "writing"# 写作生成对象
class WritingDTO:def __init__(self, content_vector, content_full, biz_id):self.content_vector = content_vectorself.content_full = content_fullself.biz_id = biz_iddef to_dict(self):"""将 WritingDTO 对象转换为字典。:return: 包含 WritingDTO 对象属性的字典"""return {"content_vector": self.content_vector,"content_full": self.content_full,"biz_id": self.biz_id}def insert_data_to_milvus(collection_name,data):"""将对象集合中的数据插入到 Milvus 集合中。:param data: 字典对象集合:return: 插入操作的结果"""print(f"insert_data_to_milvus:{collection_name}")print(f"insert_data_to_milvus:{len(data)}")res = client.insert(collection_name=collection_name,data=data)return resdef search_data_from_milvus(collection_name,query_vector,output_fields, top_k=10):"""从 Milvus 集合中搜索与查询向量最相似的向量。:param query_vector: 查询向量:param top_k: 返回的最相似向量的数量:return: 搜索结果"""res = client.search(collection_name= collection_name, # 合集名称data=query_vector, # 查询向量search_params={"metric_type": "COSINE", # 向量相似性度量方式,COSINE 表示余弦相似度(适用于文本/语义相似性场景); 可选 IP/COSINE/L2 "params": {"level":1}, }, # 搜索参数limit=top_k, # 查询结果数量output_fields= output_fields, # 查询结果需要返回的字段consistency_level="Bounded" # 数据一致性级别,Bounded允许在有限时间窗口内读取旧数据,相比强一致性(STRONG)提升 20 倍查询性能,适合高吞吐场景; )return res

3.2.6 generate_text_request.py

import requestsurl = "http://0.0.0.0:8000/generate?prompt=广州有什么好玩的"
response = requests.get(url, stream=True)
print(response.text)

四、调用结果

4.1 启动vllm

python -m vllm.entrypoints.openai.api_server  --model  /mnt/workspace/models/Qwen2-7B-Instruct  --swap-space 10 --disable-log-requests --max-num-seqs 256 --host 0.0.0.0 --port 9000  --dtype float16 --max-parallel-loading-workers 1  --max-model-len 10240 --enforce-eager

4.2 启动FastAPI

4.3 上游业务系统调用

情况1:向量查询结果相似度低于阈值(90%相似度)的情况 ==> 通过Qwen7B模型生成内容

情况2:向量查询结果相似度高于于阈值(75%相似度)的情况 ==> 直接返回向量查询结果的内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/79071.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【FreeRTOS-列表和列表项】

参照正点原子以及以下gitee笔记整理本博客,并将实验结果附在文末。 https://gitee.com/xrbin/FreeRTOS_learning/tree/master 一、列表和列表项的简介(熟悉) 1、什么是列表 答:列表是FreeRTOS中的一个数据结构,概念上和链表有点类似&#…

【c++】【STL】queue详解

目录 queue的作用什么是容器适配器queue的接口构造函数emptysizefrontback queue类的实现 queue的作用 queue是stl库提供的一种容器适配器,也就是我们数据结构中学到的队列,是非常常用的数据结构,特点是遵循LILO(last in last ou…

【一】 基本概念与应用领域【数字图像处理】

考纲 文章目录 1 概念2005甄题【名词解释】2008、2012甄题【名词解释】可考题【简答题】可考题【简答题】 2 应用领域【了解】2.1 伽马射线成像【核医学影像】☆2.2 X射线成像2.3 紫外波段成像2.4 可见光和红外波段成像2.5 微波波段成像2.6 无线电波段成像2.7 电子显微镜成像2…

RAG技术完全指南(一):检索增强生成原理与LLM对比分析

RAG技术完全指南(一):检索增强生成原理与LLM对比分析 文章目录 RAG技术完全指南(一):检索增强生成原理与LLM对比分析1. RAG 简介2. 核心思想3. 工作流程3.1 数据预处理(索引构建)3.2…

对计网考研中的信道、传输时延、传播时延的理解

对计网考研中的信道、传输时延、传播时延的理解 在学习数据链路层流量控制和可靠传输那一节的三个协议的最大信道利用率时产生的疑惑 情景: 假如A主机和B主机通过集线器连接,A和集线器是光纤连接,B和集线器也是光纤连接,A给B发…

【2025五一数学建模竞赛C题】社交媒体平台用户分析问题|建模过程+完整代码论文全解全析

你是否在寻找数学建模比赛的突破点?数学建模进阶思路! 作为经验丰富的美赛O奖、国赛国一的数学建模团队,我们将为你带来本次数学建模竞赛的全面解析。这个解决方案包不仅包括完整的代码实现,还有详尽的建模过程和解析&#xff0c…

使用 Spring Boot Actuator 实现应用实时监控

1. 引言 1.1 什么是 Spring Boot Actuator Spring Boot Actuator 是 Spring Boot 提供的一组生产级功能模块,用于帮助开发者对 Spring Boot 应用进行监控和管理。它提供了一系列 REST API 端点(Endpoints),可以获取应用程序的运行状态、健康检查、度量指标等信息。 这些…

2025MathorCup数学应用挑战赛B题

目录 模型建立与求解 1.问题一的模型建立与求解 1.1 搬迁补偿模型设计 1.2 住户是否搬迁的应对策略与分析 1.3 定量讨论 2.问题二的模型建立与求解 2.1 搬迁方案模型的优化介绍 2.2 模型的评估 2.3 模型结果 3.问题三的模型建立与求解 3.1 拐点存在性分析模型的建立 3.2 模型的…

西门子数字化研发设计制造一体化规划案例P87(87页PPT)(文末有下载方式)

资料解读:《西门子数字化研发设计制造一体化规划案例》 详细资料请看本解读文章的最后内容。 该文档围绕西门子为企业打造的智能化制造研发工艺生产一体化平台规划方案展开,全面阐述了从业务现状分析到项目实施及案例分享的整个过程。 业务现状与需求分析…

stm32基础001(串口)

文章目录 通信的基本概念串行通信和并行通信单工,半双工和全双工串口的硬件连接 stm32的串口原理图CPU的芯片手册stm32串口的库函数实现通过串口实现printf函数使用中断实现串口的接收 通信的基本概念 串行通信和并行通信 串行通信一个方向只有一个数据通道&#x…

【验证技能】文档要求和好文档注意点

项目文档 产品场景分析; 产品规格需求:OR; 项目设计需求:DR; 业务文档:学发材料; 计划 项目执行计划,设计计划,验证计划,一~四级计划; 一级计…

使用 CarrierWave 通过 AWS S3上传文件到阿里云 OSS

虽然阿里云 OSS 与 AWS S3 兼容,但需要使用阿里云的特定端点进行配置。CarrierWave 是一个流行的 Ruby 文件上传库,可以方便地与 AWS S3 集成。以下是配置和使用方法: 1. 安装必要的 gem 首先,在 Gemfile 中添加以下 gem&#x…

上位机知识篇---流水线执行

文章目录 前言前言 本文简单介绍了流水线. 基本概念 流水线(Pipeline) 是一种通过将任务分解为多个子任务(阶段),并让不同子任务并行执行以提高效率的技术。其灵感来源于工业流水线,每个阶段专注于特定操作,多任务在不同阶段重叠执行,从而提升整体吞吐率(Throughput)…

第三部分:赋予网页灵魂 —— JavaScript(下)

目录 7 DOM 操作:控制网页的"智能面板7.1 小例子:点击按钮时改变段落文字,根据用户输入改变图片7.2 练习:实现一个简单的 Tab 切换效果 8 事件处理:响应用户的"指令"8.1 小例子:实现点击按钮…

芯片软错误概率探究:基于汽车芯片安全设计视角

摘要: 本文深入剖析了芯片软错误概率问题,结合 AEC-Q100 与 IEC61508 标准,以 130 纳米工艺 1Mbit RAM 芯片为例阐述其软错误概率,探讨汽车芯片安全等级划分及软错误对汽车关键系统的影响,分析先进工艺下软错误变化趋势…

嵌入式AI还是一片蓝海

发现其实还是挺多人关注嵌入式和人工智能交叉领域的,随便一个问题,浏览量就27万了,但是这方面的内容确实少得可怜……所以干脆我自己来补点干货。 推荐一本最近很热门的新书——《边缘人工智能:用嵌入式机器学习解决现实问题》。 …

Linux 怎么安装 Oracle Java 8

在 Linux 系统上安装 Oracle Java 8 的步骤如下: 1. 下载 Oracle Java 8 访问 Oracle 官方网站的 Java 下载页面: 下载链接:Oracle Java 8 下载页面选择适合 Linux x64 的安装包(通常是 .tar.gz 格式)。需要登录 Or…

nginx配置集群服务器中的tcp负载均衡器

文章目录 前言1. Ubuntu下nginx安装2. nginx的tcp负载配置 前言 假设一台机器支持两万的并发量,现在我们需要保证八万的并发量。首先想到的是升级服务器的配置,比如提高 CPU 执行频率,加大内存等提高机器的物理性能来解决此问题。但是单台机…

【音视频】RTMP流媒体服务器搭建、推流拉流

服务器:SRS(Simple RTMP Server,⽀持RTMP、HTTP-FLV,HLS) 推流端:ffmpeg OBS 拉流端:ffplay VLC srs播放器 1 安装和测试srs流媒体服务器 1.1 安装srs流媒体服务器 srs官⽹:https://github.com/ossrs/…

数据治理与数据管理:定义之辩和责任外包的边界

数据治理与数据管理:定义之辩和责任外包的边界 最近,在数据领域的技术交流中,一位朋友探讨了两个很有意思的问题。这两个问题非常典型,也反映了大家在实际工作和学习中常会遇到的困惑:一是关于“数据管理”和“数据治…