RabbitMQ项目实战(二)

文章目录

  • 项目改造
    • 实现步骤

项目改造

以前把任务提交到线程池,然后在线程池提交中编写处理程序的代码,线程池内排队。
如果程序中断了,任务就没了,就丢了。
改造后的流程:

  1. 把任务提交改为向队列发送消息
  2. 写一个专门接收消息的程序,处理任务
  3. 如果程序中断了,消息未被确认,还会重发
  4. 现在,消息全部集中发送到消息队列,你可以部署多个后端,都从同一个地方取任务,从而实现了分布式负载均衡

实现步骤

1)创建交换机和队列
2)将线程池中的执行代码移到消费者类中
3)根据消费者的需求来确认消息的格式(chartId)
4)将提交线程池改造为发送消息到队列
注意:如果程序中断了,没有ack,也没有nack(服务中断,没有任何响应),那么这条消息会被重新放到消息队列中,从而实现了每个任务都会执行
记得在项目启动前创建队列和交换机

package com.yupi.springbootinit.bimq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)*/
public class BiMqInitMain {public static void main(String[] args) {try{ConnectionFactory factory  = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(BiMqConstant.BI_EXCHANGE_NAME,"direct");String queueName = BiMqConstant.BI_QUEUE_NAME;channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY);}catch (Exception e){}}
}

生产者:

package com.yupi.springbootinit.bimq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class BiMessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message){// 使用rabbitTemplate的convertAndSend方法将消息发送到指定的交换机和路由键rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY,message);}
}

消费者:

package com.yupi.springbootinit.bimq;import cn.hutool.core.text.StrBuilder;
import com.rabbitmq.client.Channel;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.constant.CommonConstant;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.manager.AiManager;
import com.yupi.springbootinit.model.entity.Chart;
import com.yupi.springbootinit.service.ChartService;
import com.yupi.springbootinit.utils.ExcelUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.RegEx;
import javax.annotation.Resource;@Component
@Slf4j
public class BiMessageConsumer {@Resourceprivate ChartService chartService;@Resourceprivate AiManager aiManager;//使用@SneakyThrows注解简化异常处理//使得你可以在不声明抛出异常的方法中抛出受检异常,而无需捕获它们。这在一些特定情况下可能会很有用,但通常不建议频繁使用,因为它可能会破坏代码的可读性和健壮性。@SneakyThrows//使用@RabbitListener注解指定要监听的队列名称为"code_queue",并设置消息的确认机制为手动确认@RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME},ackMode = "MANUAL")// // 在RabbitMQ中,每条消息都会被分配一个唯一的投递标签,用于标识该消息在通道中的投递状态和顺序。通过使用@Header(AmqpHeaders.DELIVERY_TAG)注解,可以从消息头中提取出该投递标签,并将其赋值给long deliveryTag参数。public void reciveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverttag){log.info("receive message = {}" + message);if (StringUtils.isBlank(message)) {//拒绝消息channel.basicNack(deliverttag,false,false);throw new BusinessException(ErrorCode.SYSTEM_ERROR,"消息为空");}long chartId = Long.parseLong(message);Chart chart = chartService.getById(chartId);if(chart == null){channel.basicNack(deliverttag,false,false);throw new BusinessException(ErrorCode.NOT_FOUND_ERROR,"图表为空");}Chart updateChart = new Chart();updateChart.setId(chart.getId());updateChart.setStatus("running");boolean b = chartService.updateById(updateChart);if(!b){handleChartUpdateError(chart.getId(),"更新图表执行中状态失败");return;}String result = aiManager.doChat(CommonConstant.BI_MODEL_ID, getUserInput(chart));String[] splits = result.split("【【【【【");if(splits.length < 3){handleChartUpdateError(chart.getId(),"AI生成错误");return;}String genChart = splits[1];String genResult = splits[2];Chart updateChart2 = new Chart();updateChart2.setId(chart.getId());updateChart2.setStatus("succeed");updateChart2.setGenChart(genChart);updateChart2.setGenResult(genResult);boolean b1 = chartService.updateById(updateChart2);if(!b1){handleChartUpdateError(chart.getId(),"更新图表成功状态失败");return;}//确认消息channel.basicAck(deliverttag,false);}/*** 根据chart获取用户的输入* @param chart* @param*/public String getUserInput(Chart chart){String goal = chart.getGoal();String chartType = chart.getChartType();String CSVData = chart.getChartData();StrBuilder userInput = new StrBuilder();userInput.append("分析需求:").append("\n");String userGoal = goal;if(StringUtils.isNotBlank(chartType)){//指定了图表类型,就在目标上拼接请使用,图表类型userGoal += "请使用,"  + chartType;}userInput.append(CSVData).append("\n");return userInput.toString();}public void handleChartUpdateError(long chartId,String execMessage){Chart chart = new Chart();chart.setId(chartId);chart.setStatus("failed");chart.setExecMessage(execMessage);boolean b = chartService.updateById(chart);if(!b){log.error("更新图表失败状态失败" + chartId + "," + execMessage);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/825852.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3D模型格式转换工具HOOPS Exchange:3D CAD数据的快速导入与导出

在当今的工程设计领域中&#xff0c;快速且可靠地处理3D CAD数据是至关重要的。HOOPS Exchange SDK通过提供一组C软件库&#xff0c;为开发团队提供了实现这一目标的有效工具。 什么是HOOPS Exchange&#xff1f; HOOPS Exchange是一组C软件库&#xff0c;旨在为开发团队提供…

YOLOv1精读笔记

YOLO系列 摘要1. 将目标检测视为一个回归问题2. 定位准确率不如 SOTA&#xff0c;但背景错误率更低3. 泛化能力强 1.引言1.1 YOLO 速度很快1.2 全局推理 2. Unified Detection2.1 网络设计2.2 训练YOLOv1模型损失函数的选择和其潜在的问题YOLOv1模型如何改进其损失函数来更好地…

【史上最全】带你全方位了解containerd 的几种插件扩展模式

除了 snapshotter&#xff0c;containerd 的扩展机制你还了解哪些&#xff1f; 本文内容节选自 《containerd 原理剖析与实战》&#xff0c;本书正参加限时优惠内购&#xff0c;限时 69.9 元购买。 进入正题之前先看一下 containerd 的整体架构 1. containerd 架构 图 contain…

UE4_常见动画节点学习_Two Bone IK双骨骼IK

学习资料&#xff0c;仅供参考&#xff01; Two Bone IK 控制器将逆运动&#xff08;IK&#xff09;解算器应用于到如角色四肢等3关节链。 变量&#xff08; HandIKWeight &#xff09;被用于在角色的 hand_l 和 hand_r 控制器上驱动 关节目标位置&#xff08;Joint Target Lo…

Navicat连接postgresql时出现‘datlastsysoid does not exist‘报错的问题

连接报错 解决方案 解决方法1&#xff1a;升级navicat 解决方法2&#xff1a;降级pgsql 解决方法3&#xff1a;修改dll 使用3解决 实操演示 1、 打开 Navicat 安装目录&#xff0c;找到libcc.dll文件 2、备份libcc.dll文件&#xff0c;将其复制并粘贴或者修改副本为任何其他名…

【C++杂项】cin的详细用法

cin详细用法 1. cin简介2. cin的常用读取方法2.1 cin>>的用法2.2 cin.get的用法2.3 cin.getline的用法 3. cin清空输入缓冲区4. 其它方法4.1 getline()读取一行 1. cin简介 cin是C中的标准输入流对象&#xff0c;即istream类的对象。cin主要用于从标准输入读取数据&…

DNS服务器的管理与配置

目录 一、相关知识 域名空间 DNS服务器分类 域名解析过程 资源记录 二、安装DNS服务 安装bind软件包 DNS服务的启动与停止 配置主要名称服务器 主配置文件 从例子学起&#xff1a; &#xff08;1&#xff09;建立主配置文件named.conf &#xff08;2&#xff09;…

Windows10安装Docker Desktop(大妈看了都会)

目录 Windows10安装Docker Desktop&#xff08;大妈看了都会&#xff09; 1.前言 1.1 为什么要在Windows10上安装Docker 1.2 Docker Desktop介绍 2.下载Docker Desktop 3.启用Hyper-V以在 Windows 10上创建虚拟机 4.安装Docker Desktop 5.运行Docker Desktop 6.Docker…

阿里云图片处理之 缩放

文档 : https://help.aliyun.com/zh/oss/user-guide/resize-images-4?spma2c4g.11186623.0.0.61cd2759v4jkhX 需求 : 图片过大, 导致加载过慢, 需对图片进行压缩 <image :src"imgUrl ?x-oss-processimage/resize,h_700,m_lfit"></image>Ps : 题外话…

如何下载省,市,区县行政区Shp数据

摘要&#xff1a;一般非专业的GIS应用通常会用到省市等行政区区划边界空间数据做分析&#xff0c;本文简单介绍了如何在互联网上下载省&#xff0c;市&#xff0c;区县的shp格式空间边界数据&#xff0c;并介绍了一个好用的在线数据转换工具&#xff0c;并且开源。 目录 一、下…

深度 | 践行绿色健康可持续发展,这家企业提供了价值范本

文 | 螳螂观察 作者 | 余一 近段时间以来&#xff0c;小米SU7热度一直不减&#xff0c;在展露小米强大品牌号召力的同时&#xff0c;也侧面体现出了当前消费者对于新能源汽车的喜爱。 而消费者选择新能源汽车时&#xff0c;环保因素也起到了至关重要的作用。像前几日&#x…

数据结构-上三角矩阵存储方式[0知识掌握]

目标&#xff1a;看完本文章你将会了解上三角矩阵的存储方式以及矩阵中数据的位置索引号如何求 难点&#xff1a;上三角矩阵的公式推导&#xff0c;上三角任意位置对应的存储位置。 一、准备知识 1.求和公式 前n项和&#xff1a;Sn n(a1an)/2 公差&#xff1a;d后项-前项…

【JavaEE多线程】线程安全、锁机制及线程间通信

目录 线程安全线程安全问题的原因 synchronized 关键字-监视器锁monitor locksynchronized的特性互斥刷新内存可重入 synchronized使用范例 volatilevolatile能保证内存可见性volatile不保证原子性synchronized 也能保证内存可见性 wait 和 notifywait()方法notify()方法notify…

拖拽式工作流有哪几个优势?

在信息技术迅猛发展的今天&#xff0c;如何助力中小型企业在数字化转型的过程中平稳过渡&#xff1f;又是如何让中小型企业摆脱数据孤岛、成本投入高等各种瓶颈和难题&#xff1f;低代码技术平台是近些年较为理想的平台产品&#xff0c;其中拖拽式工作流优势特点突出&#xff0…

地埋电缆故障检测方法有哪些?地埋电缆故障检测费用是多少?

地埋电缆故障检测方法主要涵盖脉冲反射法、桥接法、高压闪络法和声波定位法等多种方法。选择适当的方法取决于故障类型、电缆类型和实际现场条件。至于地埋电缆故障检测费用则受到多个因素的影响&#xff0c;包括故障类型、检测方法的复杂性、检测设备的先进程度以及所处地区的…

从零开始搭建社交圈子系统:充实人脉的最佳路径

线上交友圈&#xff1a;拓展社交网络的新时代 线上交友圈是社交网络的新引擎&#xff0c;提供了更广泛的社交机会&#xff0c;注重共同兴趣的连接&#xff0c;强调多样性的社交形式&#xff0c;更真实地展示自己&#xff0c;让朋友更全面地了解我们的生活状态。虽然虚拟交往存在…

SD-WAN解决电商企业海外业务网络难题

全球化背景下&#xff0c;众多国内企业都涉及到海外贸易业务&#xff0c;尤其是出海电商得到蓬勃发展。企业做出海电商&#xff0c;需要访问国外网页、社交平台&#xff0c;如亚马逊、TikTok、Facebook、YouTube等与客户沟通互动&#xff0c;SD-WAN的发展正好为解决国际网络访问…

14 Php学习:表单

表单 PHP 表单是用于收集用户输入的工具&#xff0c;通常用于网站开发。PHP 可以与 HTML 表单一起使用&#xff0c;用于处理用户提交的数据。通过 PHP 表单&#xff0c;您可以创建各种类型的表单&#xff0c;包括文本输入框、复选框、下拉菜单等&#xff0c;以便用户可以填写和…

主存储器与CPU之间的连接(会画图)

位扩展 字扩展 由于只有A13&#xff0c; A14 连到了译码器上&#xff0c;以、因此该译码器是一个 2/4 译码器&#xff0c;对应的选片有四种。选中第一个选片&#xff0c;就是把译码器“0口置0&#xff0c; 1~3口置1”&#xff0c;因为CS有非号&#xff0c;因此&#xff0c;低电…

【C++】string的使用

目录 1、为什么学习string类&#xff1f; 2、标准库中的string类 2.1 string类 2.2 string类的常见接口声明 2.2.1 string类的常见构造 ​编辑 2.2.2 string类对象的访问及遍历操作 2.2.3 string类对象的容量操作 2.2.4 string类对象的修改操作 ​编辑 1、为什么学习s…