1、消息推送
/*** @Auther: pshdhx* @Date: 2023/02/22/10:38* @Description: 往同一个stream队列里边塞值,同一队列的所有消费者组,都会收到消息* 模拟 消息推送到服务器*/
public class TestPubStream {public static void main(String[] args) {// 创建 Redis 连接RedisURI redisURI = RedisURI.Builder.redis("xxxxxx", 6379).build();redisURI.setPassword("xxxxxx?");RedisClient redisClient = RedisClient.create(redisURI);StatefulRedisConnection<String, String> connection = redisClient.connect();try {// 获取同步命令对象RedisCommands<String, String> syncCommands = connection.sync();// 创建消费者组String streamKey = "mystream"; // Stream名称String groupName = "myConsumer"; // 消费者组名称//检查 groupName 是否存在boolean groupExists = false;List<Object> result = syncCommands.xinfoGroups(streamKey);for (Object obj : result) {ArrayList objList = (java.util.ArrayList) obj;if((objList.get(1)+"").equals(groupName)){groupExists = true;break;}}System.out.println("groupExists = " + groupExists);// 如果 groupName 不存在,则创建if (!groupExists) {syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(streamKey, "0-0"), groupName, XGroupCreateArgs.Builder.mkstream());}// 发布消息syncCommands.xadd(streamKey, "key1", "val1");} finally {// 关闭连接connection.close();redisClient.shutdown();}}
}
2、实现消息订阅
/*** @Auther: pshdhx* @Date: 2023/02/22/10:40* @Description: 模拟 tomcat1 使用消费者组1,消费者1向redis 服务器 订阅推送的消息*/public class TestSub1_tomcat1 {public static void main(String[] args) {// 创建 Redis 连接RedisURI redisURI = RedisURI.Builder.redis("xxxxxx", 6379).build();redisURI.setPassword("xxxxxxxxxx");RedisClient redisClient = RedisClient.create(redisURI);StatefulRedisConnection<String, String> connection = redisClient.connect();try {// 从消费者组中获取消息String consumerName = "tomcat1_consumer_name"; // 消费者名称RedisCommands<String, String> streamCommands = connection.sync();while (true) {List<StreamMessage<String, String>> messages = streamCommands.xreadgroup(Consumer.from(groupName, consumerName),XReadArgs.Builder.block(Duration.ofSeconds(5)), // 阻塞5秒钟等待新消息XReadArgs.StreamOffset.lastConsumed(streamKey));if (messages.isEmpty()) {continue; // 在没有新消息时继续轮询}for (StreamMessage<String, String> message : messages) {System.out.println("tomcat 1 Received message: " + message.getBody());// 手动确认消息已被处理syncCommands.xack(streamKey, groupName, message.getId());}}} finally {// 关闭连接connection.close();redisClient.shutdown();}}
}