聊聊Redis消息队列stream

前言

本期和大家一起探讨了如何基于 redis 实现消息队列,其中实现方案包括三类:

  • redis list:最简单粗暴的实现,存在问题包括:不支持发布/订阅模式、消费端缺少 ack 机制
  • redis pub/sub:支持发布/订阅模式,有较高的丢数据风险,消费端同样不支持 ack 机制
  • redis stream:趋近于成熟的 mq 实现方式. 支持发布/订阅模式,消费端能支持 ack 机制. 但是受限于 redis 自身的特性,仍无法杜绝丢失数据的可能性(本文只聊这个) 

Redis Stream 简介

Redis Stream 是 Redis 5.0 版本中引入的一种数据结构,用于存储和处理消息流。它类似于消息队列,但具有更高的性能和更丰富的特性。ACK 操作用于确认消息已经被消费,从而避免重复消费。

Redis作为消息队列的优缺点

优点

  1. 简单轻量:Redis是一个内存中的数据存储系统,具有轻量级和简单的特点。相比较专门的消息队列系统,使用Redis作为消息队列不需要引入额外的组件和依赖,可以减少系统的复杂性。
  2. 速度快:由于Redis存储在内存中,它具有非常高的读写性能。这对于需要低延迟的应用程序非常有优势。
  3. 多种数据结构支持:Redis提供了丰富的数据结构,如列表、发布/订阅、有序集合等。这使得Redis在处理不同类型的消息和任务时更加灵活。
  4. 数据持久化:Redis可以通过将数据持久化到磁盘来提供数据的持久性。这意味着即使Redis重启,之前的消息也不会丢失。
  5. 广泛的应用场景:Redis不仅可以用作消息队列,还可以用作缓存、数据库、分布式锁等多种用途。如果你的应用程序已经使用了Redis,那么使用Redis作为消息队列可以减少技术栈的复杂性。

缺点

  1. 缺少一些高级特性:相对于专门的消息队列系统,Redis在消息队列方面的功能可能相对简单。例如,它可能缺乏一些高级消息传递功能,如消息重试、消息路由、持久化消息等。
  2. 可靠性和一致性:Redis的主要设计目标是提供高性能和低延迟,而不是强一致性和高可靠性。在某些情况下,Redis可能会丢失消息,或者在出现故障时可能无法提供持久性保证。

推荐应用场景

适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑 如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。

实战

maven依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置StreamConfig(监听)

import com.lasse.mq.redis.listener.AutoAckStreamConsumeListener;
import com.lasse.mq.redis.listener.BasicAckStreamConsumeListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.util.ErrorHandler;import javax.annotation.PostConstruct;
import java.time.Duration;@Configuration
public class RedisStreamConfig {@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate AutoAckStreamConsumeListener autoAckStreamConsumeListener;@Autowiredprivate BasicAckStreamConsumeListener basicAckStreamConsumeListener;@PostConstructpublic void initializeStream() {StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();// 创建一个流try {streamOperations.createGroup("stream_queue_key",  AutoAckStreamConsumeListener.GROUP);streamOperations.createGroup("stream_queue_key", BasicAckStreamConsumeListener.GROUP);} catch (RedisSystemException e) {}}@Beanpublic StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {// 用于配置消息监听容器的选项。在这个方法中,通过设置不同的选项,如轮询超时时间和消息的目标类型,可以对消息监听容器进行个性化的配置。StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// Stream 中没有消息时,阻塞多长时间,需要比 `spring.redis.timeout` 的时间小.pollTimeout(Duration.ofSeconds(3))// 获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理.errorHandler(new ErrorHandler() {@Overridepublic void handleError(Throwable t) {System.out.println("出现异常就来这里了" + t);}})// 指定了消息的目标类型为 String。这意味着容器会将接收到的消息转换为 String 类型,以便在后续的处理中使用。.build();// 创建一个可用于监听Redis流的消息监听容器。StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer =StreamMessageListenerContainer.create(redisConnectionFactory, options);// 方法配置了容器来接收来自特定消费者组和消费者名称的消息。它还指定了要读取消息的起始偏移量,以确定从哪里开始读取消息。// 消费组A,自动ack// 从消费组中没有分配给消费者的消息开始消费streamMessageListenerContainer.receiveAutoAck(Consumer.from(AutoAckStreamConsumeListener.GROUP, "AutoAckConsumer"),StreamOffset.create("stream_queue_key", ReadOffset.lastConsumed()), autoAckStreamConsumeListener);// 消费组B,不自动ackstreamMessageListenerContainer.receive(Consumer.from(BasicAckStreamConsumeListener.GROUP, "BasicAckConsumer"),StreamOffset.create("stream_queue_key", ReadOffset.lastConsumed()), basicAckStreamConsumeListener);streamMessageListenerContainer.start();return streamMessageListenerContainer;}}

配置消费者

自动ack

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.Map;@Slf4j
@Component
public class AutoAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {//分组名public static final String GROUP = "autoack_stream";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {String stream = message.getStream();RecordId id = message.getId();Map<String, String> map = message.getValue();log.info("[自动ACK]接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);redisTemplate.opsForStream().delete(GROUP, id.getValue());}
}

 手动ack

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.Map;@Slf4j
@Component
public class BasicAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> {//分组名public static final String GROUP = "basicack_stream";@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(MapRecord<String, String, String> message) {String stream = message.getStream();RecordId id = message.getId();Map<String, String> map = message.getValue();log.info("[手动ACK]接收到一个消息 stream:[{}],id:[{}],value:[{}]", stream, id, map);redisTemplate.opsForStream().acknowledge(stream, GROUP, id.getValue());//消费完毕删除该条消息redisTemplate.opsForStream().delete(GROUP, id.getValue());}
}

配置生产者

    @Autowiredprivate RedisTemplate redisTemplate;@RequestMapping(value = "/stream/produce", method = RequestMethod.GET)public void streamProduce() {Map<String, String> map = new HashMap<>();map.put("name", "大家好我是周杰伦");map.put("time", DateUtil.now());redisTemplate.opsForStream().add("stream_queue_key", map).getValue();}

 实现效果

INFO 17560 --- [cTaskExecutor-1] c.l.m.r.l.AutoAckStreamConsumeListener   : [自动ACK]接收到一个消息 stream:[stream_queue_key],id:[1712733195117-0],value:[{name="大家好我是周杰伦", time="2024-04-10 15:13:15"}]
INFO 17560 --- [cTaskExecutor-2] c.l.m.r.l.BasicAckStreamConsumeListener  : [手动ACK]接收到一个消息 stream:[stream_queue_key],id:[1712733195117-0],value:[{name="大家好我是周杰伦", time="2024-04-10 15:13:15"}]

常见问题

Redis消息队列的注意事项

一、消息丢失

问题:

当使用Redis作为消息队列时,如果Redis服务器关闭或发生故障,所有未处理的消息都将被删除,这将导致消息丢失。

解决方法:

因此,应考虑使用Redis持久化功能和备份策略,以确保消息不会丢失。

Redis提供了两种持久化选项:RDB和AOF。RDB(Redis database backup)是将整个Redis数据集在指定时间间隔内写入磁盘的快照;AOF(Append-only file)则是记录Redis服务器执行的写命令,以便在服务器重新启动时重新执行这些命令。

可以通过设置更改持久化选项的自动与手动触发,以适应您的应用程序的需求。此外,在主Redis实例故障后,备份Redis实例可以被用作备份。

二、消息可靠性

问题:

当Redis消息队列中的消息被消费者处理后,我们无法保证消费者已经成功处理了消息。如果发生故障或异常,消息将被重新处理,这可能导致问题,尤其在需要保证消息处理顺序和应用程序的幂等性时。

解决方法:

我们可以使用ACK(应答)机制,当消费者成功处理消息后,应在Redis中发送ACK。当Redis收到ACK时,将从队列中移除处理过的消息,并继续处理下一个消息。

另一种方法是使用双重消费者模式。这种模式中,每条消息都有两个消费者处理。当第一个消费者将消息标记为已完成时,Redis将消息发送给第二个消费者,如果第二个消费者没有收到消息,则原始消费者将重新处理消息。虽然这种方法会增加系统的复杂性,但可以保证消息的可靠性。

三、单点故障

问题:

使用单个Redis实例作为消息队列存在单点故障的风险。如果Redis服务器宕机,所有消息都无法处理。

解决方法:

使用Redis Sentinel,由多个Redis Sentinel实例组成的集群,可以管理和监视Redis服务器集群。当主Redis实例宕机时,Sentinel会自动将从实例提升为主实例,并通知客户端。

四、性能问题

问题:

当消息队列中的消息数量非常大时,Redis的性能可能会受到影响。在Redis中实现异步调用的最佳方法是使用发布-订阅模式,因为该模式可以处理大量的并发请求。

解决方法:

在发布-订阅模式下,发布者发布消息并将其发送到Redis频道,订阅者订阅Redis频道并处理消息。这种模式最大的优点是扩展性好,在处理大量消息的情况下可以提供可靠的性能。

Redis Stream ACK 会删除消息吗?

不会!!!

关于为什么 Redis Streams 中的消息不自动删除,这主要是基于 Stream 的设计和使用场景。以下是几个原因:

  1. 持久化:Streams 是为持久化消息日志而设计的。这意味着消息被保存在磁盘上,以便在 Redis 服务器重启或故障后能够恢复。自动删除消息会破坏这种持久化特性。
  2. 消费者进度跟踪:在发布/订阅模型中,当消息被发布后,消费者通常需要确认它们已经处理了这些消息。Streams 通过消费者组(Consumer Groups)和消费者偏移量(Consumer Offsets)来跟踪消费者的进度。如果消息被自动删除,那么跟踪消费者的进度就会变得困难。
  3. 手动管理:Redis 提供了 XTRIM 命令,允许用户手动删除旧的或不再需要的消息。这种手动管理的方式给了用户更多的灵活性,可以根据实际需求来决定何时删除消息。
  4. 资源消耗:自动删除消息可能会消耗大量的系统资源,特别是在高吞吐量的场景中。由用户根据需要手动删除消息可以更好地控制资源的使用。

需要注意的是,虽然 Streams 中的消息不会自动删除,但 Redis 会对 Streams 进行一些内部优化,例如压缩旧的消息或合并小的文件,以节省磁盘空间。这些优化操作是透明的,对用户来说是无感知的。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/807645.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无“相关性”是指商品与分享内容无相关性,下列哪个行为不属于(无)相关性”违规?()

需要查看更多试题和答案&#xff0c;可以前往&#xff08;题海舟试题答案&#xff09;进行搜题查看。可以搜“题干关键词”。 无“相关性”是指商品与分享内容无相关性&#xff0c;下列哪个行为不属于(无)相关性”违规?() A.篮球比赛直播&#xff0c;售卖球衣、球鞋、球类与球…

从挑战到机遇:HubSpot如何帮助企业化解出海过程中的难题

企业出海挑战与对策 随着全球化的加速推进&#xff0c;越来越多的企业开始将目光投向海外市场&#xff0c;以寻求更广阔的发展空间。然而&#xff0c;在出海的过程中&#xff0c;企业往往面临着诸多挑战&#xff0c;其中文化差异、法律限制等问题尤为突出。今天运营坛将对这些…

快速开始vue3

版本 node (20.11.1)vue3 (3.4.21)脚手架创建项目并运行 安装脚手架并创建项目npm create vue@latest这一指令将会安装并执行 create-vue,它是 Vue 官方的项目脚手架工具 2) 安装以下进行选择 ## 配置项目名称 √ Project name: vue3_test ## 是否添加TypeScript支持 √ Add…

创意解决方案:如何将作品集视频集中于一个二维码或链接中?

引言&#xff1a;随着面试环节的进一步数字化&#xff0c;展示自己的作品集成为了求职过程中的重要一环。但除了使用传统的方式&#xff0c;如百度网盘或直接发送多个视频链接&#xff0c;有没有更便捷的方法将作品集的多个视频放在一个链接中呢? 本文将介绍一种创意解决方案…

如何使用try-with-resources关闭非自己创建的InputStream

如何使用try-with-resources关闭非自己创建的InputStream 在Java中&#xff0c;不论InputStream是自己创建的还是由外部提供的&#xff0c;只要它是AutoCloseable的实例&#xff0c;你都可以使用try-with-resources语句来确保它在不再需要时被自动关闭。 try-with-resources语…

RocketMQ 之 IoT 消息解析:物联网需要什么样的消息技术?

作者&#xff1a;林清山&#xff08;隆基&#xff09; 前言&#xff1a; 从初代开源消息队列崛起&#xff0c;到 PC 互联网、移动互联网爆发式发展&#xff0c;再到如今 IoT、云计算、云原生引领了新的技术趋势&#xff0c;消息中间件的发展已经走过了 30 多个年头。 目前&a…

数据库被rmallox勒索病毒加密,如何还原?

近年来&#xff0c;网络安全问题日益严峻&#xff0c;勒索病毒作为其中的一种恶意软件&#xff0c;已成为网络安全领域的一大难题。其中&#xff0c;rmallox勒索病毒以其高度的隐蔽性和破坏性&#xff0c;给不少企业和个人带来了严重损失。本文将从rmallox勒索病毒的特点、传播…

一个简单的UI自动化框架应用介绍

项目框架介绍 该数据自动校验小程序采用POM模型&#xff0c;基于Javaseleniumtestngextentsreportexcel POI开发。 框架核心功能 基于PMO模型将页面封装成java对象&#xff0c;并通过selenuim驱动浏览器进行操作。通过excel POI对excel文件进行操作&#xff0c;通过对比导出…

Docker搭建CouchPotato

使用 CouchPotato Docker 镜像搭建电影下载管理器 CouchPotato 是一个电影下载管理器&#xff0c;它可以帮助用户自动搜索、下载和管理电影。通过 Docker 镜像&#xff0c;可以在服务器上轻松部署 CouchPotato&#xff0c;并让其运行在容器中&#xff0c;以便在任何设备上访问…

通过网络api获取日期对应的节假日信息

网络接口获取链接&#xff1a;免费节假日API_原百度节假日API HolidayJudge.h #pragma once#include <QtWidgets/QWidget> #include "ui_HolidayJudge.h"enum DATESTATE {WORK0,//工作日DAYOFF,//休息日HOLIDAY//节假日 };class HolidayJudge : public QWidg…

Linux之线程互斥与同步

1.线程互斥相关概念 临界资源&#xff1a;多线程执行流共享的资源就叫做临界资源 。 临界区&#xff1a;每个线程内部&#xff0c;访问临界自娱的代码&#xff0c;就叫做临界区。 互斥&#xff1a;任何时刻&#xff0c;互斥保证有且只有一个执行流进入临界区&#xff0c;访问临…

关于本博客作者的声明

鉴于鸭某兽公司的恶意推测、试图抹黑本人及本人所在公司&#xff0c;臆测本人及本人所在公司对本人博客名称进行模仿。为了对相关情况进行澄清&#xff0c;现本人声明如下&#xff1a; 本博客&#xff08;名称&#xff1a;小飞鱼通达二开&#xff09;&#xff0c; 网址为&…

【Unity】如何让GameObject的长宽自适应屏幕分辨率

【背景】 用一个长方形的GameObject代表电影屏幕,希望这个GameObject能够随着当前屏幕分辨率的大小适当变化,Texture会呈现当前屏幕的桌面画面,如果不一致会比例失调。 【分析】 Awake函数中就完成处理。获得当前屏幕分辨率,用适当倍数计算后付给GameObject的Transform下…

谷歌查问题

1&#xff0c;打开 it工具箱-里面啥都有 2&#xff0c;找到谷歌 3&#xff0c;访问gpt

E. Yet Another Walking Robot 又一个行走的机器人(map详解代码)

坐标平面上有一个机器人。最初&#xff0c;机器人位于该点&#xff08;0,0&#xff09; .它的路径被描述为字符串s长度n由字符“L”、“R”、“U”、“D”组成。这些字符中的每一个都对应着一些动作&#xff1a; ‘L’&#xff08;左&#xff09;&#xff1a;表示机器人从该点移…

VSCODE自动更新无法连接远程服务器报错“waiting for server log...“的解决方法

问题描述 一觉醒来打开vscode发现连接远程服务器显示无法连接&#xff0c;终端一直报错“waiting for server log…"&#xff0c;经查是因为vscode自动更新到了1.86&#xff0c;对于远程服务器的linux版本要求较高。这里记录下解决方法。 解决方法 1. 下载vscode便携版…

Vue3 v-bind绑定css中的var变量实现动态样式

在日常的开发中&#xff0c;我们常常遇到这样的需求&#xff1a;点击一个button改变页面中某个元素的样式&#xff0c;在这样的场景中&#xff0c;我们可以使用v-bind绑定css中的var变量&#xff0c;来动态的切换元素的样式 一个小栗子&#xff0c;在setup语法糖环境下&#xf…

敏捷开发是什么?敏捷开发的流程有什么?

敏捷开发是什么&#xff1f; 敏捷开发是一种灵活且迭代的软件开发方法论&#xff0c;它强调快速响应变化、高效协作、持续交付价值以及高度关注业务目标与客户满意度。敏捷开发采用短周期&#xff08;称为“迭代”或“冲刺”&#xff09;来开发、测试和交付可用的产品增量&…

日志埋点功能

前言 开发中经常会有日志埋点需求, 用于统计接口的请求量、处理速度等等,为此本篇幅从一下几个维度进行分析,从零到有搭建。 技术架构解析 实现日志埋点功能,从字面意思就可以想到功能大致分为两个方向: 1、 埋点功能(logback + 封装通用SDK方法 + 共享文件夹(如果是多…

Sketch3D:用于草图到3D生成的样式一致性指南

Sketch3D: Style-Consistent Guidance for Sketch-to-3D Generation Sketch3D&#xff1a;用于草图到3D生成的样式一致性指南 Wangguandong Zheng 重试 错误原因 Southeast UniversityChina 重试 错误原因 wgdzhengseu.edu.cnHaifeng Xia 重试 错误原因 Southeast Universit…