redis streams_初步了解Redis Streams以及如何在Java中使用它们

redis streams

自今年年初以来,Redis Streams已进入Redis的unstable分支,并且第一个客户开始采用Redis Streams API。 因此,这是一个绝佳的时间,可以从客户端的角度看一下Redis Streams提供的功能以及如何使用它们。

免责声明:Redis Streams作为初稿提供,尚未成为稳定版本的一部分。 API可能会更改。

什么是Redis Stream?

Redis流是一种类似于日志/日志的数据结构,它按顺序表示事件日志。 消息(事件)可以附加到流中。 然后可以以独立方式或通过在消费者组中阅读来使用这些消息。 使用者组是一个概念,其中可以将多个使用者(例如应用程序实例)分组为一个使用者组,其流偏移量(读取进度)保留在服务器端。 由于不需要在用户端保留流偏移,因此此概念简化了构建客户端的过程。

流消息由Redis在提交时生成的消息ID和表示为哈希(映射)的正文组成-基本上是一组键和值。
流本身由一个密钥标识,并保存零到许多流消息以及一些元数据,例如消费者组。

Redis Stream API

到目前为止,所有流命令都以X为前缀。 流允许使用添加,读取,自省和维护命令。 在下一部分中,您将看到最常见的命令:

  • XADD key * field1 value1 [field2 value2] [fieldN valueN] :将消息附加(提交)到Redis流。
  • XREAD [BLOCK timeout] [COUNT n] STREAMS key1 [keyN] offset1 [offsetN] :从Redis流中读取消息。
  • XRANGE key from to [COUNT n] :扫描( XRANGE key from to [COUNT n] )Redis流中的消息

此外,在使用使用者组时,还有其他命令在起作用:

  • XREADGROUP GROUP name consumer [BLOCK timeout] [COUNT n] [NOACK] STREAMS key1 [keyN] offset1 [offsetN] :在使用者及其组的上下文中从Redis流中读取消息。
  • XACK key group messageId1 [messageId2] [messageIdN] :在使用者的上下文中读取后确认消息。
  • XPENDING key group [from to COUNT n] :枚举未决(未确认的消息)。
  • XGROUP和子命令:用于创建和删除使用者组的API。

注意:为简洁起见,以上命令被截断了。 有关所有可能选项和组合的说明,请参见Redis Streams文档 。

使用Redis流

让我们来看一下如何通过redis-cli应用我们之前看到的命令来使用Redis Stream。 让我们向新流添加(并最初创建流)消息。

127.0.0.1:6379> XADD my-stream * key value
1527062149743-0

我们正在使用XADD通过键值元组向流my-stream添加新消息。 注意* (星号)? 这是用于控制ID生成的字段。 如果要由服务器生成消息ID(在99.5%的用例中都是如此,除非您是要复制的Redis服务器),请始终在此放置* 。 Redis回复消息ID 1527062149743-0

现在,我们的信息流包含一条消息。 让我们用XREAD阅读它。

127.0.0.1:6379>  XREAD COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"2) 1) 1) 1527062149743-02) 1) "key"2) "value"

我们现在已经阅读了该消息,并沿着读取的内容检索了正文。 读取消息会将消息保留在流中。 我们可以使用XRANGE验证这XRANGE

127.0.0.1:6379> XRANGE my-stream - +
1) 1) 1527068644230-02) 1) "key"2) "value"

发出具有相同流偏移量的后续读取将返回相同的消息。 您有不同的选择来避免此行为:

  1. 在客户端跟踪消息ID
  2. 阻止读取
  3. 从信息流中删除消息
  4. 限制流大小
  5. 使用消费者群体

让我们仔细看看这些选项。

MessageId追踪

每个读取操作都将返回消息ID和流消息。 如果您只有一个客户端(没有并发读取),则可以在应用程序中保留最新消息ID的引用,并在后续的读取调用中重用该消息ID。 让我们针对我们之前看到的1527068644230-0的消息ID进行此1527068644230-0

127.0.0.1:6379> XADD my-stream * key value
1527069672240-0
127.0.0.1:6379>  XREAD COUNT 1 STREAMS my-stream 1527068644230-0
1) 1) "my-stream"2) 1) 1) 1527069672240-02) 1) "key"2) "value"

我们使用1527068644230-0作为流偏移并接收下一条添加的消息。 这种方法允许恢复读取较旧的(可能已经消耗的消息),但是需要在客户端进行一些协调,以免读取重复的消息。

如果您不想跟踪消息ID,而仅对最新消息感兴趣,则可以使用阻塞读取。

阻止读取

通过XREAD读取允许以阻塞方式从流读取。 XREADBLPOPBRPOP操作的行为类似,在BLPOPBRPOP操作中,您指定超时,并且如果消息可用或读取超时,则调用将返回。 但是,Stream API允许更多选项。 对于此示例,我们需要两个单独的参与方:生产者和消费者。 如果您从头开始阅读,您将看到使用单个客户端执行的示例。 我们首先从消费者开始,否则产生的消息将到达流中而没有机会通知正在等待的消费者。

消费者

我们正在将XREADBLOCK 10000一起使用以等待10000毫秒(10秒)。 请注意,我们使用的符号流偏移量$指向流的开头。

127.0.0.1:6379> XREAD COUNT 1 BLOCK 10000 STREAMS my-stream $

使用者现在被阻止,等待消息到达。

制片人

127.0.0.1:6379> XADD my-stream * key value
1527070630698-0

Redis将消息写入流中。 现在,让我们切换回消费者。

消费者

消息写入我们的流之后,消费者收到一条消息并再次被解除阻止。 您可以开始处理该消息,并可能发出其他读取。

1) 1) "my-stream"2) 1) 1) 1527070630698-02) 1) "key"2) "value"
(1.88s)

使用流偏移量$发出另一个读取将再次等待到达该流的下一条消息。 但是,使用$会给我们提供一段时间,在此期间我们不会消耗其他消息。 为了避免这些漏洞,您应该跟踪上一次阅读的消息ID,并将其重新用于下一个XREAD调用。
还要注意ist并发性。 我们已经看到一个单个消费者的例子。 如果增加消费者数量怎么办?

在这种情况下,例如,如果您有两个使用方发出阻塞读取,那么两个使用方都会收到同一条消息,这又使我们不得不承担协调读取的任务,因此流消息不会被多次处理。

从流中删除消息

可以从流中删除消息,但是不建议这样做。 我们还没有看到XDEL ,但是从名称上可以明显看出我们可以从流中删除消息:

127.0.0.1:6379> XDEL my-stream 1527070789716-0
(integer) 1

该消息现在消失了。 不建议删除,因为操作成本很高:流使用带有宏节点的基数树。 删除是一种安全的操作,但是在与多个使用者一起使用一条消息时,您需要同步访问权限,因为删除不会阻止多次读取消息。

限制流大小

将消息追加到流时,可以指定最大流大小。 发出XADD命令时,使用MAXLEN选项会发生这种情况。

127.0.0.1:6379> XADD my-stream MAXLEN 4 * key value
1527071269045-0

消息将添加到流中,并且将尽最大努力将流修剪到大小限制。 这也意味着较旧的消息将被修剪并且不再可读。

消费群体

解决重复消息处理的最后一种方法是利用使用者组。 消费者群体的想法是跟踪确认。 确认允许将消息标记为消费者确认。 XACK命令返回是否已确认该消息或先前的使用者是否已确认该消息。

要使用消费者组,我们需要首先创建一个消费者组。 请注意,自撰写本文时起,必须先存在一个流,然后才能创建消费者组。 这个问题可能将通过https://github.com/antirez/redis/issues/4824解决。

到目前为止,如果您遵循前面的示例,我们可以重用我们的流my-stream

我们正在创建一个名为my-group的使用者组,它仅对流my-stream有效。 请注意,最后一个参数是用于跟踪读取进度的流偏移量。 我们使用$指向流头。

127.0.0.1:6379> XGROUP CREATE my-stream my-group $
OK

现在,向流中添加一条消息:

127.0.0.1:6379> XADD my-stream * key value
1527072009813-0

并通过XREADGROUP发出非阻塞读取:

127.0.0.1:6379> XREADGROUP GROUP my-group c1 COUNT 1 STREAMS my-stream >
1) 1) "my-stream"2) 1) 1) 1527072009813-02) 1) "key"2) "value"

XREADGROUP接受组名和使用者名来跟踪阅读进度。 另请注意,流偏移量> 。 此符号流偏移量指向使用者组my-group读取的最新消息ID。
您可能已经注意到该组中有一个消费者名称。 消费者群体旨在跟踪消息传递并区分消费者。 如果您还记得上面的阻塞阅读示例,您已经看到两个使用者同时收到一条消息。 要更改(或保留)此行为,可以指定使用者名称:

  1. 具有相同使用者名称的读取可以多次接收相同的消息。
  2. 具有不同使用者名称的读取将阻止多次接收同一条消息。

根据您使用消息的方式,您可能想重新启动处理或使用多个客户端使用消息,而无需建立自己的同步机制。 Redis流允许您通过确认消息来做到这一点。 默认情况下, XREADGROUP确认消息,表明该消息已被处理并且可以被逐出。 您可以指定NOACK在阅读消息时不确认消息。 处理完消息后,确认消息发出XACK 。 根据返回的命令,您可以查看您是确认消息的对象还是其他客户端已经确认的消息。

现在让我们在这里暂停,不要再讨论恢复和更高级的主题。 Redis网站在https://redis.io/topics/streams-intro提供了有关Redis Streams的完整文档。

使用Java消耗Redis流

注意:在编写本文时,唯一支持Redis Streams的Java客户端是Lettuce预览版本5.1.0.M1。

Redis Streams带有新的服务器端API,该API也需要在客户端采用。 让我们使用Java客户端重播以上示例。

首先,我们需要一个客户端实例来准备连接。 我们将使用同步API。 但是,异步和React式API也支持Redis Stream API。

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStreamCommands<String, String> streamCommands = connection.sync();

Lettuce引入了一个新的命令接口RedisStreamCommands ,该接口声明Redis Stream API方法及其各种类型(例如StreamOffsetConsumer和命令参数对象)。

我们要做的第一件事是向流中添加新消息:

Map<String, String> body =  Collections.singletonMap("key", "value");
String messageId = streamCommands.xadd("my-stream", body);

本示例使用UTF-8编码的字符串表示键和值。 主体本身作为Map传输,并发出命令XADD my-stream * key value

现在,让我们使用与XREAD COUNT 1 STREAMS my-stream 0相对应的命令从流中读取一条消息:

List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1), StreamOffset.from("my-stream", "0"));if(messages.size() == 1) { // a message was read} else { // no message was read}

所述xread(…)方法接受XReadArgsStreamOffset并返回的列表StreamMessage<K, V>包含消息ID与主体一起对象。 现在可以处理消息了,随后的读取将包括最后一个messageId以读取新消息:

StreamMessage<String, String> message = …;
List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1), StreamOffset.from("my-stream", message.getId()));if(messages.size() == 1) { // a message was read} else { // no message was read}

阻塞读取需要将额外的持续时间传递到参数对象中。 添加BLOCK选项会将非阻塞调用(从Redis的角度来看)变成阻塞调用:

List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1).block(Duration.ofSeconds(10)), StreamOffset.from("my-stream", "0"));

在最后一个示例中,让我们看一下消费者群体。 RedisStreamCommands提供了用于创建使用者的方法-截至撰写本文时,Redis中尚未实现删除使用者和使用者组的方法。

streamCommands.xadd("my-stream", Collections.singletonMap("key", "value")); // add a message to create the stream data structurestreamCommands.xgroupCreate("my-stream", "my-group", "$"); // add a group pointing to the stream headList<StreamMessage<String, String>> messages = streamCommands.xreadgroup(Consumer.from("my-group", "c1"),StreamOffset.lastConsumed("my-stream"));

使用使用者组my-group和使用者c1my-stream中读取消息。 使用者组和使用者名称是字节安全编码的,因此在使用ASCII或UTF-8字符串时区分大小写。

结论

这篇博客文章概述了Redis 5附带的Redis Streams的初步外观,以及如何在Lettuce Redis客户端上使用Stream API。 该API尚未完全实现,因此我们应该期待更改。

翻译自: https://www.javacodegeeks.com/2018/05/a-first-look-at-redis-streams-and-how-to-use-them-with-java.html

redis streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/333283.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IntelliJ IDEA for Mac 在MacOS模式的重构快捷键(Refactoring Shortcut)

快捷键快捷键符号英文名称功能说明F5Copy复制文件到指定目录F6Move移动文件到指定目录Command Delete⌘⌫Safe Delete安全删除ShiftF6⇧F6Rename重命名&#xff0c;批量修改相同引用变量的名称。例如&#xff0c;变量 i &#xff0c;在代码中多个地方有使用到&#xff0c;那么…

java中无法推断类型参数_Java 10中的本地类型推断,或者如果它像鸭子一样嘎嘎叫...

java中无法推断类型参数最近&#xff0c;Oracle采用了一种新策略&#xff0c;即每六个月发布一次新的语言版本。 该策略假定只有每个第3版都将提供长期支持或LTS。 快速说明&#xff1a; 具有LTS的当前版本是Java 8&#xff1b; Java 9仅应在2018年3月之前获得支持&#xff0…

搭建IC设计EDA虚拟机服务器,忆往昔--集成门控时钟技术的前世--分离门控时钟技术...

本文转自&#xff1a;自己的微信公众号《集成电路设计及EDA教程》《忆往昔--集成门控时钟技术的前世--分离门控时钟技术》多种门控时钟实现方案&#xff1a;Design Compiler中已经集成了可以进行低功耗设计的Power Compiler&#xff0c;它有多种门控时钟实现方案(分离门控时钟、…

IntelliJ IDEA for Mac 在MacOS模式下的注释快捷键(Comment Shortcut)

快捷键快捷键符号英文名称功能说明Command/⌘/Comment/uncomment with line comment单行注释Command Option /⌘⌥/Comment/uncomment with block comment块注释&#xff0c;多行注释Shift Control /⇧⌃/Comment/uncomment with block comment块注释&#xff0c;多行注释/…

云服务器的协议,云服务器协议

云服务器协议 内容精选换一换当创建文件系统后&#xff0c;您需要使用云服务器来挂载该文件系统&#xff0c;以实现多个云服务器共享使用文件系统的目的。本章节以Windows 2012版本操作系统为例进行CIFS类型的文件系统的挂载。同一SFS容量型文件系统不能同时支持NFS协议和CIFS协…

java 类名重复_更快地重复访问Java的Java类名?

java 类名重复Claes Redestad已在core-libs-dev邮件列表上发布了消息“ RRF&#xff1a;8187123 &#xff1a;&#xff08;反映&#xff09;Class&#xff03;getCanonicalName和Class&#xff03;getSimpleName是性能问题的一部分 ”&#xff0c;他在邮件列表中要求审查建议的…

IntelliJ IDEA for Mac在MacOS模式下的编辑快捷键(Editing Shortcut)

快捷键快捷键符号英文名称功能说明Control Space⌃SpaceBasic code completion基本的代码补全&#xff08;补全任何类、方法、变量&#xff09;&#xff0c;代码智能提示&#xff0c;因为和切换输入法快捷键冲突&#xff0c;所以基本改成Alt/Shift Command Enter⌘⇧↩Compl…

网站服务器被访问 io,服务器端被客户端访问完以后出现java.io.ioexception,socket问题。...

异常为&#xff1a;java.io.ioexception客户端代码如下&#xff1a;package package1.socket;import java.net.*;import java.io.*;import java.util.*;public class C_Scoket {Socket C_S;BufferedReader br;PrintWriter pw;String receive;public static void main(String[] …

layui绑定json_认识定制:JSON绑定概述系列

layui绑定json让我们看一下自定义JSON绑定序列化和反序列化过程时 &#xff0c; 注释模型和运行时配置如何工作。 本系列的下一篇文章介绍JSON-B如何处理自定义对象的创建。 注释方法 使用注释方法&#xff0c;可以通过注释字段&#xff0c;JavaBean方法和类来定制默认的序列…

IntelliJ IDEA for Mac在MacOS模式下的调试快捷键(Debugging Shortcut)

快捷键快捷键符号英文名称功能说明F8F8Step Over进入下一步&#xff0c;如果当前行断点是一个方法&#xff0c;不会进入当前方法体内。逐行执行程序F7F7Step Into进入下一步&#xff0c;如果当前行断点是一个方法&#xff0c;则进入当前方法体内&#xff0c;如果该方法体还有方…

jakarta ee_Jakarta EE,EE4J和Java EE之间的关系

jakarta eeJakarta EE的名称已经存在了一个多月&#xff0c;即使Mike Milinkovich在他的博客文章“ The Name Is…”中很好地解释了名称和概念&#xff0c;但对于它们之间的关系仍然有些困惑&#xff0c;我也提出了疑问只要有话题就可以围绕它。 我试图在这里总结一下。 希望能…

无线网服务器有辐射吗,WiFi对孕妇有辐射吗

针对无线WiFi上网产生的辐射是否会对人体有伤害已有实验研究。实验结果已经显示&#xff0c;现在包括由无线路由器发射的WiFi在内的无线网络频率虽然较高&#xff0c;但是功率都不大&#xff0c;对人体的辐射影响可以说微乎其微。那么对于孕妇来说&#xff0c;辐射也是非常小的…

IntelliJ IDEA for Mac在MacOS模式下的选择快捷键(Select Shortcut)

快捷键快捷键符号英文名称功能说明Shift Command 8按列选择模式。按下此组合键&#xff0c;再按鼠标左键拖动选择矩形区域&#xff0c;输入新的内容&#xff0c;可以替换被选择的所有行的内容OptionCommand↑⌥⌘↑Extend selection扩展选定范围&#xff0c;和系统快捷键有冲…

jvm虚拟机内存结构_JVM体系结构101:了解您的虚拟机

jvm虚拟机内存结构Java虚拟机&#xff08;JVM&#xff09;架构和Java字节码101的初学者速成班 Java应用程序无处不在&#xff0c;它们在我们的手机&#xff0c;平板电脑和计算机上。 在许多编程语言中&#xff0c;这意味着多次编译代码以使其在不同的OS上运行。 对于我们作为开…

IntelliJ IDEA for Mac在MacOS模式下的搜索/查询/查找快捷键(Search Shortcut)

文章目录根据内容查找文件根据名称查找类文件&#xff08;源代码文件&#xff09;根据名称查找任何类型文件查找字段名称、方法名、类名声明的地方查找所有的内容查找任何动作快捷键快捷键符号英文名称功能说明Double ShiftDouble ⇧Search everywhere查询任何东西。建议改成 S…

json解析适配模板_认识适配器:JSON绑定概述系列

json解析适配模板适配器通过实现JsonbAdapter接口来配置自定义对象的创建和序列化。 方法AdaptToJson&#xff08;&#xff09;和AdaptFromJson&#xff08;&#xff09;被执行序列化和反序列化操作的逻辑覆盖。 下一篇文章介绍了使用自定义序列化器和反序列化器自定义JSON-B的…

IntelliJ IDEA for Mac在MacOS模式下的替换快捷键(Replace Shortcut)

快捷键快捷键符号英文名称功能说明Command R⌘RReplace选择要被替换的内容&#xff0c;按下此组合键&#xff0c;然后会出现替换内容输入框&#xff0c;输入替换内容&#xff0c;按下回车一次&#xff0c;替换一个&#xff0c;继续按回车再替换一个Shift Command R⇧⌘RRepla…

震惊kafka_5个令人震惊的统计数据证明日志不足

震惊kafka事实证明&#xff0c;我们都犯有记录不良的罪名。 不相信我们吗&#xff1f; 这些统计数据可能会改变您的想法 当人们提出带有明显答案的问题时&#xff0c;这非常令人不快&#xff0c;因此&#xff0c;我不会坐在这里问您和您的团队是否使用日志文件来监视预生产和生…

IntelliJ IDEA for Mac在MacOS模式下的动态代码模板快捷键(Live Templates Shortcut)

快捷键快捷键符号英文名称功能说明Option Command J⌥ ⌘ JSelect Template弹出模板选择窗口&#xff0c;将选定的代码使用动态模板包住Option Command T⌥ ⌘ TSurround with Live Template弹出模板选择窗口&#xff0c;将选定的代码使用动态模板包住&#xff0c;和上面那个…

jdk 8 时区 转换_JDK 8 BigInteger精确缩小转换方法

jdk 8 时区 转换在博客文章“ Java中Long到Int的精确转换 ”中&#xff0c;我讨论了使用Math.toIntExact&#xff08;Long&#xff09;将Long精确地转换为int或者如果无法缩小转换范围则抛出ArithmeticException 。 JDK 8引入了该方法&#xff0c; JDK 8还向BigInteger类引入了…