Kafka消费者是消息系统的关键组成部分,掌握/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh工具的使用对于调试、测试和监控都至关重要。本文将全面介绍该工具的各种用法,帮助您高效地从Kafka消费消息。 
 
 
1 基础消费模式
 
1.1 从最新位置消费
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic
 
  
  参数解析: 
  - --bootstrap-server: 指定Kafka集群地址
 - --topic: 指定消费的主题名称
 
    特点: 
  - 从该消费者组最后提交的offset开始消费
 - 如果没有提交记录,则从最新消息开始
 
   
 
1.2 从最早位置消费
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--from-beginning
 
  
  关键参数: 
  - --from-beginning: 从主题最早的消息开始消费
 
    应用场景: 
  
 
2 消息元数据展示
 
2.1 显示消息Key
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property key.separator=":"
 
  
  参数说明: 
  - print.key=true: 显示消息key
 - key.separator: 指定key/value分隔符
 
   
 
2.2 显示完整元数据
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property print.value=true \--property print.partition=true \--property print.offset=true \--property print.timestamp=true \--property key.separator=":" \--property line.separator="\n"
 
  
  元数据参数: 
  - print.partition: 显示分区号
 - print.offset: 显示消息offset
 - print.timestamp: 显示时间戳
 
   
 
3 精准消费控制
 
3.1 指定分区消费
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 1
 
  
  参数说明: 
  
 
3.2 指定Offset消费
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 0 \--offset 1000
 
  
 
  参数说明: 
  - --offset: 指定开始消费的offset位置
 
   
 
3.3 消费超时设置
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--timeout-ms 10000
 
  
  参数说明: 
  - --timeout-ms: 设置无消息时的超时时间(毫秒)
 
   
 
4 消费者组管理
 
4.1 使用消费者组
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup
 
  
 
  参数说明: 
   特点: 
  - 支持offset自动提交
 - 支持消费者组rebalance
 
   
 
4.2 手动控制offset提交
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup \--property enable.auto.commit=false
 
  
  参数说明: 
  - enable.auto.commit=false: 关闭自动提交
 
   
 
5 高级消费配置
 
5.1 限制消费消息数
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--max-messages 100
 
  
  参数说明: 
  
 
5.2 过滤消费
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property filter.key.regex="test.*"
 
  
  参数说明: 
  - filter.key.regex: 按key正则过滤
 
   
 
5.3 消费速率控制
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property fetch.min.bytes=1024 \--property fetch.max.wait.ms=500
 
  
  参数说明: 
  - fetch.min.bytes: 最小拉取字节数
 - fetch.max.wait.ms: 最大等待时间
 
   
 
6 常用命令扩展
 
6.1 消费多个主题
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--whitelist "topic1|topic2"
 
  
6.2 显示消息头信息
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.headers=true
 
  
6.3 消费特定时间后的消息
 
 /export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property offsets.storage=kafka \--property timestamp=1625097600000