回忆 RAG 关键步骤:
- 文本切割
- 嵌入处理
- 存储向量数据库
嵌入处理,又称之为向量化操作。核心就是将文本转为向量的形式,从而为下一步做数学运算做准备。
"今天的天气真好,万里无云"
[0.3297254741191864, 0.7386181354522705, -3.342341899871826,-0.7811917066574097, -0.08536303788423538, 0.05086381733417511,... 668 more items
]
该操作一般需要依赖专门做嵌入处理的模型。例如:
import { OpenAIEmbeddings } from "@langchain/openai";const embeddings = new OpenAIEmbeddings({apiKey: process.env.OPENAI_API_KEY,model: "text-embedding-3-large", // OpenAI 官方提供的专门用于做嵌入的模型
});const vectors = await embeddings.embedDocuments(documents.map((doc) => doc.pageContent)
);
在上面的代码中,使用的是 OpenAI 官方提供的专用嵌入模型。而 OpenAIEmbeddings 则是 Embeddings 的子类,关于 Embeddings 这个工具类,后面再来介绍。
快速上手
课堂练习:使用 nomic-embed-text 模型做嵌入操作
import { TextLoader } from "langchain/document_loaders/fs/text";
import { RecursiveCharacterTextSplitter } from "langchain/text_splitter";const loader = new TextLoader("data/kong.txt");const docs = await loader.load();const splitter = new RecursiveCharacterTextSplitter({chunkSize: 64,chunkOverlap: 0,
});const splittedDocs = await splitter.splitDocuments(docs);// console.log(splittedDocs)async function getEmbedding(text) {const res = await fetch("http://localhost:11434/api/embeddings", {method: "POST",headers: { "Content-Type": "application/json" },body: JSON.stringify({model: "nomic-embed-text",prompt: text,}),});const result = await res.json();return result.embedding;
}const results = [];
for (const doc of splittedDocs) {const embedding = await getEmbedding(doc.pageContent);results.push({ ...doc, embedding });
}console.log(results);
不过目前这个例子,在进行嵌入操作时花费的时间是比较久的,这里我们可以做一个计时来测量一下花费的时间。
究其原因,是因为现在在做嵌入操作时,采用的是串行的形式。
所以,接下来一个工作重点,支持并发的嵌入操作。
自定义嵌入类
要实现并发的嵌入操作,我们可以自己来自定义一个嵌入类。
不过在此之前,需要先了解 Embeddings 工具类。
Embeddings 是 LangChain 中抽象出来的嵌入操作基类,不同厂商的向量模型,都通过继承该基类实现暴露统一方法,从而能在向量库、检索器等组件里互换使用。
基类提供两组最核心的方法:
embedDocuments(texts: string[]) => Promise<number[][]>:批量嵌入用于索引的文本(返回二维向量数组)。embedQuery(text: string) => Promise<number[]>:为查询文本生成向量(返回一维向量)。
文档地址:https://js.langchain.com/docs/concepts/embedding_models/?utm_source=chatgpt.com
使用并发工具方法:runWithConcurrency
export async function runWithConcurrency(items, worker, maxConcurrency) {if (!items?.length) return;let i = 0;const workers = [];async function spawn() {while (i < items.length) {const idx = i++;await worker(items[idx], idx);}}const n = Math.max(1, Math.min(maxConcurrency, items.length));for (let k = 0; k < n; k++) workers.push(spawn());await Promise.allSettled(workers);
}
课堂练习:自定义嵌入类
import { Embeddings } from "@langchain/core/embeddings";
import { runWithConcurrency } from "./concurrency.js";export class NomicEmbeddings extends Embeddings {constructor(concurrency = 3) {super();this.model = "nomic-embed-text";this.apiUrl = "http://localhost:11434/api/embeddings";this.concurrency = concurrency;}/*** 对单个文本做嵌入操作,这是一个内部方法* @param {*} text 单个文本*/async #fetchEmbedding(text) {const res = await fetch(this.apiUrl, {method: "POST",headers: { "Content-Type": "application/json" },body: JSON.stringify({model: this.model,prompt: text,}),});const result = await res.json();return result.embedding;}/*** 对单个文本做嵌入操作* @param {*} text*/async embedQuery(text) {return await this.#fetchEmbedding(text);}/*** 对一组文本做嵌入操作* @param {*} documents*/async embedDocuments(documents) {const results = Array.from({ length: documents.length }); // 存放结果的数组// 添加一个并发的探针let active = 0; // 并发数let maxActive = 0; // 最大并发数const t0 = performance.now();await runWithConcurrency(documents,async (text, idx) => {// 开始了一个任务,需要对并发数做一个计数active++;maxActive = Math.max(maxActive, active);console.log(`[start] #${idx} +${(performance.now() - t0).toFixed(0)}ms active=${active}`);try {results[idx] = await this.#fetchEmbedding(text);} catch (err) {results[idx] = err;} finally {// 任务结束active--;console.log(`[end ] #${idx} +${(performance.now() - t0).toFixed(0)}ms active=${active}`);}},this.concurrency);return results;}
}
第三方并发库
关于并发的控制,还可以使用一个第三方库:p-limit
该库是一个极小的工具,用来限制并发执行的 Promise 个数,可以用于 Node.js 和浏览器环境。
基本用法:
import pLimit from "p-limit";const limit = pLimit(3); // 同时最多跑 3 个const tasks = urls.map(url =>limit(() => fetchJson(url));
);
const results = await Promise.all(tasks);
下面是一个快速上手示例,添加并发探针,检测并发的数量:
import pLimit from "p-limit";const urls = ["https://jsonplaceholder.typicode.com/todos/1","https://jsonplaceholder.typicode.com/todos/2","https://jsonplaceholder.typicode.com/todos/3","https://jsonplaceholder.typicode.com/todos/4","https://jsonplaceholder.typicode.com/todos/5","https://jsonplaceholder.typicode.com/todos/6",
];// 基础请求函数:拿到 JSON,不是 2xx 则抛错
async function fetchJson(url) {const res = await fetch(url, { headers: { accept: "application/json" } });if (!res.ok) throw new Error(`HTTP ${res.status} for ${url}`);return res.json();
}async function main() {// 同时最多跑 3 个请求const limit = pLimit(3);// 并发探针let active = 0;let maxActive = 0;const t0 = performance.now();const tasks = urls.map((url, idx) =>limit(async () => {// 任务开始,并发探针检测并发数量active++;if (active > limit.concurrency) {console.warn(`并发超限: active=${active} > limit=${limit.concurrency}`);}maxActive = Math.max(maxActive, active);console.log(`[start] #${idx} +${(performance.now() - t0).toFixed(3)}ms active=${active}`);try {return await fetchJson(url);} finally {// 任务结束active--;console.log(`[end ] #${idx} +${(performance.now() - t0).toFixed(3)}ms active=${active}`);}}));try {const results = await Promise.all(tasks);console.log("结果:", results);} catch (err) {console.error("至少有一个请求失败:", err);} finally {console.log(`并发观测:maxActive=${maxActive}, limit=${limit.concurrency}, ` +`activeCount=${limit.activeCount}, pendingCount=${limit.pendingCount}`);}
}main().catch((e) => console.error(e));
课堂练习:使用 p-limit 重构前面自定义的嵌入类。
-EOF-