Kafka Go客户端--Sarama

Kafka Go客户端

在Go中里面有三个比较有名气的Go客户端。

  • Sarama:用户数量最多,早期这个项目是在Shopify下面,现在挪到了IBM下。
  • segmentio/kafka-go:没啥大的缺点。
  • confluent-kafka-go:需要启用cgo,跨平台问题比较多,交叉编译也不支持。

Sarama 使用入门:tools

IBM/sarama: Sarama is a Go library for Apache Kafka.

在 Sarama 里面提供了一些简单的命令行工具,可以看做是 Shell脚本提供的功能一个子集。

Consumer和 producer中的用得比较多

在这里插入图片描述

1.设置 Go 代理(如果内网无法直连 proxy.golang.org)

export GOPROXY=https://goproxy.cn,direct
export GOSUMDB=sum.golang.google.cn

2.在虚拟机上执行安装命令:

  • ​ go install github.com/IBM/sar ama/tools/kafka-console-consumer@latest
  • ​ go install github.com/lBM/sarama/tools/kafka-console-producer@latest

3.把可执行文件所在目录加到 PATH(如果还没加)

export PATH=$PATH:$(go env GOBIN)

4.确认可执行文件在哪里

# 查看 GOBIN,如果你没显式设置,就会是空
go env GOBIN# 查看 GOPATH,默认是 $HOME/go(对于 root 用户就是 /root/go)
go env GOPATH#我的是/home/cxz/go/lib:/home/cxz/go/work

5.查看安装结果

ls /home/cxz/go/lib/bin
#应该能够看到kafka-console-consumer  kafka-console-producer

6.临时生效

export PATH=$PATH:/home/cxz/go/lib/bin# 然后验证
which kafka-console-consumer
# 应该输出 /home/cxz/go/lib/bin/kafka-console-consumer

7.永久生效

echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.bashrc
# 或者,如果你用的是 zsh:
# echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.zshrc# 然后重新加载配置
source ~/.bashrc

Sarama 使用入门:发送消息

虚拟机上执行

kafka-console-consumer -topic=test_topic -brokers=192.168.24.101:9094

Goland上执行

package mainimport ("github.com/IBM/sarama""github.com/stretchr/testify/assert""testing"
)var addrs = []string{"192.168.24.101:9094"}func TestSyncProducer(t *testing.T) {//创建一个 Sarama 的配置对象。cfg := sarama.NewConfig()//表示生产者要等待 Kafka 确认消息成功写入后再返回(同步模式)。如果不设置这个,SyncProducer.SendMessage 会一直失败。cfg.Producer.Return.Successes = true //同步的Producer一定要设置//创建一个同步的生产者实例producer, err := sarama.NewSyncProducer(addrs, cfg)assert.NoError(t, err)//构建消息并发送_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic",//消息数据本体Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),//会在生产者和消费者之间传递,消息头,可传递自定义键值对,比如 trace_id 用于链路追踪。Headers: []sarama.RecordHeader{{Key:   []byte("trace_id"),Value: []byte("123456"),},},//只作用于发送过程。元信息,在发送过程中使用,可以用来传递额外信息,发送完成后会原样返回(不会传给消费者)。Metadata: "这是metadata",})assert.NoError(t, err)
}

10.执行结果

Partition:	0
Offset:	0
Key:	
Value:	hello world ,这是一条使用kafka的消息

使用控制台工具连接Kafka

Sarama 使用入门:指定分区

可以注意到,前面所有的消息都被发送到了 Partition 0 上面。

正常来说,在 Sarama 里面,可以通过指定 config 中的Partitioner来指定最终的目标分区。

常见的方法:

  • ​ Random:随机挑一个。
  • ​ RoundRobin:轮询。
  • ​ Hash(默认):根据 key 的哈希值来筛选一个。
  • ​ Manual: 根据 Message 中的 partition 字段来选择。
  • ​ ConsistentCRC:一致性哈希,用的是 CRC32 算法。
  • ​ Custom:实际上不 Custom,而是自定义一部分Hash 的参数,本质上是一个 Hash 的实现。
//默认HashPartitioner  适合: 按用户 ID、订单 ID 等字段分区场景
cfg.Producer.Partitioner = sarama.NewHashPartitioner
//使用 CRC32 算法 计算 Key 的哈希。 适合: 需要高一致性分布的业务,例如日志收集系统
cfg.Producer.Partitioner = sarama.NewConsistentCRCHashPartitioner
//忽略 Key,每条消息随机分配 partition。  适合: 普通消息队列、广播类场景。
cfg.Producer.Partitioner = sarama.NewRandomPartitioner
//需要手动指定 partition(ProducerMessage.Partition 字段)。适合: 明确知道要写哪个 partition,例如做数据分流
cfg.Producer.Partitioner = sarama.NewManualPartitioner
//用于实现你自己的 Partitioner  一般不推荐使用这个空参函数(它会 panic),应实现完整接口。
cfg.Producer.Partitioner = sarama.NewCustomPartitioner()
//允许你使用自定义哈希函数来做 key 分区。  适合: 有特定哈希策略需求时,例如分布要尽可能均匀。
cfg.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {})Topic: "test_topic",
//分区依据
Key:   sarama.StringEncoder("user_123"), // 🔑 这里是分区依据
//消息数据本体
Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),

最典型的场景,就是利用Partitioner来保证同一个业务的消息一定发送到同一个分区上,从而保证业 有序。

Sarama 使用入门:异步发送

Sarama有一个异步发送的producer,它的用法稍微复杂一点。

  • 把Return.Success和 Errors都设置为true,这是为了后面能够拿到发送结果。
  • 初始化异步producer。
  • 从producer里面拿到Input的channel,并且发送 一条消息。
  • ​ 利用select case,同时**监听Success和Error两个channel,**来获得发送成功与否的信息。
func TestAsyncProducer(t *testing.T) {cfg := sarama.NewConfig()//怎么知道发送是否成功cfg.Producer.Return.Errors = truecfg.Producer.Return.Successes = trueproducer, err := sarama.NewAsyncProducer(addrs, cfg)require.NoError(t, err)messages := producer.Input()go func() {for {messages <- &sarama.ProducerMessage{Topic: "test_topic",//分区依据Key: sarama.StringEncoder("user_123"), // 🔑 这里是分区依据//消息数据本体Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),//会在生产者和消费者之间传递Headers: []sarama.RecordHeader{{Key:   []byte("trace_id"),Value: []byte("123456"),},},//只作用于发送过程Metadata: "这是metadata",}}}()errCh := producer.Errors()succCh := producer.Successes()for {//两个都不满足就会阻塞select {case err := <-errCh:t.Log("发送出了问题", err.Err)case <-succCh:t.Log("发送成功")}}
}

Sarama 使用入门:acks

在Kafka里面,生产者在发送数据的时候,有一个很关键的参数,就是 acks。
有三个取值:

  • ​ 0:客户端发一次,不需要服务端的确认。
  • ​ 1:客户端发送,并且需要服务端写入到主分区。
  • ​ -1:客户端发送,并且需要服务端同步到所有的ISR 上。

从上到下,性能变差,但是数据可靠性上升。需要性能,选 0,需要消息不丢失,选-1。

理解acks你就要抓住核心点,谁ack才算数?

  • 0:TCP协议返回了ack就可以。
  • 1:主分区确认写入了就可以。
  • -1:所有的ISR都确认了就可以。

在这里插入图片描述

ISR (In Sync Replicas),用通俗易懂的话来说,就是跟上了节奏的从分区。

什么叫做跟上了节奏?就是它和主分区保持了数据同步。

所以,当消息被同步到从分区之后,如果主分区崩溃了那么依旧可以保证在从分区上还有数据。

在这里插入图片描述

sarama 使用入门:启动消费者

Sarama的消费者设计不是很直观,稍微有点复杂。

  • ​ 首先要初始化一个ConsumerGroup。
  • ​ 调用ConsumerGroup上的Consume方法。
  • ​ 为 Consume 方法传入一个 ConsumerGroupHandler的辅助方法。
package mainimport ("context""github.com/IBM/sarama""github.com/stretchr/testify/assert""log""testing"
)func TestConsumer(t *testing.T) {cfg := sarama.NewConfig()//正常来说,一个消费者都是归属一个消费者组的//消费者就是你的业务consumerGroup, err := sarama.NewConsumerGroup(addrs, "test_group", cfg)assert.NoError(t, err)err = consumerGroup.Consume(context.Background(), []string{"test_topic"}, testConsumerGroupHandler{})//你消费结束,就会到这里t.Log(err)
}type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println("Setup session:", session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println("Cleanup session:", session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()for msg := range msgs {//var bizMsg MyBizMsg//err := json.Unmarshal(msg.Value, &bizMsg)//if err != nil {//	//这就是消费消息出错//	//大多数时候就是重试//	//记录日志//	continue//}log.Println(string(msg.Value))session.MarkMessage(msg, "")}//什么情况下会到这里//msg被人关了,也就是要退出消费逻辑return nil
}type MyBizMsg struct {Name string
}

sarama 使用入门:ConsumerGroupHandler

下面的代码就是对ConsumerGroupHandler的实现,关键就是在消费了msg之后,如果消费成功了,要记得提交。

也就是调用MarkMessage方法。

至于 Setup 和 Cleanup 方法反而用得不多。

type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println("Setup session:", session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println("Cleanup session:", session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()for msg := range msgs {//var bizMsg MyBizMsg//err := json.Unmarshal(msg.Value, &bizMsg)//if err != nil {//	//这就是消费消息出错//	//大多数时候就是重试//	//记录日志//	continue//}log.Println(string(msg.Value))session.MarkMessage(msg, "")}//什么情况下会到这里//msg被人关了,也就是要退出消费逻辑return nil
}

sarama 使用入门:利用context来控制消费者退出

可以利用初始化ConsumerGroup 时候传入的ctx来控制消费者组退出消息。

下图中,我传入了一个超时的context,那么:

	start := time.Now()//这里是测试,我们就控制消费10sctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()//开始消费,会在这里阻塞住err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})//你消费结束,就会到这里t.Log(err, time.Since(start).String())	

下图中,我主动调用了cancel,那么:

	start := time.Now()//这里是测试,我们就控制消费5sctx, cancel := context.WithCancel(context.Background())time.AfterFunc(time.Second*5, func() {cancel()})//开始消费,会在这里阻塞住err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})//你消费结束,就会到这里t.Log(err, time.Since(start).String())
  • 如果超时了
  • 如果我主动调用了cancel

以上两种情况,任何一种情况出现了,都会让消费者退出消息。

sarama 使用入门:指定偏移量消费

在部分场景下,我们会希望消费历史消息,或者从某个消息开始消费,那么可以考虑在Setup里面设置偏移量。

关键调用是 ResetOffset。

不过一般建议走离线渠道,操作Kafka集群去重置对应的偏移量。

核心在于,你并不是每次重新部署,重新启动都是要重置这个偏移量的。

只要你的消费者组在这个分区上有过“已提交的 offset”,Kafka 就会优先使用这个提交的 offset,而忽略你在 Setup() 中设置的 offset

// 在每次 rebalance 或初次连接 Kafka 后调用,用于初始化。
func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {//执行一些初始化的事情log.Println("Setup")//假设要重置到0var offset int64 = 0//遍历所有的分区partitions := session.Claims()["test_topic"]for _, p := range partitions {session.ResetOffset("test_topic", p, offset, "")//session.ResetOffset("test_topic", p, sarama.OffsetNewest, "")//session.ResetOffset("test_topic", p, sarama.OffsetOldest, "")}return nil
}

sarama使用入门:异步消费,批量提交

正常来说,为了在异步消费失败之后还能继续重试,可以考虑异步消费一批,提交一批。

下图中,ctx.Done分支用来控制凑够一批的超时机制,防止生产者的速率很低,一直凑不够一批。

func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)//可以通过 session 控制 offset 提交,获取消费者信息,并感知退出时机。session sarama.ConsumerGroupSession,//claim 是你获取消息的入口claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()//设置批量处理的条数const batchSize = 10for {ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)var eg errgroup.Groupvar last *sarama.ConsumerMessagefor i := 0; i < batchSize; i++ {done := falseselect {case <-ctx.Done()://这边表示超时了done = truecase msg, ok := <-msgs:if !ok {cancel()return nil}last = msgmsg1 := msgeg.Go(func() error {//我就在这里消费time.Sleep(time.Second)//你在这里重试log.Println(string(msg1.Value))return nil})}if done {break}}cancel()err := eg.Wait()if err != nil {//这边能怎么办?//记录日志continue}//就这样session.MarkMessage(last, "")}return nil
}

另外一个分支就是读取消息,并且提交到errgroup里面执行。

Sleep是模拟长时间业务执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/906515.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Axure全链路交互设计:快速提升实现能力(基础交互+高级交互)

想让你的设计稿像真实App一样丝滑&#xff1f;本专栏带你玩转Axure交互&#xff0c;从选中高亮到动态面板骚操作&#xff0c;再到中继器表单花式交互&#xff0c;全程动图教学&#xff0c;一看就会&#xff01; 本专栏系统讲解多个核心交互效果&#xff0c;是你的Axure交互急救…

自动化测试脚本点击运行后,打开Chrome很久??

亲爱的小伙伴们大家好。 小编最近刚换了电脑&#xff0c;这几天做自动化测试发现打开Chrome浏览器需要等待好长时间&#xff0c;起初还以为代码有问题&#xff0c;或者Chromedriver与Chrome不匹配造成的&#xff0c;但排查后发现并不是&#xff01;&#xff01; 在driver.py中…

现代人工智能系统的实用设计模式

关键要点 AI设计模式是为现代AI驱动的软件中常见问题提供的可复用解决方案&#xff0c;帮助团队避免重复造轮子。我们将其分为五类&#xff1a;提示与上下文&#xff08;Prompting & Context&#xff09;、负责任的AI&#xff08;Responsible AI&#xff09;、用户体验&…

经典面试题:TCP 三次握手、四次挥手详解

在网络通信的复杂架构里&#xff0c;“三次握手”与“四次挥手”仿若一座无形的桥梁&#xff0c;它们是连接客户端与服务器的关键纽带。这座“桥梁”不仅确保了连接的稳固建立&#xff0c;还保障了连接的有序结束&#xff0c;使得网络世界中的信息能够顺畅、准确地流动。 在面…

食品饮料行业AI转型趋势分析与智能化解决方案探索​

一、行业洞察&#xff1a;AI驱动食品饮料行业价值重构​ 当前&#xff0c;食品饮料行业正面临消费分级显性化、需求多元化与技术范式革新的三重挑战。根据《2024食品饮料行业全营销白皮书》&#xff0c;高收入群体倾向于高端化、个性化产品&#xff0c;而下沉市场更关注性价比…

Electron使用WebAssembly实现CRC-8 ITU校验

Electron使用WebAssembly实现CRC-8 ITU校验 将C/C语言代码&#xff0c;经由WebAssembly编译为库函数&#xff0c;可以在JS语言环境进行调用。这里介绍在Electron工具环境使用WebAssembly调用CRC-8 ITU格式校验的方式。 CRC-8 ITU校验函数WebAssembly源文件 C语言实现CRC-8 I…

python如何遍历postgresql所有的用户表

要遍历PostgreSQL数据库中的所有用户表&#xff0c;可以按照以下步骤操作&#xff1a; 安装必要依赖库 pip install psycopg2-binary使用标准SQL查询方案&#xff08;推荐&#xff09; import psycopg2def list_user_tables():try:conn psycopg2.connect(host"your_ho…

面试相关的知识点

1 vllm 1.1常用概念 1 vllm&#xff1a;是一种大模型推理的框架&#xff0c;使用了张量并行原理&#xff0c;把大型矩阵分割成低秩矩阵&#xff0c;分散到不同的GPU上运行。 2 模型推理与训练&#xff1a;模型训练是指利用pytorch进行对大模型进行预训练。 模型推理是指用训…

node.js如何实现双 Token + Cookie 存储 + 无感刷新机制

node.js如何实现双 Token Cookie 存储 无感刷新机制 为什么要实施双token机制&#xff1f; 优点描述安全性Access Token 短期有效&#xff0c;降低泄露风险&#xff1b;Refresh Token 权限受限&#xff0c;仅用于获取新 Token用户体验用户无需频繁重新登录&#xff0c;Toke…

MySQL——6、内置函数

内置函数 1、日期函数2、字符串函数3、数学函数4、其他函数 1、日期函数 1.1、获取当前日期&#xff1a; 1.2、获取当前时间&#xff1a; 1.3、获取当前时间戳&#xff1a; 1.4、获取当前日期时间&#xff1a; 1.5、提取出日期&#xff1a; 1.6、给日期添加天数或时间…

【Linux】Shell脚本中向文件中写日志,以及日志文件大小、数量管理

1、写日志 shell脚本中使用echo命令,将字符串输入到文件中 覆盖写入:echo “Hello, World!” > laoer.log ,如果文件不存在,则会创建文件追加写入:echo “Hello, World!” >> laoer.log转移字符:echo -e “Name:\tlaoer\nAge:\t18” > laoer.log,\t制表符 …

深度学习中ONNX格式的模型文件

一、模型部署的核心步骤 模型部署的完整流程通常分为以下阶段&#xff0c;用 “跨国旅行” 类比&#xff1a; 步骤类比解释技术细节1. 训练模型学会一门语言&#xff08;如中文&#xff09;用 PyTorch/TensorFlow 训练模型2. 导出为 ONNX翻译成国际通用语言&#xff08;如英语…

基于两阶段交互控制框架的互联多能系统协同自治优化

摘要&#xff1a;从多能源集成系统的效益出发&#xff0c;建立了基于交互控制的双层两阶段框架&#xff0c;以实现互联多能源系统(MESs)间的最优能量供应。在下层&#xff0c;各MES通过求解成本最小化问题自主确定其可控资产的最优设定值&#xff0c;其中滚动时域优化用于处理负…

matlab编写的BM3D图像去噪方法

BM3D&#xff08;Block-Matching and 3D Filtering&#xff09;是一种基于块匹配和三维滤波的图像去噪方法&#xff0c;广泛应用于图像处理领域。它通过在图像中寻找相似的块&#xff0c;并将这些块堆叠成三维数组进行滤波处理&#xff0c;从而有效地去除噪声&#xff0c;同时保…

前端(小程序)学习笔记(CLASS 1):组件

1、小程序中组件的分类 小程序中的组件也是由宿主环境提供的&#xff0c;开发者可以基于组件快速搭建出漂亮的页面结构。官方把小程序的组件分为了9大类&#xff0c;分别是&#xff1a; * 视图容器&#xff0c;* 基础内容&#xff0c;* 表单组件&#xff0c;* 导航组件 媒体…

基于亚马逊云科技构建音视频直播审核方案

1. 前言 随着互联网内容形态的多样化发展&#xff0c;用户生成内容&#xff08;UGC&#xff09;呈现爆发式增长。社交平台、直播、短视频、语聊房等应用场景中&#xff0c;海量的音视频内容需要进行实时审核&#xff0c;以维护平台安全与用户体验。 然而&#xff0c;企业在构…

linux基础操作11------(运行级别)

一.前言 这个是linux最后一章节内容&#xff0c;主要还是介绍一下&#xff0c;这个就和安全有关系了&#xff0c;内容还是很多的&#xff0c;但是呢&#xff0c;大家还是做个了解就好了。 二.权限掩码 运行级别 0 关机 运行级别 1 单用户 &#xff0c;这个类似于windows安全…

QT+Visual Studio 配置开发环境教程

一、QT架构 Qt Creator 是一个轻量级、跨平台的 IDE&#xff0c;专为 Qt 开发量身打造&#xff0c;内置对 qmake/CMake 的深度支持、Kits 配置管理、原生 QML 调试器以及较低的资源占用维基百科。 而在 Windows 环境下&#xff0c;Visual Studio 配合 Qt VS Tools 扩展则可将 Q…

(2)JVM 内存模型更新与 G1 垃圾收集器优化

JVM 内存模型更新与 G1 垃圾收集器优化 &#x1f680; 掌握前沿技术&#xff0c;成为顶尖 Java 工程师 2️⃣ JVM 内存模型更新 &#x1f449; 点击展开题目 JVM内存模型在Java 17中有哪些重要更新&#xff1f;如何优化G1垃圾收集器在容器化环境的表现&#xff1f; &#x1…

TASK04【Datawhale 组队学习】构建RAG应用

目录 将LLM接入LangChain构建检索问答链运行成功图遇到的问题 langchain可以便捷地调用大模型&#xff0c;并将其结合在以langchain为基础框架搭建的个人应用中。 将LLM接入LangChain from langchain_openai import ChatOpenAI实例化一个 ChatOpenAI 类,实例化时传入超参数来…