【kafka】kafka概念,使用技巧go示例

1. Kafka基础概念

1.1 什么是Kafka?

Kafka是一个分布式流处理平台,用于构建实时数据管道和流式应用。核心特点:

  • 高吞吐量:每秒可处理百万级消息
  • 持久化存储:消息按Topic分区存储在磁盘
  • 分布式架构:支持水平扩展
  • 高可用性:通过副本机制保证数据不丢失
1.2 核心组件
  • Topic(主题):消息的逻辑分类,如user_loginorder_create
  • Partition(分区):Topic的物理分片,每个分区是有序的日志文件
  • Broker(代理):Kafka集群中的服务器节点
  • Producer(生产者):向Topic发送消息的应用
  • Consumer(消费者):从Topic接收消息的应用
  • Consumer Group(消费者组):多个消费者组成的组,共同消费Topic数据

2. Go语言操作Kafka

2.1 选择客户端库

Go语言中推荐使用confluent-kafka-go库,它基于librdkafka实现,性能优秀且功能完整:

go get -u github.com/confluentinc/confluent-kafka-go/kafka
2.2 生产者示例
package mainimport ("fmt""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 配置生产者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092",  // Kafka集群地址"acks":              "all",             // 所有副本确认"retries":           5,                 // 重试次数})if err != nil {panic(err)}defer p.Close()// 异步处理发送结果go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v\n", ev.TopicPartition)}}}}()// 发送消息topic := "user_login"for i := 0; i < 10; i++ {value := fmt.Sprintf("Hello Kafka %d", i)p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(value),}, nil)}// 等待所有消息发送完成p.Flush(15 * 1000)  // 超时15秒// 优雅退出sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)<-sigchan
}
2.3 消费者示例
package mainimport ("fmt""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 配置消费者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id":          "my-group","auto.offset.reset": "earliest",  // 从最早的消息开始消费})if err != nil {panic(err)}defer c.Close()// 订阅主题topic := "user_login"c.SubscribeTopics([]string{topic}, nil)// 处理信号,优雅退出sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)run := truefor run {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsedefault:ev := c.Poll(100)  // 轮询100msif ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Message on %s: %s\n",e.TopicPartition, string(e.Value))// 手动提交偏移量c.CommitMessage(e)case kafka.Error:fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)if e.Code() == kafka.ErrAllBrokersDown {run = false}default:// 忽略其他事件}}}fmt.Println("Closing consumer")
}

3. 高级特性与最佳实践

3.1 消息分区策略

Kafka通过分区实现并行处理,生产者可指定分区策略:

// 1. 轮询(默认):均匀分布消息到各分区
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(value),
}, nil)// 2. 基于Key哈希:相同Key的消息发到同一分区
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key:            []byte(userID),  // 根据用户ID哈希到固定分区Value:          []byte(value),
}, nil)
3.2 消费者组与分区分配
  • 同一消费者组内的消费者共同消费Topic的所有分区
  • 每个分区只能被组内一个消费者消费
  • 消费者数量超过分区数时,多余的消费者空闲
3.3 手动提交偏移量
// 配置手动提交
config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id":          "my-group","enable.auto.commit": false,  // 禁用自动提交
}// 消费消息后手动提交
for {msg, err := c.ReadMessage(-1)  // 阻塞读取if err == nil {fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))// 处理消息...// 手动提交当前消息的偏移量_, err := c.CommitMessage(msg)if err != nil {fmt.Printf("Failed to commit offset: %v\n", err)}}
}
3.4 事务处理
// 配置事务生产者
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","transactional.id":  "my-transactional-id",
})
if err != nil {panic(err)
}// 初始化事务
p.InitTransactions(10 * time.Second)// 开始事务
p.BeginTransaction()// 发送多条消息
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic1}, Value: []byte("msg1")}, nil)
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic2}, Value: []byte("msg2")}, nil)// 提交事务
err = p.CommitTransaction(10 * time.Second)
if err != nil {p.AbortTransaction(10 * time.Second)  // 回滚
}

4. 企业级实战案例

4.1 异步日志处理
// 生产者:收集应用日志发送到Kafka
func LogToKafka(level, message string) {p, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092"})defer p.Close()topic := "app_logs"msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key:            []byte(level),Value:          []byte(message),}p.Produce(msg, nil)p.Flush(2 * 1000)  // 等待2秒
}// 消费者:从Kafka读取日志并存储到Elasticsearch
func ConsumeAndIndex() {c, _ := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092","group.id":          "log-consumer-group",})c.SubscribeTopics([]string{"app_logs"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {// 发送到ElasticsearchsendToES(string(msg.Key), string(msg.Value))}}
}
4.2 微服务间事件驱动通信
// 订单服务:创建订单后发送事件
func CreateOrder(userID, productID string, amount float64) {// 1. 创建订单orderID := generateOrderID()saveOrderToDB(orderID, userID, productID, amount)// 2. 发送订单创建事件到Kafkap, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092"})defer p.Close()topic := "order_created"event := fmt.Sprintf(`{"order_id": "%s", "user_id": "%s", "amount": %.2f}`, orderID, userID, amount)p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(event),}, nil)
}// 库存服务:监听订单创建事件并扣减库存
func StartInventoryService() {c, _ := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092","group.id":          "inventory-service-group",})c.SubscribeTopics([]string{"order_created"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {// 解析订单事件var orderEvent struct {OrderID string  `json:"order_id"`UserID  string  `json:"user_id"`Amount  float64 `json:"amount"`}json.Unmarshal(msg.Value, &orderEvent)// 扣减库存deductInventory(orderEvent.ProductID, 1)}}
}

5. 性能优化与常见问题

5.1 生产者性能优化
  • 批量发送:设置batch.sizelinger.ms
  • 压缩消息:启用compression.type(如snappylz4
  • 异步发送:使用回调函数处理发送结果
5.2 消费者性能优化
  • 增加分区数:提高并行消费能力
  • 多消费者实例:通过消费者组水平扩展
  • 合理批量处理:批量拉取消息,批量提交偏移量
5.3 常见问题排查
问题原因解决方案
消息丢失acks配置不当、副本数不足设置acks=all,确保至少2个副本
消费滞后消费速度慢、分区数不足增加消费者、提高处理效率、增加分区数
重复消费偏移量提交时机不当处理完消息后再提交偏移量,或使用事务
生产者吞吐量低批处理参数不合理、网络延迟增大batch.sizelinger.ms,优化网络连接

6. 生产环境部署建议

  1. 多Broker集群:至少3个Broker,提高可用性
  2. 合理分区数:根据业务量预估,建议单个Topic分区数≥3
  3. 数据备份:定期备份Kafka日志
  4. 监控系统:集成Prometheus、Grafana监控Kafka性能
  5. 安全配置:启用SSL/TLS加密、SASL认证

总结:Go语言使用Kafka的最佳实践

  1. 生产者

    • 使用异步发送提高吞吐量
    • 合理配置acks和重试次数保证消息不丢失
    • 根据业务需求选择分区策略
  2. 消费者

    • 使用消费者组实现水平扩展
    • 手动提交偏移量确保消息处理可靠性
    • 处理消息失败时考虑重试或死信队列
  3. 性能与可靠性

    • 批量处理提高效率
    • 监控关键指标(如Lag、吞吐量)
    • 设计幂等消费逻辑应对重复消息

https://github.com/0voice

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/83797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

掌握Git:版本控制与高效协作指南

一、初始Git 提出问题&#xff1a;无论是在工作还是学习&#xff0c;我们在编写各种文档的时候&#xff0c;更改失误&#xff0c;失误后恢复到原来版本&#xff0c;不得不复制出一个副本。 每个版本由各自的内容&#xff0c;但最终只有一个报告需要被我们使用。 但在此之前的…

【生活相关-日语-日本-东京-搬家后-引越(ひっこし)(3)-踩坑点:国民健康保险】

【生活相关-日语-日本-东京-搬家后-引越&#xff08;ひっこし&#xff09;&#xff08;3&#xff09;-注意点&#xff1a;国民健康保险】 1、前言2、情况说明&#xff08;1&#xff09;问题说明&#xff08;2&#xff09;情况说明&#xff08;1&#xff09;收到情况&#xff08…

linux——mysql故障排查与生产环境优化

目录 一&#xff0c;mysql数据库常见的故障 1&#xff0c;故障现象1 2&#xff0c;故障现象2 3&#xff0c;故障现象3 &#xff14;&#xff0c;故障现象&#xff14; &#xff15;&#xff0c;故障现象&#xff15; &#xff16;&#xff0c;故障现象&#xff16; 二&…

【C#】用 DevExpress 创建带“下拉子表”的参数表格视图

展示如何用 DevExpress 创建带“下拉子表”的参数表格视图。主表为 参数行 ParamRow&#xff0c;子表为 子项 ChildParam。 一、创建模型类 public class ParamRow {public string Pn { get; set; }public string DisplayName { get; set; }public string Value { get; set; }…

【JavaScript】用 Proxy 拦截对象属性

目录 一、Proxy 的基本结构&#xff08;打地基&#xff09; 二、最常用的两个拦截方法&#xff1a;get 和 set 1. get(target, key) 2. set(target, key, value) 三、说到这&#xff0c;那就可以回到题目来 四、什么是 Reflect&#xff1f; 总结不易&#xff0c;本章节对…

[IMX] 02.GPIO 寄存器

目录 手册对应章节 1.GPIO 复用&#xff08;引脚功能选择&#xff09;- IOMUXC_SW_MUX_CTL_PAD_xxx 2.GPIO 电气特性 - IOMUXC_SW_PAD_CTL_PAD_xxx 3.GPIO 数据与控制寄存器 3.1.数据 - DR 3.2.输入/输出选择 - GDIR 3.3.状态 - PSR 3.4.中断触发控制 - ICR 3.5.中断使…

Tomcat 配置 HTTPS 访问全攻略(CentOS 环境)

Tomcat 配置 HTTPS 访问全攻略&#xff08;CentOS 环境&#xff09; 一、环境说明 操作系统&#xff1a;CentOS Tomcat 版本&#xff1a;Apache Tomcat/9.0.105 服务器 IP&#xff1a;192.168.1.35 目标&#xff1a;将 Tomcat 默认的 HTTP 访问升级为 HTTPS&#xff0c;提…

Flink 运维监控与指标采集实战(Prometheus + Grafana 全流程)

一、引言:为什么 Flink 运维监控如此重要? 在实时计算场景中,Flink 作业 724 小时运行,对性能、资源、故障感知、状态变化的实时监控非常关键。没有有效的运维可观测体系: 不知道任务是否在稳定运行 发生问题难以快速定位 无法感知背压、延迟、反压等状态 因此,构建完善…

【prometheus+Grafana篇】基于Prometheus+Grafana实现Oracle数据库的监控与可视化

&#x1f4ab;《博主主页》&#xff1a; &#x1f50e; CSDN主页 &#x1f50e; IF Club社区主页 &#x1f525;《擅长领域》&#xff1a;擅长阿里云AnalyticDB for MySQL(分布式数据仓库)、Oracle、MySQL、Linux、prometheus监控&#xff1b;并对SQLserver、NoSQL(MongoDB)有了…

【数据仓库面试题合集③】实时数仓建模思路与实践详解

实时数据仓库已经成为各大企业构建核心指标监控与业务实时洞察的基础能力。面试中,关于实时建模的题目频繁出现,尤其聚焦于建模思路、宽表设计、状态管理、乱序处理等方面。本文整理典型题目及答题思路,帮助你应对相关考察。 一、建模原则与数仓分层认知 1. 实时数仓与离线…

鸿蒙PC操作系统:从Linux到自研微内核的蜕变

鸿蒙PC操作系统是否基于Linux内核,需要结合其技术架构、发展阶段和官方声明综合分析。以下从多个角度展开论述: 一、鸿蒙操作系统的多内核架构设计 多内核混合架构 根据资料,鸿蒙操作系统(HarmonyOS)采用分层多内核架构,内核层包含Linux内核、LiteOS-m内核、LiteOS-a内核…

LabVIEW数据库使用说明

介绍LabVIEW如何在数据库中插入记录以及执行 SQL 查询&#xff0c;适用于对数据库进行数据管理和操作的场景。借助 Database Connectivity Toolkit&#xff0c;可便捷地与指定数据库交互。 各 VI 功能详述 左侧 VI 功能概述&#xff1a;实现向数据库表中插入数据的操作。当输入…

【docker】--docker file编写教程

文章目录 构建docker file 镜像常用命令速查表一、基础指令&#xff08;指定镜像和执行命令&#xff09;二、构建上下文管理三、设置镜像内部环境四、容器运行配置五、多阶段构建&#xff08;可选进阶&#xff09; 构建docker file 镜像 # -f 指定dockerfile # -t 镜像名和tag…

WeakAuras Lua Script <BiaoGe>

WeakAuras Lua Script <BiaoGe> 表格拍卖插件WA字符串 表格字符串代码&#xff1a; !WA:2!S3xA3XXXrcoE2VH9l7ZFy)C969PvDpSrRgaeuhljFlUiiSWbxaqXDx(4RDd0vtulB0fMUQMhwMZJsAO5HenLnf1LPSUT4iBrjRzSepL(pS)e2bDdWp5)cBEvzLhrMvvnAkj7zWJeO7mJ8kYiJmYiImYF0b(XR)JR9JRD…

虚幻引擎5-Unreal Engine笔记之什么时候新建GameMode,什么时候新建关卡?

虚幻引擎5-Unreal Engine笔记之什么时候新建GameMode,什么时候新建关卡&#xff1f; code review! 参考笔记&#xff1a; 1.虚幻引擎5-Unreal Engine笔记之GameMode、关卡&#xff08;Level&#xff09; 和 关卡蓝图&#xff08;Level Blueprint&#xff09;的关系 2.虚幻引擎…

开源模型应用落地-模型上下文协议(MCP)-Resource Template-资源模板的使用逻辑(六)

一、前言 在数字化进程加速的今天,如何高效管理动态资源已成为开发者们的核心课题。Resource Template(资源模板)作为Model Context Protocol(MCP)中的关键机制,正通过参数化设计重新定义资源调用的边界——它不仅是静态数据的容器,更是动态上下文生成的引擎。与传统的R…

uniapp小程序获取手机设备安全距离

utils.js let systemInfo null;export const getSystemInfo () > {if (!systemInfo) {systemInfo uni.getSystemInfoSync();// 补充安全区域默认值systemInfo.safeAreaInsets systemInfo.safeAreaInsets || {top: 0,bottom: 0,left: 0,right: 0};// 确保statusBarHei…

【线下沙龙】NineData x Apache Doris x 阿里云联合举办数据库技术Meetup,5月24日深圳见!

5月24日下午&#xff0c;NineData 将联合 Apache Doris、阿里云一起&#xff0c;在深圳举办数据库技术Meetup。本次技术沙龙聚焦「数据实时分析」与「数据同步迁移」 两大核心领域&#xff0c;针对企业数据战略中的痛点&#xff0c;特邀行业资深技术大咖&#xff0c;结合多年技…

企业网站架构部署与优化 --web技术与nginx网站环境部署

一、Web 基础 本节将介绍Web 基础知识,包括域名的概念、DNS 原理、静态网页和动态网页的 相关知识。 1、域名和DNS 1.1、域名的概念 网络是基于TCP/IP 协议进行通信和连接的&#xff0c;每一台主机都有一个唯一的标识(固定的IP 地址),用以区别在网络上成千上万个用户和计算机。…

java实现poi-ooxml导出Excel的功能

文章目录 1. 添加poi-ooxml依赖2. Excel导出工具类3.核心逻辑说明4.扩展建议5.HSSF、XSSF、SXSSF 的核心原则和场景建议&#xff0c;帮助你在不同需求下快速决策&#xff1a; 以下是一个基于 Apache POI 实现的简单、通用的Java导出Excel工具类&#xff0c;代码逻辑清晰且注释详…