【Spring连载】使用Spring Data访问Redis(九)----Redis流 Streams

【Spring连载】使用Spring Data访问Redis(九)----Redis流 Streams

  • 一、追加Appending
  • 二、消费Consuming
    • 2.1 同步接收Synchronous reception
    • 2.2 通过消息监听器容器进行异步接收Asynchronous reception through Message Listener Containers
      • 2.2.1 命令式Imperative StreamMessageListenerContainer
      • 2.2.2 反应式Reactive StreamReceiver
    • 2.3 确认策略Acknowledge strategies
    • 2.4 读取偏移量策略ReadOffset strategies
  • 三、序列化Serialization
  • 四、对象映射Object Mapping
    • 4.1 简单值Simple Values
    • 4.2 复杂值Complex Values

Redis Streams以抽象的方法对日志数据结构进行建模。通常,日志是仅追加(append-only)的数据结构,并且从一开始就在随机位置或通过流式传输新消息来消费。
在 Redis参考文档中了解有关Redis Streams的更多信息。
Redis Streams大致可以分为两个功能领域:

  • 追加记录
  • 消费记录

尽管这种模式与Pub/Sub有相似之处,但主要区别在于消息的持久性以及消息的消费方式。
Pub/Sub依赖于瞬态消息的广播(即,如果你不听,你就会错过消息),而Redis Stream使用了一种持久的、仅追加的数据类型,它会保留消息,直到流被修剪。消费方面的另一个区别是Pub/Sub注册服务器端订阅。Redis将到达的消息推送到客户端,而Redis Streams需要活动轮询(active polling)。
org.springframework.data.redis.connection 和 org.springframework.data.redis.stream包为Redis Streams提供了核心功能。

一、追加Appending

要发送记录,你可以像使用其他操作一样,使用低级(low-level)RedisConnection或高级StreamOperations。这两个实体都提供add (xAdd)方法,该方法接受记录和目标流作为参数。RedisConnection需要原始数据(字节数组),而StreamOperations允许任意对象作为记录传入,如以下示例所示:

// append message through connection
RedisConnection con =byte[] stream =ByteRecord record = StreamRecords.rawBytes().withStreamKey(stream);
con.xAdd(record);// append message through RedisTemplate
RedisTemplate template =StringRecord record = StreamRecords.string().withStreamKey("my-stream");
template.opsForStream().add(record);

流记录携带一个Map,键值元组,作为它们的payload。将记录附加到流中会返回可作为进一步引用的RecordId。

二、消费Consuming

在消费端,你可以消费一个或多个流。Redis Streams提供读取命令,允许从已知流的任意位置(随机访问)消费流和从流的结束消费新的流记录。
在底层,RedisConnection提供了xRead和xReadGroup方法,它们分别映射Redis命令以在消费者组中进行各自读取。请注意,可以将多个流用作参数。
Redis中的订阅命令可能会被阻塞。也就是说,在连接(connection)上调用xRead会导致当前线程在开始等待消息时阻塞。只有当读取命令超时或收到消息时,线程才会被释放。
要消费流消息,可以在应用程序代码中轮询(poll)消息,也可以通过消息监听器容器使用两个异步接收中的一个(2.2章节),命令式或反应式。每次新记录到达时,容器都会通知应用程序代码。

2.1 同步接收Synchronous reception

虽然流消费通常与异步处理相关联,但也可以同步消费消息。重载的StreamOperations.read(…)方法提供了这个功能。在同步接收期间,调用线程可能会阻塞,直到消息可用为止。属性StreamReadOptions.block指定接收者在放弃等待消息之前应该等待多长时间。

// Read message through RedisTemplate
RedisTemplate template =List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),StreamOffset.latest("my-stream"));List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),StreamReadOptions.empty().count(2),StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

2.2 通过消息监听器容器进行异步接收Asynchronous reception through Message Listener Containers

由于其阻塞性,低级别轮询(low-level polling)没有吸引力,因为它需要为每个消费者进行连接和线程管理。为了缓解这个问题,SpringData提供了消息侦听器,它完成了所有繁重的工作。如果您熟悉EJB和JMS,您应该会发现这些概念很熟悉,因为它的设计尽可能接近Spring Framework及其消息驱动的POJO(MDP)中的支持。

Spring Data提供了两种针对所用编程模型量身定制的实现:

StreamMessageListenerContainer充当命令式编程模型的消息侦听器容器。它用于使用Redis流中的记录,并驱动注入其中的StreamListener实例。

StreamReceiver提供了消息侦听器的反应式变体。它用于将Redis流中的消息作为潜在的无限流使用,并通过Flux发出流消息。

StreamMessageListenerContainer和StreamReceiver负责消息接收和调度到侦听器中进行处理的所有线程。消息侦听器容器/接收器是MDP和消息传递提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等。这使您作为应用程序开发人员能够编写与接收消息(并对其作出反应)相关联的(可能复杂的)业务逻辑,并将Redis基础设施的样板问题委托给框架。

这两个容器都允许更改运行时配置,以便在应用程序运行时添加或删除订阅,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。如果所有侦听器都被取消订阅,它会自动执行清理,线程就会被释放。

2.2.1 命令式Imperative StreamMessageListenerContainer

2.2.2 反应式Reactive StreamReceiver

2.3 确认策略Acknowledge strategies

2.4 读取偏移量策略ReadOffset strategies

三、序列化Serialization

四、对象映射Object Mapping

4.1 简单值Simple Values

4.2 复杂值Complex Values

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/665664.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【学网攻】 第(20)节 -- 网络端口地址转换NAPT配置

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 例如&#xff1a;第一章 Python 机器学习入门之pandas的使用 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目…

Elasticsearch重建索引-修改索引字段类型

如果不允许修改索引字段类型&#xff0c;只能重建索引 步骤 新建一个索引数据迁移删除旧索引别名引用 目录 1、准备工作1.1、查看版本号1.2、创建旧索引1.3、添加两条数据1.4、查看数据 2、新建一个索引2.1、查看旧索引的mapping2.2、新建索引 3、数据迁移3.1、使用异步任务迁…

C语言应用实例——贪吃蛇

&#xff08;图片由AI生成&#xff09; 0.贪吃蛇游戏背景 贪吃蛇游戏&#xff0c;最早可以追溯到1976年的“Blockade”游戏&#xff0c;是电子游戏历史上的一个经典。在这款游戏中&#xff0c;玩家操作一个不断增长的蛇&#xff0c;目标是吃掉出现在屏幕上的食物&#xff0c…

探究 MySQL 中使用 where 1=1 是否存在性能影响

文章目录 前言聊聊 mybatis 中多条件拼接的两种常规写法where 11使用 <where> 标签 性能影响where 11<where> 标签 总结个人简介 前言 最近在项目中使用 mybatis 写 SQL 使用了 where 11 来简化多条件拼接的写法&#xff0c;案例如下&#xff0c;借此聊聊多条件拼…

CTF(5)

一、[SWPUCTF 2021 新生赛]ez_caesar 1、题目 import base64 def caesar(plaintext):str_list list(plaintext)i 0while i < len(plaintext):if not str_list[i].isalpha():str_list[i] str_list[i]else:a "A" if str_list[i].isupper() else "a"…

C++学习Day01之初识C++ Helloworld

目录 一、程序二、输出三、分析与总结 一、程序 #include <iostream> //标准输入输出流 i - input 输入 o - output 输出 stream 流 相当于 stdio.h using namespace std; //使用 标准 命名空间 //程序入口函数 int main() {// cout 标准输出流对象// <&l…

Java学习-案例-ATM系统

案例ATM系统 大致思路&#xff1a; 实现功能&#xff1a; 案例代码&#xff1a; Account类&#xff1a; packageatmDemo; publicclassAccount{ privateStringcardId; privateStringuserName; privatecharsex; privateStringpassWord; privatedoublemoney; privatedoublelimit; …

ICLR 2024 | MolGen: 化学反馈引导的预训练分子生成

MolGen: 化学反馈引导的预训练分子生成 英文题目&#xff1a;Domain-Agnostic Molecular Generation with Chemical Feedback 发表会议&#xff1a;ICLR 2024 论文链接&#xff1a;https://arxiv.org/abs/2301.11259 代码链接&#xff1a;https://github.com/zjunlp/MolGen 目录…

可解释性AI(XAI):构建透明和值得信赖的决策过程

可解释性AI&#xff08;XAI&#xff09;旨在提高人工智能系统的透明度和可理解性&#xff0c;使人们更好地理解AI的决策过程和原理。随着AI技术的广泛应用&#xff0c;XAI成为了一个备受关注的重要领域。它不仅有助于建立人们对AI的信任&#xff0c;还可以帮助解决AI伦理和偏见…

Python flask 表单详解

文章目录 1 概述1.1 request 对象 2 示例2.1 目录结构2.2 student.html2.3 result.html2.4 app.py 1 概述 1.1 request 对象 作用&#xff1a;来自客户端网页的数据作为全局请求对象发送到服务器request 对象的重要属性如下&#xff1a; 属性解释form字典对象&#xff0c;包…

Android状态栏/通知栏图标白底问题

问题及现象 从android L版本开始&#xff0c;为了统一图标样式&#xff0c;会将通知栏、状态栏等显示图标处统一为白底或黑底&#xff0c;以促使开发人员规范图标设计。 从现象看&#xff0c;状态栏会显示一个白底的方框&#xff1b;下拉通知栏展开时的图标为白底方框加圆框…

HCIP-Datacom(H12-821)91-100题解析

有需要完整题库的同学可以私信博主&#xff0c;博主看到会回复将文件发给你&#xff01;&#xff08;麻烦各位同学给博主推文点赞关注和收藏哦&#xff09; 91、下面关于AS PATH的描述&#xff0c;错误的 A.当路由器中存在两条或者两条以上的到同一目的地的路由时&#xff0c;…

IEC104 S帧超时判定客户与服务端不匹配造成的异常链接问题分析

2、通过ss命令发现确有链接端口变化&#xff0c;与设备约一天一次的重连&#xff0c;通过抓包&#xff08;tcpdump -vvv -nn port 1001 -w 0926.cap&#xff09;分析得以下现象 2.1、异常情况时未对设备的I帧均匀的回S帧进行确认&#xff0c;正常情况时均匀的回S帧进行确认 2.…

优化 React:理解 DOM Diffing 算法及关键的 key 属性

优化 React&#xff1a;理解 DOM Diffing 算法及关键的 key 属性 DOM 的 Diffing 算法和 Key 的作用 在 React 中&#xff0c;DOM 的 Diffing&#xff08;差异比较&#xff09;算法是一种优化手段&#xff0c;用于确定虚拟 DOM 树与实际 DOM 树之间的差异&#xff0c;并仅更新…

酷开科技依托酷开系统新剧热播,引领潮流风向

随着科技的不断发展&#xff0c;智能电视已经成为了家庭娱乐的主流&#xff0c;是消费者居家休闲放松的好帮手。其中&#xff0c;作为国内智能电视操作系统领军者的酷开系统&#xff0c;一直致力于为消费者提供丰富的内容和贴心的体验。近日&#xff0c;酷开系统新剧热播&#…

仰暮计划|“每次他们吃饭,出来散步,都是背着枪,枪都是装满子弹上好膛,时刻准备着作战和反击”

20世纪70年代中叶&#xff0c;越南结束抗美战争、实现国家统一后&#xff0c;把中国视为“头号敌人”&#xff0c;中越关系急剧恶化&#xff0c;中国边疆的和平、安定和人民的生命财产受到严重威胁。在此情况下&#xff0c;1979年2月17日&#xff0c;遵照中央军委命令&#xff…

车载测试Vector工具CANoe——常见问题汇总(中)

车载测试Vector工具CANoe——常见问题汇总(中) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师(Wechat:gongkenan2013)。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一…

【兼容认证】白鲸开源与银河麒麟高级服务器操作系统成功通过测试

2024年1月2日&#xff0c;北京白鲸开源科技有限公司&#xff08;以下简称"白鲸开源"&#xff09;荣幸宣布&#xff0c;白鲸开源旗下产品 WhaleStudio V2.4 已成功通过与麒麟软件有限公司旗下的银河麒麟高级服务器操作系统产品的兼容性测试。 麒麟软件有限公司的银河麒…

分布式(一)Redis的数据结构

五种数据结构 String 结构 字符串常用操作 SET key value //存入字符串键值对 MSET key value [key value ...] //批量存储字符串键值对 SETNX key value //存入一个不存在的字符串键值对 GET key //获取一个字符串键值 MGET key [key ...] //批量获取字…

计算已知经纬度的两点距离(两种方法GeoTools和Haversine公式)

计算已知经纬度的两点距离&#xff08;两种方法&#xff09; 法一&#xff1a;GeoTools 要使用GeoTools&#xff0c;你需要在Maven项目中添加以下依赖坐标&#xff1a; <dependency><groupId>org.geotools</groupId><artifactId>gt-main</artifa…