项目实战:基于Spring WebFlux与LangChain4j实现大语言模型流式输出

一、背景

        在大语言模型(LLM)应用场景中,GPT-4等模型的响应生成往往需要数秒至数十秒的等待时间。传统同步请求会导致用户面对空白页面等待,体验较差。本文通过Spring WebFlux响应式编程SSE服务器推送技术,实现类似打印机的逐字流式输出效果,同时结合LangChain4j框架进行AI能力集成,有效提升用户体验。

二、技术选型

  1. Spring WebFlux:基于 Reactor 的异步非阻塞 Web 框架
  2. SSE(Server-Sent Events):轻量级服务器推送技术
  3. LLM框架:LangChain4j 
  4. 大模型 API:以 OpenAI 的 GPT-4 (实际大模型是deepseek)
  5. 开发工具:IntelliJ IDEA + JDK 17

三、Spring WebFlux介绍

Spring Webflux 教程 - spring 中文网

这里就不多介绍了,网上教程很多

四、整体方案

五、实现步骤

1、pom依赖

        <dependency><groupId>io.milvus</groupId><artifactId>milvus-sdk-java</artifactId><version>2.5.1</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-milvus</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-embeddings-all-minilm-l6-v2</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-open-ai</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-open-ai-spring-boot-starter</artifactId><version>0.36.2</version></dependency><dependency><groupId>dev.langchain4j</groupId><artifactId>langchain4j-reactor</artifactId><version>0.36.2</version></dependency>

2、controller层

content-type= text/event-stream

@ApiOperation(value = "流式对话")
@PostMapping(value = "", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> chat(@RequestBody @Validated ChatReq chatReq) {log.info("--流式对话 chat request: {}--", chatReq);return chatService.chat(chatReq);
}
@ApiModel(value = "对话请求")
public class ChatReq {@ApiModelProperty(value = "对话id")private Long chatId;@ApiModelProperty(value = "对话类型")private Integer type;@ApiModelProperty(value = "提问")private String question;@ApiModelProperty(value = "外部id")private List<Long> externalIds;@ApiModelProperty(value = "向量检索阈值", example = "0.5")@Min(value = 0)@Max(value = 1)private Double retrievalThreshold;@ApiModelProperty(value = "向量匹配结果数", example = "5")@Min(value = 1)private Integer topK;....}

3、service层

1)主体请求

public Flux<String> chat(ChatReq chatReq) {// Create a Sink that will emit items to FluxSinks.Many<ApiResponse<String>> sink = Sinks.many().multicast().onBackpressureBuffer();// 用于控制数据生成逻辑的标志AtomicBoolean isCancelled = new AtomicBoolean(false);ChatStreamingResponseHandler chatStreamingResponseHandler = new ChatStreamingResponseHandler();// 判断新旧对话if (isNewChat(chatReq.getChatId())) { // 新对话,涉及业务略过chatReq.setHasHistory(false);chatModelHandle(chatReq);} else { // 旧对话// 根据chatId查询对话类型和对话历史chatReq.setHasHistory(true);chatModelHandle(chatReq);}return sink.asFlux().doOnCancel(() -> {log.info("停止流处理");isCancelled.set(true); // 设置取消标志sink.tryEmitComplete(); // 停止流});}

2)构建请求参数

有会话历史,获取会话历史(请求回答和回答)

封装成ChatMessages(question存UserMessage、answer存AiMessage)

​
private void chatModelHandle(ChatReq chatReq){List<ChatMessage> history = new ArrayList<>();if (chatReq.getHasHistory()) {// 组装对话历史,获取question和answer分别存UserMessage和AiMessagehistory = getHistory(chatReq.getChatId());}Integer chatType = chatReq.getType();//依赖文本List<Long> externalIds = chatReq.getExternalIds();// 判断对话类型if (ChatType.NORMAL.getCode().equals(chatType)) { // 普通对话if (chatReq.getHasHistory()) {history.add(UserMessage.from(chatReq.getQuestion()));}chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature()).generate(chatReq.getHasHistory() ? history : chatReq.getQuestion(), chatStreamingResponseHandler);} else if (ChatType.DOCUMENT_DB.getCode().equals(chatType)) { // 文本对话Prompt prompt = geneRagPrompt(chatReq);if (chatReq.getHasHistory()) {history.add(UserMessage.from(prompt.text()));}chatStreamingResponseHandler = new ChatStreamingResponseHandler(sink, chatReq, isCancelled);ChatModelClient.getStreamingChatLanguageModel(chatReq.getTemperature()).generate(chatReq.getHasHistory() ? history : prompt.text(), chatStreamingResponseHandler);} else {throw new BizException("功能待开发");}}​

3)如果有参考文本,获取参考文本

在向量库中,根据参考文本id和向量检索阈值,查看参考文本topN

    private List<PPid> search(ChatReq chatReq, MilvusClientV2 client, MilvusConfig config, EmbeddingModel model) {//使用文本id进行查询TextSegment segment = TextSegment.from(chatReq.getQuestion());Embedding queryEmbedding = model.embed(segment).content();SearchResp searchResp = client.search(SearchReq.builder().collectionName(config.getCollectionName()).data(Collections.singletonList(new FloatVec(queryEmbedding.vector()))).filter(String.format("ARRAY_CONTAINS(documentIdList, %s)", chatReq.getExternalIds())).topK(chatReq.getTopK() == null ? config.getTopK() : chatReq.getTopK()).outputFields(Arrays.asList("pid", "documentId")).build());// 过滤掉分数低于阈值的结果List<SearchResp.SearchResult> searchResults = searchResp.getSearchResults().get(0);Double minScore = chatReq.getRetrievalThreshold() == null ? config.getMinScore() : chatReq.getRetrievalThreshold();return searchResults.stream().filter(item -> item.getScore() >= minScore).sorted((item1, item2) -> Double.compare(item2.getScore(), item1.getScore())).map(item -> new PPid((Long) item.getEntity().get("documentId"),(Long) item.getEntity().get("pid"))).toList();}

获取参考文本id后,获取文本,再封装请求模版

​
private Prompt genePrompt(String context) {...
}​

4)连接大模型客户端

public static StreamingChatLanguageModel getStreamingChatLanguageModel() {ChatModelConfig config = ChatConfig.getInstance().getChatModelConfig();return OpenAiStreamingChatModel.builder().baseUrl(config.getBaseUrl()).modelName(config.getModelName()).apiKey(config.getApiKey()).maxTokens(config.getMaxTokens()).timeout(Duration.ofSeconds(config.getTimeout())).build();
}

5)大模型输出处理


@Slf4j
@Data
@NoArgsConstructor
public class ChatStreamingResponseHandler implements StreamingResponseHandler<AiMessage> {private Sinks.Many<ApiResponse<String>> sink;private ChatReq chatReq;private AtomicBoolean isCancelled;public ChatStreamingResponseHandler(Sinks.Many<ApiResponse<String>> sink, ChatReq chatReq, AtomicBoolean isCancelled) {this.sink = sink;this.chatReq = chatReq;this.isCancelled = isCancelled;}@Overridepublic void onNext(String answer) {//取消不输出if (isCancelled.get()) {return;}sink.tryEmitNext(BaseController.success(answer));}@Overridepublic void onComplete(Response<AiMessage> response) {if (!isCancelled.get()) {sink.tryEmitNext("结束标识");sink.tryEmitComplete();}// 业务处理}@Overridepublic void onError(Throwable error) {if (!isCancelled.get()) {sink.tryEmitError(error);}// 业务处理}}

六、效果呈现

七、结尾

上面简要列一下实现步骤,可以留言深入讨论。

有许多体验还需要完善,以参考豆包举例

1、实现手动停止响应

2、刷新或者页面关闭自动停止流式输出,重连后流式输出继续

3、将多个Token打包发送,减少SSE帧数量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/74332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Go语言入门经典:数组与切片详解

Go语言入门经典&#xff1a;数组与切片详解 数组和切片是Go语言中两种重要的数据结构。数组是一种固定长度的集合&#xff0c;而切片则是一种灵活的动态集合。本章将详细讲解数组和切片的定义、初始化、访问元素、动态操作等内容&#xff0c;帮助读者全面掌握这两种数据结构。…

uniapp中如何用iconfont来管理图标集成到我们开发的项目中

我们在开发不管小程序还是APP的过程中都会用到图标这个东西,那么iconfont提供了对应的功能,怎么才能方便的集成到我们的小程序或者APP项目中,目标是方便调用并且方便管理。 首先注册ICONFONT账号 www.iconfont.cn中去注册即可选择我们需要的图标如下 我们搜索我们需要的图…

从实用的角度聊聊Linux下文本编辑器VIM

本文从实用的角度聊聊Vim的常用命令。何为实用&#xff1f;我举个不实用的例子大家就明白了&#xff0c;用vim写代码。;) “vim是从 vi 发展出来的一个文本编辑器。代码补全、编译及错误跳转等方便编程的功能特别丰富&#xff0c;在程序员中被广泛使用&#xff0c;和Emacs并列成…

优化程序命名:提升专业感与用户体验

在软件开发的广阔天地中&#xff0c;程序命名这一环节常常被开发者们忽视。不少程序沿用着简单直白、缺乏雕琢的名字&#xff0c;如同素面朝天的璞玉&#xff0c;虽不影响其核心功能的发挥&#xff0c;但却在无形之中错失了许多提升用户印象与拓展应用场景的机会。今天&#xf…

LeetCode BFS解决最短路问题

广度优先搜索(BFS, Breadth-First Search)是一种用于图和树结构的遍历算法&#xff0c;特别适合解决无权图的最短路径问题。 算法思想&#xff1a; BFS从起始节点开始&#xff0c;按照"广度优先"的原则&#xff0c;逐层向外扩展搜索&#xff1a; 先访问起始节点的…

[物联网iot]对比WIFI、MQTT、TCP、UDP通信协议

第一步&#xff1a;先理解最基础的关系&#xff08;类比快递&#xff09; 假设你要给朋友寄快递&#xff1a; Wi-Fi&#xff1a;相当于“公路和卡车”&#xff0c;负责把包裹从你家运到快递站。 TCP/UDP&#xff1a;相当于“快递公司的运输规则”。 TCP&#xff1a;顺丰快递&…

基于python的电影数据分析及可视化系统

一、项目背景 随着电影行业的快速发展&#xff0c;电影数据日益丰富&#xff0c;如何有效地分析和可视化这些数据成为行业内的一个重要课题。本系统旨在利用Python编程语言&#xff0c;结合数据分析与可视化技术&#xff0c;为电影行业从业者、研究者及爱好者提供一个便捷的电…

Java8 到 Java21 系列之 Lambda 表达式:函数式编程的开端(Java 8)

Java8 到 Java21 系列之 Lambda 表达式&#xff1a;函数式编程的开端&#xff08;Java 8&#xff09; 系列目录 Java8 到 Java21 系列之 Lambda 表达式&#xff1a;函数式编程的开端&#xff08;Java 8&#xff09;Java 8 到 Java 21 系列之 Stream API&#xff1a;数据处理的…

②EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关

型号 协议转换通信网关 EtherCAT 转 Modbus TCP 配置说明 网线连接电脑到模块上的 WEB 网页设置网口&#xff0c;电脑所连网口的网段设置成 192.168.1.X&#xff08;X 是除 8 外的任一数值&#xff09;后&#xff0c;打开浏览器&#xff0c;地址栏输入 192.168.1.8 &#xff…

机器视觉--python基础语法

Python基础语法 1. Python标识符 在 Python 里&#xff0c;标识符由字母、数字、下划线组成。 在 Python 中&#xff0c;所有标识符可以包括英文、数字以及下划线(_)&#xff0c;但不能以数字开头。 Python 中的标识符是区分大小写的。 以下划线开头的标识符是有特殊意义的…

算法日常记录

1. 链表 1.1 删除链表的倒数第 N 个结点 问题描述&#xff1a;给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5] 思路&#xff1a;先让fast跑n步&#xff0c;然后…

14使用按钮实现helloworld(1)

目录 还可以通过按钮的方式来创建 hello world 涉及Qt 中的信号槽机制本质就是给按钮的点击操作,关联上一个处理函数当用户点击的时候 就会执行这个处理函数 connect&#xff08;谁发的信号&#xff0c; 信号类型&#xff0c; 谁来处理这个信息&#xff0c; 怎么处理的&…

【Golang】泛型与类型约束

文章目录 一、环境二、没有泛型的Go三、泛型的优点四、理解泛型&#xff08;一&#xff09;泛型函数&#xff08;Generic function&#xff09;1&#xff09;定义2&#xff09;调用 &#xff08;二&#xff09;类型约束&#xff08;Type constraint&#xff09;1&#xff09;接…

k8s常用总结

1. Kubernetes 架构概览 主节点&#xff08;Master&#xff09;&#xff1a; 负责集群管理&#xff0c;包括 API Server、Controller Manager、Scheduler 和 etcd 存储。 工作节点&#xff08;Node&#xff09;&#xff1a; 运行 Pod 和容器&#xff0c;包含 kubelet、kube-pr…

Android 单例模式全解析:从基础实现到最佳实践

单例模式&#xff08;Singleton Pattern&#xff09;是软件开发中常用的设计模式&#xff0c;其核心是确保一个类在全局范围内只有一个实例&#xff0c;并提供全局访问点。在 Android 开发中&#xff0c;单例模式常用于管理全局资源&#xff08;如网络管理器、数据库助手、配置…

ffmpeg滤镜使用

ffmpeg实现画中画效果 FFmpeg中&#xff0c;可以通过overlay将多个视频流、多个多媒体采集设备、多个视频文件合并到一个界面中&#xff0c;生成画中画的效果 FFmpeg 滤镜 overlay 基本参数 x和y x坐标和Y坐标 eof action 遇到 eof表示时的处理方式&#xff0c;默认为重复。…

OpenAI即将开源!DeepSeek“逼宫”下,AI争夺战将走向何方?

OpenAI 终于要 Open 了。 北京时间 4 月 1 日凌晨&#xff0c;OpenAI 正式宣布&#xff1a;将在未来几个月内开源一款具备推理能力的语言模型&#xff0c;并开放训练权重参数。这是自 2019 年 GPT-2 部分开源以来&#xff0c;OpenAI 首次向公众开放核心模型技术。 【图片来源于…

贪心算法,其优缺点是什么?

什么是贪心算法&#xff1f; 贪心算法(Greedy Algorithm)是一种在每一步选择中都采取在当前状态下最优(局部最优)的选择&#xff0c;从而希望导致全局最优解的算法策略。 它不像动态规划那样考虑所有可能的子问题&#xff0c;而是做出局部最优选择&#xff0c;依赖这些选择来…

python string 类型字符拼接 +=的缺点,以及取代方法

在Python中&#xff0c;使用进行字符串拼接虽然语法简单&#xff0c;但在性能和代码维护方面存在明显缺陷。以下是详细分析及替代方案&#xff1a; 一、的缺点 性能低下 内存分配问题&#xff1a;字符串在Python中不可变&#xff0c;每次操作会创建新字符串对象&#xff0c;导…

web前端开发-JS

web前端开发-JS 什么是JavaScript Web标准也称网页标准,由一系列的标准组成,大部分由W3C(World Wide Web Consortium,万维网联盟)负责制定。三个组成部分: HTML:负责网页的结构(页面元素和内容)。CSS:负责网页的表现(页面元素的外观、位置等页面样式,如:颜色、大小等)。JavaS…