kafka复习:(20):消费者拦截器的使用

一、定义消费者拦截器(只消费含"sister"的消息)

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;import java.util.*;public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {Map<TopicPartition,List<ConsumerRecord<String,String>>> finalResult=new HashMap<>();Set<TopicPartition> partitionSet = records.partitions();for(TopicPartition topicPartition: partitionSet){List<ConsumerRecord<String,String>> partitionRecordList=records.records(topicPartition);List<ConsumerRecord<String,String>> newPartitionRecordList=new LinkedList<>();for(ConsumerRecord<String,String> record: partitionRecordList){if(record.value().contains("sister")){newPartitionRecordList.add(record);}}finalResult.put(topicPartition,newPartitionRecordList);}return new ConsumerRecords<>(finalResult);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp,meta) -> {System.out.println("消费者拦截器:"+tp.topic()+":"+meta.offset());});}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

二、定义消费者,配置消费者拦截器

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class ConsumerInterceptorTest  {public static void main(String[] args) {String topic="testTopic2";String server="xx.xx.xx.xx:9092";Properties properties=new Properties();properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroupTest4");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,server);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,MyConsumerInterceptor.class.getName());KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(properties);myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records=myConsumer.poll(Duration.ofMillis(2000));for(ConsumerRecord consumerRecord: records){System.out.println(consumerRecord.value());}//myConsumer.commitSync();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/51825.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

0103水平分片-jdbc-shardingsphere-中间件

文章目录 1 准备服务器1.1 创建server-order0容器1.2 创建server-order1容器 2、基本水平分片2.1、基本配置2.2、数据源配置2.3、标椎分片表配置2.4、行表达式2.5、分片算法配置2.6、分布式序列算法 3、多表关联3.1、创建关联表3.2、创建实体类3.3、创建Mapper3.4、配置关联表3…

【C++设计模式】用动画片《少年骇客》(Ben10)来解释策略模式

2023年8月25日&#xff0c;周五上午 今天上午学习设计模式中的策略模式时&#xff0c;发现这个有点像很多卡通片里面的变身器... #include<iostream>//alien hero是外星英雄的意思 //在《少年骇客》中&#xff0c;主角可以通过变身器变成10种外星英雄 class AlienHero{ …

手机盖板IR油墨透光率检测仪T03

手机盖板作为手机最外层玻璃面板&#xff0c;其加工一般有落料、倒边、抛光、镀膜、丝印等多道加工工序组成&#xff0c;其中任何一个工序出现差错&#xff0c;都有可能导致手机盖板产生缺陷&#xff0c;例如漏油、透光、IR孔不良、视窗划伤、油墨区划伤、內污、边花等&#xf…

CSS中如何实现元素之间的间距(Margin)合并效果?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 外边距合并的示例&#xff1a;⭐ 如何控制外边距合并&#xff1a;⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff…

C语言实现状态机

关于状态机&#xff0c;基础的知识点可以自行理解&#xff0c;讲解的很多&#xff0c;这里主要是想写一个有限状态机FSM通用的写法&#xff0c;目的在于更好理解&#xff0c;移植&#xff0c;节省代码阅读与调试时间&#xff0c;体现出编程之美。 传统的实现方案 if...else : …

【Spring Boot】什么是深度优先遍历与广度优先遍历?用Spring Boot项目举例说明。

深度优先遍历&#xff08;Depth First Search&#xff0c;DFS&#xff09;和广度优先遍历&#xff08;Breadth First Search&#xff0c;BFS&#xff09;是图的遍历算法。其中&#xff0c;深度优先遍历从某个起始点开始&#xff0c;先访问一个节点&#xff0c;然后跳到它的一个…

mysql 通过 group by 分组查询最大时间的一条数据

SELECT* FROMdemo_table lRIGHT JOIN ( SELECT table_id, MAX( create_time ) AS create_time FROM demo_table GROUP BY table_id ) t ON t.create_time l.create_time AND t.table_id l.table_id

基于GEWE框架实现微信关键字回复

友情链接 geweapi.com 点击即可访问 发送app类型消息 小提示&#xff1a; 发送一些特殊的消息类型注意参数 请求URL&#xff1a; http://域名地址/api/message/sendapp 请求方式&#xff1a; POST 请求头&#xff1a; Content-Type&#xff1a;application/json X-GEWE…

多页面应用,vue cli 配置不生成 html 文件

目录 已解决1&#xff0c;需求2&#xff0c;解决方案1&#xff0c;保持现状2&#xff0c;不生成 html 文件3&#xff0c;将生成的 html 文件放到其他目录。 3&#xff0c;实现1&#xff0c;项目结构2&#xff0c;vue.config.js 核心配置3&#xff0c;打包结果4&#xff0c;vue.…

Nginx入门——Nginx的docker版本和windows版本安装和使用 代理的概念 负载分配策略

目录 引出nginx是啥正向代理和反向代理正向代理反向代理 nginx的安装使用Docker版本的nginx安装下载创建挂载文件获取配置文件创建docker容器拷贝容器中的配置文件删除容器 创建运行容器开放端口进行代理和测试 Windows版本的使用反向代理多个端口运行日志查看启动关闭重启 负载…

【BASH】回顾与知识点梳理(三十八)

【BASH】回顾与知识点梳理 三十八 三十八. 源码概念及简单编译38.1 开放源码的软件安装与升级简介什么是开放源码、编译程序与可执行文件什么是函式库什么是 make 与 configure什么是 Tarball 的软件如何安装与升级软件 38.2 使用传统程序语言进行编译的简单范例单一程序&#…

leetcode304. 二维区域和检索 - 矩阵不可变(java)

前缀和数组 二维区域和检索 - 矩阵不可变题目描述前缀和代码演示 一维数组前缀和 二维区域和检索 - 矩阵不可变 难度 - 中等 原题链接 - 二维区域和检索 - 矩阵不可变 题目描述 给定一个二维矩阵 matrix&#xff0c;以下类型的多个请求&#xff1a; 计算其子矩形范围内元素的总…

python3对接godaddy API,实现自动更改域名解析(DDNS)

python3对接godaddy API&#xff0c;实现自动更改域名解析&#xff08;DDNS&#xff09; 文章开始前&#xff0c;先解释下如下问题&#xff1a; ①什么是域名解析&#xff1f; 域名解析一般是指通过一个域名指向IP地址&#xff08;A解析&#xff09;&#xff0c;然后我们访问…

SpringBoot-1-Spring Boot实战:快速搭建你的第一个应用,以及了解原理

SpringBoot-1-Spring Boot实战&#xff1a;快速搭建你的第一个应用&#xff0c;以及了解原理 今日内容 SpringBootWeb入门 前言 我们在之前介绍Spring的时候&#xff0c;已经说过Spring官方(Spring官方)提供很多开源项目&#xff0c;点击projects&#xff0c;看到spring家族…

【前端】深入解析CSS:选择器、显示模式、背景属性和特征剖析

目录 一、前言二、CSS的复合选择器1、后代选择器①、语法②、注意事项 2、子选择器①、语法②、注意事项 3、并集选择器①、语法②、注意事项 4、链接伪类选择器①、语法②、注意事项 三、CSS元素显示模式转换1、转换为块元素display:block2、转换为行内元素display:inline3、转…

带您解读DeepBook经济原理

DeepBook是Sui上的第一个原生流动性层&#xff0c;通过Sui可预测且低廉的gas费&#xff0c;将促进DeepBook上的大规模交易活动。鉴于DeepBook的中央限价订单簿&#xff08;Central Limit Order Book&#xff0c;CLOB&#xff09;架构&#xff0c;交易量越大&#xff0c;资产价格…

java gradle 项目 在idea上 搭建一个简单的thrift实例

前言 Thrift是RPC通信的一种方式&#xff0c;可以通过跨语言进行通信&#xff0c;最近项目需要进行跨语言的通信&#xff0c;因此首先尝试搭建了一个简单的thrift框架&#xff0c;因为网上的实例大都参差不全&#xff0c;通过gpt查询得到的结果对我帮助更大一点&#xff0c;但…

通信原理 | 窗函数 | 矩形窗 | 汉宁窗 | 汉明窗 | 布莱克曼窗 | 补零对频谱的影响

文章目录 矩形窗矩形窗的时域表达式N=32的时域图N=32的频域图时域补零后的时域序列时域补零后的频域序列时域补零到序列长度为4096,对应的频域序列纵轴取对数汉宁窗N=32的情况下的时域序列N=32的频域图时域补零后的时域序列和对应频域序列时域补零到序列长度为4096,对应的频域…

CentOS KVM虚拟安装和开机启动

1. 配置系统 关闭SELinux setenforce 0持久化关闭配置 vi /etc/selinux/config2. 安装虚拟化软件 安装 KVM、QEMU等虚拟化软件。 yum install qemu-kvm qemu-img virt-manager libvirt virt-install virt-viewer 检查LVM模块是否已经加载 lsmod |grep kvm设置开机启动 s…

回归预测 | MATLAB实现GWO-BP灰狼算法优化BP神经网络多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现GWO-BP灰狼算法优化BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现GWO-BP灰狼算法优化BP神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本介绍程序…