golang封装调用kafka的工具包

封装一个golang调用kafka的工具包,包含了consumer,producer,auth,在自己的生产环境上做过验证。可以做参考作用,也可以直接使用。

部分代码

// Run 执行消费动作
func (cg *ConsumerGroup) Run(ctx context.Context) {defer cg.close()for {select {case err := <-cg.consumer.Errors():cg.logger.WithError(err).Errorln("Error channel")cg.handleConsumeError(err)case <-ctx.Done():err := ctx.Err()if err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {// 正常退出return}cg.logger.WithError(err).Errorln("上下文异常退出")default:if err := cg.consumer.Consume(ctx, cg.options.topics, cg.options); err != nil {cg.logger.WithError(err).Errorln("Consume Error channel")cg.handleConsumeError(err)}}}
}
func (kc *KafkaClient) CreateTopic(ctx context.Context, topicName string, ops ...TopicOption) (err error) {topicConf := &sarama.TopicDetail{NumPartitions:     3,ReplicationFactor: 1,ConfigEntries: map[string]*string{"cleanup.policy": &TopicTTLPolicy,"retention.ms":   &TopicTTLRetention,},}for _, op := range ops {op(topicConf)}return kc.cli.CreateTopic(topicName, topicConf, false)
}

代码太多,全部写出来不现实,详细代码在这个下载包里,有test文件可以查看是如何调用的

另外需要自己处理消费时的速度,比如用channel控制同时消费的数量
golang调用kafka下载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/829885.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux——(关于权限常见的3个问题)

文章目录 1.修改文件或者目录的拥有者和所属组1.1chown指令1.2chgrp指令 2.常见的权限三个问题2.1对应一个目录&#xff0c;如果要进入&#xff0c;需要什么权限&#xff1f;2.2为什么我们创建的文件默认权限不是7772.2.1关于Linux下的权限掩码 2.3文件能否被删除取决于什么2.3…

Beyond Chain-of-Thought: A Survey of Chain-of-X Paradigms for LLMs论文阅读笔记(未完待续)

地址&#xff1a;https://arxiv.org/html/2404.15676v1 一些论文合集&#xff1a;https://github.com/atfortes/Awesome-LLM-Reasoning 背景 思维链 &#xff08;Chain-of-Thought&#xff0c;CoT&#xff09; 是一种被广泛采用的提示方法&#xff0c;它激发了大型语言模型 …

《HCIP-openEuler实验指导手册》2.1安装和测试Nginx

知识点 Nginx (发音为 “engine x”) 是一个开源的高性能 HTTP 和反向代理服务器&#xff0c;也是一个 IMAP/POP3/SMTP 代理服务器。由 Igor Sysoev 创建并维护&#xff0c;其设计用于处理高并发连接&#xff0c;具有高度的可扩展性和灵活性。 安装步骤 yum方式安装 dn…

go语言实现简单ngnix样例

目录 1、代码实现样例&#xff1a; 2、postman调用ngnix&#xff0c;转发&#xff1a; 1、代码实现样例&#xff1a; package mainimport ("bytes""encoding/json""io""log""net/http""net/http/httputil""…

ruoyi-nbcio-plus基于vue3的flowable收回任务后重新进行提交表单的处理

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://218.75.87.38:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; h…

Python AI库 Pandas的常见操作的扩展知识

Python AI库 Pandas的常见操作的扩展知识 本文默认读者具备以下技能&#xff1a; 熟悉python基础知识&#xff0c;vscode或其它编辑工具 熟悉表格文件的基本操作 具备自主扩展学习能力 前文中对Pandas的数据结构以及基础操作做了介绍,本文中会在前文的基础上,对常见的操作进…

selenium拉动滚动条

selenium拉动滚动条 # 导包 from selenium import webdriver from time import sleep # 获取浏览器驱动对象 driver webdriver.Edge() # 最大化浏览器 driver.maximize_window() # 隐式等待 driver.implicitly_wait(30) # 打开url url r"C:\Users\黄永生\Desktop\软件测…

计算机找不到vcruntime140_1.dll,无法继续执行代码快速解决方法

vcruntime140_1.dll是一个重要的Windows操作系统中的动态链接库&#xff08;DLL&#xff09;文件&#xff0c;它是微软Visual C Redistributable软件包的组成部分。以下是该文件的详细介绍&#xff1a; 名称含义&#xff1a;“vcruntime”代表Visual C Runtime&#xff0c;表明…

static page 项目

static page 项目 作者&#xff1a;不染心 博客地址&#xff1a;https://blog.csdn.net/qq_38234785 源码地址&#xff1a;https://mbd.pub/o/bread/ZpWVlJps 未经允许&#xff0c;不得转载 文档版本v1&#xff0c;还没写完持续更新 一、引言 1. 软件概述和背景 本软件是…

STM32f103 HAL库读保护以及解除

读保护 void Flash_EnableReadProtection(void) {FLASH_OBProgramInitTypeDef OBInit;__HAL_FLASH_PREFETCH_BUFFER_DISABLE();HAL_FLASHEx_OBGetConfig(&OBInit);if(OBInit.RDPLevel OB_RDP_LEVEL_0){OBInit.OptionType | OPTIONBYTE_RDP;OBInit.RDPLevel OB_RDP_LEVEL…

FIR滤波器——DSP学习笔记三(包含一个滤波器设计的简明案例)

​​​​​​ 背景知识 FIR滤波器的特性与优点 可精确地实现线性相位响应&#xff08;Linear phase response&#xff09;&#xff0c;无相位失真&#xff1b; 总是稳定的&#xff0c;所有极点都位于原点 线性相位FIR滤波器的性质、类型及零点位置 冲击响应满足&#xff1a;奇…

【PyTorch】torch.gather() 用法

gather常被用于image做mask的操作中&#xff0c;对哪些地方进行赋值0/1 API&#xff1a; torch.gather — PyTorch 2.2 documentation torch.gather(input, dim, index, outNone) → Tensor gather()的意义&#xff1a; 顾名思义&#xff0c;聚集、集合&#xff1a;gather…

VS2019配合QT5.9开发IRayAT430相机SDK

环境配置 VS2019 QT5.9 编译器版本 MSVC2017_64添加系统环境变量&#xff08;完毕后重启电脑&#xff09; 从VS2019中下载Qt插件 从VS2019中添加单个编译组件 上述操作完成后用VS打开工程文件&#xff0c;工程文件地址 &#xff1a; C:\Users\86173\Desktop\IRCNETSDK_W…

数据分析:生存分析原理和应用实例

介绍 生存分析的目的是分析某个时间点的“生存概率”是多少。基于这样的研究目的,需要提供生存数据,它是一种由不同的开始时间和结束时间组成的事件-时间的数据,比如在癌症研究领域,研究手术到死亡的过程、治疗到疾病进展等等。 在开展生存分析前,需要了解什么是删失(c…

时间序列生成数据,TransformerGAN

简介&#xff1a;这个代码可以用于时间序列修复和生成。使用transformer提取单变量或者多变时间窗口的趋势分布情况。然后使用GAN生成分布类似的时间序列。 此外&#xff0c;还实现了基于prompt的数据生成&#xff0c;比如指定生成某个月份的数据、某半个月的数据、某一个星期的…

哈夫曼编码---一种无损数据压缩算法

哈夫曼编码是一种无损数据压缩算法&#xff0c;该算法在数据压缩&#xff0c;存储和网络传输等领域广泛引用&#xff0c;对互联网的发展也产生了深远的影响。 大家熟知的数据无损压缩软件&#xff0c;如WinRAR&#xff0c;gzip&#xff0c;bzip&#xff0c;lzw&#xff0c;7-z…

mac M2 配置item2 rzsz

背景 apple m 系列处理器安装的 homebrew 跟 intel 处理器略有不同&#xff0c;其中安装目录的区别&#xff1a; m 系列处理器安装目录为 /usr/local/bin/homebrew intel 处理器安装目录为 /opt/homebrew 问题1: 卡住 产生原因&#xff1a; m 系列使用 brew install lrzs…

Vscode——SSH连接不进去服务器的万能解决办法

一、查看当前版本VSCode的commit_id Help -> About -> Commit&#xff08;对应中文版本&#xff1a;帮助 -> 关于 -> 提交&#xff09; 会得到一串数字字母&#xff0c;我们简称 ID。 二、手动下载对应的VSCode包 浏览器输入&#xff1a;https://update.code.v…

手撕红黑树(kv模型模拟)

目录 前言 一、相关概念 二、性质介绍 红黑树平衡说明 三、红黑树模拟&#xff08;kv结构&#xff09; 1、红黑树节点 2、红黑树插入 2、特殊处理情况 声明&#xff1a; 情况一&#xff1a;cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u存在&#xff0c;且…

Spring Cloud学习笔记(Feign):配置类(未完成)

这是本人学习的总结&#xff0c;主要学习资料如下 - 马士兵教育 1、给Feign配置的方式1.1、通过Bean配置1.2、application.yaml配置 2、配置日志2.1、日志级别1.2、指定日志级别1.2.1、通过Bean配置1.2.2、application.yaml配置 3、Inteceptor配置 1、给Feign配置的方式 我们有…