【README】 java作为生产者,centos 作为消费者;
【1】生产者代码
-- pom.xml
<!-- 依赖 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency></dependencies>
生产者
-- 生产者
public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重试次数*/props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.创建生产者对象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.发送数据 */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first01", "first01-20201229--A" + i));System.out.printf("写入数据%s \n", "first01-20201229--A" + i);try {System.out.println(future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */ producer.close();System.out.println("kafka生产者写入数据完成"); }
}
【2】centos 消费者
[root@centos202 kafka-0.11]# kafka-console-consumer.sh --topic first01 --bootstrap-server centos201:9092 --from-beginning
first01-20201229--2
first01-20201229--6
first01-20201229--A0
first01-20201229--A1
first01-20201229--A2
first01-20201229--A3
first01-20201229--A4
first01-20201229--A5
first01-20201229--A6
first01-20201229--A7
first01-20201229--A8
first01-20201229--A9
【3】生产者发送消息超时问题
3.1、问题现场
kafka Expiring 1 record(s) for first01-3: 31539 ms has passed since batch creation plus linger time
3.2、解决方法
修改本地机器的hosts, 如下:
192.168.163.201 centos201
192.168.163.202 centos202
192.168.163.203 centos203