04_kafka_java-api

文章目录

  • 使用api 实现 topic 增删改查
    • pom.xml
    • log4j.properties
    • 创建、查询 Topic
    • 生产者、消费者 api
    • 自定义 生产者分区发送策略
    • 自定义序列化器
    • 自定义 生产者拦截器
    • offset 提交控制
    • 确认-acks 与 重试-retries
    • 幂等消息
    • 生产者事务
    • 生产者+消费者事务


  • 04_kafka_java-api

  • 使用api 实现增删改查

  • 发布,订阅 (sub/ assign)

使用api 实现 topic 增删改查

  • 需要额外注意的是 ip-主机名 主机名映射需要在开发机器上配置

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.qww</groupId><artifactId>kafka-demo</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version></dependency><!-- https://mvnrepository.com/artifact/log4j/log4j --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency></dependencies></project>

log4j.properties

log4j.rootLogger = info,consolelog4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern =  %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n

创建、查询 Topic

package cn.qww.topic;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;public class TopicCreate {public static void main(String[] args) throws ExecutionException, InterruptedException {//配置连接参数Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_1:9092,kafka_2:9092,kafka_3:9092");KafkaAdminClient adminClient = (KafkaAdminClient) KafkaAdminClient.create(props);//创建TopicsList<NewTopic> newTopics = Arrays.asList(new NewTopic("topic03", 2, (short) 3));CreateTopicsResult topics = adminClient.createTopics(newTopics);Map<String, KafkaFuture<Void>> values = topics.values();System.out.println(values);//删除Topic// adminClient.deleteTopics(Arrays.asList("topic02"));listTopic(adminClient);describeTopic(adminClient);adminClient.close();}/*** 查询topics*/private static void listTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {KafkaFuture<Set<String>> nameFutures = adminClient.listTopics().names();for (String name : nameFutures.get()) {System.out.println("topic_name: " + name);}}/*** describeTopic*/private static void describeTopic(KafkaAdminClient adminClient) throws ExecutionException, InterruptedException {DescribeTopicsResult describeTopics =  adminClient.describeTopics(Arrays.asList("topic02"));Map<String, TopicDescription> tdm = describeTopics.all().get();for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {System.out.println(entry.getKey() + "\t" + entry.getValue());}}
}

生产者、消费者 api

  • producer
package cn.qww.curpdr;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) throws InterruptedException {//1.创建链接参数Properties props=new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka_1:9092,kafka_2:9092,kafka_3:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//2.创建生产者KafkaProducer<String,String> producer= new KafkaProducer<>(props);for(int i = 0; i< 10; i++){Thread.sleep(100);ProducerRecord<String, String> record = new ProducerRecord<>("topic03", "key_" + i, "value_" + i);producer.send(record);}producer.close();}
}
  • consumer
package cn.qww.curpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class Consumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka_1:9092,kafka_2:9092,kafka_3:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");//2.创建Topic消费者try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile("^topic.*$"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}}
}

自定义 生产者分区发送策略

  • 默认生产者分发策略:消息有key 使用hash,没有key 使用轮询

  • 自定义分区类

package cn.qww;public class Config {public static String BOOTSTRAP_SERVERS_CONFIG = "kafka_1:9092,kafka_2:9092,kafka_3:9092";
}---
package cn.qww.c_custompartition;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;public class CustomPartition implements Partitioner {private AtomicInteger atomicInteger = new AtomicInteger(0);@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions = cluster.partitionsForTopic(topic).size();if (keyBytes == null || keyBytes.length == 0) {int i = atomicInteger.addAndGet(1);int nodeI = (i & Integer.MAX_VALUE) % numPartitions;System.out.println(nodeI);return nodeI;} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}@Overridepublic void close() {System.out.println("close");}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("configure");}
}
  • 生产者
package cn.qww.c_custompartition;import cn.qww.Config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerUseCustomPartition {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class.getName());//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (Integer i = 0; i < 6; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "value_" + i);
//            ProducerRecord<String, String> record = new ProducerRecord<>("topic04",  "key_" + i,"value_" + i);producer.send(record);}producer.close();}
}
  • 消费者
package cn.qww.c_custompartition;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");//2.创建Topic消费者KafkaConsumer<String,String> consumer= new KafkaConsumer<>(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile("^topic.*$"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}

自定义序列化器

  • 序列化器
package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;import java.io.Serializable;
import java.util.Map;public class ObjectSerializer implements Serializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {System.out.println("configure");}@Overridepublic byte[] serialize(String topic, Object data) {// commons-lang3 包return SerializationUtils.serialize((Serializable) data);}@Overridepublic void close() {System.out.println("close");}
}
  • 反序列化器

    package cn.qww.d_serializer;import org.apache.commons.lang3.SerializationUtils;
    import org.apache.kafka.common.serialization.Deserializer;import java.util.Map;public class ObjectDeserializer implements Deserializer<Object> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {System.out.println("configure");}@Overridepublic Object deserialize(String topic, byte[] data) {return SerializationUtils.deserialize(data);}@Overridepublic void close() {System.out.println("close");}
    }
  • 自定义类

    package cn.qww.d_serializer;import java.io.Serializable;public class CustomObj implements Serializable {private Integer id;private String name;public CustomObj(Integer id, String name) {this.id = id;this.name = name;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "CustomObj{" +"id=" + id +", name='" + name + '\'' +'}';}
    }
  • 消费者

    package cn.qww.d_serializer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Properties;
    import java.util.regex.Pattern;public class SerConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");//2.创建Topic消费者KafkaConsumer<String, CustomObj> consumer=new KafkaConsumer<String, CustomObj>(props);consumer.subscribe(Arrays.asList("topic02"));while (true){ConsumerRecords<String, CustomObj> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, CustomObj>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, CustomObj> record = recordIterator.next();String key = record.key();CustomObj value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
    }
  • 生产者

    package cn.qww.d_serializer;import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class SerProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());//2.创建生产者KafkaProducer<String, CustomObj> producer = new KafkaProducer<String, CustomObj>(props);for (Integer i = 0; i < 10; i++) {ProducerRecord<String, CustomObj> record = new ProducerRecord<>("topic02", "key" + i, new CustomObj(i, "name_" + i));producer.send(record);}producer.close();}
    }

自定义 生产者拦截器

  • 拦截器类

    package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CustomProducerInterceptor implements ProducerInterceptor {@Overridepublic ProducerRecord onSend(ProducerRecord record) {ProducerRecord wrapRecord = new ProducerRecord(record.topic(), record.key() + "_suf", record.value() + "_val_suf");wrapRecord.headers().add("header:", "header_val".getBytes());System.out.println("topic:" + wrapRecord.topic() + ", partition: " + wrapRecord.partition());return wrapRecord;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {System.out.println("metadata:" + metadata + ", exception:" + exception);}@Overridepublic void close() {System.out.println("close");}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("configure");}
    }
  • 消费者

    package cn.qww.e_interceptors;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Properties;
    import java.util.regex.Pattern;public class IcptConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset + ",header:" + record.headers());}}}
    }
  • 生产者

    package cn.qww.e_interceptors;import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IcptProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (Integer i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "error" + i);producer.send(record);}producer.close();}
    }

offset 提交控制

  • 第一次连接消息队列, auto.offset.reset 设置
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class AutoOffsetResetConfigConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 第一次访问时: 读到历史的消息
//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");props.put(ConsumerConfig.GROUP_ID_CONFIG, "g2");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//3.订阅topic开头的消息队列consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}
  • 关闭自动提交
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;public class CloseAutoCommitConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "g4");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic01"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();offsets.put(new TopicPartition(record.topic(), partition), new OffsetAndMetadata(offset + 1));consumer.commitAsync(offsets, (offsets1, exception) -> System.out.println("完成:" + offsets1 + "提交!"));System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}
  • 调整自动提交间隔 auto.commit.interval.ms
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ModifyCommitIntervalConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);//3.订阅topic开头的消息队列consumer.subscribe(Pattern.compile("^topic.*$"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}
  • 生产者
package cn.qww.f_offset_autocommit;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Producer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);for (Integer i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);producer.send(record);}producer.close();}
}

确认-acks 与 重试-retries

  • 生产者:配置 acks 与 retries
package cn.qww.g_acks_retries;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.io.IOException;
import java.util.Properties;public class AckRetriesProducer {public static void main(String[] args) throws IOException {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 4);//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key-ack", "value-retries" );producer.send(record);producer.flush();producer.close();
//        System.in.read();}
}
  • 消费者
package cn.qww.g_acks_retries;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic04"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}

幂等消息

  • 生产者设置幂等消息:enable.idempotence=true
package cn.qww.i_idempotence;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class IdempotenceProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);props.put(ProducerConfig.ACKS_CONFIG, "-1");props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key-idempotence", "value-idempotence" );producer.send(record);producer.flush();producer.close();}
}
  • 消费者
package cn.qww.i_idempotence;import cn.qww.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {//1.创建Kafka链接参数Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");//2.创建Topic消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic04"));while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()) {ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:" + key + ",value:" + value + ",partition:" + partition + ",offset:" + offset);}}}
}

生产者事务

  • 生产者:开启事务
package cn.qww.h_transaction;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;public class TxProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id, 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id" + UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时,等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i = 0; i < 10; i++) {if (i == 7) {int k = i / 0;System.out.println(k);}ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key_tx_" + i, "tx_val_" + i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();}
}
  • 消费者: 设置读已提交:read_committed
package cn.qww.h_transaction;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");// 默认值 read_uncommitted
//        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic04"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}

生产者+消费者事务

  • 上游生产者—(msg)—> 【topic04 ===> 中游业务 —(msg_processed)—> topic02】 ===> 下游消费者

  • 上游生产者

package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;public class SrcProducer {public static void main(String[] args) {//1.创建链接参数Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 事务 id, 具有唯一性props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-id" + UUID.randomUUID().toString());// 配置批处理size/*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 20);// batch 中数据不足时,等待时间 10msprops.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///2.创建生产者KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);producer.initTransactions();//初始化事务try {producer.beginTransaction();//开启事务for (Integer i = 0; i < 10; i++) {/*if (i == 7) {int k = i / 0;System.out.println(k);}*/ProducerRecord<String, String> record = new ProducerRecord<>("topic04", "key_tx_" + i, "tx_val_" + i);producer.send(record);producer.flush();}producer.commitTransaction();//提交事务} catch (Exception e) {e.printStackTrace();producer.abortTransaction();//终止事务}producer.close();}
}
  • 消费者(用于测试,上游生产者确实已经发送消息,可以没有)
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class ReadCommittedConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");// 默认值 read_uncommitted
//        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic04"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}
  • 中游业务:从上游读取数据,并将处理后的数据发送到下游topic,整体在一个事务里处理
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;public class TxConsumerProducer {public static void main(String[] args) {Properties propsConsumer=new Properties();propsConsumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);propsConsumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());propsConsumer.put(ConsumerConfig.GROUP_ID_CONFIG,"group02");propsConsumer.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);propsConsumer.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(propsConsumer);consumer.subscribe(Arrays.asList("topic04"));Properties props=new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tx-id");KafkaProducer<String,String> producer= new KafkaProducer<String, String>(props);producer.initTransactions();//初始化事务try{while(true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();//开启事务控制producer.beginTransaction();Map<TopicPartition, OffsetAndMetadata> offsets= new HashMap<>();while (consumerRecordIterator.hasNext()){ConsumerRecord<String, String> record = consumerRecordIterator.next();//创建RecordProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>("topic02",record.key(),record.value());producer.send(producerRecord);//记录元数据offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1));}//提交事务producer.sendOffsetsToTransaction(offsets,"group02");producer.commitTransaction();}}catch (Exception e){producer.abortTransaction();//终止事务}finally {producer.close();}}
}
  • 下游topic 的消费者
package cn.qww.h_transaction.conxpdr;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;public class DownStreamConsumer {public static void main(String[] args) {//1.创建Kafka链接参数Properties props=new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,cn.qww.Config.BOOTSTRAP_SERVERS_CONFIG);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");// 默认值 read_uncommittedprops.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");//2.创建Topic消费者KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("topic02"));while (true){ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();while (recordIterator.hasNext()){ConsumerRecord<String, String> record = recordIterator.next();String key = record.key();String value = record.value();long offset = record.offset();int partition = record.partition();System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/834466.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux与windows网络管理

文章目录 一、TCP/IP1.1、TCP/IP概念TCP/IP是什么TCP/IP的作用TCP/IP的特点TCP/IP的工作原理 1.2、TCP/IP网络发展史1.3、OSI网络模型1.4、TCP/IP网络模型1.5、linux中配置网络网络配置文件位置DNS配置文件主机名配置文件常用网络查看命令 1.6、windows中配置网络CMD中网络常用…

认识卷积神经网络

我们现在开始了解卷积神经网络&#xff0c;卷积神经网络是深度学习在计算机视觉领域的突破性成果&#xff0c;在计算机视觉领域&#xff0c;往往我们输入的图像都很大&#xff0c;使用全连接网络的话&#xff0c;计算的代价较高&#xff0c;图像也很难保留原有的特征&#xff0…

python 和 MATLAB 都能绘制的母亲节花束!!

hey 母亲节快到了&#xff0c;教大家用python和MATLAB两种语言绘制花束~这段代码是我七夕节发的&#xff0c;我对代码进行了简化&#xff0c;同时自己整了个python版本 MATLAB 版本代码 function roseBouquet_M() % author : slandarer% 生成花朵数据 [xr,tr]meshgrid((0:24).…

jQuery-1.语法、选择器、节点操作

jQuery jQueryJavaScriptQuery&#xff0c;是一个JavaScript函数库&#xff0c;为编写JavaScript提供了更高效便捷的接口。 jQuery安装 去官网下载jQuery&#xff0c;1.x版本练习就够用 jQuery引用 <script src"lib/jquery-1.11.2.min.js"></script>…

我的Transformer专栏来啦

五一节前吹的牛&#xff0c;五一期间没完成&#xff0c;今天忙里偷闲&#xff0c;给完成了。 那就是初步拟定了一个《Transformer最后一公里》的写作大纲。 之前一直想写一系列Transformer架构的算法解析文章&#xff0c;但因为一直在忙&#xff08;虽然不知道在忙啥&#xf…

倍思|西圣开放式耳机哪个好用?热门机型深度测评!

在数字化生活的浪潮中&#xff0c;耳机已成为我们不可或缺的伴侣。然而&#xff0c;长时间佩戴传统的耳机容易导致的耳道疼痛等问题&#xff0c;严重的话将影响听力。许多人开始寻找更为舒适的佩戴体验。开放式耳机因为不需要需直接插入耳道的设计&#xff0c;逐渐受到大众的青…

Apipost使用心得,让接口文档变得更清晰,更快捷

Idea和Apipost结合使用 Idea 安装插件Apipost-Helper-2.0 在【file】–>【settings】–>【Plugins】搜索 “Apipost-Helper-2.0”–>【install】&#xff0c;重启Idea 编写controller接口 在idea中编写业务功能及接口之后&#xff0c;在controller中鼠标【右键】单…

Linux下的SPI通信

SPI通信 一. 1.SPI简介: SPI 是一种高速,全双工,同步串行总线。 SPI 有主从俩种模式通常由一个主设备和一个或者多个从设备组从。SPI不支持多主机。 SPI通信至少需要四根线,分别是 MISO(主设备数据输入,从设备输出),MOSI (主设数据输出从设备输入),SCLK(时钟信号),CS/SS…

安卓开发--新建工程,新建虚拟手机,按键事件响应

安卓开发--新建工程&#xff0c;新建虚拟手机&#xff0c;按键事件响应 1.前言2.运行一个工程2.1布局一个Button2.2 button一般点击事件2.2 button属性点击事件2.2 button推荐点击事件 本篇博客介绍安卓开发的入门工程&#xff0c;通过使用按钮Buton来了解一个工程的运作机制。…

【SpringBoot记录】自动配置原理(1):依赖管理

前言 我们都知道SpringBoot能快速创建Spring应用&#xff0c;其核心优势就在于自动配置功能&#xff0c;它通过一系列的约定和内置的配置来减少开发者手动配置的工作。下面通过最简单的案例分析SpringBoot的功能特性&#xff0c;了解自动配置原理。 SpringBoot简单案例 根据S…

第 129 场 LeetCode 双周赛题解

A 构造相同颜色的正方形 枚举&#xff1a;枚举每个 3 3 3\times 3 33的矩阵&#xff0c;判断是否满足条件 class Solution {public:bool canMakeSquare(vector<vector<char>>& grid) {for (int i 0; i < 2; i)for (int j 0; j < 2; j) {int c1 0, c…

hypertherm海宝EDGE控制器显示屏工控机维修

海宝工控机维修V3.0/4.0/5.0&#xff1b;hypertherm数控切割机系统MICRO EDGE系统显示屏维修&#xff1b; 美国hypertherm公司mirco edge数控系统技术标准如下&#xff1a; 1&#xff09; p4处理器 2&#xff09; 512mb内存 3&#xff09; 80g硬盘&#xff0c;1.44m内置软驱…

IOS Xcode证书配置和ipa打包流程(附详细图文教程)

IOS Xcode证书配置和ipa打包流程&#xff08;附图文教程&#xff09; 前言ipa文件简介证书文件简介Provisioning Profile描述文件简介当前环境版本Xcode证书配置和ipa打包流程生成Apple Distribution Certificates证书创建描述文件&#xff08;Provisioning Profiles&#xff0…

Goland开发者软件激活使用教程

Goland开发者工具&#xff1a; Goland是由JetBrains公司推出的专门针对Go语言设计的集成开发环境&#xff08;IDE&#xff09;。这款工具具有智能的代码补全、强大的代码导航和重构功能&#xff0c;同时提供了丰富的调试工具&#xff0c;能够满足Golang开发者的各种需求。 Gol…

pwn(一)前置技能

以下是pwn中的题目&#xff08;漏洞&#xff09;类型&#xff1a; 关于pwn的学习&#xff1a; 一.什么是pwn&#xff1f;&#xff08;二进制的漏洞&#xff09; "Pwn"是一个俚语&#xff0c;起源于电子游戏社区&#xff0c;经常在英语中用作网络或电子游戏文化中的…

数字化转型浪潮下,信创如何破浪前行,六招助你稳中求胜

信创改造落地过程中的六个难点及应对经验 技术生态迅猛发展&#xff0c;尚未成熟且多元化&#xff0c;信创技术栈应用经验匮乏&#xff0c;导致改造落地工作面临重重挑战。在此背景下&#xff0c;如何克服技术难题&#xff0c;推动信创改造顺利实施&#xff0c;成为业界亟待解决…

Linux nohup 命令

Linux nohup 命令 应用场景 使用 PyCharm 连接服务器跑模型虽然很方便&#xff0c;但是如果遇到网络不佳、PyCharm出BUG、急需转移阵地等情况就只能中断训练&#xff0c;前面的全白跑了。 因此可以尝试直接在服务器上使用命令跑模型&#xff0c;这个命令好说&#xff0c;笨一…

为什么需要归档和管理合同

归档和管理合同是非常重要的&#xff0c;主要有以下几个原因&#xff1a; 1. 法律合规性&#xff1a;公司需要遵守法律和监管要求&#xff0c;合同是法律文件&#xff0c;涉及公司的权益和责任。归档和管理合同可以确保公司遵守法律法规&#xff0c;合同的内容和执行过程都符合…

水质监测设备预警系统

随着工业化进程的加快和城市化水平的提高&#xff0c;水质安全问题愈发受到社会各界的广泛关注。为了确保水资源的清洁与安全&#xff0c;水质监测设备预警系统成为了不可或缺的利器。在这个背景下&#xff0c;HiWoo Cloud平台凭借其先进的技术和卓越的性能&#xff0c;为水质监…