在linux上安装kafka,并使用kafka-clients构建消费者

news/2025/10/23 14:12:56/文章来源:https://www.cnblogs.com/slgkaifa/p/19160443

在linux上安装kafka,并使用kafka-clients构建消费者

1.安装 java 环境

Kafka依赖 Java运行环境(JDK8或更高版本)

# 安装OpenJDK(推荐)
yum install openjdk-11-jdk
# 验证安装
java -version

如果已经安装Java环境,可通过如下方式进行检查

# 检查当前 JAVA_HOME
echo $JAVA_HOME
# 查找Java安装路径
update-alternatives --config java
# 输出示例:/usr/lib/jvm/java-11-openjdk-amd64/bin/java
# 设置JAVA_HOME(替换为你的路径)
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.18.0.10-1.el7_9.x86_64
export PATH=$JAVA_HOME/bin:$PATH
#永久修复(所有终端生效):
# /etc/profile
vim /etc/profile
# 添加以下内容(替换为你的路径)
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.18.0.10-1.el7_9.x86_64
export PATH=$JAVA_HOME/bin:$PATH
# 使配置生效
source /etc/profile

2.下载和解压Kafka

从官网下载最新版Kafka(以3.7.0为例)

​wget https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
mv kafka_2.13-3.7.0 /opt/kafka  #移动到/opt目录
cd /opt/kafka

3.配置Kafka

修改配置文件 config/server.properties:

vim config/server.properties

关键配置项:

listeners=PLAINTEXT://0.0.0.0:9092
# 允许外部访问(替换为你的服务器IP或保持localhost。如果需要其他机器能访问你的kafka,就配置为服务器IP)
advertised.listeners=PLAINTEXT://<服务器IP>:9092# 日志存储目录(确保目录存在且可写)log.dirs=/tmp/kafka-logs

4.启动Zookeeper和Kafka

Kafka依赖ZooKeeper协调服务,新版本内置了ZooKeeper(等zookeeper启动好了再启动kafka)
如果启动时内存不足,可以修改zookeeper和kafka的启动脚本,把内存缩小一些,我是缩小了一倍:

#启动ZooKeeper(后台运行)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
#启动Kafka(后台运行)
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &

5.测试Kafka

5.1创建topic(localhost或你的服务器IP)

bin/kafka-topics.sh --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1

5.2启动生产者

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

5.3启动消费者(新终端)

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning

6.停止服务

# 停止Kafka
bin/kafka-server-stop.sh
# 停止ZooKeeper
bin/zookeeper-server-stop.sh

7.常见问题

端口冲突:确保9092(Kafka)和2181(ZooKeeper)端口未被占用。

防火墙:开放端口或关闭防火墙:
日志目录权限:确保Kafka进程有权限写入log.dirs配置的目录。

8.常用命令

#启动ZooKeeper(后台运行)
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
#启动Kafka(后台运行)
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
#创建测试用的 topic
bin/kafka-topics.sh --create --topic lzq-topic --bootstrap-server 123.249.124.105:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --create --topic lzq-topic \
--bootstrap-server 123.249.124.105:9092 \
--partitions 1 --replication-factor 1
#创建生产者
bin/kafka-console-producer.sh --topic lzq-topic --bootstrap-server 123.249.124.105:9092
#创建消费者
bin/kafka-console-consumer.sh --topic lzq-topic --bootstrap-server 123.249.124.105:9092 --from-beginning
# 停止Kafka
bin/kafka-server-stop.sh
# 停止ZooKeeper
bin/zookeeper-server-stop.sh
# 3. 列出topic
bin/kafka-topics.sh --list --bootstrap-server 123.249.124.105:9092
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
ps -ef|grep kafka
ps -ef|grep zook
# 列出所有活跃的消费者组
bin/kafka-consumer-groups.sh --bootstrap-server 123.249.124.105:9092 --list
# 查看特定组的详情
bin/kafka-consumer-groups.sh --bootstrap-server 123.249.124.105:9092 --group app-system-group --describe

9.pom引入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.1</version>
<scope>compile</scope>
</dependency>

10.创建配置类

package cn.newdt.monitor.custom.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka配置类
*/
@Configuration
public class KafkaConfig {
@Value("${kafka.bootstrap.servers:123.249.124.105:9092}")
private String bootstrapServers;
@Value("${kafka.group.id:app-system-group}")
private String groupId;
@Value("${kafka.enable.auto.commit:true}")
private Boolean enableAutoCommit;
@Value("${kafka.auto.commit.interval:5000}")
private Integer autoCommitInterval;
@Value("${kafka.auto.offset.reset:latest}")
private String autoOffsetReset;
@Value("${kafka.fetch.max.wait:500}")
private Integer fetchMaxWait;
@Value("${kafka.fetch.min.size:1}")
private Integer fetchMinSize;
@Value("${kafka.heartbeat.interval:3000}")
private Integer heartbeatInterval;
@Value("${kafka.max.poll.records:500}")
private Integer maxPollRecords;
@Bean
public KafkaConsumer kafkaConsumer() {
Map<String, Object> configs = new HashMap<>();// kafka服务端的IP和端口,格式:(ip:port)configs.put("bootstrap.servers", bootstrapServers);// 指定消费组(自己起的组名,启动时会自动把这个组名注册到kafka)configs.put("group.id", groupId);// 开启consumer的偏移量(offset)自动提交到Kafkaconfigs.put("enable.auto.commit", enableAutoCommit);//consumer的偏移量(offset) 自动提交的时间间隔,单位毫秒configs.put("auto.commit.interval.ms", autoCommitInterval);/*偏移量重置策略(在Kafka中没有初始化偏移量或者当前偏移量不存在情况)earliest, 在偏移量无效的情况下, 自动重置为最早的偏移量latest, 在偏移量无效的情况下, 自动重置为最新的偏移量none, 在偏移量无效的情况下, 抛出异常.*/configs.put("auto.offset.reset", autoOffsetReset);// 请求阻塞最大时间configs.put("fetch.max.wait.ms", fetchMaxWait);// 请求应答最小字节数configs.put("fetch.min.bytes", fetchMinSize);// 心跳间隔时间configs.put("heartbeat.interval.ms", heartbeatInterval);// 单次poll最大记录数configs.put("max.poll.records", maxPollRecords);// Key和Value反序列化器Deserializer<String> keyDeserializer = new StringDeserializer();Deserializer<String> valueDeserializer = new StringDeserializer();// 创建Kafka消费者return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer);}}

10.创建消费者

package cn.newdt.monitor.custom;
import cn.hutool.core.thread.ThreadUtil;
import cn.newdt.base.cmdb.domain.AppSystem;
import cn.newdt.monitor.custom.dao.config.CustomAppSystemMapper;
import cn.newdt.monitor.utils.CommonUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 监听来自 kafka 的应用系统消息
*/
@Component
@Slf4j
public class AppSystemKafkaConsumer implements InitializingBean, DisposableBean {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private CustomAppSystemMapper customAppSystemMapper;
@Value("${kafka.topic.name:lzq-topic}")
private String topicName;
@Value("${kafka.poll.timeout:1000}")
private Long pollTimeout;
private volatile Thread consumerThread;
private final AtomicBoolean running = new AtomicBoolean(false);
private static final String LOG_PREFIX = "[kafka同步应用系统]";
@Override
public void afterPropertiesSet() {
consumerThread = new Thread(this::consumeMessages, "kafka-consumer-thread");
consumerThread.setDaemon(false);
consumerThread.start();
}
private void consumeMessages() {
try {
log.info("{}启动线程监听Topic: {}", LOG_PREFIX, topicName);
ThreadUtil.sleep(1000);
kafkaConsumer.subscribe(Collections.singletonList(topicName));
running.set(true);
while (running.get()) {
try {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(pollTimeout));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {processMessage(consumerRecord);}} catch (Exception e) {log.error("{}Kafka消费过程中发生异常", LOG_PREFIX, e);ThreadUtil.sleep(5000); // 发生异常时短暂休眠避免频繁重试}}} catch (Exception e) {log.error("{}Kafka消费者线程执行异常", LOG_PREFIX, e);} finally {log.info("{}Kafka消费者线程已停止", LOG_PREFIX);}}private void processMessage(ConsumerRecord<String, String> consumerRecord) {try {String originalMsg = consumerRecord.value();log.info("{}消费消息 - Topic: {}, Partition: {}, Offset: {}, Value: {}",LOG_PREFIX,consumerRecord.topic(),consumerRecord.partition(),consumerRecord.offset(),originalMsg);// todo: 在这里添加具体的业务逻辑处理} catch (Exception e) {log.error("{}处理Kafka消息时发生异常: {}", LOG_PREFIX, consumerRecord.value(), e);}}@Overridepublic void destroy() {log.info("开始关闭Kafka消费者");running.set(false);if (consumerThread != null) {consumerThread.interrupt();try {consumerThread.join(5000); // 等待最多5秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}}if (kafkaConsumer != null) {try {kafkaConsumer.close();log.info("Kafka消费者已关闭");} catch (Exception e) {log.error("关闭Kafka消费者时发生异常", e);}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/944241.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年10月国内防水耳机生产厂家全景解析报告,基于专业测评的技术、性能及市场优势深度分析

随着消费者对运动与户外活动需求的增加,防水耳机市场迎来快速增长。根据行业数据显示,2025年全球防水耳机市场规模预计突破50亿美元,年复合增长率达12%以上。本报告基于专业市场调研与产品测评数据,从技术研发、性…

Oracle故障处理:ORA-00600: internal error code, arguments: [3020], [5], [13], [20971533]

我们的文章会在微信公众号IT民工的龙马人生和博客网站( www.htz.pw )同步更新 ,欢迎关注收藏,也欢迎大家转载,但是请在文章开始地方标注文章出处,谢谢! 由于博客中有大量代码,通过页面浏览效果更佳。Oracle故障处…

2025年10月国内环保悬浮拼装地板生产厂家全景解析报告,基于专业测评的技术、性能及市场优势深度分析

随着环保意识的提升和体育设施建设的快速发展,环保悬浮拼装地板作为一种可回收、耐用的地面材料,在校园、体育馆和公共场所的应用日益广泛。其市场需求逐年增长,据行业数据显示,2024年全球悬浮拼装地板市场规模预计…

批量跑脚本后自定义消息内容发送至钉钉--自定义发送到钉钉的消息内容

import requestsimport jsonimport osimport timeimport sysdef send_dingtalk_message(webhook_url, success_count, fail_count, fail_module, job_name, build_number, build_user, build_url): ""&qu…

2025年10月国内四氟电加热器生产厂家全景解析报告,基于专业测评的技术、性能及市场优势深度分析

在工业加热设备领域,四氟电加热器因耐腐蚀、防粘附、高温稳定性等特性,已成为化工、医药、食品等高标准行业的首选设备。据《2025中国工业加热设备技术白皮书》统计,四氟电加热器国内市场年增长率达12.8%,其中具备…

Python编程:类型转换与输入输出

Python编程:类型转换与输入输出在Python编程中,类型转换和输入输出是基础且重要的概念。下面我将详细介绍这些内容,并提供一个综合示例。 输入输出基础 输入函数:input() input() 函数用于从用户获取输入,返回的是…

088_尚硅谷_switch使用细节(2)

088_尚硅谷_switch使用细节(2)1.switch 后也可以不带表达式, 类似多个if--else分支使用_案例演示1 2.switch 后也可以不带表达式, 类似多个if--else分支使用_案例演示2 3.switch 后也可以直接声明或定义一个变量,分号…

微软修复CVSS 10.0分高危Entra ID漏洞CVE-2025-55241 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

iText Core生成pdf的一个简单示例

使用pdfwriter创建writer对象,使用document创建文档:// 创建新的PDF文档PdfWriter writer = new PdfWriter("order_report.pdf");PdfDocument pdfDoc = new PdfDocument(writer);Document document = new …

在IntelliJ IDEA中采用Git

在IntelliJ IDEA中采用Gitpre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", &…

国产化Excel开发组件Spire.XLS教程:在Python中将Pandas DataFrame导出到Excel的详细教程

在 Python 开发中,处理表格数据是非常常见的任务,而 Pandas 是最常用的数据处理和分析库。开发者经常需要将 Pandas DataFrame 导出到 Excel,以便进行报告、团队协作或进一步的数据分析。本教程介绍如何在Python中使…

2025 年化工塑料桶生产厂家最新推荐榜:聚焦企业专利技术、品质管控及知名客户合作案例的权威解析

在化工、食品、医药等工业领域,塑料包装容器的质量直接关系到物料储存安全与运输效率。根据2025年最新行业调研数据,化工塑料桶市场规模已达187亿元,年均增长率5.8%。本文基于对企业产能规模、工艺专利数量、质检合…

主流CI/CD工具选型指南:助力企业实现高效交付

DevOps工具选型指南:Gitee CI/CD如何助力企业实现高效交付 在数字化转型浪潮中,DevOps已成为企业提升软件交付效率的核心方法论。作为DevOps实践的关键环节,持续集成与持续交付(CI/CD)工具的选择直接影响着企业的…

通过openwrt唤醒pc电脑

参考 https://todesk.com/helpcenter/questions-86.html 重点 网卡设置 在网卡的电源设置中,最好不要勾选准许计算机关闭此设备以节约电源 ARP绑定 局域网唤醒如果是局域网唤醒pc,完全不需要ARP绑定 比如通过openwrt…

图表控件Aspose.Diagram教程:在C#中将VSD转换为PDF

Microsoft Visio 使用VSD文件来保存专业图表、流程图和业务流程模型。但是,如果收件人未安装 Microsoft Visio,则共享 VSD 格式的 Visio 图表可能会效率低下。一种通用且便携的共享图表的方法,同时保持格式和布局不…

Gitee:中国开发者生态的数字化转型引擎

Gitee:中国开发者生态的数字化转型引擎 在中国数字经济高速发展的背景下,本土化代码托管平台正成为企业数字化转型的关键基础设施。Gitee作为国内领先的一站式DevOps平台,凭借其深度本地化服务、全流程开发工具链和…

Windows 11 24H2 堆栈防护:从功能解析到实战配置

在现代11 24H2版本中,"内核模式硬件强制堆栈保护"作为一项关键安全特性,为系统内核筑起了一道硬件级防线。本文将系统讲解该功能的防护场景、技术细节及配置方法,助你快速掌握这一安全工具。 一、防护场景…

2025年陕西省基本农田调整技术服务品牌排名前十权威解析

摘要 随着国家土地管理政策的不断完善,基本农田调整技术服务行业在2024年迎来快速发展,陕西省作为农业大省,相关服务需求持续增长。本文基于市场调研和用户反馈,整理出2025年陕西省基本农田调整技术服务品牌排名前…

2025年陕西省基本农田调整技术服务公司排名前十权威解析

摘要 随着国家土地管理政策的不断完善,基本农田调整技术服务行业在2024年迎来快速发展,尤其在陕西省,市场需求持续增长。本文基于行业数据分析和用户反馈,为您呈现2025年陕西省基本农田调整技术服务公司排名前十的…

2025.10.23 VP Record

Public NOIP Round #8 (Div. 1, 提高)从这场开始正式进入高中的 OI 训练了,回家记得写一篇 Rules 用于训练和比赛。 QOJ 7717 T1,感觉 *2000 左右,很有启发性一个题。首先暴力模拟显然是假的,这个 bitset 也就是个…