kafka入门(一):kafka消息发送与消费

kafka的基础概念

  • Producer (消息生产者)
    向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

  • Consumer (消息消费者)
    订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

  • Consumer Group (消费者组)

每个消费者属于一个特定的消费者群组(可为每个消费者指定group name,若不指定group name则属于默认的group)。

每个消费者群组都有一个唯一的GroupId。

  • Brokers(kafka服务器)

一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

一个broker可以容纳多个topic。每个broker都有各自的broker.id。

  • Topics(主题)

消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。

  • Partition(分区)

主题(Topic)可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,

由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序。

安装kafka,创建 topic:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.10.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency>

kafka生产者示例:

按以下步骤发送消息:

  • 设置 broker服务器的ip和端口
  • 生产者初始化
  • 发送消息
public class KafkaDemoProducer {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static void main(String[] args) {//属性配置Properties properties = getProperties(BROKER_LIST);//生产者初始化KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");//发送消息try {producer.send(record);} catch (Exception e) {System.out.println("send error." + e);}producer.close();}private static Properties getProperties(String brokerList) {Properties properties = new Properties();properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", brokerList);return properties;}}

kafka消费者示例:

主要按照以下步骤:

  • 设置 broker服务器的ip和端口, 设置 消费者群组id

  • 初始化消费者

  • 消费者订阅主题

  • 消费者批量拉取消息

public class KafkaDemoConsumer {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {consumerRecord();}public static void consumerRecord() {//属性配置Properties properties = getProperties(BROKER_LIST, GROUP_ID);//消费者初始化KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//消息者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));//循环while (true) {//每次拉取 1千条消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("=============> 消费kafka消息:"+ record.value());}}}public static Properties getProperties(String brokerList, String groupId) {Properties properties = new Properties();//序列化properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//broker服务器的ip和端口,多个用逗号隔开properties.put("bootstrap.servers", brokerList);//消费者群组idproperties.put("group.id", groupId);return properties;}}

观察idea 控制台,可以看到 成功消费了消息:

=============> 消费kafka消息:hello kafka

参考资料:

《深入理解kafka 核心设计与实践原理》
kafka的简单理解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

03 前后端数据交互【小白入门SpringBoot + Vue3】

项目笔记&#xff0c;教学视频来源于B站青戈 https://www.bilibili.com/video/BV1H14y1S7YV 前两个笔记。是把前端页面大致做出来&#xff0c;接下来&#xff0c;把后端项目搞一下。 后端项目&#xff0c;使用IDEA软件、jdk1.8、springboot2.x 。基本上用的是稳定版。 还有My…

【C++】chono库:使用及源码分析

文章目录 0. 概述1. duration1.1 分析std::chrono::duration_cast() 1.2 使用案例std::chrono::duration::count() 1.3 部分源码 2. time_point2.1 分析std::chrono::time_point_cast() 2.2 使用举例std::chrono::time_point::time_since_epoch() 2.3 部分源码 0. 概述 本篇文…

python使用selenium webDriver时 报错

可能原因和解决&#xff1a; 1. python 解释器 ----> 设置 2. 浏览器版本 与 浏览器驱动版本不一致 ----> 安装同一版本的 (下载chromedriver | 谷歌驱动更高版本的测试版) 参考&#xff1a;Python使用Selenium WebDriver的入门介绍及安装教程-CSDN博客 Selenium安…

设计模式-行为型模式-策略模式

一、什么是策略模式 策略模式是一种行为设计模式&#xff0c;它允许在运行时选择算法或行为&#xff0c;并将其封装成独立的对象&#xff0c;使得这些算法或行为可以相互替换&#xff0c;而不影响使用它们的客户端。&#xff08;ChatGPT生成&#xff09; 主要组成部分&#xff…

基于django的在线教育系统

基于python的在线教育系统 摘要 基于Django的在线教育系统是一种利用Django框架开发的现代化教育平台。该系统旨在提供高效、灵活、易用的在线学习体验&#xff0c;满足学生、教师和管理员的需求。系统包括学生管理、课程管理、教师管理、视频课程、在线测验等核心功能。系统采…

docker 部署Redis集群(三主三从,以及扩容、缩容)

1&#xff1a;创建6个redis容器 docker run -d --name redis01 --net host --privilegedtrue -v /opt/redis/redis01:/data redis:6.0.8 --cluster-enabled yes --appendonly yes --port 6381 docker run -d --name redis02 --net host --privilegedtrue -v /opt/redis/redis0…

在线 sha1 加密

ttmd5 http://www.ttmd5.com/hash.php?type5 qqxiuzi https://www.qqxiuzi.cn/bianma/sha-1.htm jb51 http://tools.jb51.net/password/sha_encode

Kubernetes实战(五)-pod之间网络请求实战

1 同namespace内pod网络请求 1.1 创建namespace ygq $ kubectl create namespace ygq namespace/ygq created 1.2 创建svc和deployment 在naemspace ygq下创建两个应用&#xff1a;nginx和nginx-test。 1.2.1 部署应用nginx $ cat nginx-svc.yaml apiVersion: v1 kind: …

立哥国家示范项目-5G智慧文旅

项目总体技术方案&#xff1a; 1、旅游5G专网建设&#xff1a;是基于公网授权频谱&#xff0c;采用专线形式&#xff0c;使用MEC服务器为用户提供服务,边缘计算使用Edge VLAVR支持多类型应用&#xff0c;并通过编排实现边缘业务的构建。解决了信号密度覆盖小、强度弱的问题。 …

代码随想录二刷 | 数组 | 总结篇

代码随想录二刷 &#xff5c; 数组 &#xff5c; 总结篇 基础知识二分查找移除元素有序数组的平方长度最小的数组最小覆盖子串螺旋数组 基础知识 定义&#xff1a;数组是存放在连续内存空间上的相同类型数据的集合 特点&#xff1a; 数组下标从 0 开始数组内存空间的地址是连…

Golang Context 的并发安全性探究

在 Golang 中&#xff0c;Context 是一个用于管理 goroutine 生命周期、传递请求和控制信息的重要机制。然而&#xff0c;当多个 goroutine 同时使用 Context 时&#xff0c;很容易出现并发安全性问题。本文将探讨如何正确使用 Context 并保证其在并发环境下的安全性。 1. Con…

23111707[含文档+PPT+源码等]计算机毕业设计基于javawebmysql的旅游网址前后台-全新项目

文章目录 **软件开发环境及开发工具&#xff1a;****功能介绍&#xff1a;****论文截图&#xff1a;****实现&#xff1a;****代码:** 编程技术交流、源码分享、模板分享、网课教程 &#x1f427;裙&#xff1a;776871563 软件开发环境及开发工具&#xff1a; 前端使用技术&a…

mock测试数据

1.下载一个jar 架包 地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1G5rVF5LlIYpyU-_KHsGjOA?pwdab12 提取码&#xff1a;ab12 2.配置当前电脑java环境变量 3.在同一文件目录下创建json 数据4.在终端切换到当前目录下启动服务&#xff0c; java -jar ./moco-r…

力扣:171. Excel 表列序号(Python3)

题目&#xff1a; 给你一个字符串 columnTitle &#xff0c;表示 Excel 表格中的列名称。返回 该列名称对应的列序号 。 例如&#xff1a; A -> 1 B -> 2 C -> 3 ... Z -> 26 AA -> 27 AB -> 28 ... 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; …

使用百度翻译API或腾讯翻译API做一个小翻译工具

前言 书到用时方恨少&#xff0c;只能临时抱佛脚。英文pdf看不懂&#xff0c;压根看不懂。正好有百度翻译API和腾讯翻译API&#xff0c;就利用两个API自己写一个简单的翻译工具&#xff0c;充分利用资源&#xff0c;用的也放心。 前期准备 关键肯定是两大厂的翻译API&#x…

IDEA 集成 Docker 插件一键部署 SpringBoot 应用

目录 前言IDEA 安装 Docker 插件配置 Docker 远程服务器编写 DockerFileSpringBoot 部署配置SpringBoot 项目部署结语 前言 随着容器化技术的崛起&#xff0c;Docker成为了现代软件开发的关键工具。在Java开发中&#xff0c;Spring Boot是一款备受青睐的框架&#xff0c;然而&…

kubenetes-服务发现和负载均衡

一、服务发布 kubenetes把服务发布至集群内部或者外部&#xff0c;服务的三种不同类型&#xff1a; ClusterlPNodePortLoadBalancer ClusterIP是发布至集群内部的一个虚拟IP,通过负载均衡技术转发到不同的pod中。 NodePort解决的是集群外部访问的问题&#xff0c;用户可能不…

debian 修改镜像源为阿里云【详细步骤】

文章目录 修改步骤第 1 步:安装 vim 软件第 2 步:备份源第 3 步:修改为阿里云镜像参考👉 背景:在 Docker 中安装了 jenkins 容器。查看系统,发现是 debian 11(bullseye)。 👉 目标:修改 debian bullseye 的镜像为阿里云镜像,加速软件安装。 修改步骤 第 1 步:…

限制Domain Admin登录非域控服务器和用户计算机

限制Domain Admin管理员使用敏感管理员帐户(域或林中管理员组、域管理员组和企业管理员组中的成员帐户)登录到信任度较低的服务器和用户端计算机。 此限制可防止管理员通过登录到信任度较低的计算机来无意中增加凭据被盗的风险。 建议采用的策略 建议使用以下策略限制对信任度…

SPASS-偏相关分析

基本概念 偏相关分析的任务就是在研究两个变量之间的线性相关关系时控制可能对其产生影响的变量,这种相关系数称为偏相关系数。偏相关系数的数值和简单相关系数的数值常常是不同的,在计算简单相关系数时,所有其他自变量不予考虑。 统计原理 控制一个变量和控制两个变量的偏…