broker:消息中间件处理节点,一个broker就是一个kafka节点,一个或者多个broker就组成了一个kafka集群 topic:kafka根据topic对消息进行归类,发布到kafka集群的每个消息,都要指定一个topic producer:消息生产者,向broker发送消息的客户端 consumer:消息消费者,从broker读取消息的客户端 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中 消息的保存是有序的,通过offset偏移量来描述消息的有序性 消费者消费消息时,也是通过offset来描述当前要消费的那条消息的位置 如果多个消费者在同一个消费者组中,那么只有一个消费者可以收到订阅topic中的消息,换言之,同一个消费组中只有一个消费者能收到一个topic中的消息 多播消息:不同的消费组订阅同一个topic,不同的消费组中只有一个消费者能收到消息,实际上也是多个消费组中的多个消费者收到了消息 Kafka集群中的broker在zk中创建节点的时候,会有一个临时节点序号,序号最小的节点,会被当做集群的controller,负责管理集群中的所有分区和副本的状态 当某个分区的leader副本出现故障,由控制器负责为该分区选举新的leader副本 当检测到某个分区的ISR集合发生变化的时候,由控制器负责通知所有的broker更新其元数据信息 当使用kafka-topic.sh脚本为某个topic增加分区数量的时候,同样还是由控制器负责让新分区被其它节点感知到 前提是消费者没有指定分区进行消费,当消费组中的消费者或者分区关系发生变化的时候,就会触发rebalance机制,这个机制会调整消费者消费哪个分区 在触发rebalance机制之前,消费者消费哪个分区有三种策略: range:通过公示来计算某个消费者消费哪个分区 轮询:所有消费者轮着消费 sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整  LEO是某个副本最后消息的消息位置(log-end-offset) HW是已完成同步的位置,消息在写入broker时,且每个broker都完成了这条消息的同步后,hw才会变化,这之前,消费者是消费不到这条消息的,同步完成后,HW调整后,消费者才能消费这条消息,这样做是为了方式消息丢失 消息积压问题的出现:消息的消费者的消费速度远远赶不上生产者生产消息的速度,导致kafka中有大量的数据没有被消费,随着没有被消费的消息越来越多,消费者寻址的性能越来越差,最后导致整个kafka对外提供的服务的性能越来越差,从而造成其它服务的访问速度很慢,造成服务雪崩。 消息积压的解决方案: 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息; 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者,该消费者poll下来的消息,直接转发到新的主题上,使用多个消费者消费新主题的消息–该方法不常用  docker  search kafka
docker  pull bitnami/kafka
docker  run -d  --name  kafka1 --network  mynetwork \ -p  9092 :9092 \ --env  KAFKA_BROKER_ID = 0  \ --env  KAFKA_ZOOKEEPER_CONNECT = 192.168 .228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \ --env  KAFKA_ADVERTISED_LISTENERS = PLAINTEXT://192.168.228.5:9092 \ --env  KAFKA_LISTENERS = PLAINTEXT://0.0.0.0:9092 bitnami/kafka docker  run -d  --name  kafka2 --network  mynetwork \ -p  9093 :9092 \ --env  KAFKA_BROKER_ID = 1  \ --env  KAFKA_ZOOKEEPER_CONNECT = 192.168 .228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \ --env  KAFKA_ADVERTISED_LISTENERS = PLAINTEXT://192.168.228.6:9092 \ --env  KAFKA_LISTENERS = PLAINTEXT://0.0.0.0:9092 bitnami/kafka docker  run -d  --name  kafka3 --network  mynetwork \ -p  9094 :9092 \ --env  KAFKA_BROKER_ID = 2  \ --env  KAFKA_ZOOKEEPER_CONNECT = 192.168 .228.2:2181,92.168.228.3:2181,92.168.228.4:2181 \ --env  KAFKA_ADVERTISED_LISTENERS = PLAINTEXT://192.168.228.7:9092 \ --env  KAFKA_LISTENERS = PLAINTEXT://0.0.0.0:9092 bitnami/kafka 
docker  start kafka1
docker  start kafka2
docker  start kafka3
springboot引用kafka的生产者和消费者 
server : port :  8080 servlet : context-path :  /
spring : application : name :  mvcLearn
web : resources : static-locations : -  classpath: /hwc/kafka : bootstrap-servers :  127.0.0.1: 9092 , 127.0.0.1: 9093 , 127.0.0.1: 9094 producer : acks :  1 retries :  3 batch-size :  16384 buffer-memory :   33554432 key-serializer :  org.apache.kafka.common.serialization.StringSerializervalue-serializer :  org.apache.kafka.common.serialization.StringSerializerconsumer : group-id :  default- groupenable-auto-commit :  false auto-offset-reset :  earliestkey-deserializer :  org.apache.kafka.common.serialization.StringDeserializervalue-deserializer :   org.apache.kafka.common.serialization.StringDeserializerlistener : ack-mode :  manual_immediate
package  com. huwc. mvclearn. controller ; import  org. springframework. beans. factory. annotation.  Autowired ; 
import  org. springframework. kafka. core.  KafkaTemplate ; 
import  org. springframework. web. bind. annotation.  GetMapping ; 
import  org. springframework. web. bind. annotation.  PathVariable ; 
import  org. springframework. web. bind. annotation.  RestController ; @RestController 
public  class  MyKafkaControlller  { private  final  static  String  TOPIC_NAME  =  "my_two_partition_topic" ; @Autowired private  KafkaTemplate < String ,  String > ; @GetMapping ( "/send/{msg}" ) public  String  sendMessage ( @PathVariable ( "msg" )  String  msg) { template. send ( TOPIC_NAME ,  0 ,  "key" ,  msg) ; return  "send success"  ; } 
} 
package  com. huwc. mvclearn. consumer ; import  org. apache. kafka. clients. consumer.  ConsumerRecord ; 
import  org. apache. kafka. clients. consumer.  ConsumerRecords ; 
import  org. springframework. kafka. annotation.  KafkaListener ; 
import  org. springframework. kafka. support.  Acknowledgment ; 
import  org. springframework. stereotype.  Component ; @Component 
public  class  MyKafkaConsumer  { @KafkaListener ( topics =  "my_two_partition_topic" ,  groupId =  "MyGroup1" ) public  void  listenGroup ( ConsumerRecord < String ,  String > ,  Acknowledgment  ack) { String  key =  record. key ( ) ; String  value =  record. value ( ) ; System . out. println ( "key = "  +  key) ; System . out. println ( "value = "  +  value) ; System . out. println ( "record = "  +  record) ; ack. acknowledge ( ) ; } 
}