kafka安装和使用的入门教程

这篇文章简单介绍如何在ubuntu上安装kafka,并使用kafka完成消息的发送和接收。

一、安装kafka

访问kafka官网Apache Kafka,然后点击快速开始

紧接着,点击Download

最后点击下载链接下载安装包

如果下载缓慢,博主已经把安装包上传到百度网盘:

链接:https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd=3aoh
提取码:3aoh
--来自百度网盘超级会员V3的分享

二、启动kafka

经过上一步下载完成后,按照页面的提示启动kafka

1、通过远程连接工具,如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录

2、切换到usr目录,解压kafka_2.13-3.6.0.tgz

cd /usrtar -zxzf kafka_2.13-3.6.0.tgz

3、启动zookeeper

修改配置文件confg/zookeeper.properties,修改一下数据目录

dataDir=/usr/local/zookeeper

然后通过以下命令启动kafka自带的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

4、启动kafka

修改配置文件confg/server.properties,修改一下kafka保存日志的目录

log.dirs=/usr/local/kafka/logs

然后新开一个连接窗口,通过以下命令启动kafka

bin/kafka-server-start.sh config/server.properties

三、kafka发送、接收消息

创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

生产消息

往刚刚创建的topic里发送消息,可以一次性发送多条消息,点击Ctrl+C完成发送

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

消费消息

消费最新的消息

新开一个连接窗口,在命令行输入以下命令拉取topic为hello上的消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

消费之前的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello

指定偏移量消费

 指定从第几条消息开始消费,这里--offset参数设置的偏移量是从0开始的。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello

消息的分组消费
每个消费者都可以指定一个消费者组, kafka 中的同一条消息,只能被同一个消费者组下的某一个消费 者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。
通过以下命令在启动消费者时设置分组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=helloGroup --topic hello

四、Java中使用kafka

通过maven官网搜索kafka的maven依赖版本

https://central.sonatype.com/search?q=kafkaicon-default.png?t=N7T8https://central.sonatype.com/search?q=kafka然后通过IntelliJ IDEA创建一个maven项目kafka,在pom.xml中添加kafka的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>kafka</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>3.6.0</version></dependency></dependencies>
</project>

创建消息生产者

生产者工厂类
package producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** 消息生产者工厂类* @author heyunlin* @version 1.0*/
public class MessageProducerFactory {private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";public static Producer<String, String> getProducer() {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<>(props);}}

测试发送消息
package producer;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;/*** @author heyunlin* @version 1.0*/
public class MessageProducer {private static final String TOPIC = "hello";public static void main(String[] args) {ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "Message From Producer.");Producer<String, String> producer = MessageProducerFactory.getProducer();// 同步发送消息producer.send(record);// 异步发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {String topic = recordMetadata.topic();long offset = recordMetadata.offset();int partition = recordMetadata.partition();String message = recordMetadata.toString();System.out.println("topic = " + topic);System.out.println("offset = " + offset);System.out.println("message = " + message);System.out.println("partition = " + partition);}});// 加上这行代码才会发送消息producer.close();}}

创建消息消费者

消费者工厂类
package consumer;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;/*** 消息生产者工厂类* @author heyunlin* @version 1.0*/
public class MessageConsumerFactory {private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";public static Consumer<String, String> getConsumer() {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "helloGroup");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<>(props);}}

测试消费消息
package consumer;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.time.Duration;
import java.util.Collections;/*** @author heyunlin* @version 1.0*/
public class MessageConsumer {private static final String TOPIC = "hello";public static void main(String[] args) {Consumer<String, String> consumer = MessageConsumerFactory.getConsumer();consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key() + ": " + record.value());}// 提交偏移量,避免消息重复推送consumer.commitSync(); // 同步提交// consumer.commitAsync(); // 异步提交}}}

五、springboot整合kafka

开始前的准备工作

然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka,在pom.xml中添加kafka的依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

然后修改application.yml,添加kafka相关配置

spring:kafka:bootstrap-servers: 192.168.254.128:9092producer:acks: 1retries: 3batch-size: 16384properties:linger:ms: 0buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: helloGroupenable-auto-commit: falseauto-commit-interval: 1000auto-offset-reset: latestproperties:request:timeout:ms: 18000session:timeout:ms: 12000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建消息生产者

package com.example.springboot.kafka.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;/*** @author heyunlin* @version 1.0*/
@RestController
@RequestMapping(path = "/producer", produces = "application/json;charset=utf-8")
public class KafkaProducer {private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)public String sendMessage(String message) {kafkaTemplate.send("hello", message);return "发送成功~";}}

创建消息消费者

package com.example.springboot.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author heyunlin* @version 1.0*/
@Component
public class KafkaConsumer {@KafkaListener(topics = "hello")public void receiveMessage(ConsumerRecord<String, String> record) {String topic = record.topic();long offset = record.offset();int partition = record.partition();System.out.println("topic = " + topic);System.out.println("offset = " + offset);System.out.println("partition = " + partition);}}

然后访问网址http://localhost:8080/producer/sendMessage?message=hello往topic为hello的消息队列发送消息。控制台打印了参数,成功监听到发送的消息。

 

文章涉及的项目已经上传到gitee,按需获取~

Java中操作kafka的基本项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/kafka.git

springboot整合kafka案例项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/springboot-kafka.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用匿名函数在Golang中的好处

发挥Golang中无名代码块的潜力 匿名函数&#xff0c;也被称为lambda函数或闭包&#xff0c;是Golang中的一个强大功能&#xff0c;提供了许多好处。这些无名代码块为开发人员在设计和构建其代码时提供了更大的灵活性和模块化。在本节中&#xff0c;我们将探讨使用匿名函数可以…

《Python基础教程》专栏总结篇

大家好&#xff0c;我是爱编程的喵喵。双985硕士毕业&#xff0c;现担任全栈工程师一职&#xff0c;热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…

【Java基础面试一】、为什么Java代码可以实现一次编写、到处运行?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;为什么Java代码可以实现…

国外的跨境物流有什么学习的地方

国外的跨境物流的学习点可以从以下几个方面入手&#xff1a; 跨境物流的技术与流程&#xff1a;学习国外跨境物流的技术和流程&#xff0c;了解国外先进的物流设备和仓储管理系统。 国际贸易政策和法规&#xff1a;熟悉国际贸易政策和法规&#xff0c;包括出口清关流程、运输协…

python - 内存池的机制

内存池是一中用于管理内存分配的机制&#xff0c;它可以提高内存分配和释放的小路&#xff0c;内存池通常由一块预先分配好的内存区域和一个空闲列表组成&#xff0c;当需要内存分配内存时&#xff0c;内存会重空闲列表中取出来医疗空闲的内存&#xff0c;当释放内存时&#xf…

头部品牌集体扑街!2023年9月京东平板电视TOP10品牌排行榜出炉

鲸参谋监测的京东平台9月份平板电视市场最新销售数据已出炉&#xff01; 根据鲸参谋平台的数据显示&#xff0c;9月份&#xff0c;京东平台大家电品类——平板电视的整体销售呈现下滑。具体地&#xff0c;9月平板电视的销量为62万&#xff0c;环比降低约18%&#xff0c;同比降低…

oracle 同一张表同时insert多条数据 mysql 同一张表同时insert多条数据

oracle 同一张表同时insert多条数据 在Oracle数据库中&#xff0c;你可以使用INSERT ALL语句同时向同一张表插入多条数据。INSERT ALL语句允许你一次执行多个插入操作&#xff0c;可以提高插入的效率和速度。 以下是使用INSERT ALL语句插入多条数据的示例&#xff1a; INSERT…

线程安全问题 的小案例

package Thread_api_test;public class ThreadSafety {//模拟线程安全问题public static void main(String[] args) {//1:创建一个账户对象 代表两个人的共享账户Account accnew Account("ICBC",10000);//创建两个线程 分别两个人 再去同一个账户里取钱10000new Draw…

【Python机器学习】零基础掌握HDBSCAN聚类

如何在大量数据中识别出有用的模式或群体? 在我们日常生活和工作中,数据无处不在。但,在大量数据中找到有用的信息却并非易事。特别是对于那些不具备数据科学背景的人来说,这一任务更加艰巨。那么有没有一种方法,不仅高效还易于理解,能帮助我们从复杂的数据中提取有用的…

细说晶振元件HC49U的功能、应用及性能特点 | 百能云芯

在电子领域&#xff0c;晶振&#xff08;Crystal Oscillator&#xff09;是一种重要的元件&#xff0c;用于产生稳定的时钟信号。HC49U是一种常见的晶振型号&#xff0c;具有广泛的应用范围。云芯将带您深入探讨HC49U晶振的功能、应用领域&#xff0c;以及其性能特点&#xff0…

postman接口测试

HTTP的接口测试工具有很多&#xff0c;可以进行http请求的方式也有很多&#xff0c;但是可以直接拿来就用&#xff0c;而且功能还支持的不错的&#xff0c;我使用过的来讲&#xff0c;还是postman比较上手。 优点&#xff1a; 1、支持用例管理 2、支持get、post、文件上传、响…

安捷伦N8974A分析仪

安捷伦N8974A分析仪 N8974A 是 Agilent 的二手分析仪。分析仪是测试工程、医疗、汽车和技术行业电子设备的关键工具。使用分析仪来监控许多不同类型的电子设备的性能。您可能需要分析仪来测量音频频谱、电压和电流、信号和频率等分量 频率范围&#xff1a;10 MHz 至 6.7 GHz 一…

MTC证书|欧盟与英国金属类产品清关新要求

英国禁止进口俄钢材的通告内容 从2023年 9 月 30 日起&#xff0c;欧盟和英国将对在第三国加工的特定钢铁产品实施新的制裁&#xff0c;这些产品包含俄罗斯原产的钢铁原料。进口商需要在进口时申报进口货物是否合规。 MTC认证 欧盟严抓MTC认证 获悉&#xff0c;从10月1日起&a…

浅谈大数据之Flink

1.3.4 Flink Flink是由德国3所大学发起的学术项目,后来不断发展壮大,并于2014年年末成为Apache顶级项目之一。在德语中,“flink”表示快速、敏捷,以此来表征这款计算框架的特点。 Flink主要面向流处理,如果说Spark是批处理界的“王者”,那么Flink就是流处理领域冉冉升…

本地PHP搭建简单Imagewheel私人云图床,在外远程访问

&#x1f525;博客主页&#xff1a; 小羊失眠啦 &#x1f516;系列专栏&#xff1a; C语言、Linux &#x1f325;️每日语录&#xff1a;追逐影子的人&#xff0c;自己就是影子。 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 1.前言 云存储在前几年风头无两&#xff0c;云存…

Linux下企业级夜莺监控分析工具的远程访问设置【内网穿透】

文章目录 前言1. Linux 部署Nightingale2. 本地访问测试3. Linux 安装cpolar4. 配置Nightingale公网访问地址5. 公网远程访问Nightingale管理界面6. 固定Nightingale公网地址 前言 夜莺监控是一款开源云原生观测分析工具&#xff0c;采用 All-in-One 的设计理念&#xff0c;集…

C++ 之 Vector 和 List

Vector vector 是C STL中最常用的容器&#xff0c;支持存储多种类型的数据。 与数组相比&#xff0c;它的大小是可变的&#xff0c;因此也会被称为动态数组。 使用连续的存储空间&#xff0c;访问速度快&#xff0c;但插入删除慢。 使用它&#xff0c;需要包含头文件&#x…

DDD之领域(Domain)和子域(Subdomain)

领域驱动设计系列文章&#xff0c;点击上方合集↑ 1. 领域 领域&#xff08;Domain&#xff09;是一个组织所做的事情以及其中所包含的一切&#xff0c;领域可以表示整个业务系统。 领域&#xff0c;简单来说&#xff0c;是指一个业务或行业领域&#xff0c;例如电商、社交媒…

从零开始学习调用百度地图网页API:三、鼠标点击绘图功能

目录 代码功能问题注意addEventListenerplot_line 代码 <!DOCTYPE html> <html> <head><meta http-equiv"Content-Type" content"text/html; charsetutf-8" /><meta name"viewport" content"initial-scale1.0,…

MySQL 的下载与安装

MySQL 的下载 https://cdn.mysql.com/archives/mysql-5.7/mysql-5.7.30-1.el7.x86_64.rpm-bundle.tar 将下载的数据包拉到虚拟机的linux系统的主文件夹下,创建一个MySQL文件存放 安装MySQL 1、解压数据包 tar -xvf mysql-5.7.30-1.el7.x86_64.rpm-bundle.tar -x: 表示解压…