lettuce利用stream实现消息推送

1、消息推送

/*** @Auther: pshdhx* @Date: 2023/02/22/10:38* @Description: 往同一个stream队列里边塞值,同一队列的所有消费者组,都会收到消息* 模拟 消息推送到服务器*/
public class TestPubStream {public static void main(String[] args) {// 创建 Redis 连接RedisURI redisURI = RedisURI.Builder.redis("xxxxxx", 6379).build();redisURI.setPassword("xxxxxx?");RedisClient redisClient = RedisClient.create(redisURI);StatefulRedisConnection<String, String> connection = redisClient.connect();try {// 获取同步命令对象RedisCommands<String, String> syncCommands = connection.sync();// 创建消费者组String streamKey = "mystream"; // Stream名称String groupName = "myConsumer"; // 消费者组名称//检查 groupName 是否存在boolean groupExists = false;List<Object> result = syncCommands.xinfoGroups(streamKey);for (Object obj : result) {ArrayList objList = (java.util.ArrayList) obj;if((objList.get(1)+"").equals(groupName)){groupExists = true;break;}}System.out.println("groupExists = " + groupExists);// 如果 groupName 不存在,则创建if (!groupExists) {syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(streamKey, "0-0"), groupName, XGroupCreateArgs.Builder.mkstream());}// 发布消息syncCommands.xadd(streamKey, "key1", "val1");} finally {// 关闭连接connection.close();redisClient.shutdown();}}
}

2、实现消息订阅

/*** @Auther: pshdhx* @Date: 2023/02/22/10:40* @Description: 模拟 tomcat1 使用消费者组1,消费者1向redis 服务器 订阅推送的消息*/public class TestSub1_tomcat1 {public static void main(String[] args) {// 创建 Redis 连接RedisURI redisURI = RedisURI.Builder.redis("xxxxxx", 6379).build();redisURI.setPassword("xxxxxxxxxx");RedisClient redisClient = RedisClient.create(redisURI);StatefulRedisConnection<String, String> connection = redisClient.connect();try {// 从消费者组中获取消息String consumerName = "tomcat1_consumer_name"; // 消费者名称RedisCommands<String, String> streamCommands = connection.sync();while (true) {List<StreamMessage<String, String>> messages = streamCommands.xreadgroup(Consumer.from(groupName, consumerName),XReadArgs.Builder.block(Duration.ofSeconds(5)), // 阻塞5秒钟等待新消息XReadArgs.StreamOffset.lastConsumed(streamKey));if (messages.isEmpty()) {continue; // 在没有新消息时继续轮询}for (StreamMessage<String, String> message : messages) {System.out.println("tomcat 1 Received message: " + message.getBody());// 手动确认消息已被处理syncCommands.xack(streamKey, groupName, message.getId());}}} finally {// 关闭连接connection.close();redisClient.shutdown();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/85096.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#60个常见的问题和答案

在本文中,我将帮助你准备好在下一次面试中解决这些与C# 编程语言相关的问题。同时,你可能想练习一些C# 项目。这 60 个基本的 C#面试问题和答案将帮助你了解该语言的技术概念。 目录 什么是 C#? 1.什么是类? 2.面向对象编程的主要概念是什么?

Postman应用——控制台调试

当你在测试脚本中遇到错误或意外行为时&#xff0c;Postman控制台可以帮助你识别&#xff0c;通过将console.log调试语句与你的测试断言相结合&#xff0c;你可以检查http请求和响应的内容&#xff0c;以及变量之类的。 通常可以使用控制台日志来标记代码执行&#xff0c;有时…

网络安全日报 2023年09月21日

1、研究人员披露基于ERMAC木马的Hook家族银行木马 https://research.nccgroup.com/2023/09/11/from-ermac-to-hook-investigating-the-technical-differences-between-two-android-malware-variants/ 研究人员发现 ERMAC 源代码被用作 Hook 的基础。恶意软件操作者可以发送到…

Visual Studio将C#项目编译成EXE可执行程序

经常看文章时会收获不少实用工具&#xff0c;有的在github上是编译好的&#xff0c;有的则是未编译的项目文件。所以经常会使用Visual Studio编译项目文件成exe可执行程序&#xff0c;以下为编译的流程。 第一步&#xff0c;从github上下载项目文件&#xff0c;举个例子&#…

Golang 中空的切片转化成 JSON 后变为 null 的问题如何解决?

问题 在 Golang 中&#xff0c;经常需要将其他类型&#xff08;例如 slice、map、struct 等类型&#xff09;的数据转化为 JSON 格式。有时候转化的结果并不是预期中的&#xff0c;例如将一个空的切片转化为 JSON 时&#xff0c;会变成"null"&#xff0c;而并非预期…

小米手机安装面具教程(Xiaomi手机获取root权限)

文章目录 1.Magisk中文网&#xff1a;2.某呼&#xff1a;3.最后一步打开cmd命令行输入的时候:4.Flash Boot 通包-Magisk&#xff08;Flash Boot通刷包&#xff09;5.小米Rom下载&#xff08;官方刷机包&#xff09;6.Magisk最新版本国内源下载 1.Magisk中文网&#xff1a; htt…

【深度学习实验】前馈神经网络(七):批量加载数据(直接加载数据→定义类封装数据)

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. 直接加载鸢尾花数据集 a. 加载数据集 b. 数据归一化 c. 洗牌操作 d. 打印数据 2. 定义类封装数据 a. __init__(构造函数&#xff1a;用于初始化数据集对象) b.…

华为OD机试 - 构成正方形的数量 - 数据结构map(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、Java算法源码五、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#xff09;》。 …

mysql 半同步复制模式使用详解

目录 一、前言 二、mysql主从架构简介 2.1 mysql主从复制架构概述 2.2 为什么使用主从架构 2.2.1 提高数据可用性 2.2.2 提高数据可靠性 2.2.3 提升数据读写性能 2.3 主从架构原理 2.4 主从架构扩展 2.4.1 双机热备&#xff08;AB复制&#xff09; 2.4.2 级联复制 2…

GIT使用(踩坑)

1、关于远程路径的设置&#xff1a; 双反斜线 和 单斜线 的问题。 这样写可以&#xff1a; git remote add origin L:/BottleCapDetection这样写也可以&#xff1a; git remote add origin L:\\BottleCapDetection但是这样写&#xff0c;就不对&#xff1a; git remote add…

Qt核心:元对象系统、属性系统、对象树、信号槽

一、元对象系统 1、Qt 的元对象系统提供的功能有&#xff1a;对象间通信的信号和槽机制、运行时类型信息和动态属性系统等。 2、元对象系统是 Qt 对原有的 C进行的一些扩展&#xff0c;主要是为实现信号和槽机制而引入的&#xff0c; 信号和槽机制是 Qt 的核心特征。 3、要使…

当网络设置为自动获取dns时而实际nds是8.8.8.8,1.1.1.1的解决方法

笔记本换网络环境后&#xff0c;网络设置的是自动获取IP和自动获取dns。但使用命令&#xff1a;config/all命令时发现dns总是8.8.8.8,1.1.1.1。导致csdn上不了。 8.8.8.8,1.1.1.1&#xff1a;是谷歌的dns。 解决办法&#xff1a; 在支行中输入regedit打开注册表后&#xff0…

清华博士面试的准备(已通过)

内修&#xff08;30%&#xff09; 不管如何 任何人都不能影响你的心态。因为冷静、理性&#xff0c;才能处理好95%以上的问题。剩下的5%我可以不拥有。不能既要、又要、还要。尊重客观规律。放下我执。 价值导向、解决问题为导向。 允许一切事情的发生&#xff0c;是我们最大的…

windows下载虚拟机virtualBox

链接&#xff1a;Downloads – Oracle VM VirtualBox 进入链接这样点击&#xff1a; 直接下载即可

听GPT 讲Istio源代码--pkg(4)

File: istio/pkg/test/framework/components/cluster/topology.go istio/pkg/test/framework/components/cluster/topology.go文件是Istio项目中的一个测试框架组件&#xff0c;用于定义和管理测试环境中的集群拓扑结构。 其中&#xff0c;knownClusterNames是一个全局变量&…

Java“牵手”速卖通商品列表页数据采集+速卖通商品价格数据排序,速卖通API接口申请指南

速卖通是阿里巴巴旗下的面向国际市场打造的跨境电商平台&#xff0c;被称为国际版淘宝&#xff0c;速卖通面向海外买家客户&#xff0c;通过支付宝国际账户进行担保交易&#xff0c;并使用国际物流渠道运输发货&#xff0c;是全球第三大英文在线购物网站。 速卖通商品列表数据…

关于IDEA没有显示日志输出?IDEA控制台没有显示Tomcat Localhost Log和Catalina Log 怎么办?

问题描述&#xff1a; 原因是;CATALINA_BASE里面没有相关的文件配置。而之前学习IDEA的时候&#xff0c;把这个文件的位置改变了。导致&#xff0c;最后输出IDEA的时候&#xff0c;不会把日志也打印出来。 检查IDEA配置; D:\work_soft\tomcat_user\Tomcat10.0\bin 在此目录下&…

如何在没有第三方.NET库源码的情况,调试第三库代码?

大家好&#xff0c;我是沙漠尽头的狼。 本方首发于Dotnet9&#xff0c;介绍使用dnSpy调试第三方.NET库源码&#xff0c;行文目录&#xff1a; 安装dnSpy编写示例程序调试示例程序调试.NET库原生方法总结 1. 安装dnSpy dnSpy是一款功能强大的.NET程序反编译工具&#xff0c;…

STM32 Cubemx 通用定时器 General-Purpose Timers同步

文章目录 前言简介cubemx配置 前言 持续学习stm32中… 简介 通用定时器是一个16位的计数器&#xff0c;支持向上up、向下down与中心对称up-down三种模式。可以用于测量信号脉宽&#xff08;输入捕捉&#xff09;&#xff0c;输出一定的波形&#xff08;比较输出与PWM输出&am…

activemq部署

目录 1.下载 2.java环境 3.解压启动 4.访问测试 5.问题记录 5.1.无法启动成功问题 5.2.其他服务器无法访问 1.下载 ActiveMQ 2.java环境 需要注意要求的jdk版本&#xff0c;否则启动不会成功 3.解压启动 tar -zxvf apache-activemq-5.18.2-bin.tar.gz 进入到目录下执行…