kafak消费者从头开始消费(消费者组)

【README】

本文主要用于描述 kafka 消费者如何从头开始消费;


【1】从头开始消费

1)从头开始消费,需要满足两个条件, 如下:

  • 条件1, 使用一个全新的消费者组id;
  • 条件2,指定 auto.offset.reset 为 earliest ;

2)代码如下:

public static void main(String[] args) {/* 1.创建消费者配置信息 */Properties props = new Properties();/*2.给配置信息赋值*//*2.1连接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2开启自动提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自动提交的延时*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消费者组 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan2"); // group.id /*2.6 重置消费者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest /* 创建消费者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 订阅主题 */consumer.subscribe(Arrays.asList("third", "second"));/* 循环拉取 */ int i =0;while(true) {if (i++ > 10) break; // 只消费10条数据 /* 消费消息-获取数据 */ConsumerRecords<String, String> consumerRds  = consumer.poll(100);/* 解析并打印 ConsumerRecords  *//* 遍历 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消费者] " + rd.key() + "--" + rd.value()); }} /* 关闭消费者 */ consumer.close(); }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329830.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jenkins安装与配置windows_Windows下Scoop安装、配置与使用

Scoop简介Scoop是Windows的命令行安装程序&#xff0c;是一个强大的包管理工具。可以在github上找到其项目的相关信息&#xff0c;项目网址。安装的起因&#xff1a;在平常生活中如果要安装像gcc、git等一些需要手动配置相关参数的工具&#xff0c;需要先去官网下载安装程序&am…

对于线程安全的集合类(例如Vector)的任何操作是不是都能保证线程安全

转载自 对于线程安全的集合类&#xff08;例如Vector&#xff09;的任何操作是不是都能保证线程安全之前在公众号中问了这个问题&#xff1a;对于线程安全的集合类&#xff08;例如Vector&#xff09;的任何操作是不是都能保证线程安全&#xff1f; 三天之内收到120回复&#x…

ASP.NET Core 1.0中的管道-中间件模式

ASP.NET Core 1.0借鉴了Katana项目的管道设计(Pipeline)。日志记录、用户认证、MVC等模块都以中间件(Middleware)的方式注册在管道中。显而易见这样的设计非常松耦合并且非常灵活&#xff0c;你可以自己定义任意功能的Middleware注册在管道中。这一设计非常适用于“请求-响应”…

怎么样安装Ubuntu系统,一文告诉你

前言 额滴神呐/(ㄒoㄒ)/~~&#xff0c;用惯了windows开发&#xff0c;初上手Linux桌面开发真的是举步维艰&#xff08;内心ps&#xff1a;谁让你立这个标题的&#xff0c;现在后悔了吧… 你自己想办法 怎么把这个标题栏目圆过去&#xff09; 经过跟内心戏反复的都在&#xff0…

(转) SpringBoot接入两套kafka集群

转自&#xff1a; SpringBoot接入两套kafka集群 - 风小雅 - 博客园引入依赖 compile org.springframework.kafka:spring-kafka 第一套kafka配置 package myapp.kafka; importhttps://www.cnblogs.com/ylty/p/13673357.html 引入依赖 compile org.springframework.kafka:spring…

idea tomcat部署web项目_项目开发之部署帆软到Tomcat服务一

书接上回上一篇文章介绍了两种图表取数的方式&#xff0c;新增数据库查询和通过存储过程取数&#xff0c;其他的内置数据集&#xff0c;文件数据集和关联数据集等方式暂时还没有用到&#xff0c;先暂时不介绍了&#xff0c;等之后用到了或者等小编有时间试过之后再来做个简单的…

C#工业物联网和集成系统解决方案的技术路线

前言 2000年以后&#xff0c;互联网在中国的大地上如火如荼的发展&#xff0c;在这个行业竞争中比的是加速度。我清晰的记得《世界是平的》中有这样一段话&#xff1a;在非洲&#xff0c;羚羊每天早上醒来时&#xff0c;它知道自己必须跑得比最快的狮子还快&#xff0c;否则就会…

转:Kafka事务使用和编程示例/实例

Kafka事务使用和编程示例/实例_JobShow裁员加班实况-微信小程序-CSDN博客一、概述​ Kafka事务特性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中&#xff0c;或者说是一个原子操作&#xff0c;生产消息和提交偏移量同时成功或者失败。注意&#xff1a;kafk…

[初级]Java中的switch对整型、字符型、字符串的具体实现细节

转载自 [初级]Java中的switch对整型、字符型、字符串的具体实现细节Java 7中&#xff0c;switch的参数可以是String类型了&#xff0c;这对我们来说是一个很方便的改进。到目前为止switch支持这样几种数据类型&#xff1a;byteshort int char String 。但是&#xff0c;作为一个…

SpringBoot-Cache整合redis

前言 SpringBoot的众多Starter有两个很重要的缓存Starter&#xff0c;其中一个是我们经常用到的Redis&#xff08;spring-boot-starter-data-redis&#xff09;还有一个是 spring-boot-starter-cache。 今天主要是简单介绍一个如何整合这两个组件&#xff0c;达到相互合作的关系…

C#跨平台物联网通讯框架ServerSuperIO(SSIO)

一.SSIO的特点 轻型高性能通信框架&#xff0c;适用于多种应用场&#xff0c;轮询模式、自控模式、并发模式和单例模式。设备驱动、IO通道、控制模式场景协调统一。设备驱动内轩命令驱动器、命令缓存器、自定义参数和实时数据元素。框架平台支持按设备命令优先级别进行调度&…

spring boot 单元测试_spring-boot-plus1.2.0-RELEASE发布-快速打包-极速部署-在线演示

spring-boot-plusspring-boot-plus集成spring boot常用开发组件的后台快速开发脚手架Purpose每个人都可以独立、快速、高效地开发项目&#xff01;Everyone can develop projects independently, quickly and efficiently&#xff01;官网地址&#xff1a;springboot.plusGITHU…

在Java中如何高效的判断数组中是否包含某个元素

转载自 在Java中如何高效的判断数组中是否包含某个元素如何检查一个数组(无序)是否包含一个特定的值&#xff1f;这是一个在Java中经常用到的并且非常有用的操作。同时&#xff0c;这个问题在Stack Overflow中也是一个非常热门的问题。在投票比较高的几个答案中给出了几种不同的…

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

【README】 0&#xff0c;为啥要看 DefaultKafkaProducerFactory&#xff1f; 最近在基于 springboot 开发kafka模块&#xff0c;发现 kafakTemplate构造器传入了 DefaultKafkaProducerFactory实例&#xff0c; kafkaTemplate内部使用了 很多 DefaultKafkaProducerFactory的方…

【SpringSecurity】【JJWT】JJWT踩坑LocalDateTime

前言 最近自己又在开始闲搞&#xff0c;主要原因还是下山无望&#xff08;买显卡&#xff09;。只能晚上下班找点事情做啦~~ 环境 版本请根据实际情况参考JJWT官网选择使用&#xff0c;这里只说明一下问题大概思路&#xff01; <!-- 增加token生成依赖 --> <depen…

针对Linux ASP.NET MVC网站中 httpHandlers配置无效的解决方案

近期有Linux ASP.NET用户反映&#xff0c;在MVC网站的Web.config中添加 httpHandlers 配置用于处理自定义类型&#xff0c;但是在运行中并没有产生预期的效果&#xff0c;服务器返回了404&#xff08;找不到网页&#xff09;错误。经我亲自测试&#xff0c;在WebForm网站中&…

简单介绍Java中Comparable和Comparator

转载自 简单介绍Java中Comparable和ComparatorComparable 和 Comparator是Java核心API提供的两个接口&#xff0c;从它们的名字中&#xff0c;我们大致可以猜到它们用来做对象之间的比较的。但它们到底怎么用&#xff0c;它们之间有又哪些差别呢&#xff1f;下面有两个例子可以…

spring-kafka整合:KafkaTemplate-kafka模板类介绍

【README】 1&#xff0c;本文主要关注 KafkaTemplate的重点方法&#xff0c;并非全部方法&#xff1b; 2&#xff0c;KafkaTemplate 底层依赖于 DefaultKafkaProducerFactory &#xff0c; 关于 DefaultKafkaProducerFactory 的介绍&#xff0c;refer2 spring-kafka整合:…

安卓 on a null object reference_详解Object.prototype.__proto__

Object.prototype 的 __proto__ 属性是一个访问器属性(一个getter函数和一个setter函数), 暴露了通过它访问的对象的内部[[Prototype]] (一个对象或 null)。使用__proto__是有争议的&#xff0c;也不鼓励使用它。因为它从来没有被包括在EcmaScript语言规范中&#xff0c;但是现…

【SpringBoot】服务器JVM远程调试

目的 当系统部署到测试环境服务器时&#xff0c;难免会遇到bug。这个时候如果能远程调试&#xff0c;那么能够大大提高我们的生产效率&#xff0c;快速完成服务调试&#xff0c;最快发布生产环境。&#xff08;领导好评不就到手了&#xff09; 准备 Idea&#xff08;Java最好…