kafka的简单使用

Kafka是一个分布式的流处理平台,主要用于处理高吞吐量的实时数据流。在Kafka中,有一些重要的概念需要了解,包括以下几个:

  1. Broker:Kafka集群中的每个服务器节点被称为Broker。每个Broker负责处理一部分的消息流量,并且可以与其他Broker协作以实现高可用性和可扩展性。

  2. Topic:Topic是Kafka中消息的逻辑分类单位,类似于消息队列中的队列。每个Topic可以被分成多个Partition,每个Partition都是一个有序的消息队列。

  3. Partition:Partition是Topic的一个分区,每个Partition都是一个有序的消息队列。在Kafka中,每个Partition都被分配到一个Broker上,这个Broker就是该Partition的Leader Broker,其他的Broker则是该Partition的Follower Broker。

  4. Producer:Producer是向Kafka中发送消息的客户端。Producer可以将消息发送到指定的Topic的一个Partition中,也可以让Kafka根据一定的策略自动选择Partition。

  5. Consumer:Consumer是从Kafka中读取消息的客户端。Consumer可以订阅一个或多个Topic,并从指定的Partition中读取消息。

  6. Consumer Group:Consumer Group是一组共同消费一个或多个Topic的Consumer的集合。每个Consumer Group中的Consumer可以消费不同的Partition,但同一个Partition只能被同一个Consumer Group中的一个Consumer消费。

  7. Offset:Offset是Kafka中每个Partition中消息的唯一标识符,用于标识Consumer已经消费到哪个位置。Kafka中的Consumer可以通过指定Offset来读取指定位置的消息,也可以通过自动提交Offset来实现自动恢复。

简单的Spring Boot项目中使用Kafka的示例代码1:

  1. 引入Kafka依赖

在项目的pom.xml文件中添加Kafka依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.2</version>
</dependency>
  1. 配置Kafka

在application.properties文件中配置Kafka相关参数:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  1. 创建Kafka生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private static final String TOPIC = "my-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}
  1. 创建Kafka消费者
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void consume(String message) {System.out.println("Received message: " + message);}
}
  1. 测试Kafka

在Controller中注入KafkaProducer,发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MyController {@Autowiredprivate KafkaProducer kafkaProducer;@GetMapping("/send")public String sendMessage() {kafkaProducer.sendMessage("Hello, Kafka!");return "Message sent";}
}

启动项目,访问http://localhost:8080/send,可以在控制台看到消费者接收到的消息。

以上是一个简单的Spring Boot项目中使用Kafka的示例代码,实际使用中还需要考虑Kafka的高可用、消息序列化方式等问题。



示例代码2:

在Spring Boot项目中使用Kafka,需要引入Kafka客户端依赖,如下所示:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.1</version>
</dependency>

接下来,我们可以创建一个Kafka生产者和一个Kafka消费者。生产者用于发送消息到Kafka,消费者用于从Kafka订阅消息。以下是一个完整的Kafka示例代码:

@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}

在上面的代码中,我们使用了Spring Kafka提供的一些配置类和注解,包括@EnableKafka、ProducerFactory、KafkaTemplate、ConsumerFactory和ConcurrentKafkaListenerContainerFactory等。

在生产者中,我们创建了一个ProducerFactory对象,用于创建KafkaProducer实例。然后,我们创建了一个KafkaTemplate对象,用于发送消息到Kafka。

在消费者中,我们创建了一个ConsumerFactory对象,用于创建KafkaConsumer实例。然后,我们创建了一个ConcurrentKafkaListenerContainerFactory对象,用于订阅Kafka消息并处理消息。

下面是一个使用Kafka的完整示例代码:

@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {kafkaTemplate.send("test-topic", message);return "Message sent successfully";}@KafkaListener(topics = "test-topic", groupId = "group-id")public void consumeMessage(String message) {System.out.println("Received message: " + message);}
}

在上面的代码中,我们创建了一个RestController,其中包含一个发送消息的接口和一个消费消息的方法。在发送消息的接口中,我们使用KafkaTemplate对象发送消息到名为test-topic的Kafka主题。在消费消息的方法中,我们使用@KafkaListener注解订阅名为test-topic的Kafka主题,并处理接收到的消息。

总之,Kafka是一个高吞吐量的分布式发布订阅消息系统,可以处理大量的实时数据流。在Spring Boot项目中使用Kafka,需要引入Kafka客户端依赖,并创建一个Kafka生产者和一个Kafka消费者。在生产者中,我们使用KafkaTemplate对象发送消息到Kafka。在消费者中,我们使用@KafkaListener注解订阅Kafka主题,并处理接收到的消息。

示例代码3:
以下是一个简单的Spring Boot项目中使用Kafka的示例代码:

  1. 配置Kafka

在Spring Boot项目中,我们需要在application.properties文件中配置Kafka的相关信息,如下所示:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

这里配置了Kafka的服务器地址、消费者组ID、自动偏移重置以及生产者的序列化器。

  1. 创建Kafka生产者

在Spring Boot项目中,我们可以使用KafkaTemplate来创建Kafka生产者,如下所示:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("my-topic", message);
}

这里使用@Autowired注解来注入KafkaTemplate,然后使用send方法来发送消息到指定的主题。

  1. 创建Kafka消费者

在Spring Boot项目中,我们可以使用@KafkaListener注解来创建Kafka消费者,如下所示:

@KafkaListener(topics = "my-topic", groupId = "my-group")
public void receiveMessage(String message) {System.out.println("Received message: " + message);
}

这里使用@KafkaListener注解来监听指定的主题和消费者组,然后使用receiveMessage方法来处理接收到的消息。

以上就是一个简单的Spring Boot项目中使用Kafka的示例代码。Kafka是一个分布式消息队列系统,可以用于实现高吞吐量、低延迟的消息传递。在Spring Boot项目中,我们可以使用KafkaTemplate来创建Kafka生产者,使用@KafkaListener注解来创建Kafka消费者,从而实现消息的生产和消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/834973.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Photoshop中选区工具的应用

Photoshop中选区工具的应用 前言Photoshop中选区工具的基本操作创建选区的工具及方法选择、取消、隐藏选区选区的增加、减少选区的应用变换扩大选取与选取相似 Photoshop中采用快速选择工具来创建选区Photoshop中采用色彩范围命令来创建选区Photoshop中采用快速蒙版来创建选区P…

安全加固

目录 1.文件锁定管理 2.设置用户账户有效期 3.查看并清除命令历史记录 4.设置用户超时登出时间 5.用户切换 6.用户提权 7.禁用重启热键CtrlAltDel 8.设置单用户模式密码 9.调整BIOS引导设置 10.禁止root用户从本地登录&#xff1a; 11.禁止root用户通过ss…

大数据------JavaWeb------Tomcat(完整知识点汇总)

Web服务器——Tomcat Web服务器定义 它是一个应用程序&#xff08;软件&#xff09;&#xff0c;对HTTP协议的操作进行封装&#xff0c;使得程序员不必直接对协议进行操作&#xff0c;让Web开发更便捷 Web服务器主要功能 封装HTTP协议操作&#xff0c;简化开发将Web项目部署到…

如何免费获得进仓数据库专家认证(帮你省50块钱)

这篇文章分三个部分 50块钱解决&#xff08;全靠自己钱可能打水漂考试只有三次机会&#xff09;50块钱解决&#xff08;全靠自己考试只有三次机会。&#xff09;30块钱解决&#xff08;考试靠我&#xff0c;报名费帮你0元处理&#xff0c;要求只有在线大学生。能力有限只能考K…

春秋云镜 CVE-2022-4230

靶标介绍&#xff1a; WP Statistics WordPress 插件13.2.9之前的版本不会转义参数&#xff0c;这可能允许经过身份验证的用户执行 SQL 注入攻击。默认情况下&#xff0c;具有管理选项功能 (admin) 的用户可以使用受影响的功能&#xff0c;但是该插件有一个设置允许低权限用户…

多标签分割

https://github.com/PaddlePaddle/PaddleSeg/blob/release/2.9/configs/multilabelseg/README_cn.md

Adobe Photoshop PS 25.6.0 解锁版 (最流行的图像设计软件)

前言 Adobe Photoshop 是一款专业强大的图片处理工具&#xff0c;从照片编辑和合成到数字绘画、动画和图形设计&#xff0c;一流的图像处理和图形设计应用程序是几乎每个创意项目的核心所在。利用 Photoshop 在桌面上的强大功能&#xff0c;您可以在灵感来袭时随时随地进行创作…

3分钟快速了解VR全景编辑器

说到VR全景&#xff0c;想必大多数人都见过那种可以360旋转拖动观看的图片。虽然这种技术已经不算新鲜&#xff0c;如果你以为这就是VR全景的全部&#xff0c;那就大错特错了&#xff01; 上面看到的这种形式&#xff0c;只能算VR全景的第一层形态。现在的VR全景已经发展成为了…

Lobe Chat–在线AI对话聊天机器人,一键部署,免费开源

Lobe Chat 现代化设计的开源 ChatGPT/LLMs 聊天应用与开发框架 支持语音合成、多模态、可扩展的&#xff08;function call&#xff09;插件系统 一键免费拥有你自己的 ChatGPT/Gemini/Claude/Ollama 应用 项目演示 支持多种模型接口 支持语音输入输出 支持云端同步 丰富多彩非…

如何在电脑桌面显示此电脑

如何在电脑桌面显示此电脑 鼠标在桌面空白处点击右键展示个性化 选择主题 选择桌面图标设置

Sqli-labs第五,六关

目录 首先找到他们的闭合方式 操作 总结&#xff1a; 第五关根据页面结果得知是字符型但是和前面四关还是不一样是因为页面虽然有东西。但是只有对于请求对错出现不一样页面其余的就没有了。这个时候我们用联合注入就没有用&#xff0c;因为联合注入是需要页面有回显位。如果…

OpenCompass笔记

假设一个模型&#xff0c;被2bit量化&#xff0c;然后一直瞎说话&#xff0c;怎么办&#xff1f;你是不是应该评估一下这个模型的效果&#xff1f; 但是&#xff0c;大模型的评估是很复杂的&#xff0c;如果说小模型的测试就像体检&#xff0c;指标明确&#xff0c;那么大模型…

数据结构_顺序表中基本操作的实现_代码

学习笔记&#xff0c;仅供参考 1.头文件 2.初始化 3.增加值 4.根据下标取值 5.查找 6.插入 7.删除 8.动态增加数组的长度 9.所有代码 10.运行结果 1.头文件 //顺序表的实现——动态分配 #include<stdio.h> #include<stdlib.h> #define InitSize 10 type…

国产银河麒麟V10SP1系统下搭建TiDB数据库操作步骤图文

开发目的&#xff1a;在国产银河麒麟系统中搭建TiDB数据库运行环境。 开发工具&#xff1a;银河麒麟系统V10SP1TiDBMySql数据库8.0。 具体步骤&#xff1a; 1、在VmWare虚拟机中安装好国产银河麒麟V10Sp1操作系统。 2、打开终端命令&#xff0c;安装TiDB相关软件&#xff1…

LearnOpenGL(十一)之光源

一、投光物 将光投射(Cast)到物体的光源叫做投光物(Light Caster)。 二、平行光 当一个光源处于很远的地方时&#xff0c;来自光源的每条光线就会近似于互相平行&#xff0c;我们可以称这些光为平行光。当我们使用一个假设光源处于无限远处的模型时&#xff0c;它就被称为定向…

开源AlphaFold3来啦!快来亲自尝试预测蛋白质结构!

引言 随着AlphaFold2的显著成就&#xff0c;DeepMind的AlphaFold3引发了科学界的广泛关注。尽管官方尚未开源AlphaFold3的代码&#xff0c;一些社区开发者已开始基于现有的科学论文尝试复现。本文将介绍如何使用一个名为AlphaFold3复现项目的GitHub代码仓库来进行蛋白质结构预…

CTF—AWD防御起手式

前言 AWD (Attack With Defence)&#xff0c;比赛中每个队伍维护多台服务器&#xff0c;服务器中存在多个漏洞&#xff0c;利用漏洞攻击其他队伍可以进行得分&#xff0c;修复漏洞可以避免被其他队伍攻击失分。 改SSH密码 官方在给出服务器密码时&#xff0c;很有可能是默认…

.NET WebService \ WCF \ WebAPI 部署总结 以及 window 服务 调试

一、webservice 部署只能部署IIS上&#xff0c; 比较简单&#xff0c;就不做说明了 二、 WCF 部署 1 部署到IIS 跟部署 webservice 部署方法一样的 wcf 部署2 部署到控制台 要以管理员运行vs&#xff0c;或者 管理员运行 控制台的exe 在控制器项目中 创建IUserInfoService 接口…

资源管理游戏模版进入The Sandbox

我们非常高兴地向您介绍 Game Maker 的最新模板&#xff1a;资源管理游戏&#xff01; 这一全新的模板让您能够深入身临其境的游戏体验中&#xff0c;同时掌握令人兴奋的新机制。通过揭开模板的神秘面纱&#xff0c;您可以锤炼您的游戏设计技能。 什么是资源管理游戏&#xff1…

Hive Aggregation 聚合函数

Hive Aggregation 聚合函数 基础聚合 增强聚合