redis stream介绍

news/2025/11/12 17:44:32/文章来源:https://www.cnblogs.com/hekk/p/19212822

介绍

redis stream是一种类似日志追加的数据结构。可用来记录和实时处理事件。适用场景:

  • 事件溯源
  • 传感器监控
  • 通知

性能

新增 O(1)
访问单个节点是O(n),n是ID的长度
redis stream使用radix trees实现

基础

XADD

新增条目使用XADD

> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1692632147973-0"

一个条目由一个或多个key-value组成,类似于字典。
以上命令往stream race:france中添加了一个条目:rider:Castilla, speed:29.9, position:1, location_id:2,并且自动生成id,其id为1692632147973-0*代表自动生成ID。每一个新生成的ID是递增的。

Entry IDs

ID的组成

<millisecondsTime>-<sequenceNumber>

从Streams获取数据

读取数据有多种模式:

  1. 多个客户端读取一个Stream的新数据,类似 tail -f 命令
  2. 按时间查询,查询历史数据,遍历数据
  3. 同一个消费组种的消费者消费一个Stream中的不同消息,类似Kafka

范围查询XRANGE和XREVRANGE

特殊的ID
-表示最小的id
+表示最大的id
示例:
获取最靠前的2个节点

XRANGE race:france - + COUNT 2

查询ID从1692632094485-0开始的2个元素,不包括1692632094485-0本身,‘(’符号表示不包含。

XRANGE race:france (1692632094485-0 + COUNT 2

XREVRANGE命令与XRANGE相等,只是返回的元素的顺序相反。

使用XREAD监听新元素

  1. 非阻塞式读取:
XREAD COUNT 2 STREAMS race:france 0

以上命令返回2个ID大于0的数据。
2. 阻塞读取

XREAD BLOCK 0 STREAMS race:france $

BLOCK 0的效果是一直阻塞,$表示当前stream中最大的ID值。这个命令实现了tail -f的效果。

消费组

消费组的概念与Kafka中的消费组类型,但实现上与kafka无关。

消费者组就像一个从一个流中获取数据的伪消费者,它实际上服务于多个消费者,提供了以下保证:

  1. 一条消息只能发送给一个消费者。
  2. 在一个消费组中,不同客户端通过一个名称来区分,由客户端提供唯一标识符。
  3. 每个使用者组都有从未使用过的第一个 ID 的概念,因此,当使用者请求新消息时,它可以只提供以前未传递的消息。
  4. 使用消息需要使用特定命令进行显式确认。Redis 将确认解释为:此消息已正确处理,因此可以将其从使用者组中逐出。
  5. 消费组跟踪当前挂起的所有消息,即已传递给使用者组的某个使用者但尚未确认为已处理的消息。由于此功能,在访问流的消息历史记录时,每个消费者将只看到已传递到它的消息。

创建消费者

> XGROUP CREATE race:france france_riders $
OK

在这行命令中,为流race:france创建了一个france_riders消费组,并指定id为$。$表示消费组读取最新的消息。消费者只会读取比指定ID大的消息。如果ID指定为0表示读取流中所有的数据。

自动创建流

> XGROUP CREATE race:italy italy_riders $ MKSTREAM
OK

消费者读取

> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy >
1) 1) "race:italy"2) 1) 1) "1692632639151-0"2) 1) "rider"2) "Castilla"

XREADGROUP指令需要指定GROUP <group-name> <consumer-name>

  • ID指定为>,这意味着消费者只希望接收从未传递给任何其他消费者的消息。这意味着,给我新的信息。
  • 任何其他 ID,即0或任何其他有效ID或不完整ID(仅仅毫秒时间部分),将返回为发送命令的使用者返回待处理的条目,其 ID 大于所提供的 ID。因此,基本上,如果 ID 不为>,那么该命令将只允许客户端访问其待处理条目:那些发送给它,但尚未确认的消息。请注意,在这种情况下,BLOCK 和 NOACK 都被忽略。

XREADGROUP 是一个写入命令, 因为即使它从流中读取数据,读取操作也会产生副作用,导致消费者组被修改,因此它只能在主实例上调用。
一个使用ruby实现的消费者代码例子:

require 'redis'if ARGV.length == 0puts "Please specify a consumer name"exit 1
endConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.newdef process_message(id,msg)puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end$lastid = '0-0'puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true# Pick the ID based on the iteration: the first time we want to# read our pending messages, in case we crashed and are recovering.# Once we consumed our history, we can start getting new messages.if check_backlogmyid = $lastidelsemyid = '>'enditems = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)if items == nilputs "Timeout!"nextend# If we receive an empty reply, it means we were consuming our history# and that the history is now empty. Let's start to consume new messages.check_backlog = false if items[0][1].length == 0items[0][1].each{|i|id,fields = i# Process the messageprocess_message(id,fields)# Acknowledge the message as processedr.xack(:my_stream_key,GroupName,id)$lastid = id}
end

故障恢复

使用XPENDINGXCLAIM命令从故障进行恢复。
Redis 消费者者组提供了一项功能,用于在这些情况下使用该功能来声明给定消费者的待处理消息,以便此类消息将更改所有权并重新分配给不同的消费者。消费者必须检查挂起的消息列表,并且必须使用特殊命令声明特定消息,否则服务器将使消息永远处于挂起状态并分配给旧的消费者。

XPENDING

> XPENDING race:italy italy_riders
1) (integer) 2
2) "1692632647899-0"
3) "1692632662819-0"
4) 1) 1) "Bob"2) "2"

以这种方式调用时,该命令会输出消费者组中待处理消息的总数(在本例中为两个),待处理消息中较低和较高的消息 ID,最后输出消费者列表和它们拥有的待处理消息数。
也可以指定开始和结束的ID

> XPENDING race:italy italy_riders - + 10
1) 1) "1692632647899-0"2) "Bob"3) (integer) 746424) (integer) 1
2) 1) "1692632662819-0"2) "Bob"3) (integer) 746424) (integer) 1

该命令返回了消息的详细信息,ID、使用者名称、空闲时间(以毫秒为单位),即自上次将消息传递给某个使用者以来已经过去了多少毫秒,最后是给定消息传递的次数。
使用XCLAIM指令来更改消息的所有权

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

Claiming and the delivery counter

delivery counter在2种情况下会递增:

  1. 当一个消息被成功claim时
  2. 使用XREADGROUP去读取历史消息
    当出现故障时,消息会多次传递是正常的,但最终它们通常会被处理和确认。
    因此,一旦传递计数器达到您选择的给定大数字,将此类消息放在另一个流中并向系统管理员发送通知可能更明智。这基本上是 Redis Streams 实现死信概念的方式。

特殊的ID符号

-可能的最小id(0-1)
+可能的最大id(18446744073709551615-18446744073709551615),类似Integer.MAX_VALUE
$表示当前stream中最大的那个ID
>表示消费组中最后一个已发送的ID

持久化,复制和消息安全性

Stream数据结构像其他Redis数据结构一样异步地同步到副本节点以及持久化到AOF和RDB文件中。消费组的状态会保存在副本、AOF、RDB中。在重启后,通过AOF文件可恢复消费组的状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/963653.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 线性表、栈、队列和优先队列

Java 线性表、栈、队列和优先队列 选择合适的数据结构和算法是开发高性能软件的关键。数据结构是按特定形式组织数据的集合,不仅存储数据,还支持数据的访问与处理操作。 在面向对象思想中,数据结构被视为容器或容器…

2025/11/11

2025/11/11全流程开发逻辑 从前端表单设计、后端 Servlet 处理、DAO 层数据库操作,到 MySQL 表结构设计,完整覆盖了 “用户交互 -> 业务逻辑 -> 数据存储” 的 Web 应用开发流程。 问题排查方法 面对代码报错,…

植物大战僵尸修改器下载教程:图文详解与实用技巧

前言: 在塔防游戏还没有被各类快节奏手游占据之前,《植物大战僵尸》几乎是每位玩家电脑中必装的一款经典作品。即便十多年过去,它依然凭借简单的机制与极高的策略深度拥有极强的生命力。 对于不少老玩家来说,玩到中…

微服务——注册中心

常见的注册中心:eureka、nacos、zookeeper 服务注册和发现是什么意思?Spring Cloud是如何实现服务注册发现? 服务注册:服务提供者需要把自己的信息注册到eureka,由eureka来保存这些信息,比如服务名称、IP、端口等…

【深度学习计算机视觉】13:实战Kaggle比赛:图像分类 (CIFAR-10) - 指南

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

fabricjs 整合 vue3-sketch-ruler 实现标尺功能

版本信息 fabricjs版本为6.7.1 ------ vue3-sketch-ruler的版本为1.3.15 引入标尺的部分代码<!-- 画布区域 --><div id="workspace" style="width: 100%;height: 100%; position: relative; …

2025年真空耙式干燥机定做厂家权威推荐榜单:真空单锥螺带干燥机/沸腾床干燥机/闪蒸干燥机源头厂家精选

在精细化工与制药行业对热敏性物料干燥要求日益严格的背景下,一台高性能的真空耙式干燥机已成为保障产品质量、提升生产效率的关键装备。 据干燥设备行业数据显示,2024年中国真空干燥设备市场规模达到87亿元,年增长…

基础查找算法(三)二分查找

基础查找算法(三)二分查找一 定义 二分查找(Binary Search)是一种基于分治策略的高效查找算法,专用于有序数据集合。它通过不断将搜索范围减半来快速定位目标元素,具有对数时间复杂度,适合处理大规模静态数据。…

2025年软像套电缆订做厂家权威推荐榜单:补偿电缆/矿物质电缆/电力电缆源头厂家精选

在工业自动化与智能制造浪潮中,一根高品质的软像套电缆不仅是电力与信号的传输载体,更是保障设备稳定运行的关键。 据行业数据显示,全球工业电缆市场规模预计到2031年将达到千亿级别,年复合增长率稳定提升。软像套…

2025年济南统招专升本学校权威推荐榜单:专升本机构报名/全日制专升本/专升本考试培训学校精选

在山东省专升本录取率持续走低的背景下,选择一家优质的统招专升本培训机构已成为考生成功升本的关键因素。 根据山东省教育招生考试院公布的数据,2024年山东专升本报考人数已突破17万,而总招生计划仅为7.6万人左右,…

一些水题

https://www.luogu.com.cn/problem/CF374B点击查看代码 #include <bits/stdc++.h> using namespace std;/*CF374B - Inna and Nine思路:- 找到字符串中所有 maximal 的“相邻两位和为9”的连续段(段内任意相邻…

(3)Bug篇 - 详解

(3)Bug篇 - 详解2025-11-12 17:29 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-fam…

西林瓶灌装轧盖机:黔东南折旧年限与成本解析

在黔东南地区,随着生物医药、民族医药及大健康产业的快速发展,对西林瓶灌装设备的需求持续增长。无论是本地药企扩产,还是新兴诊断试剂企业建线,西林瓶灌装轧盖机作为核心装备之一,其采购决策不仅关乎生产效率,更…

list对象 集合 和 String 互转

list<对象> 集合 和 String 互转maven 项目 首先导入alibaba的jar包<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.54</v…

碎碎念(二四)

awa跃龙门 鲤鱼跃龙门,长久以来津津乐道的话题。 指鲤鱼回游逆流而上时成功跃过激流,化龙的故事。 之前我认为它指春节时热闹喜庆(毕竟春节都是锦鲤和金龙),或者进士中举时用来庆祝。 大家都在对跃龙门的鲤鱼表示…

西林瓶灌装加塞机:驻马店适用,低噪运行约65分贝

在制药与生物制剂行业中,西林瓶灌装设备的运行稳定性、精度以及环境友好性始终是用户关注的核心指标。其中,噪音水平作为衡量设备运行品质的重要参数之一,直接影响操作人员的工作舒适度及车间整体声学环境。根据行业…

高精度除法模板(p1480)

P1480 A/B Problem 题目描述 输入两个整数 \(a,b\),输出它们的商。 输入格式 两行,第一行是被除数,第二行是除数。 输出格式 一行,商的整数部分。 输入输出样例 #1 输入 #1 10 2输出 #1 5说明/提示 \(0\le a\le 10…

如何完成一个简单的rust WebAssembly调用

1、安装wasm编译目标 rustup target add wasm32-unknown-unknownwasm32-unknown-unknown 是 WebAssembly (Wasm) 的一个编译目标,表示一个独立于特定环境(如浏览器或Node.js)的 32 位 WebAssembly 二进制文件。它是…

焊接工业机器人节气装置

在现代制造业如飞驰列车般迅猛发展的当下,机器人焊接技术俨然成为提升生产效率、铸就高品质产品的中流砥柱。但在这看似一切顺遂的背后,焊接过程中保护气体的消耗问题,却似一座横亘于制造成本之途的大山,令人颇为挠…

详细介绍:考研408--组成原理--day1

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …