Kafka分区策略实现

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用随机分区器的生产者示例
    public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();}
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定义分区逻辑,这里简单示例根据消息值的长度分区String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用自定义分区器的生产者示例
    public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/894506.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++ stl 遍历算法和查找算法

概述&#xff1a; 算法主要由头文件<algorithm> <functional> <numeric> 提供 <algorithm> 是所有 STL 头文件中最大的一个&#xff0c;提供了超过 90 个支持各种各样算法的函数&#xff0c;包括排序、合并、搜索、去重、分解、遍历、数值交换、拷贝和…

2.2 实现双向链表的快速排序

实现一个双向链表的快速排序。 1>程序代码 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h>…

力扣动态规划-19【算法学习day.113】

前言 ###我做这类文章一个重要的目的还是记录自己的学习过程&#xff0c;我的解析也不会做的非常详细&#xff0c;只会提供思路和一些关键点&#xff0c;力扣上的大佬们的题解质量是非常非常高滴&#xff01;&#xff01;&#xff01; 习题 1.矩形中移动的最大次数 题目链接…

Gurobi基础语法之 addConstr, addConstrs, addQConstr, addMQConstr

在新版本的 Gurobi 中&#xff0c;向 addConstr 这个方法中传入一个 TempConstr 对象&#xff0c;在模型中就会根据这个对象生成一个约束。更重要的是&#xff1a;TempConstr 对象可以传给所有addConstr系列方法&#xff0c;所以下面先介绍 TempConstr 对象 TempConstr TempC…

五子棋对弈

问题描述 "在五子棋的对弈中&#xff0c;友谊的小船说翻就翻&#xff1f;" 不&#xff01;对小蓝和小桥来说&#xff0c;五子棋不仅是棋盘上的较量&#xff0c;更是心与心之间的沟通。这两位挚友秉承着"友谊第一&#xff0c;比赛第二"的宗旨&#xff0c;决…

使用 HTTP::Server::Simple 实现轻量级 HTTP 服务器

在Perl中&#xff0c;HTTP::Server::Simple 模块提供了一种轻量级的方式来实现HTTP服务器。该模块简单易用&#xff0c;适合快速开发和测试HTTP服务。本文将详细介绍如何使用 HTTP::Server::Simple 模块创建和配置一个轻量级HTTP服务器。 安装 HTTP::Server::Simple 首先&…

在AI技术深度渗透的背景下,2025年传媒互联网行业的哪些细分场景和产品形态将迎来爆发式增长?

一、AI技术重构传媒互联网行业版图&#xff1a;从底层逻辑到应用场景 近年来&#xff0c;AI技术已从实验室走向商业化落地&#xff0c;而传媒互联网行业因其庞大的用户基数、高频交互场景和丰富的数据积累&#xff0c;成为AI应用的主战场。根据华源证券最新行业周报&#xff0…

Docker Hub 镜像 Pull 失败的解决方案

目录 引言一、问题二、原因三、解决方法四、参考文献 引言 在云原生技术火热的当下&#xff0c;Docker可谓是其基础&#xff0c;由于其简单以及方便性&#xff0c;让开发人员不必再为环境配置问题而伤脑筋&#xff0c;因为可将其看作一个虚拟机程序去理解。所以掌握好它可谓是…

neo4j-community-5.26.0 create new database

1.edit neo4j.conf 把 # The name of the default database initial.dbms.default_databasehonglouneo4j # 写上自己的数据库名称 和 # Name of the service #5.0 server.windows_service_nameneo4j #4.0 dbms.default_databaseneo4j #dbms.default_databaseneo4jwind serve…

unity实现回旋镖函数

最近学习unity2D&#xff0c;想实现一个回旋镖武器&#xff0c;发出后就可以在角色周围回旋。 一、目标 1.不是一次性的&#xff0c;扔出去、返回、没有了&#xff1b;而是扔出去&#xff0c;返回到角色后方相同距离&#xff0c;再次返回&#xff1b;再次返回&#xff0c;永远…

【C++基础】字符串/字符读取函数解析

最近在学C以及STL&#xff0c;打个基础 参考&#xff1a; c中的char[] ,char* ,string三种字符串变量转化的兼容原则 c读取字符串和字符的6种函数 字符串结构 首先明确三种字符串结构的兼容关系&#xff1a;string>char*>char [] string最灵活&#xff0c;内置增删查改…

求一个数的数根(高精度)

上一期我们讲的是求一个数的数根&#xff0c;和本期唯一不同的是&#xff0c;数据范围不同了&#xff0c;上一期这个数是小于等于10的18次方的&#xff0c;这一期是小于等于10的1000次方的&#xff0c;开一个变量&#xff1f;肯定不行&#xff0c;我们需要再开一个数组&#xf…

SpringBoot源码解析(九):Bean定义接口体系

SpringBoot源码系列文章 SpringBoot源码解析(一)&#xff1a;SpringApplication构造方法 SpringBoot源码解析(二)&#xff1a;引导上下文DefaultBootstrapContext SpringBoot源码解析(三)&#xff1a;启动开始阶段 SpringBoot源码解析(四)&#xff1a;解析应用参数args Sp…

Vue 3 30天精进之旅:Day 13 - 路由守卫

在构建单页面应用时&#xff0c;路由守卫是一个非常重要的概念。它允许我们在路由进入或离开时执行一些操作&#xff0c;比如验证用户权限、处理数据加载、执行导航确认等。Vue Router提供了多种类型的路由守卫&#xff0c;使我们能够灵活地控制路由的行为。在今天的学习中&…

【TypeScript】基础:数据类型

文章目录 TypeScript一、简介二、类型声明三、数据类型anyunknownnervervoidobjecttupleenumType一些特殊情况 TypeScript 是JavaScript的超集&#xff0c;代码量比JavaScript复杂、繁多&#xff1b;但是结构更清晰 一、简介 为什么需要TypeScript&#xff1f; JavaScript的…

C++模板编程——可变参函数模板

目录 1. 可变参函数模板基本介绍 2. 参数包展开——通过递归函数 3. 参数包展开——通过编译期间if语句(constexpr if) 4. 重载 5. 后记 进来看的小伙伴们应该对C中的模板有了一定了解&#xff0c;下面给大家介绍一下可变参函数模板。过于基础的概念将不仔细介绍。 1. 可变…

ChatGPT-4o和ChatGPT-4o mini的差异点

在人工智能领域&#xff0c;OpenAI再次引领创新潮流&#xff0c;近日正式发布了其最新模型——ChatGPT-4o及其经济实惠的小型版本ChatGPT-4o Mini。这两款模型虽同属于ChatGPT系列&#xff0c;但在性能、应用场景及成本上展现出显著的差异。本文将通过图文并茂的方式&#xff0…

三数之和(15)

15. 三数之和 - 力扣&#xff08;LeetCode&#xff09; 可以一起总结的题目&#xff1a;三角形的最大周长&#xff08;976&#xff09;-CSDN博客 解法&#xff1a; class Solution { public:vector<vector<int>> threeSum(vector<int>& nums) {vector…

2025最新源支付V7全套开源版+Mac云端+五合一云端

2025最新源支付V7全套开源版Mac云端五合一云端 官方1999元&#xff0c; 最新非网上那种功能不全带BUG开源版&#xff0c;可以自己增加授权或二开 拥有卓越的性能和丰富的功能。它采用全新轻量化的界面UI&#xff0c;让您能更方便快捷地解决知识付费和运营赞助的难题 它基于…

9 点结构模块(point.rs)

一、point.rs源码 use super::UnknownUnit; use crate::approxeq::ApproxEq; use crate::approxord::{max, min}; use crate::length::Length; use crate::num::*; use crate::scale::Scale; use crate::size::{Size2D, Size3D}; use crate::vector::{vec2, vec3, Vector2D, V…