卡夫卡如何分区_通过分区在卡夫卡实现订单担保人

卡夫卡如何分区

Kafka最重要的功能之一是实现消息的负载平衡,并保证分布式集群中的排序,否则在传统队列中是不可能的。

首先让我们尝试了解问题陈述

让我们假设我们有一个主题,其中发送消息,并且有一个消费者正在使用这些消息。
如果只有一个使用者,它将按消息在队列中的顺序或发送的顺序接收消息。

现在,为了获得更高的性能,我们需要更快地处理消息,因此我们引入了消费者应用程序的多个实例。

如果消息包含任何状态,则将导致问题。

让我们尝试通过一个例子来理解这一点:

如果对于特定的消息ID,我们有3个事件:
第一:创建
第二:更新 第三:删除 我们要求仅在消息的“创建”事件之后才处理消息的“更新”或“删除”事件。 现在,如果两个单独的实例几乎同时获得同一消息的“ CREATE”和“ UPDATE”,则即使另一个实例完成“ CREATE”消息之前,带有“ UPDATE”消息的实例仍有机会尝试对其进行处理。 。 这可能是一个问题,因为使用者将尝试更新尚未创建的消息,并且将引发异常,并且此“更新”可能会丢失。

可能的解决方案

我想到的第一个解决方案是数据库上的乐观锁,这可以防止这种情况,但是随后需要适应异常情况。 这不是一个非常简单的方法,可能涉及更多的锁定和要处理的并发问题。

另一个更简单的解决方案是,如果特定ID的消息/事件总是转到特定实例,因此它们将是有序的。 在这种情况下,CREATE将始终在UPDATE之前执行,因为这是发送它们的原始顺序。

这就是卡夫卡派上用场的地方。

Kafka在主题中具有“分区”的概念,该概念既可以提供订购保证,又可以在整个消费者流程中提供负载平衡。

每个分区都是有序的,不可变的消息序列,这些消息连续地附加到提交日志中。 分区中的每个消息均分配有一个顺序ID号,称为偏移量,该ID唯一地标识分区中的每个消息。

因此,一个主题将具有多个分区,每个分区保持各自的偏移量。
现在,要确保将具有特定id的事件始终转到特定实例,可以执行以下操作:如果我们将每个使用者与特定分区绑定在一起,然后确保具有特定id的所有事件和消息始终转到特定实例,特定分区,因此它们始终由同一使用者实例使用。

为了实现此分区,Kafka客户端API为我们提供了两种方法:
1)定义用于分区的键,该键将用作默认分区逻辑的键。
2)编写一个Partitioning类来定义我们自己的分区逻辑。

让我们探索第一个:

默认分区逻辑

默认的分区策略是hash(key)%numPartitions 。 如果键为null,则选择一个随机分区。 所以,如果我们要为分区键是一个特定属性,我们需要将它传递在ProducerRecord构造而从发送消息Producer

让我们来看一个例子:

注意:要运行此示例,我们需要具备以下条件:
1.运行Zookeeper(在localhost:2181)
2.运行Kafka(位于localhost:9092) 3.创建一个带有3个分区的名为“ TRADING-INFO”的主题。(为简单起见,我们可以只有一个代理。) 要完成以上三个步骤,请遵循此处的文档。

假设我们正在发送有关“ TRADING-INFO”主题的交易信息,该信息由消费者消费。

1.贸易舱

(注意:我在这里使用过Lombok )

@Data
@Builder
public class Trade {private String id;private String securityId;private String fundShortName;private String value;
}

2. Kafka客户端依赖

为了制作一个Kafka Producer,我们需要包含Kafka依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.10.0.0</version></dependency>

卡夫卡制片人

public class Producer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5);Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10);Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15);ExecutorService executorService = Executors.newFixedThreadPool(3);executorService.submit(task1);executorService.submit(task2);executorService.submit(task3);executorService.shutdown();}private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {for (int i = idStart; i <= idEnd; i++) {Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build();try {String s = new ObjectMapper().writeValueAsString(trade);kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));System.out.println("Sending to " + topic + "msg : " + s);} catch (JsonProcessingException e) {e.printStackTrace();}}}private static Properties getProducerProperties() {Properties props = new Properties();String KAFKA_SERVER_IP = "localhost:9092";props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return props;}}

消费者

public class TConsumer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";final String CONSUMER_GROUP_ID = "consumer-group";KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID));kafkaConsumer.subscribe(Arrays.asList(TOPIC));while(true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);consumerRecords.forEach(e -> {System.out.println(e.value());});}}private static Properties getConsumerProperties(String consumerGroupId) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", consumerGroupId);props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());return props;}
}

由于我们有3个分区,因此我们将运行3个Consumer实例。

现在,当我们使用不同的线程运行生产者时,生成具有3种“安全类型”消息的消息,这是我们的关键。 我们将看到,特定的实例总是迎合特定的“安全类型”,因此将能够按顺序处理消息。

产出

消费者1:

{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"}

消费者2:

{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}

消费者3:

{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}

因此,这里的3种类型的“ securityIds”生成了不同的哈希值,因此被分配到了不同的分区中,从而确保一种交易总是去往特定的实例。

现在,如果我们不想使用默认的分区逻辑并且我们的场景更加复杂,我们将需要实现自己的Partitioner,在下一个博客中,我将解释如何使用它以及它如何工作。

翻译自: https://www.javacodegeeks.com/2016/08/achieving-order-guarnetee-kafka-partitioning.html

卡夫卡如何分区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335702.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全景视觉空间直线检测_视觉SLAM深度解读

近年来&#xff0c;SLAM技术取得了惊人的发展&#xff0c;领先一步的激光SLAM已成熟的应用于各大场景中&#xff0c;视觉SLAM虽在落地应用上不及激光SLAM&#xff0c;但也是目前研究的一大热点&#xff0c;今天我们就来详细聊聊视觉SLAM的那些事儿。视觉SLAM是什么&#xff1f;…

HTTP网页请求响应的状态码/状态代码

文章目录1开头 询问客户端是否还有请求消息2开头 &#xff08;请求成功&#xff09;表示成功处理了请求的状态代码3开头 &#xff08;请求被重定向&#xff09;表示要完成请求&#xff0c;需要进一步操作。 通常&#xff0c;这些状态代码用来重定向4开头 &#xff08;请求错误&…

某人想在h小时内钓到_为某人命名以重新连接到您的服务器

某人想在h小时内钓到在进行测试自动化时&#xff0c;通常需要知道当前计算机的名称&#xff0c;以提示另一台计算机连接到它&#xff0c;特别是在并行运行测试的情况下。 本周&#xff0c;我试图对服务器进行测试&#xff0c;以使其连接回在从属测试计算机上运行的WireMock服务…

对象必须实现 iconvertible。_java面向对象最全入门笔记(通俗易懂,适合初学者)...

前言:面向对象的三大特征封装 (Encapsulation)继承 (Inheritance)多态 (Polymorphism)编程思想&#xff1a;面向过程&#xff1a;做某件事情都需要自己亲历亲为&#xff0c;按照步骤去完成面向对象&#xff1a;做某件事情不需要自己亲历亲为&#xff0c;只需指定特定的对象去完…

C++ 【随想录】(一)模拟矩阵相乘

#include<iostream> using namespace std; const int N 105; int R1[N][N], R2[N][N], ans[N][N];int main() {int m, p, n;cin >> m >> p >> n;for (int i 0; i < m; i) {//录入第一个矩阵for (int j 0; j < p; j) {cin >> R1[i][j];}…

HH SaaS电商系统的结算系统设计

文章目录一、第三方卖家销售货款结算流程说明&#xff1a;销售货款结算逻辑流程图预期收入如何统计二、第三方卖家推广分销佣金结算佣金结算逻辑流程图逾期佣金收入如何统计三、采购货款结算租户采购货款结算流程说明商城采购货款结算流程说明店铺商家的采购货款结算流程说明四…

java8并行流_Java 8:CompletableFuture与并行流

java8并行流这篇文章展示了Java 8的CompletableFuture在执行异步计算时如何与并行流进行比较。 我们将使用以下类对长时间运行的任务进行建模&#xff1a; class MyTask {private final int duration;public MyTask(int duration) {this.duration duration;}public int calc…

postgresql删除索引_PostgreSQL 13 发布,索引和查找有重大改进

9月24日&#xff0c;PostgreSQL全球开发组宣布PostgreSQL 13正式发布&#xff0c;作为世界上使用最多的开源数据库之一&#xff0c;PostgresSQL 13是目前的最新版本。PostgreSQL 13 在索引和查找方面进行了重大改进&#xff0c;有利于大型数据库系统&#xff0c;改进包括索引的…

【WebRTC---源码篇】(十六)WebRTC/NetEQ

WebRTC【4096版本】 NetEQ的作用 进行抖动控制和丢包隐藏,通过该技术可以让音频更平滑 NetEQ插入packet数据 int NetEqImpl::InsertPacketInternal(const RTPHeader& rtp_header,rtc::ArrayView<const uint8_t> payload) {//如果有效荷载payload为空if (paylo…

HH SaaS电商系统的销售订单设计

文章目录订单销售类型订单优惠优惠方式子订单优惠金额订单拆单订单发货销售订单拆单逻辑图销售订单的信息结构相关实体订单运营类型&#xff08;作废&#xff09;售后截止时间订单状态状态机的设计不同属性组合下的订单状态组合1&#xff1a;实物线上非预售非定制非拼单快递组合…

kafka netty_惠而浦:使用Netty和Kafka的微服务

kafka netty介绍 在上一个博客中 &#xff0c;我介绍了Netty用作Web服务器。 该示例运行良好……只要需要广播服务器即可。 大多数情况下不是很有用。 更有可能的是&#xff0c;每个客户端仅接收针对其的数据&#xff0c;并保留了特殊情况下的广播&#xff0c;例如“服务器在1…

HH SaaS电商系统的拼团系统设计

文章目录拼团业务流程拼团单实体拼团单的状态拼团型订单的销售状态注意点拼团业务流程 创建活动 管理后台创建拼团活动&#xff0c;设置好活动有效期、成团人数、成团时效、限购数量、活动对象、添加活动商品&#xff0c;设置团长价和团员价&#xff0c;活动开始后活动对象在买…

统信uos系统考试题_离Windows更近一步!微信Linux原生版上线:国产统信UOS系统已适配...

就在本月11号&#xff0c;国产操作系统-统信 UOS发布了专业版 V20(1030)&#xff0c;功能更强大&#xff0c;同时性能和安全性均有所提升。而距离这个好消息过去没多久&#xff0c;统信软件官方再次发布了一个好消息&#xff1a;那就是微信桌面客户端(统信 UOS 版)研发完成&…

npm 引用子项目模块_Java / Web项目中的NPM模块Browser-Sync

npm 引用子项目模块Browser-Sync是一个方便的基于Node.js的NPM模块&#xff0c;可用于更快的Web开发。 浏览器同步可在许多设备之间同步文件更改和交互。 最重要的功能是实时重新加载。 我们也可以在Java / Web项目中使用Browser-Sync。 Cagatay Civici创造了一个伟大的 视频教…

电商系统的自提订单,提货流程如何设计

文章目录产生自提订单的场景非O2O模式平台的自提点和O2O模式平台的自提点区别提货核销二维码核销提货码核销产生自提订单的场景 自提订单通常在O2O场景下会涉及到&#xff0c;所以通常是在O2O店铺会产生这样的订单&#xff0c;当然B2C模式的平台也会产生自提订单&#xff0c;例…

xrd精修教程_XRD精修系列干货 | 带你领略晶体之美

1. XRD精修教程(一)——XRD精修基本原理与GSAS软件简介(附GSAS软件下载链接)点击上图即可查看全文X射线衍射分析(XRD)在研究材料的相结构、相成分等多个方面有广泛的应用&#xff0c;但最常用的多晶衍射法有个缺点&#xff1a;得到的谱峰重叠严重&#xff0c;从而造成大量材料结…

【WebRTC---源码篇】(十八)GoogREMB算法

GoogREMB是基于延时的接收端拥塞控制算法,主要包括以下四个部分: 1.RemoteBitrate Estimator ,是接收端延时拥塞控制算法的管理模块。一方面与外面模块打交道,从网络收/发模块获取RTP包的传输信息用于拥塞评估,或将内部评估的下一时刻的发送码率(大小)输出给网络收/发模…

电商系统的O2O业务模式设计

O2O商业模式简述 O2O模式的平台才会产生配送订单&#xff08;即外卖订单&#xff09;&#xff0c;配送和快递的业务意义不同&#xff0c;配送强调的是“短距离”的派送服务&#xff0c;这正好符合O2O商业模式的核心诉求。 O2O服务的是周边用户人群&#xff0c;满足客户付款后…

cloud foundry_Cloud Foundry Java客户端–流事件

cloud foundryCloud Foundry Java客户端提供了基于Java的绑定&#xff0c;用于与正在运行的Cloud Foundry实例进行交互。 该项目的一件整洁的事情是&#xff0c;它已经接受了基于Reactive Stream的API的方法签名&#xff0c;特别是使用Reactor实现&#xff0c;这在使用流数据时…