徐州网站建设石家庄百度推广家庄网站建设
news/
2025/9/29 3:25:46/
文章来源:
徐州网站建设,石家庄百度推广家庄网站建设,品牌营销方案,企业网站建设分析目录 1.安装kafka
2.安装kafkamanager可视化工具
3.springboot整合kafka
1.pom导包
2.启动类和yml配置
3.代码演示
编写生产者#xff1a;
消费者#xff1a; 1.安装kafka
进入kafka官网下载对应版本kafka
kafka官网地址#xff1a;Apache Kafka
kafka是使用Scal…目录 1.安装kafka
2.安装kafkamanager可视化工具
3.springboot整合kafka
1.pom导包
2.启动类和yml配置
3.代码演示
编写生产者
消费者 1.安装kafka
进入kafka官网下载对应版本kafka
kafka官网地址Apache Kafka
kafka是使用Scala开发所以版本号是由 Scala的版本号和Kafka版本号组成的如kafka_2.12-3.2.0 2.12是scala版本 3.2.0是kafka版本下载完成解压得到kafka目录结构如下 结构介绍 bin kafka的执行脚本 其中包括启动kafka的脚本kafka-server-start.bat 和 zookeeper-server-start.bat 启动zookeeper的脚本(kafka内置有zookeeper) bin/windows 目录中的脚本是针对windows平台。 config : 配置文件目录 包括server.properties kafka的配置 zookeeper.properties zookeeper的配置 producer.properties生产者的配置 consumer.properties 消费者的配置等等。 libs : 依赖的三方jar包 可以进入config文件夹修改kafka和zookeeper配置文件 zookeeper.properties是作为zookeeper的配置文件dataDir为数据目录clientPort为启动端口比如你想修改zookeeper的默认端口通过配置文件修 clientPort2181项即可 如下 server.properties作为kafka的配置文件我们关注下面几个配置你也可以根据情况进行修改 broker.id 0 : 如果是做个多个kafka主机集群那么brocker.id不能重复0 1 2 增长 zookeeper.connect : zookeeper的地址 如果有多个zk就用逗号隔开配置多个地址 num.partions 1 : 默认partions 数量默认为1 log.dirs : 日志目录不建议放到tmp临时目录一定要修改如log.dirsd:/kafka-logs 在安装目录下运行cmd使用命令
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
启动zookeeper 使用命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
启动kafka 2.安装kafkamanager可视化工具
进入git官网下载kafka-manager 项目地址https://github.com/yahoo/kafka-manager
可以直接下载release版本 下载好后需要解压然后对原始文件进行编译编译完成后会的得到一个kafka-manager-1.3.3.23.zip文件解压这个文件之后才能启动manager
这里建议大家直接下载编译好的mangaer地址
链接https://pan.baidu.com/s/1oEC2XlPtlSZmOotPYGjpOQ?pwdjne1 提取码jne1
解压好后得到如下结构 使用bin\kafka-manager命令启动kafka-manager 启动完成之后访问:http://localhost:9000/ 可以看到kafkaManager主页 第一次进入需要新建 Cluster
输入集群的名字如Kafka-Cluster-1和 Zookeeper 服务器地址如localhost:2181选择最接近的Kafka版本如0.8.1.1 注意如果没有在 Kafka 中配置过 JMX_PORT千万不要选择第一个复选框。 Enable JMX Polling 如果选择了该复选框Kafka-manager 可能会无法启动。 以下全使用默认设置 点击进入刚刚创建的集群即可看到如下结构 点击topics可以看到所有创建的topic主题brokers则代表所有集群内的kafka服务有几个服务就会显示几个broker点击topics可以进入查看topic 进入test_topic 相关参数和使用教程文档可以参考这个大佬的文章Kafka可视化管理工具kafka-manager部署安装和使用_kafka manager-CSDN博客
3.springboot整合kafka
1.pom导包
创建一个maven结构的springboot项目首先在pom中导入如下依赖
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.2.5.RELEASE/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.28/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependency!-- swagger --dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependency/dependencies
2.启动类和yml配置
导入依赖之后需要为SpringBoot创建启动类在启动类中我们通过注解的方式创建一个Topic如下
SpringBootApplication
public class KafKaApplication {private static final String TOPIC_NAME kafka_test_topic;public static void main(String[] args) {SpringApplication.run(KafKaApplication.class);}//通过定义Bean的方式创建TopicBeanpublic NewTopic topicHello(){//创建Topic : topic名字 partition数量 , replicas副本数量return TopicBuilder.name(TOPIC_NAME).build();}
}
在yml中对kafka做一些常规配置如下
server:port: 12012
spring:application:name: application-kafkakafka:bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer #键序列化value-serializer: org.apache.kafka.common.serialization.StringSerializer #值序列化retries: 1 # 消息发送重试次数# acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。# acksall 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 #批量大小properties:linger:ms: 0 #提交延迟consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #键序列化value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值序列化group-id: test-consumer-group #消费者的ID这个对应 config/consumer.properties中的group.id更详细的全局配置以及说明 spring : kafka : bootstrap-servers : 192.168.10.70 : 9092 producer : # 发生错误后消息重发的次数。 retries : 0 # 当有多个消息需要被发送到同一个分区时生产者会把它们放在同一个批次里。该参数指定了一个批 次可以使用的内存大小按照字节数计算。 batch-size : 16384 # 设置生产者内存缓冲区的大小。 buffer-memory : 33554432 # 键的序列化方式 key-serializer : org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer : org.apache.kafka.common.serialization.StringSerializer # acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。 # acksall 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功 响应。 acks : 1 consumer : # 自动提交的时间间隔 在 spring boot 2.X 版本中这里采用的是值的类型为 Duration 需要符合 特定的格式如 1S,1M,2H,5D auto-commit-interval : 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理 # latest 默认值在偏移量无效的情况下消费者将从最新的记录开始读取数据在消费者启动之 后生成的记录 # earliest 在偏移量无效的情况下消费者将从起始位置读取分区的记录 auto-offset-reset : earliest # 是否自动提交偏移量默认值是 true, 为了避免出现重复数据和数据丢失可以把它设置为 false, 然后手动提交偏移量 enable-auto-commit : false # 键的反序列化方式 key-deserializer : org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer : org.apache.kafka.common.serialization.StringDeserializer # 配置使用默认的消费组 ID group-id : defaultConsumerGroup listener : # 在侦听器容器中运行的线程数。 concurrency : 5 #listner 负责 ack 每调用一次就立即 commit ack-mode : manual_immediate missing-topics-fatal : false 3.代码演示
编写生产者
编写生产者案例 Kafka提供了 KafkaTemplate 用来向Kafka发送消息直接在查询中注入即可使用。KafkaTemplate提供了很多个重载的send方法方法返回ListenableFuture对象即发送的结果对象。
同步阻塞
需要特别注意的是 future.get()方法会阻塞他会一直尝试获取发送结果如果Kafka迟迟没有返回发送结果那么程序会阻塞到这里。所以这种发送方式是同步的。
当然如果你的消息不重要允许丢失你也可以直接执行 kafkaTemplate.send 不调用get()方法获取发送结果程序就不会阻塞当然你也就不知道消息到底有没有发送成功。
异步非阻塞
幸好Kafka为 ListenableFuture 提供了Callback异步回调我们可以通过异步回调来接收发送结果
RestController(/producer)
Api(tags 生产者示例接口, description 生产者示例接口 | 消息发送测试接口, hidden false)
public class ProducerContrller {private static final String TOPIC_NAME kafka_test_topic;Autowiredprivate KafkaTemplateObject,Object kafkaTemplate;/*** 同步阻塞消息队列* param msg* return* throws ExecutionException* throws InterruptedException*/PostMapping(/sendSyncMsg/{msg})ApiOperation(value 生产者生成数据, notes 同步阻塞消息队列)ApiImplicitParams({ApiImplicitParam(name msg, value 需要发送的数据, required true, dataType String),})public String sendSyncMsg(PathVariable(msg)String msg) throws ExecutionException, InterruptedException {ListenableFutureSendResultObject, Object future kafkaTemplate.send(TOPIC_NAME, msg);System.out.println(发送结果future.get().toString());return 发送成功;}/*** 异步非阻塞消息队列* param msg* return* throws ExecutionException* throws InterruptedException*/PostMapping(/sendAsyncMsg/{msg})ApiOperation(value 生产者生成数据, notes 异步非阻塞消息队列)ApiImplicitParams({ApiImplicitParam(name msg, value 需要发送的数据, required true, dataType String),})public String sendAsyncMsg(PathVariable(msg)String msg) throws ExecutionException, InterruptedException {ListenableFutureSendResultObject, Object future kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallbackSendResultObject, Object() {Overridepublic void onFailure(Throwable ex) {ex.getStackTrace();}Overridepublic void onSuccess(SendResultObject, Object result) {System.out.println(发送结果result);}});return 发送成功;}}
也有原生的使用KafkaProducer的方式创建生产者发送消息这样的好处是可以灵活配置不需要每次对kafka配置修改后就要重启服务
以下是代码示例
/*** 原生构建KafkaProducer的生产者方法接口** param msg* return*/PostMapping(/sendMsgByProducer/{msg})ApiOperation(value 生产者生成数据, notes 原生构建KafkaProducer的生产者方法接口)ApiImplicitParams({ApiImplicitParam(name msg, value 需要发送的数据, required true, dataType String),})public String sendMsgByProducer(PathVariable(msg) String msg){// 创建一个 Map或Properties 对象用于构建 Kafka 生产者的配置信息
// Properties map new Properties();Map map new HashMap();// 这个是kafka的地址,对应你server.properties中配置的map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 键序列化map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 值序列化map.put(ProducerConfig.RETRIES_CONFIG, 1); // 设置重试次数map.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 设置重试间隔if (false) {// 添加ssl认证String userName ;String passWord ;map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);map.put(SaslConfigs.SASL_MECHANISM, PLAIN);map.put(SaslConfigs.SASL_JAAS_CONFIG,org.apache.kafka.common.security.plain.PlainLoginModule required username\ userName \ password\ passWord \;);}// 创建 KafkaProducer 对象 kafkaProducer并传入配置信息 mapKafkaProducer kafkaProducer new KafkaProducer(map);// 创建要发送的消息 ProducerRecord同时订阅主题和需要发送的内容ProducerRecordString, String record new ProducerRecord(TOPIC_NAME, msg);// 1.不需要回调的消息发送方式kafkaProducer.send(record);// 创建一个 CompletableFuture 对象 future 用于异步处理发送消息的结果CompletableFutureObject future new CompletableFuture();try {// 2.需要回调的消息发送方式kafkaProducer.send(record, (data, exception) - {if (exception null) {System.out.println(String.format(Message sent successfully! Topic: {} Partition: {} Offset: {}, data.topic(), data.partition(), data.offset()));future.complete(消息投递成功无异常); // 成功时完成future} else {System.out.println(String.format(Error sending message: exception.getMessage(), exception));future.completeExceptionally(exception); // 错误时传递异常}});}catch (Exception e){e.printStackTrace();} finally {// 关闭生产者通道释放资源kafkaProducer.flush();kafkaProducer.close();}return 发送成功;}
消费者
使用KafkaListener注释来接收消息用法比较简单实例如下
Component
public class HelloConsumer {KafkaListener(topics kafka_test_topic)public void handle(ConsumerRecord consumerRecord) {System.out.println(消费者消费消息 consumerRecord);System.out.println(String.format(消费者收到消息,topic:%s,partition:%s, consumerRecord.topic(), consumerRecord.partition()));System.out.println(消费内容 consumerRecord.value());}//消费消息的时候给方法添加 Acknowledgment 参数用来签收消息KafkaListener(topics kafka_test_topic, containerFactory kafkaManualAckListenerContainerFactory)public void handler(String message, Acknowledgment ack){System.out.println(收到消息message);//确认收到消息ack.acknowledge();}
}
也有原生的使用KafkaProducer的方式创建生产者发送消息的示例
使用while循环来保证达到与注解的方式相同的实时接收消息的相同的功能,这样的好处是可以灵活配置可以每次订阅多个不同的topic不使用的topic可以直接释放掉
RestController
Api(tags 消费者示例接口, description 消费者示例接口 | 消费消息测试接口, hidden false)
public class ConsumerContrller {PostMapping(/useMsg)ApiOperation(value 消费者消费数据, notes 消费者消费消息数据)public String useMsg(){Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);props.put(ConsumerConfig.GROUP_ID_CONFIG, test-group);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Collections.singletonList(kafka_test_topic));try {while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}}} catch (WakeupException e) {// Ignore exception for shutdown} finally {consumer.close();}return 消费成功;}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/921394.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!