消费可靠性投递-confirm确认模式

package com.java1234.producer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/*** direct交换机名称*/public static final String DIRECT_EXCHANGE="directExchange";/*** direct交换机名称1*/public static final String DIRECT_EXCHANGE1="directExchange1";/*** fanout交换机名称*/public static final String FANOUT_EXCHANGE="fanoutExchange";/*** direct队列名称*/public static final String DIRECT_QUEUE="directQueue";/*** direct1队列名称*/public static final String DIRECT_QUEUE1="directQueue1";/*** direct2队列名称*/public static final String DIRECT_QUEUE2="directQueue2";/*** 订阅队列1名称*/public static final String SUB_QUEUE1="subQueue1";/*** 订阅队列2名称*/public static final String SUB_QUEUE2="subQueue2";/*** direct路由Key*/public static final String DIRECT_ROUTINGKEY="directRoutingKey";/*** topic队列名称1**/public static final String TOPIC_QUEUE1="topicQueue1";/*** topic队列名称2**/public static final String TOPIC_QUEUE2="topicQueue2";/*** direct交换机名称*/public static final String TOPIC_EXCHANGE="topicExchange";/*** 定义一个direct交换机* @return*/@Beanpublic DirectExchange directExchange(){return new DirectExchange(DIRECT_EXCHANGE);}/*** 定义一个direct交换机1* @return*/@Beanpublic DirectExchange directExchange1(){return new DirectExchange(DIRECT_EXCHANGE1);}/*** 定义一个direct交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}/*** 定义一个direct队列* @return*/@Beanpublic Queue directQueue(){return new Queue(DIRECT_QUEUE);}/*** 定义一个direct1队列* @return*/@Beanpublic Queue directQueue1(){return new Queue(DIRECT_QUEUE1);}/*** 定义一个direct2队列* @return*/@Beanpublic Queue directQueue2(){return new Queue(DIRECT_QUEUE2);}/*** 定义一个订阅队列1* @return*/@Beanpublic Queue subQueue1(){return new Queue(SUB_QUEUE1);}/*** 定义一个订阅队列2* @return*/@Beanpublic Queue subQueue2(){return new Queue(SUB_QUEUE2);}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding(){return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding fanoutBinding1(){return BindingBuilder.bind(subQueue1()).to(fanoutExchange());}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(subQueue2()).to(fanoutExchange());}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding1(){return BindingBuilder.bind(directQueue1()).to(directExchange1()).with("error");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding2(){return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("info");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding3(){return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("error");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding directBinding4(){return BindingBuilder.bind(directQueue2()).to(directExchange1()).with("warning");}/*** 定义一个topic队列1*/@Beanpublic Queue topicQueue1(){return new Queue(TOPIC_QUEUE1);}/*** 定义一个topic队列2*/@Beanpublic Queue topicQueue2(){return new Queue(TOPIC_QUEUE2);}/*** 定义一个direct交换机* @return*/@Beanpublic TopicExchange topicExchange(){return new TopicExchange(TOPIC_EXCHANGE);}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding topicBinding1(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding topicBinding2(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");}/*** 定义一个队列和交换机的绑定* @return*/@Beanpublic Binding topicBinding3(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");}
}
package com.java1234.consumer.service.impl;import com.java1234.consumer.service.RabbitMqService;
import com.java1234.producer.config.RabbitMQConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService {@Autowiredprivate AmqpTemplate amqpTemplate;@Overridepublic void receiveMessage() {String message=(String) amqpTemplate.receiveAndConvert(RabbitMQConfig.DIRECT_QUEUE);System.out.println("接受到的mq消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE1})public void receiveMessage2(String message) {
//        System.out.println("消费者1:接收到的mq消息:"+message);System.out.println("队列1接收日志消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.TOPIC_QUEUE2})public void receiveMessage3(String message) {
//        System.out.println("消费者2:接收到的mq消息:"+message);System.out.println("队列2接收日志消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE1})public void receiveSubMessage1(String message) {System.out.println("订阅者1:接收到的mq消息:"+message);}@Override@RabbitListener(queues = {RabbitMQConfig.SUB_QUEUE2})public void receiveSubMessage2(String message) {System.out.println("订阅者2:接收到的mq消息:"+message);}}
package com.java1234.consumer.service;public interface RabbitMqService {/*** 接受消息*/public void receiveMessage();/*** 接受消息*/public void receiveMessage2(String message);/*** 接受消息*/public void receiveMessage3(String message);/*** 接受订阅消息1*/public void receiveSubMessage1(String message);/*** 接受订阅消息2*/public void receiveSubMessage2(String message);
}
package com.java1234.consumer;import com.java1234.consumer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(ConsumerApplication.class,args);
//        RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
//        rabbitMqService.receiveMessage();}
}
package com.java1234.producer.service.impl;import com.java1234.producer.config.RabbitMQConfig;
import com.java1234.producer.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;@Service("rabbitmqService")
public class RabbitMqServiceImpl implements RabbitMqService, RabbitTemplate.ConfirmCallback {@Autowiredprivate AmqpTemplate amqpTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/*** String exchange 交换机名称* String routingKey 路由Key* Object object 具体发送的消息* @param message*/@Overridepublic void sendMessage(String message) {
//        amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message);CorrelationData correlationData=new CorrelationData("3453");rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,RabbitMQConfig.DIRECT_ROUTINGKEY,message,correlationData);}@Overridepublic void sendFanoutMessage(String message) {amqpTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,"",message);}@Overridepublic void sendRoutingMessage() {amqpTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE1,"warning2","发送warning2级别的消息");}@Overridepublic void sendTopicMessage() {
//        amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.rabbit","飞快的橘色兔子");
//                amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.orange.elephant","慢腾腾的橘色大象");
//        amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.orange.fox","quick.orange.fox");
//        amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"lazy.brown.fox","lazy.brown.fox");amqpTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,"quick.brown.fox","quick.brown.fox");}/**** @param correlationData 消息唯一标识* @param ack 交换机是否成功收到消息 true成功 false失败* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm方法被执行了..."+correlationData);if(ack){System.out.println("交换机,消息接收成功"+cause);}else{System.out.println("交换机,消息接收失败"+cause);//我们这里要做一些消息补发的措施System.out.println("id="+correlationData.getId());}}
}
package com.java1234.producer.service;public interface RabbitMqService {/*** 发送消息* @param message*/public void sendMessage(String message);/*** 发送消息* @param message*/public void sendFanoutMessage(String message);/*** 发送路由模式消息*/public void sendRoutingMessage();/*** 发送Topic模式消息*/public void sendTopicMessage();
}
package com.java1234.producer;import com.java1234.producer.service.RabbitMqService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {ApplicationContext ac = SpringApplication.run(ProducerApplication.class, args);RabbitMqService rabbitMqService=(RabbitMqService) ac.getBean("rabbitmqService");
//        rabbitMqService.sendRoutingMessage();
//        rabbitMqService.sendTopicMessage();rabbitMqService.sendMessage("confirm确认测试消息");//        for(int i=0;i < 10;i++){rabbitMqService.sendMessage("RabbitMQ大爷你好!!!"+i);
//             rabbitMqService.sendFanoutMessage(i+"用户欠费了");
//        }}
}
server:port: 80
spring:rabbitmq:host: 192.168.30.113port: 5672username: pzypassword: 123456virtual-host: /publisher-confirm-type: correlated

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/605229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

KY23 最小花费 DP

DP&#xff0c;比较恼的是题中没说明a、b的范围&#xff0c;不敢轻易用双循环 添加链接描述 #include<bits/stdc.h>using namespace std;#define ll long longconst int INF 1e9 10;int l1, l2, l3, c1, c2, c3, a, b, n;int pri(int d){if(d < l1) return c1;else…

ROS-urdf集成gazebo

文章目录 一、URDF与Gazebo基本集成流程二、URDF集成Gazebo相关设置三、URDF集成Gazebo实操四、Gazebo仿真环境搭建 一、URDF与Gazebo基本集成流程 1.创建功能包 创建新功能包&#xff0c;导入依赖包: urdf、xacro、gazebo_ros、gazebo_ros_control、gazebo_plugins 2.编写URD…

HTML5 Video(视频)

视频格式与浏览器的支持 当前&#xff0c; <video> 元素支持三种视频格式&#xff1a; MP4, WebM, 和 Ogg: 浏览器MP4WebMOggInternet ExplorerYESNONOChromeYESYESYESFirefoxYESYESYESSafariYESNONOOperaYES (从 Opera 25 起)YESYES MP4 带有 H.264 视频编码和 AAC …

HarmonyOS状态管理概述

状态管理概述 在前文的描述中&#xff0c;我们构建的页面多为静态界面。如果希望构建一个动态的、有交互的界面&#xff0c;就需要引入“状态”的概念。 图1 效果图 上面的示例中&#xff0c;用户与应用程序的交互触发了文本状态变更&#xff0c;状态变更引起了UI渲染&…

C#实现个人账本管理系统

git地址&#xff1a;https://gitee.com/myshort-term/personal-ledger-management-system 1.系统简介 LedgerManagementSystem是一个小型的个人账本管理系统&#xff0c;可对收支项目进行增加、删除、修改、查询以及导入和导出。可对每日的各类收支项目进行汇总并查看和修改收…

STM32 基础知识(探索者开发板)--146讲 IIC

IIC特点&#xff1a; 同步串行半双工通信总线 IIC有一个弱上拉电阻&#xff0c;在主机和从机都没有传输数据下拉时&#xff0c;总线会自动上拉 SCL在低电平期间&#xff0c;改变SDA的值来上传数据&#xff0c;方便SCL电平上升时进行数据读取 SCL在高电平期间&#xff0c;不能…

【蓝桥杯软件赛 零基础备赛20周】第7周——二叉树

文章目录 1 二叉树概念2 二叉树的存储和编码2.1 二叉树的存储方法2.2 二叉树存储的编码实现2.3 二叉树的极简存储方法 3 例题4 习题 前面介绍的数据结构数组、队列、栈&#xff0c;都是线性的&#xff0c;它们存储数据的方式是把相同类型的数据按顺序一个接一个串在一起。简单的…

[Flutter]WindowsPlatform上运行遇到的问题总结

目录 写在开头 正文 Q1、file_version_info.dart Q2、flutter clean &#xff0c;无法删除build文件夹 其他 写在结尾 写在开头 Flutter项目已能在移动端完美使用后&#xff0c;想看看在桌面端等使用情况 基于Flutter3.0后已支持Windows/MacOS/Web等桌面端&#xff0c;不…

【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax快速入门

【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax概述 【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax快速入门 【大数据进阶第三阶段之Datax学习笔记】阿里云开源离线同步工具Datax类图 【大数据进阶第三阶段之Datax学习笔记】使用…

剑指offer面试题3 二维数组中的查找

考察点&#xff1a; 考察数据结构二维数组知识点&#xff1a; 1.java中的数据类型分为基本类型和引用类型&#xff0c;数组属于引用类型&#xff0c;引用类型的变量中存储的是地址&#xff0c;该地址指向内存中的某个对象&#xff0c;参考c中的指针。2.一维数组定义&#xff0c…

flutter中枚举的使用

枚举支持成员属性、实现和定义方法&#xff0c;使用时需要注意的地方&#xff1a; 枚举的所有实例都必须在声明的开头声明&#xff0c;并且必须至少声明一个实例最后一个枚举元素以 ; 结尾&#xff0c;其余的枚举元素以 ,结尾。枚举的构造函数 一定要是 const 构造枚举中的三个…

看直播怎么录屏?精选工具助你轻松录制!

随着网络直播平台的兴起&#xff0c;观看直播已经成为人们日常生活的一部分。然而&#xff0c;有时我们可能想要保存直播内容以便日后回顾或分享。可是看直播怎么录屏呢&#xff1f;在本文中&#xff0c;我们将介绍两种录制直播的方法&#xff0c;通过这些步骤&#xff0c;你可…

Verilog学习记录

目录 一、Verilog简介 &#xff08;一&#xff09;Verilog 的主要特性 &#xff08;二&#xff09;Verilog的主要应用 &#xff08;三&#xff09;Verilog设计方法 二、Verilog基础语法 &#xff08;一&#xff09;标识符和关键字 &#xff08;二&#xff09;Verilog数据…

内外网文件交换系统实用技巧揭秘:安全、效率、便捷一个不少

内外网文件交换系统是一种专门设计用于在企业内部网络&#xff08;内网&#xff09;与外部网络&#xff08;外网&#xff09;之间安全传输文件的技术解决方案。在企业环境中&#xff0c;出于安全考虑&#xff0c;内部网络通常与外部网络隔离&#xff0c;以防止未经授权的访问和…

shp文件与数据库(创建表)

前言 第三方库准备 shp文件是什么&#xff1f;笔者就不多做解释。后面将使用python的一些第三方库 1、sqlalchemy 2、pyshp 3、geoalchemy2 4、geopandas 这四个是主要的库&#xff0c;具体怎么使用可以参考相关教程&#xff0c;当然还有其他库&#xff0c;后面在介绍。…

信源编码与信道转移矩阵

目录 一. 信息论模型 二. 点对点通信模型 三. 信源编码 四. 信道转移矩阵 4.1 二进制对称信道 4.2 二进制擦除信道 五. 小结 &#xff08;1&#xff09;信道直射与反射 &#xff08;2&#xff09;信道散射 &#xff08;3&#xff09; 信道时变性 一. 信息论模型 194…

【AI视野·今日Robot 机器人论文速览 第七十一期】Fri, 5 Jan 2024

AI视野今日CS.Robotics 机器人学论文速览 Fri, 5 Jan 2024 Totally 11 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers Machine Learning in Robotic Ultrasound Imaging: Challenges and Perspectives Authors Yuan Bi, Zhongliang Jiang, Felix D…

redis可视化工具 RedisInsight

redis可视化工具 RedisInsight 1、RedisInsight是什么2、下载RedisInsight3、使用RedisInsight4、其他redsi可视化工具 1、RedisInsight是什么 RedisInsight 是一个用于管理和监控 Redis 数据库的图形用户界面&#xff08;GUI&#xff09;工具。它是由 Redis Labs 开发的&…

idea使用ssh连接docker,并通过Dockerfile文件,直接在idea中启动docker应用,并进行远程debug

idea使用ssh连接docker&#xff0c;并通过Dockerfile文件&#xff0c;直接在idea中启动docker应用&#xff0c;并进行远程debug 第一步: idea通过ssh连接docker第二步&#xff1a;使用Dockerfile文件在远程启动应用第三步: 远程debug 容器运行的好处是减轻本地运行的负担(本地电…