徐州网站建设石家庄百度推广家庄网站建设

news/2025/9/29 3:25:46/文章来源:
徐州网站建设,石家庄百度推广家庄网站建设,品牌营销方案,企业网站建设分析目录 1.安装kafka 2.安装kafkamanager可视化工具 3.springboot整合kafka 1.pom导包 2.启动类和yml配置 3.代码演示 编写生产者#xff1a; 消费者#xff1a; 1.安装kafka 进入kafka官网下载对应版本kafka kafka官网地址#xff1a;Apache Kafka kafka是使用Scal…目录 1.安装kafka 2.安装kafkamanager可视化工具 3.springboot整合kafka 1.pom导包 2.启动类和yml配置 3.代码演示  编写生产者 消费者 1.安装kafka 进入kafka官网下载对应版本kafka kafka官网地址Apache Kafka kafka是使用Scala开发所以版本号是由 Scala的版本号和Kafka版本号组成的如kafka_2.12-3.2.0 2.12是scala版本 3.2.0是kafka版本下载完成解压得到kafka目录结构如下 结构介绍   bin kafka的执行脚本 其中包括启动kafka的脚本kafka-server-start.bat 和 zookeeper-server-start.bat 启动zookeeper的脚本(kafka内置有zookeeper) bin/windows 目录中的脚本是针对windows平台。 config : 配置文件目录 包括server.properties kafka的配置 zookeeper.properties zookeeper的配置 producer.properties生产者的配置 consumer.properties 消费者的配置等等。 libs : 依赖的三方jar包 可以进入config文件夹修改kafka和zookeeper配置文件 zookeeper.properties是作为zookeeper的配置文件dataDir为数据目录clientPort为启动端口比如你想修改zookeeper的默认端口通过配置文件修 clientPort2181项即可 如下 server.properties作为kafka的配置文件我们关注下面几个配置你也可以根据情况进行修改 broker.id 0 : 如果是做个多个kafka主机集群那么brocker.id不能重复0 1 2 增长 zookeeper.connect : zookeeper的地址 如果有多个zk就用逗号隔开配置多个地址 num.partions 1 : 默认partions 数量默认为1 log.dirs : 日志目录不建议放到tmp临时目录一定要修改如log.dirsd:/kafka-logs 在安装目录下运行cmd使用命令 .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties 启动zookeeper 使用命令 .\bin\windows\kafka-server-start.bat .\config\server.properties 启动kafka 2.安装kafkamanager可视化工具 进入git官网下载kafka-manager 项目地址https://github.com/yahoo/kafka-manager 可以直接下载release版本 下载好后需要解压然后对原始文件进行编译编译完成后会的得到一个kafka-manager-1.3.3.23.zip文件解压这个文件之后才能启动manager 这里建议大家直接下载编译好的mangaer地址 链接https://pan.baidu.com/s/1oEC2XlPtlSZmOotPYGjpOQ?pwdjne1  提取码jne1 解压好后得到如下结构 使用bin\kafka-manager命令启动kafka-manager 启动完成之后访问:http://localhost:9000/ 可以看到kafkaManager主页 第一次进入需要新建 Cluster 输入集群的名字如Kafka-Cluster-1和 Zookeeper 服务器地址如localhost:2181选择最接近的Kafka版本如0.8.1.1 注意如果没有在 Kafka 中配置过 JMX_PORT千万不要选择第一个复选框。 Enable JMX Polling 如果选择了该复选框Kafka-manager 可能会无法启动。 以下全使用默认设置 点击进入刚刚创建的集群即可看到如下结构 点击topics可以看到所有创建的topic主题brokers则代表所有集群内的kafka服务有几个服务就会显示几个broker点击topics可以进入查看topic 进入test_topic 相关参数和使用教程文档可以参考这个大佬的文章Kafka可视化管理工具kafka-manager部署安装和使用_kafka manager-CSDN博客 3.springboot整合kafka 1.pom导包 创建一个maven结构的springboot项目首先在pom中导入如下依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.2.5.RELEASE/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.28/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependency!-- swagger --dependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependency/dependencies 2.启动类和yml配置 导入依赖之后需要为SpringBoot创建启动类在启动类中我们通过注解的方式创建一个Topic如下 SpringBootApplication public class KafKaApplication {private static final String TOPIC_NAME kafka_test_topic;public static void main(String[] args) {SpringApplication.run(KafKaApplication.class);}//通过定义Bean的方式创建TopicBeanpublic NewTopic topicHello(){//创建Topic : topic名字 partition数量 , replicas副本数量return TopicBuilder.name(TOPIC_NAME).build();} } 在yml中对kafka做一些常规配置如下 server:port: 12012 spring:application:name: application-kafkakafka:bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer #键序列化value-serializer: org.apache.kafka.common.serialization.StringSerializer #值序列化retries: 1 # 消息发送重试次数# acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。# acksall 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功响应。acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 #批量大小properties:linger:ms: 0 #提交延迟consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #键序列化value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值序列化group-id: test-consumer-group #消费者的ID这个对应 config/consumer.properties中的group.id更详细的全局配置以及说明 spring : kafka : bootstrap-servers : 192.168.10.70 : 9092 producer : # 发生错误后消息重发的次数。 retries : 0 # 当有多个消息需要被发送到同一个分区时生产者会把它们放在同一个批次里。该参数指定了一个批 次可以使用的内存大小按照字节数计算。 batch-size : 16384 # 设置生产者内存缓冲区的大小。 buffer-memory : 33554432 # 键的序列化方式 key-serializer : org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer : org.apache.kafka.common.serialization.StringSerializer # acks0 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks1 只要集群的首领节点收到消息生产者就会收到一个来自服务器成功响应。 # acksall 只有当所有参与复制的节点全部收到消息时生产者才会收到一个来自服务器的成功 响应。 acks : 1 consumer : # 自动提交的时间间隔 在 spring boot 2.X 版本中这里采用的是值的类型为 Duration 需要符合 特定的格式如 1S,1M,2H,5D auto-commit-interval : 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理 # latest 默认值在偏移量无效的情况下消费者将从最新的记录开始读取数据在消费者启动之 后生成的记录 # earliest 在偏移量无效的情况下消费者将从起始位置读取分区的记录 auto-offset-reset : earliest # 是否自动提交偏移量默认值是 true, 为了避免出现重复数据和数据丢失可以把它设置为 false, 然后手动提交偏移量 enable-auto-commit : false # 键的反序列化方式 key-deserializer : org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer : org.apache.kafka.common.serialization.StringDeserializer # 配置使用默认的消费组 ID group-id : defaultConsumerGroup listener : # 在侦听器容器中运行的线程数。 concurrency : 5 #listner 负责 ack 每调用一次就立即 commit ack-mode : manual_immediate missing-topics-fatal : false 3.代码演示  编写生产者 编写生产者案例 Kafka提供了 KafkaTemplate 用来向Kafka发送消息直接在查询中注入即可使用。KafkaTemplate提供了很多个重载的send方法方法返回ListenableFuture对象即发送的结果对象。 同步阻塞 需要特别注意的是 future.get()方法会阻塞他会一直尝试获取发送结果如果Kafka迟迟没有返回发送结果那么程序会阻塞到这里。所以这种发送方式是同步的。 当然如果你的消息不重要允许丢失你也可以直接执行 kafkaTemplate.send 不调用get()方法获取发送结果程序就不会阻塞当然你也就不知道消息到底有没有发送成功。 异步非阻塞 幸好Kafka为 ListenableFuture 提供了Callback异步回调我们可以通过异步回调来接收发送结果 RestController(/producer) Api(tags 生产者示例接口, description 生产者示例接口 | 消息发送测试接口, hidden false) public class ProducerContrller {private static final String TOPIC_NAME kafka_test_topic;Autowiredprivate KafkaTemplateObject,Object kafkaTemplate;/*** 同步阻塞消息队列* param msg* return* throws ExecutionException* throws InterruptedException*/PostMapping(/sendSyncMsg/{msg})ApiOperation(value 生产者生成数据, notes 同步阻塞消息队列)ApiImplicitParams({ApiImplicitParam(name msg, value 需要发送的数据, required true, dataType String),})public String sendSyncMsg(PathVariable(msg)String msg) throws ExecutionException, InterruptedException {ListenableFutureSendResultObject, Object future kafkaTemplate.send(TOPIC_NAME, msg);System.out.println(发送结果future.get().toString());return 发送成功;}/*** 异步非阻塞消息队列* param msg* return* throws ExecutionException* throws InterruptedException*/PostMapping(/sendAsyncMsg/{msg})ApiOperation(value 生产者生成数据, notes 异步非阻塞消息队列)ApiImplicitParams({ApiImplicitParam(name msg, value 需要发送的数据, required true, dataType String),})public String sendAsyncMsg(PathVariable(msg)String msg) throws ExecutionException, InterruptedException {ListenableFutureSendResultObject, Object future kafkaTemplate.send(TOPIC_NAME, msg);future.addCallback(new ListenableFutureCallbackSendResultObject, Object() {Overridepublic void onFailure(Throwable ex) {ex.getStackTrace();}Overridepublic void onSuccess(SendResultObject, Object result) {System.out.println(发送结果result);}});return 发送成功;}} 也有原生的使用KafkaProducer的方式创建生产者发送消息这样的好处是可以灵活配置不需要每次对kafka配置修改后就要重启服务 以下是代码示例 /*** 原生构建KafkaProducer的生产者方法接口** param msg* return*/PostMapping(/sendMsgByProducer/{msg})ApiOperation(value 生产者生成数据, notes 原生构建KafkaProducer的生产者方法接口)ApiImplicitParams({ApiImplicitParam(name msg, value 需要发送的数据, required true, dataType String),})public String sendMsgByProducer(PathVariable(msg) String msg){// 创建一个 Map或Properties 对象用于构建 Kafka 生产者的配置信息 // Properties map new Properties();Map map new HashMap();// 这个是kafka的地址,对应你server.properties中配置的map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 键序列化map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 值序列化map.put(ProducerConfig.RETRIES_CONFIG, 1); // 设置重试次数map.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 设置重试间隔if (false) {// 添加ssl认证String userName ;String passWord ;map.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT);map.put(SaslConfigs.SASL_MECHANISM, PLAIN);map.put(SaslConfigs.SASL_JAAS_CONFIG,org.apache.kafka.common.security.plain.PlainLoginModule required username\ userName \ password\ passWord \;);}// 创建 KafkaProducer 对象 kafkaProducer并传入配置信息 mapKafkaProducer kafkaProducer new KafkaProducer(map);// 创建要发送的消息 ProducerRecord同时订阅主题和需要发送的内容ProducerRecordString, String record new ProducerRecord(TOPIC_NAME, msg);// 1.不需要回调的消息发送方式kafkaProducer.send(record);// 创建一个 CompletableFuture 对象 future 用于异步处理发送消息的结果CompletableFutureObject future new CompletableFuture();try {// 2.需要回调的消息发送方式kafkaProducer.send(record, (data, exception) - {if (exception null) {System.out.println(String.format(Message sent successfully! Topic: {} Partition: {} Offset: {}, data.topic(), data.partition(), data.offset()));future.complete(消息投递成功无异常); // 成功时完成future} else {System.out.println(String.format(Error sending message: exception.getMessage(), exception));future.completeExceptionally(exception); // 错误时传递异常}});}catch (Exception e){e.printStackTrace();} finally {// 关闭生产者通道释放资源kafkaProducer.flush();kafkaProducer.close();}return 发送成功;} 消费者 使用KafkaListener注释来接收消息用法比较简单实例如下 Component public class HelloConsumer {KafkaListener(topics kafka_test_topic)public void handle(ConsumerRecord consumerRecord) {System.out.println(消费者消费消息 consumerRecord);System.out.println(String.format(消费者收到消息,topic:%s,partition:%s, consumerRecord.topic(), consumerRecord.partition()));System.out.println(消费内容 consumerRecord.value());}//消费消息的时候给方法添加 Acknowledgment 参数用来签收消息KafkaListener(topics kafka_test_topic, containerFactory kafkaManualAckListenerContainerFactory)public void handler(String message, Acknowledgment ack){System.out.println(收到消息message);//确认收到消息ack.acknowledge();} } 也有原生的使用KafkaProducer的方式创建生产者发送消息的示例 使用while循环来保证达到与注解的方式相同的实时接收消息的相同的功能,这样的好处是可以灵活配置可以每次订阅多个不同的topic不使用的topic可以直接释放掉 RestController Api(tags 消费者示例接口, description 消费者示例接口 | 消费消息测试接口, hidden false) public class ConsumerContrller {PostMapping(/useMsg)ApiOperation(value 消费者消费数据, notes 消费者消费消息数据)public String useMsg(){Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);props.put(ConsumerConfig.GROUP_ID_CONFIG, test-group);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Collections.singletonList(kafka_test_topic));try {while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}}} catch (WakeupException e) {// Ignore exception for shutdown} finally {consumer.close();}return 消费成功;} }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/921394.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

观澜做网站网站建设 团队

侃侃尔雅您无需成为系统应用程序。首先,com.android.internal.telephony在您的项目中创建包,并将其放入名为“ ITelephony.aidl” 的文件中:package com.android.internal.telephony; interface ITelephony { boolean endCall(); vo…

网站备案号怎么查询东道设计考研

项目的大致需求就是做一个App,里面集成各种功能供用户使用,其中涉及到很多Vue的使用方法,单独总结太麻烦,所以通过这几篇笔记来梳理一下。原型图如下:路由配置主界面会用到一些原生App方法,比如验证用户身份…

网站seo分析常用的工具是南宁网站推广费用

大家好,我是若川。今天分享这篇,相信读完会有些收获。本文经作者授权转载,原文链接:https://juejin.cn/post/6980671091526074404个人简介19年底12月进入字节实习, 第二年7月毕业转正。到前几天正好全职一周年。进入公…

网站策划与建设阶段dedecms网站地图模板

CALDERA是一个由python语言编写的红蓝对抗工具(攻击模拟工具)。它是MITRE公司发起的一个研究项目,该工具的攻击流程是建立在ATT&CK攻击行为模型和知识库之上的,能够较真实地APT攻击行为模式。 通过CALDERA工具,安全…

北京网站建设公司哪个最好网站开发的最后五个阶段

问题来源 本周在实际项目中发现无法自定义的log4j-dev配置的error日志级别文件无法生效,项目启动后仍然采用默认的info级别日志进行打印。之所以自定义名称,是为了减少隔离不同环境的日志级别,比如开发dev环境使用debug、info级别&#xff0…

MySQL数据误删或者误更新如何恢复25-9-29

目录本篇文章适用场景一、下载MyFlash工具二、误删数据恢复先检查MySQL有没有开启binlog日志演示误删除数据利用MyFlash工具 反写SQL利用mysqlbinlog 执行反写的sql二进制文件恢复完成三、误更新数据恢复演示误更新数据…

东莞做网站 信科网络医院网站建设企业

Oracle Data Guard 参数介绍Data Guard作为Oracle提供的一个高可用及灾备解决方案,理解并可以实施它对于DBA来说是非常重要套的技能上节介绍了有关Data Guard的概念,这节将介绍相关的一些参数有的参数是做为备库角色时才生效的,会单独说出来1. DB_NAME该…

怎么注销网站备案上海网站seo排名优化

A 题:园区微电网风光储协调优化配置 摘要 在全球范围内,气候变化和环境污染问题日益严重,减少碳排放和实现可持续发展成为各国的共同目标。新能源,尤其是风能和光伏发电,因其清洁、可再生的特性,正在全球范…

网站群建设目标百度链接提交收录入口

伴随着电脑游戏和图形处理的需求不断增加,很多笔记本电脑使用者开始考虑是否能够通过外接显卡来提升性能。然而,外接显卡对于笔记本电脑是否可行,以及如何连接外接显卡,对于很多人来说仍然是一个迷。本文将为您揭秘外接显卡的奥秘…

使用 logwatch 监控系统日志

配置好 postfix。安装 logwatch: sudo apt install logwatch配置 Logwatch: sudoedit /usr/share/logwatch/default.conf/logwatch.confMailTo = example@gmail.com # 改为你 postfix 配置的发件人 MailFrom = ex…

织梦做的网站打开慢模板之家下载

人情世故是我们日常生活中积累的约定俗成的行为规则,属于社会知识的范畴。这些知识大半来源于与不同人群的社会交际,也来源于社会冲突与社会发展。在有专业知识与技能的情况下,人情世故能够帮助我们个人缓和与其他人之间的紧张度,…

西双版纳网站制作公司wordpress 汽车租赁

流程图制作在云上 : https://www.processon.com/ 转载于:https://www.cnblogs.com/hopesun/p/4661429.html

网站核验点全屋定制设计培训学校

目录 问题情况1情况2 问题 如果我们在开发过程中,存在一些验证性的提交或者失误性的提交,那么这些提交我们不想要了,怎么办? 情况1 如果是想要删除某个commitid之后的所有提交 那么git reset 可以满足你 git reset --hard 你要…

现在手机网站设计上海市各区建设局网站

信息技术应用创新产业,即信创产业,是信息化建设的核心,它涵盖了从硬件到软件的一系列关键技术。信创产业的目标是通过自主研发,减少对外部技术的依赖,增强信息安全,并提升国内产业的全球竞争力。该产业主要…

做网站界面的软件赣州小程序推荐

音视频编解码的一些源代码 (转)资料名称:音视频编解码的一些源代码 资料成文时间:不详 语言:英文 页数:很多 何人所著(来源): 文件格式:原代码 开发工具:vc 说…

如皋做网站沈阳工程信息造价网

网易2018数据分析(20道单选3道问答) 好评率是会员对平台评价的重要指标。现在需要统计2018年1月1日到2018年1月31日,用户’小明’提交的母婴类目"花王"品牌的好评率(好评率“好评”评价量/总评价量): 用户评…

鹏鸿生态板官方网站开发区代理做电商网站需要会些什么条件

点击上方蓝色字体,关注我们上传文件是互联网中应用的场景之一,最典型的情况就是上传头像。文件上传主要是将文件通过IO流传输到服务器的某一个特定的文件夹下。Why->MultipartFile?解析源码:public interface MultipartFile extends InputStreamSour…

长安网站建设详细教程oss做静态网站

1、问题: 2、解决办法: 先执行下面命令看exit在哪个头文件下面 man exit 效果如下图 加上头文件编译,问题就解决了 3、 总结 如果看到编译的时候提示wall,我们首先是找到报警搞的函数,再用man 命令来 man 函数,然后找…

建设银行网站登录没反应盐城做网站哪家好

Scala第十七章节 scala总目录 文档资料下载 章节目标 了解集合的相关概念掌握Traversable集合的用法掌握随机学生序列案例 1. 集合 1.1 概述 但凡了解过编程的人都知道程序 算法 数据结构这句话, 它是由著名的瑞士计算机科学家尼古拉斯沃斯提出来的, 而他也是1984年图灵…

爱站网 关键词挖掘工具站长工具wordpress博客数据放在哪里的

微信小程序开发入门之共享账本(十四)(备注:微信小程序的wxml文件相当于HTML文件,wxss文件相当于CSS文件,js文件就是JavaScript文件,数据库为NoSQL数据库,数据库脚本语言也同NoSQL,因为是运行在微信内&#…