介绍
redis stream是一种类似日志追加的数据结构。可用来记录和实时处理事件。适用场景:
- 事件溯源
- 传感器监控
- 通知
性能
新增 O(1)
访问单个节点是O(n),n是ID的长度
redis stream使用radix trees实现
基础
XADD
新增条目使用XADD
> XADD race:france * rider Castilla speed 29.9 position 1 location_id 2
"1692632147973-0"
一个条目由一个或多个key-value组成,类似于字典。
以上命令往stream race:france中添加了一个条目:rider:Castilla, speed:29.9, position:1, location_id:2,并且自动生成id,其id为1692632147973-0。*代表自动生成ID。每一个新生成的ID是递增的。
Entry IDs
ID的组成
<millisecondsTime>-<sequenceNumber>
从Streams获取数据
读取数据有多种模式:
- 多个客户端读取一个Stream的新数据,类似 tail -f 命令
- 按时间查询,查询历史数据,遍历数据
- 同一个消费组种的消费者消费一个Stream中的不同消息,类似Kafka
范围查询XRANGE和XREVRANGE
特殊的ID
-表示最小的id
+表示最大的id
示例:
获取最靠前的2个节点
XRANGE race:france - + COUNT 2
查询ID从1692632094485-0开始的2个元素,不包括1692632094485-0本身,‘(’符号表示不包含。
XRANGE race:france (1692632094485-0 + COUNT 2
XREVRANGE命令与XRANGE相等,只是返回的元素的顺序相反。
使用XREAD监听新元素
- 非阻塞式读取:
XREAD COUNT 2 STREAMS race:france 0
以上命令返回2个ID大于0的数据。
2. 阻塞读取
XREAD BLOCK 0 STREAMS race:france $
BLOCK 0的效果是一直阻塞,$表示当前stream中最大的ID值。这个命令实现了tail -f的效果。
消费组
消费组的概念与Kafka中的消费组类型,但实现上与kafka无关。
消费者组就像一个从一个流中获取数据的伪消费者,它实际上服务于多个消费者,提供了以下保证:
- 一条消息只能发送给一个消费者。
- 在一个消费组中,不同客户端通过一个名称来区分,由客户端提供唯一标识符。
- 每个使用者组都有从未使用过的第一个 ID 的概念,因此,当使用者请求新消息时,它可以只提供以前未传递的消息。
- 使用消息需要使用特定命令进行显式确认。Redis 将确认解释为:此消息已正确处理,因此可以将其从使用者组中逐出。
- 消费组跟踪当前挂起的所有消息,即已传递给使用者组的某个使用者但尚未确认为已处理的消息。由于此功能,在访问流的消息历史记录时,每个消费者将只看到已传递到它的消息。
创建消费者
> XGROUP CREATE race:france france_riders $
OK
在这行命令中,为流race:france创建了一个france_riders消费组,并指定id为$。$表示消费组读取最新的消息。消费者只会读取比指定ID大的消息。如果ID指定为0表示读取流中所有的数据。
自动创建流
> XGROUP CREATE race:italy italy_riders $ MKSTREAM
OK
消费者读取
> XREADGROUP GROUP italy_riders Alice COUNT 1 STREAMS race:italy >
1) 1) "race:italy"2) 1) 1) "1692632639151-0"2) 1) "rider"2) "Castilla"
XREADGROUP指令需要指定GROUP <group-name> <consumer-name>
- ID指定为
>,这意味着消费者只希望接收从未传递给任何其他消费者的消息。这意味着,给我新的信息。 - 任何其他 ID,即0或任何其他有效ID或不完整ID(仅仅毫秒时间部分),将返回为发送命令的使用者返回待处理的条目,其 ID 大于所提供的 ID。因此,基本上,如果 ID 不为
>,那么该命令将只允许客户端访问其待处理条目:那些发送给它,但尚未确认的消息。请注意,在这种情况下,BLOCK 和 NOACK 都被忽略。
XREADGROUP 是一个写入命令, 因为即使它从流中读取数据,读取操作也会产生副作用,导致消费者组被修改,因此它只能在主实例上调用。
一个使用ruby实现的消费者代码例子:
require 'redis'if ARGV.length == 0puts "Please specify a consumer name"exit 1
endConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.newdef process_message(id,msg)puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end$lastid = '0-0'puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true# Pick the ID based on the iteration: the first time we want to# read our pending messages, in case we crashed and are recovering.# Once we consumed our history, we can start getting new messages.if check_backlogmyid = $lastidelsemyid = '>'enditems = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)if items == nilputs "Timeout!"nextend# If we receive an empty reply, it means we were consuming our history# and that the history is now empty. Let's start to consume new messages.check_backlog = false if items[0][1].length == 0items[0][1].each{|i|id,fields = i# Process the messageprocess_message(id,fields)# Acknowledge the message as processedr.xack(:my_stream_key,GroupName,id)$lastid = id}
end
故障恢复
使用XPENDING和XCLAIM命令从故障进行恢复。
Redis 消费者者组提供了一项功能,用于在这些情况下使用该功能来声明给定消费者的待处理消息,以便此类消息将更改所有权并重新分配给不同的消费者。消费者必须检查挂起的消息列表,并且必须使用特殊命令声明特定消息,否则服务器将使消息永远处于挂起状态并分配给旧的消费者。
XPENDING
> XPENDING race:italy italy_riders
1) (integer) 2
2) "1692632647899-0"
3) "1692632662819-0"
4) 1) 1) "Bob"2) "2"
以这种方式调用时,该命令会输出消费者组中待处理消息的总数(在本例中为两个),待处理消息中较低和较高的消息 ID,最后输出消费者列表和它们拥有的待处理消息数。
也可以指定开始和结束的ID
> XPENDING race:italy italy_riders - + 10
1) 1) "1692632647899-0"2) "Bob"3) (integer) 746424) (integer) 1
2) 1) "1692632662819-0"2) "Bob"3) (integer) 746424) (integer) 1
该命令返回了消息的详细信息,ID、使用者名称、空闲时间(以毫秒为单位),即自上次将消息传递给某个使用者以来已经过去了多少毫秒,最后是给定消息传递的次数。
使用XCLAIM指令来更改消息的所有权
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
Claiming and the delivery counter
delivery counter在2种情况下会递增:
- 当一个消息被成功claim时
- 使用
XREADGROUP去读取历史消息
当出现故障时,消息会多次传递是正常的,但最终它们通常会被处理和确认。
因此,一旦传递计数器达到您选择的给定大数字,将此类消息放在另一个流中并向系统管理员发送通知可能更明智。这基本上是 Redis Streams 实现死信概念的方式。
特殊的ID符号
-可能的最小id(0-1)
+可能的最大id(18446744073709551615-18446744073709551615),类似Integer.MAX_VALUE
$表示当前stream中最大的那个ID
>表示消费组中最后一个已发送的ID
持久化,复制和消息安全性
Stream数据结构像其他Redis数据结构一样异步地同步到副本节点以及持久化到AOF和RDB文件中。消费组的状态会保存在副本、AOF、RDB中。在重启后,通过AOF文件可恢复消费组的状态。