通过分区在Kafka中实现订单保证人

Kafka最重要的功能之一是实现消息的负载平衡,并保证分布式集群中的排序,否则传统队列中将无法实现。

首先让我们尝试了解问题陈述

让我们假设我们有一个主题,其中发送消息,并且有一个消费者正在使用这些消息。
如果只有一个使用者,它将按消息在队列中的顺序或发送的顺序接收消息。

现在,为了获得更高的性能,我们需要更快地处理消息,因此我们引入了消费者应用程序的多个实例。

如果消息包含任何状态,则将导致问题。

让我们尝试通过一个例子来理解这一点:

如果对于特定的消息ID,我们有3个事件:
第一:创建
第二:更新 第三:删除 我们要求仅在消息的“创建”事件之后才处理消息的“更新”或“删除”事件。 现在,如果两个单独的实例几乎同时获得相同消息的“ CREATE”和“ UPDATE”,则即使另一个实例完成“ CREATE”消息之前,带有“ UPDATE”消息的实例仍有机会尝试对其进行处理。 。 这可能是一个问题,因为使用者将尝试更新尚未创建的消息,并且将引发异常,并且此“更新”可能会丢失。

可能的解决方案

我想到的第一个解决方案是对数据库的乐观锁定,这可以防止这种情况,但是随后需要适应异常情况。 这不是一个非常简单的方法,可能涉及更多的锁定和要处理的并发问题。

另一个更简单的解决方案是,如果特定ID的消息/事件总是转到特定实例,因此它们将是有序的。 在这种情况下,CREATE将始终在UPDATE之前执行,因为这是发送它们的原始顺序。

这就是卡夫卡派上用场的地方。

Kafka在主题内具有“分区”的概念,该概念既可以提供订购保证,又可以在整个消费者流程中提供负载平衡。

每个分区都是有序的,不可变的消息序列,该消息序列被连续附加到提交日志中。 分区中的每个消息均分配有一个顺序ID号,称为偏移量,它唯一地标识分区中的每个消息。

因此,一个主题将具有多个分区,每个分区都保持自己的偏移量。
现在,要确保将具有特定id的事件始终转到特定实例,可以执行以下操作:如果将每个使用者与特定分区绑定,然后确保具有特定id的所有事件和消息始终转到特定实例,则可以完成此操作。特定分区,因此它们始终由同一使用者实例使用。

为了实现此分区,Kafka客户端API为我们提供了两种方法:
1)定义用于分区的键,该键将用作默认分区逻辑的键。
2)编写一个Partitioning类来定义我们自己的分区逻辑。

让我们探索第一个:

默认分区逻辑

默认的分区策略是hash(key)%numPartitions 。 如果键为null,则选择一个随机分区。 所以,如果我们要为分区键是一个特定属性,我们需要将它传递在ProducerRecord构造而从发送消息Producer

让我们来看一个例子:

注意:要运行此示例,我们需要具备以下条件:
1.运行Zookeeper(在localhost:2181)
2.运行Kafka(位于localhost:9092) 3.创建一个带有3个分区的名为“ TRADING-INFO”的主题。(为简单起见,我们可以只有一个代理。) 要完成以上三个步骤,请遵循此处的文档。

假设我们正在发送有关“ TRADING-INFO”主题的交易信息,该信息由消费者消费。

1.贸易舱

(注意:我在这里使用过Lombok )

@Data
@Builder
public class Trade {private String id;private String securityId;private String fundShortName;private String value;
}

2. Kafka客户端依赖

为了制作一个Kafka Producer,我们需要包含Kafka依赖项:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>0.10.0.0</version></dependency>

卡夫卡制片人

public class Producer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";KafkaProducer kafkaProducer = new KafkaProducer(getProducerProperties());Runnable task1 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ABCD", 1, 5);Runnable task2 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "PQ1234@1211111111111", 6, 10);Runnable task3 = () -> sendTradeToTopic(TOPIC, kafkaProducer, "ZX12345OOO", 11, 15);ExecutorService executorService = Executors.newFixedThreadPool(3);executorService.submit(task1);executorService.submit(task2);executorService.submit(task3);executorService.shutdown();}private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {for (int i = idStart; i <= idEnd; i++) {Trade trade = Trade.builder().id(i).securityId(securityId).value("abcd").build();try {String s = new ObjectMapper().writeValueAsString(trade);kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));System.out.println("Sending to " + topic + "msg : " + s);} catch (JsonProcessingException e) {e.printStackTrace();}}}private static Properties getProducerProperties() {Properties props = new Properties();String KAFKA_SERVER_IP = "localhost:9092";props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return props;}}

消费者

public class TConsumer {public static void main(String[] args) {final String TOPIC = "TRADING-INFO";final String CONSUMER_GROUP_ID = "consumer-group";KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(getConsumerProperties(CONSUMER_GROUP_ID));kafkaConsumer.subscribe(Arrays.asList(TOPIC));while(true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);consumerRecords.forEach(e -> {System.out.println(e.value());});}}private static Properties getConsumerProperties(String consumerGroupId) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", consumerGroupId);props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());return props;}
}

因为我们有3个分区,所以我们将运行3个Consumer实例。

现在,当我们使用不同的线程运行生产者时,生成具有3种“安全类型”消息的消息,这是我们的关键。 我们将看到,特定的实例总是迎合特定的“安全类型”,因此将能够按顺序处理消息。

产出

消费者1:

{"id":1,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":2,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":3,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":4,"securityId":"ABCD","fundShortName":null,"value":"abcd"}
{"id":5,"securityId":"ABCD","fundShortName":null,"value":"abcd"}

消费者2:

{"id":6,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":7,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":8,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":9,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}
{"id":10,"securityId":"PQ1234@1211111111111","fundShortName":null,"value":"abcd"}

消费者3:

{"id":11,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":12,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":13,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":14,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}
{"id":15,"securityId":"ZX12345OOO","fundShortName":null,"value":"abcd"}

因此,这里的3种类型的“ securityIds”生成了不同的哈希值,因此被分配到不同的分区中,从而确保一种交易类型始终用于特定实例。

现在,如果我们不想使用默认的分区逻辑,并且我们的场景更加复杂,我们将需要实现自己的Partitioner,在下一个博客中,我将解释如何使用它以及它如何工作。

翻译自: https://www.javacodegeeks.com/2016/08/achieving-order-guarnetee-kafka-partitioning.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/351209.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

破解栅栏密码python脚本

今天遇到一个要破解的栅栏密码&#xff0c;写了个通用的脚本 1 #!/usr/bin/env python2 # -*- coding: gbk -*-3 # -*- coding: utf_8 -*-4 # Author: 蔚蓝行5 # http://www.cnblogs.com/duanv6 e raw_input(请输入要解密的字符串\n)7 elen len(e)8 field[]9 for i in range(…

水稻已知os基因号,利用DAVIA进行GO功能富集分析

第1-5步&#xff1a; 已知水稻的基因&#xff08;os&#xff09;&#xff0c;进行功能注释 第6步 第七步&#xff1a; 第八步&#xff1a; 第九步&#xff1a; 第十步&#xff1a; 第十一步&#xff1a;

二,八,十,十六进制之间转换的相应方法

int num1 Integer.valueOf(n,16); //16进制转换成10进制 Integer.toHexString(Integer i); //10进制转换成16进制 补充&#xff1a;Integer.toHexString(Integer i);该方法得出的字符默认为小写&#xff0c;如果想得到大写结果&#xff0c;则变为Integer.toHexString(Integer i…

IDF实验室-图片里的英语

原题&#xff1a; 一恒河沙中有三千世界&#xff0c;一张图里也可以有很多东西。 不多说了&#xff0c;答案是这个图片包含的那句英文的所有单词的首字母。 首字母中的首字母要大写&#xff0c;答案格式是wctf{一坨首字母} 加油吧少年&#xff01;看好你哦~ writeup&#xff…

linux 终端调用MATLAB程序

linux 终端调用MATLAB程序 路径&#xff1a;/A/B/C/ 程序名称&#xff1a;xxx.m linux 终端调用MATLAB函数方法 cd /A/B/C/ matlab -nodisplay -nosplash -nodesktop -r "xxx;exit;"

2018-11-02 在代码中进行中文命名实践的短期目标

对中文命名的意义不再赘述, 请参看之前的对在代码中使用中文命名的质疑与回应. 去年中文命名实践的阻力和应对之后, 在一些小项目中继续实践了中文命名(Java/JS/Python等, 详见之前的专栏文章), 涉及领域不少但尚未形成明确的重点项目. 发现了一些在业务相关代码使用中文命名的…

Wireshark 命令行捕获数据

在 Wireshark 程序目录中&#xff0c;包含两个命令行捕获工具。这两个工具分别是 Dumpcap 和 Tshark。当不能以图形界面方式捕获数据时&#xff0c;可以在命令行使用 dumpcap 或 tshark 程序实施捕获。 一、使用 Dumpcap 捕获数据 执行 dumpcap -h 可以查看参数详情。 1、执行 …

zk ui_高级ZK:异步UI更新和后台处理–第2部分

zk ui介绍 在第1部分中&#xff0c;我展示了如何在ZK应用程序中使用服务器推送和线程来执行后台任务。 但是&#xff0c;这个简单的示例具有一个重大缺陷&#xff0c;这使其对于实际应用程序而言是一种不好的方法&#xff1a;它为每个后台任务启动了一个新线程。 JDK5引入了E…

学生管理系统 数据库版结果 查询student表中所有学生信息

1.创建school_java数据库 CREATE DATABASE schooljava; USE schooljava; CREATE TABLE student ( id INT(11), name VARCHAR(25), tel INT(11), sex VARCHAR(6) ); DESC student; java代码 package Mysql; import java.sql.Connection; import java.sql.DriverManager; imp…

如何查看思科交换机的出厂时间?

1.在交换机命令行运行show version 查看交换机的sn码 System serial number : FOC1723W0VP 2.SN码取出第四位至七位 以 FOC1723W0VP 为例 第四和第五位代表年份&#xff0c;第六和第七位代表当年的第XX周 &#xff08;范围是01至52周&#xff09; 1719962013 ; 23周 (注&…

Linux系统电脑非正常关机之后可能出现在登录界面循环的情况

Linux系统电脑非正常关机之后可能出现在登录界面循环的情况 例如&#xff1a; Ubuntu 18.04 有时会出现在登录界面循环&#xff0c;你输入密码&#xff0c;回车后又回到输入密码界面 &#xff0c;遇到这样的解决办法是直接进入命令行模式&#xff0c;然后看一下home文件夹&…

使用Spring Cloud Stream与RabbitMQ集成

在我以前的文章中&#xff0c;我写了两个系统之间非常简单的集成场景-一个生成一个工作单元&#xff0c;另一个处理该工作单元&#xff0c;以及Spring Integration如何使这种集成非常容易。 在这里&#xff0c;我将演示如何使用Spring Cloud Stream进一步简化此集成方案 我在…

ubuntu18.0.4 不能下载 libgd2-dev(ubuntu 20.04 安装perl 中GD 模块失败的解决办法)

ubuntu18.0.4 不能下载 libgd2-dev 一、错误信息&#xff1a; Unable to locate package libgd2-dev二、原因 没有对应源 到 https://packages.ubuntu.com/找对应名称 三、解决 18.04之后没有libgd2-dev sudo apt-get install libgd-dev参考&#xff1a;https://www.cnblo…

开课博客

自我介绍 对于自我介绍这篇我还是很意外的&#xff0c;个人信息不说了&#xff0c;说说自己的情况吧&#xff0c;当时大一的时候&#xff0c;感觉还是很喜欢编程的&#xff0c;个人感觉老师也挺喜欢我&#xff0c;可能大一下的时候心思多了点&#xff0c;慢慢的就没放多少心思在…

抓到一只苍蝇 writeup

题目在 http://ctf.idf.cn/index.php?ggame&marticle&aindex&id57 下载到的文件是misc_fly.pcapng&#xff0c;使用wireshark打开&#xff0c;能看到一堆tcp、http和dns协议混合的数据包&#xff0c;在上面的框里面输入http&#xff0c;让它只显示http协议的数据包…

ubuntu 20.04 安装circos

不需要自己手动按照&#xff0c;调配置 直接 sudo apt install circos一步到位

java环境变量的配置和使用

在downloads中选择JAVA DOWNLOAD进入Java下载列表 点选Accept License Agreement&#xff0c;选择Windows这一栏下载安装包 进入文件夹&#xff0c;双击应用程序根据提示进行安装&#xff0c;直至安装完成。 测试jdk是否安装成功&#xff0c;可在【开始】中搜索cmd&#xff…

perl 安装GD 出错解决方案

perl 安装GD 出错具体如下 install GD Running install for module GD Checksum for /root/.cpan/sources/authors/id/R/RU/RURBAN/GD-2.73.tar.gz ok Configuring R/RU/RURBAN/GD-2.73.tar.gz with Makefile.PL Package gdlib was not found in the pkg-config sea…

C语言学习一个月后感想

C语言学习一个月后感想 感谢李晓东老板及计算机工程师联盟的学长学姐和某神秘同级同学的辛勤指导&#xff0c;感谢宋雨田的督促和陪伴。 初识C的1、、体会 我本以为凭借瓜皮思维和花里胡哨操作可以让我熟练地学习语言&#xff0c;现在发现只是python的易入门给我的错觉。。。错…

使用Java 8处理并行数据库流

什么是并行数据库流&#xff1f; 阅读这篇文章&#xff0c;了解如何使用并行流和Speedment并行处理数据库中的数据。 在许多情况下&#xff0c;并行流可能比通常的顺序流快得多。 随着Java 8的引入&#xff0c;我们得到了期待已久的Stream库。 流的优点之一是使流并行非常容易…