文章目录
- 使用api 实现 topic 增删改查
- pom.xml
- log4j.properties
- 创建、查询 Topic
- 生产者、消费者 api
- 自定义 生产者分区发送策略
- 自定义序列化器
- 自定义 生产者拦截器
- offset 提交控制
- 确认-acks 与 重试-retries
- 幂等消息
- 生产者事务
- 生产者+消费者事务
-
04_kafka_java-api
-
使用api 实现增删改查
-
发布,订阅 (sub/ assign)
使用api 实现 topic 增删改查
- 需要额外注意的是 ip-主机名 主机名映射需要在开发机器上配置;
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.qww</groupId><artifactId>kafka-demo</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/log4j/log4j --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency></dependencies></project>
log4j.properties
log4j.rootLogger = info,consolelog4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern = %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n
创建、查询 Topic
package cn.qww.topic;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;public class TopicCreate {public static void main(String[] args) throws ExecutionException, InterruptedException {//配置连接参数Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_1:9092,kafka_2:9092,kafka_3:9092");KafkaAdminClient adminClient = (KafkaAdminClient) KafkaAdminClient.create(props);//创建TopicsList<NewTopic> newTopics = Arrays.asList(new NewTopic("topic03", 2, (short) 3));CreateTopicsResult topics = adminClient.createTopics(newTopics);Map<String, KafkaFuture<Void>> values = topics.values();System.out.println(values);//删除Topic// adminClient.deleteTopics(Arrays.asList("topic02"));listTopic(adminClient);describeTopic(adminClient);adminClient.close();}/*** 查询topics*/private static void listTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {KafkaFuture<Set<String>> nameFutures = adminClient.listTopics().names();for (String name : nameFutures.get()) {System.out.println("topic_name: " + name);}}/*** describeTopic*/private static void describeTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("topic02"));Map<String, TopicDescription> tdm = describeTopics.all().get();for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {System.out.println(entry.getKey() + "\t" + entry.getValue());}}
}
生产者、消费者 api
- producer
package cn.qww.curpdr;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) throws InterruptedException {//1.创建链接参数Properties props=new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka_1:9092,kafka_2:9092,kafka_3:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//2.创建生产者KafkaProducer<String,String> producer= new KafkaProducer<>(props);for(int i = 0; i< 10; i++){Thread.sleep(100);ProducerRecord<String, String> record = new ProducerRecord<>("topic03", "key_" + i, "value_" + i);producer.send(record);}producer.close();}
}
- consumer
package cn.qww.curpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class Consumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka_1:9092,kafka_2:9092,kafka_3:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");//2.创建Topic消费者try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile("^topic.*$"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}}
}
自定义 生产者分区发送策略
-
默认生产者分发策略:消息有key 使用hash,没有key 使用轮询
-
自定义分区类
package cn.qww;public class Config {public static String BOOTSTRAP_SERVERS_CONFIG = "kafka_1:9092,kafka_2:9092,kafka_3:9092";
}---
package cn.qww.c_custompartition;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;public class CustomPartition implements Partitioner {private AtomicInteger atomicInteger = new AtomicInteger(0);@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions = cluster.partitionsForTopic(topic).size();if (keyBytes == null || keyBytes.length == 0) {int i = atomicInteger.addAndGet(1);int nodeI = (i & Integer.MAX_VALUE) % numPartitions;System.out.println(nodeI);return nodeI;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {System.out.println("close");}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("configure");}
}
- 生产者
package cn.qww.c_custompartition;import cn.qww.Config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerUseCustomPartition {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (Integer i = 0; i < 6; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "value_" + i);
// ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key_" + i,"value_" + i);producer.send(record);}producer.close();}
}
- 消费者
package cn.qww.c_custompartition;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");//2.创建Topic消费者KafkaConsumer<String,String> consumer= new KafkaConsumer<>(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile("^topic.*$"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}
自定义序列化器
- 序列化器
package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;import java.io.Serializable;
import java.util.Map;public class ObjectSerializer implements Serializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {System.out.println("configure");}@Overridepublic byte[] serialize(String topic, Object data) {// commons-lang3 包return SerializationUtils.serialize((Serializable) data);}@Overridepublic void close() {System.out.println("close");}
}
-
反序列化器
package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.serialization.Deserializer;import java.util.Map;public class ObjectDeserializer implements Deserializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {System.out.println("configure");}@Overridepublic Object deserialize(String topic, byte[] data) {return SerializationUtils.deserialize(data);}@Overridepublic void close() {System.out.println("close");} }
-
自定义类
package cn.qww.d_serializer;import java.io.Serializable;public class CustomObj implements Serializable {private Integer id;private String name;public CustomObj(Integer id, String name) {this.id = id;this.name = name;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "CustomObj{" +"id=" + id +", name='" + name + '\'' +'}';} }
-
消费者
package cn.qww.d_serializer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class SerConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");//2.创建Topic消费者KafkaConsumer<String, CustomObj> consumer=new KafkaConsumer<String, CustomObj>(props);consumer.subscribe(Arrays.asList("topic02"));while (true){ConsumerRecords<String, CustomObj> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, CustomObj>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, CustomObj> record = recordIterator.next();String key = record.key();CustomObj value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}} }
-
生产者
package cn.qww.d_serializer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class SerProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());//2.创建生产者KafkaProducer<String, CustomObj> producer = new KafkaProducer<String, CustomObj>(props);for (Integer i = 0; i < 10; i++) {ProducerRecord<String, CustomObj> record = new ProducerRecord<>("topic02", "key" + i, new CustomObj(i, "name_" + i));producer.send(record);}producer.close();} }
自定义 生产者拦截器
-
拦截器类
package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomProducerInterceptor implements ProducerInterceptor {@Overridepublic ProducerRecord onSend(ProducerRecord record) {ProducerRecord wrapRecord = new ProducerRecord(record.topic(), record.key() + "_suf", record.value() + "_val_suf");wrapRecord.headers().add("header:", "header_val".getBytes());System.out.println("topic:" + wrapRecord.topic() + ", partition: " + wrapRecord.partition());return wrapRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("metadata:" + metadata + ", exception:" + exception);}@Overridepublic void close() {System.out.println("close");}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("configure");} }
-
消费者
package cn.qww.e_interceptors;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.regex.Pattern;public class IcptConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset + ",header:" + record.headers());}}} }
-
生产者
package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IcptProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (Integer i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "error" + i);producer.send(record);}producer.close();} }
offset 提交控制
- 第一次连接消息队列, auto.offset.reset 设置
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class AutoOffsetResetConfigConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 第一次访问时: 读到历史的消息
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");props.put(ConsumerConfig.GROUP_ID_CONFIG, "g2");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//3.订阅topic开头的消息队列consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}
- 关闭自动提交
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;public class CloseAutoCommitConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "g4");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();offsets.put(new TopicPartition(record.topic(), partition), new OffsetAndMetadata(offset + 1));consumer.commitAsync(offsets, (offsets1, exception) -> System.out.println("完成:" + offsets1 + "提交!"));System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}
- 调整自动提交间隔 auto.commit.interval.ms
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ModifyCommitIntervalConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile("^topic.*$"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}
- 生产者
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (Integer i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);producer.send(record);}producer.close();}
}
确认-acks 与 重试-retries
- 生产者:配置 acks 与 retries
package cn.qww.g_acks_retries;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.io.IOException;
import java.util.Properties;public class AckRetriesProducer {public static void main(String[] args) throws IOException {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 4);//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key-ack", "value-retries" );producer.send(record);producer.flush();producer.close();
// System.in.read();}
}
- 消费者
package cn.qww.g_acks_retries;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic04"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}
幂等消息
- 生产者设置幂等消息:enable.idempotence=true
package cn.qww.i_idempotence;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IdempotenceProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, "-1");props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key-idempotence", "value-idempotence" );producer.send(record);producer.flush();producer.close();}
}
- 消费者
package cn.qww.i_idempotence;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic04"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}
生产者事务
- 生产者:开启事务
package cn.qww.h_transaction;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;public class TxProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id, 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id" + UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时,等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i = 0; i < 10; i++) {if (i == 7) {int k = i / 0;System.out.println(k);}ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key_tx_" + i, "tx_val_" + i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();}
}
- 消费者: 设置读已提交:read_committed
package cn.qww.h_transaction;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");// 默认值 read_uncommitted
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic04"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}
生产者+消费者事务
-
上游生产者—(msg)—> 【topic04 ===> 中游业务 —(msg_processed)—> topic02】 ===> 下游消费者
-
上游生产者
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;public class SrcProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id, 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id" + UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时,等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i = 0; i < 10; i++) {/*if (i == 7) {int k = i / 0;System.out.println(k);}*/ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key_tx_" + i, "tx_val_" + i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();}
}
- 消费者(用于测试,上游生产者确实已经发送消息,可以没有)
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");// 默认值 read_uncommitted
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic04"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}
- 中游业务:从上游读取数据,并将处理后的数据发送到下游topic,整体在一个事务里处理
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;public class TxConsumerProducer {public static void main(String[] args) {Properties propsConsumer=new Properties();propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG,"group02");propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);propsConsumer.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(propsConsumer);consumer.subscribe(Arrays.asList("topic04"));Properties props=new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tx-id");KafkaProducer<String,String> producer= new KafkaProducer<String, String>(props);producer.initTransactions();//初始化事务try{while(true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();//开启事务控制producer.beginTransaction();Map<TopicPartition, OffsetAndMetadata> offsets= new HashMap<>();while (consumerRecordIterator.hasNext()){ConsumerRecord<String, String> record = consumerRecordIterator.next();//创建RecordProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>("topic02",record.key(),record.value());producer.send(producerRecord);//记录元数据offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1));}//提交事务producer.sendOffsetsToTransaction(offsets,"group02");producer.commitTransaction();}}catch (Exception e){producer.abortTransaction();//终止事务}finally {producer.close();}}
}
- 下游topic 的消费者
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class DownStreamConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");// 默认值 read_uncommittedprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic02"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}