【Spring连载】使用Spring访问 Apache Kafka(二十)----测试应用

【Spring连载】使用Spring访问 Apache Kafka(二十)----测试应用Testing Applications

  • 一、KafkaTestUtils
  • 二、JUnit
  • 三、配置主题Configuring Topics
  • 四、为多个测试类使用相同的broker,Using the Same Broker(s) for Multiple Test Classes
  • 五、@EmbeddedKafka 注解
  • 六、配合JUnit5使用@EmbeddedKafka注解
  • 七、@SpringBootTest注解中的嵌入式Broker
    • 7.1 JUnit4 Class Rule
    • 7.2 @EmbeddedKafka 注解或 EmbeddedKafkaBroker Bean
  • 八、Hamcrest Matchers
  • 九、AssertJ Conditions
  • 十、示例
  • 十一、模拟消费者和生产者

spring-kafka-test jar包含一些有用的工具来帮助测试你的应用程序。

一、KafkaTestUtils

o.s.kafka.test.utils.KafkaTestUtils提供了许多静态helper方法来消费记录、检索各种记录偏移量等。请参阅其Javadocs以获得完整的详细信息。

二、JUnit

o.s.kafka.test.utils.KafkaTestUtils还提供了一些静态方法来设置生产者和消费者属性。下面的清单显示了这些方法签名:

/*** Set up test properties for an {@code <Integer, String>} consumer.* @param group the group id.* @param autoCommit the auto commit.* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.* @return the properties.*/
public static Map<String, Object> consumerProps(String group, String autoCommit,EmbeddedKafkaBroker embeddedKafka) { ... }/*** Set up test properties for an {@code <Integer, String>} producer.* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.* @return the properties.*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }

从2.5版本开始,consumerProps方法设置ConsumerConfig.AUTO_OFFSET_RESET_CONFIG到earliest。这是因为,在大多数情况下,你希望consumer 消费在测试用例中发送的任何消息。ConsumerConfig默认值是latest,这意味着在consumer启动之前,测试已经发送的消息将不会被收到。若要恢复到以前的行为,请在调用方法后将属性设置为latest。
当使用嵌入式(embedded) broker时,通常最佳做法是为每个测试使用不同的主题,以防止串扰(cross-talk)。如果由于某种原因无法做到这一点,请注意consumeFromEmbeddedTopics方法的默认行为是在分配后seek分配的分区到开头。由于它无法访问consumer属性,因此必须使用重载方法,该方法使用seekToEnd布尔参数来seek到末尾而不是开头。
框架提供了一个EmbeddedKafkaBroker的JUnit4@Rule wrapper,用于创建一个嵌入式Kafka和一个嵌入式Zookeeper服务器。(有关在JUnit 5中使用@EmbeddedKafka的信息,请参阅 @EmbeddedKafka注解)。以下列表显示了这些方法的签名:

/*** Create embedded Kafka brokers.* @param count the number of brokers.* @param controlledShutdown passed into TestUtils.createBrokerConfig.* @param topics the topics to create (2 partitions per).*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }/**** Create embedded Kafka brokers.* @param count the number of brokers.* @param controlledShutdown passed into TestUtils.createBrokerConfig.* @param partitions partitions per topic.* @param topics the topics to create.*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }

EmbeddedKafkaBroker类有一个utility方法,可以消费它创建的所有主题。以下示例显示了如何使用它:

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

KafkaTestUtils有一些utility方法可以从消费者那里获取结果。下面的清单显示了这些方法签名:

/*** Poll the consumer, expecting a single record for the specified topic.* @param consumer the consumer.* @param topic the topic.* @return the record.* @throws org.junit.ComparisonFailure if exactly one record is not received.*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }/*** Poll the consumer for records.* @param consumer the consumer.* @return the records.*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }

下面的例子展示了如何使用KafkaTestUtils:

...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...

当嵌入式Kafka和嵌入式Zookeeper服务器由EmbeddedKafkaBroker启动时,名为spring.embedded.kafka.brokers的系统属性被设置为Kafka broker的地址,名为spring.embedded.zookeeper.connect的系统属性则被设置为Zookeeper的地址。EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS 和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT为两个属性提供了方便的常量。
Kafka brokers的地址可以暴露给任意的属性,而不是默认的spring.embedded.kafka.brokers系统属性。为此,可以在启动嵌入式kafka之前设置spring.embedded.kafka.brokers.property (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY)系统属性。例如,对于Spring Boot,需要分别为自动配置的kafka客户端设置spring.kafka.bootstrap-servers配置属性。因此,在随机端口上使用嵌入式Kafka运行测试之前,我们可以将spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers设置为系统属性,EmbeddedKafkaBroker将使用它来暴露其broker地址。这现在是此属性的默认值(从3.0.10版本开始)。
使用EmbeddedKafkaBroker.brokerProperties(Map<String, String>),你可以为Kafka服务器提供额外的属性。有关broker属性的更多信息,请参阅Kafka Config。

三、配置主题Configuring Topics

下面的配置示例创建了名为cat和hat的主题,每个主题包含5个分区,名为thing1的主题包含10个分区,名为thing2的主题包含15个分区:

public class MyTests {@ClassRuleprivate static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");@Testpublic void test() {embeddedKafkaRule.getEmbeddedKafka().addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));...}}

默认情况下,addTopics会在出现问题时抛出异常(例如添加已存在的主题)。版本2.6添加了该方法的新版本,该方法返回Map<String, Exception>;键是主题名称,值为null表示成功,值为Exception表示失败。

四、为多个测试类使用相同的broker,Using the Same Broker(s) for Multiple Test Classes

你可以为多个测试类使用相同的broker,类似于如下代码:

public final class EmbeddedKafkaHolder {private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false).brokerListProperty("spring.kafka.bootstrap-servers");private static boolean started;public static EmbeddedKafkaBroker getEmbeddedKafka() {if (!started) {try {embeddedKafka.afterPropertiesSet();}catch (Exception e) {throw new KafkaException("Embedded broker failed to start", e);}started = true;}return embeddedKafka;}private EmbeddedKafkaHolder() {super();}}

这里假定是Spring Boot环境,嵌入式broker取代了bootstrap servers属性。然后,在每个测试类中,你可以使用类似于以下内容:

static {EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();

如果你没有使用Spring Boot,则可以使用broker.getBrokersAsString()获取bootstrap servers。
前面的示例没有提供在所有测试完成时关闭broker的机制。如果您在Gradle守护进程中运行测试,这可能会成为一个问题。在这种情况下,你不应该使用此技术,或者你应该在测试完成后在EmbeddedKafkaBroker上调用destroy()。
从3.0版本开始,框架为JUnit平台暴露了一个GlobalEmbeddedKafkaTestExecutionListener;它在默认情况下被禁用。这需要JUnit平台1.8或更高版本。这个监听器的目的是为整个测试计划启动一个全局EmbeddedKafkaBroker,并在计划结束时停止它。为了启用这个监听器,并因此为项目中的所有测试提供一个全局嵌入式Kafka集群,spring.kafka.global.embedded.enabled属性必须通过系统属性或JUnit平台配置设置为true。此外,还可以提供以下特性:

  • spring.kafka.embedded.count——要管理的kafka broker的数量;
  • spring.kafka.embedded.ports——每个kafka broker启动的端口(逗号分隔值),如果首选随机端口,则为0;值的数量必须等于上述count;
  • spring.kafka.embedded.topics——要在启动的kafka集群中创建的主题(逗号分隔值);
  • spring.kafka.embedded.partitions——为创建的主题提供的分区数;
  • spring.kafka.embedded.broker.properties.location——其他kafka broker配置属性的文件位置;此属性的值必须遵循Spring资源抽象模式。
    从本质上讲,这些属性模仿了一些@EmbeddedKafka属性。
    请参阅《JUnit 5用户指南》中有关配置属性以及如何提供这些属性的更多信息。例如,spring.embedded.kafka.brokers.property=my.bootstrap-servers条目可以添加到测试类路径中的junit-platform.properties文件中。从3.0.10版本开始,默认情况下,broker会自动将其设置为spring.kafka.bootstrap-servers,用于使用Spring Boot应用程序进行测试。
    建议不要在单个测试套件中组合全局embedded Kafka和每个测试类。两者共享相同的系统属性,因此很可能会导致意外行为。
    spring-kafka-test对junit-jupiter-api和junit-platform-launcher(后者支持全局嵌入式代理)具有可传递的依赖关系。如果你希望使用嵌入式broker,而不使用JUnit,则需要排除这些依赖项。

五、@EmbeddedKafka 注解

我们通常建议你将该rule用作@ClassRule,以避免在测试之间启动和停止broker(并为每个测试使用不同的主题)。从2.0版本开始,如果使用Spring的测试应用程序上下文缓存,还可以声明一个EmbeddedKafkaBroker bean,因此可以在多个测试类中使用单个broker 。为了方便起见,框架提供了一个名为@EmbeddedKafka的测试类级注解来注册EmbeddedKafkaBroker bean。以下示例显示了如何使用它:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,topics = {KafkaStreamsTests.STREAMING_TOPIC1,KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {@Autowiredprivate EmbeddedKafkaBroker embeddedKafka;@Testpublic void someTest() {Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);Consumer<Integer, String> consumer = cf.createConsumer();this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);assertThat(replies.count()).isGreaterThanOrEqualTo(1);}@Configuration@EnableKafkaStreamspublic static class KafkaStreamsConfiguration {@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")private String brokerAddresses;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration kStreamsConfigs() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);return new KafkaStreamsConfiguration(props);}}}

从2.2.4版本开始,你还可以使用@EmbeddedKafka注解来指定Kafka ports属性。
以下示例设置@EmbeddedKafka支持属性占位符解析的topics、brokerProperties和brokerPropertiesLocation属性:

@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },brokerProperties = { "log.dir=${kafka.broker.logs-dir}","listeners=PLAINTEXT://localhost:${kafka.broker.port}","auto.create.topics.enable=${kafka.broker.topics-enable:true}" },brokerPropertiesLocation = "classpath:/broker.properties")

在前面的示例中,属性占位符${kafka.topics.another-topic}, ${kafka.broker.logs-dir}, 和 ${kafka.broker.port}是从Spring环境中解析的。此外,broker属性是从brokerPropertiesLocation指定的broker.properties类路径资源加载的。框架将解析brokerPropertiesLocation URL的属性占位符以及在资源中找到的任何属性占位符。brokerProperties定义的属性会覆盖在brokerPropertiesLocation中找到的属性。
你可以将@EmbeddedKafka注解与JUnit 4或JUnit 5一起使用。

六、配合JUnit5使用@EmbeddedKafka注解

从2.3版本开始,有两种方法可以将@EmbeddedKafka注解与JUnit5一起使用。当与@SpringJunitConfig注解一起使用时,嵌入式broker会添加到测试应用程序上下文中。你可以在类或方法级别将broker自动装配到测试中,以获取broker地址列表。
当不使用spring测试上下文时,EmbdeddedKafkaCondition会创建一个broker;该条件包括一个参数解析器,因此你可以在测试方法中访问代理…

@EmbeddedKafka
public class EmbeddedKafkaConditionTests {@Testpublic void test(EmbeddedKafkaBroker broker) {String brokerList = broker.getBrokersAsString();...}}

如果用@EmbeddedBroker注解的类没有用ExtendedWith(SpringExtension.class)进行注解(或元注解),则将创建一个独立的(而不是Spring测试上下文)broker。@SpringJunitConfig和@SpringBootTest是这样的元注解,并且当他们中任意一个注解存在时,将使用基于上下文的broker。
当有可用的Spring测试应用程序上下文时,主题和broker属性可以包含属性占位符,只要在某个地方定义了属性,就会解析这些占位符。如果没有可用的Spring上下文,则不会解析这些占位符。

七、@SpringBootTest注解中的嵌入式Broker

Spring Initializr现在以test scope自动将spring-kafka-test依赖项添加到项目配置中。
如果你的应用程序在spring-cloud-stream中使用Kafka binder,并且如果你想使用嵌入式broker进行测试,则必须删除spring-cloud-stream-test-support依赖项,因为它用测试用例的测试binder替换了真正的binder。如果希望某些测试使用测试binder,而某些测试使用嵌入式broker,则使用真实binder的测试需要通过排除测试类中的binder自动配置来禁用测试binder。以下示例展示了如何执行此操作:

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.autoconfigure.exclude="+ "org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration")
public class MyApplicationTests {...
}

有几种方法可以在Spring Boot应用程序测试中使用嵌入式broker。
它们包括:

  • JUnit4 Class Rule
  • @EmbeddedKafka 注解或 EmbeddedKafkaBroker Bean

7.1 JUnit4 Class Rule

下面的例子展示了如何使用JUnit4 class rule来创建嵌入式代理:

@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {@ClassRulepublic static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,false, "someTopic").brokerListProperty("spring.kafka.bootstrap-servers");}@Autowiredprivate KafkaTemplate<String, String> template;@Testpublic void test() {...}}

注意,由于这是一个Spring Boot应用程序,因此我们重写broker list属性来设置Boot的属性。

7.2 @EmbeddedKafka 注解或 EmbeddedKafkaBroker Bean

下面的例子展示了如何使用@EmbeddedKafka 注解来创建一个嵌入式broker:

@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {@Autowiredprivate KafkaTemplate<String, String> template;@Testpublic void test() {...}}

从3.0.10版本开始,默认情况下,bootstrapServersProperty被自动设置为spring.kafka.bootstrap-servers。

八、Hamcrest Matchers

o.s.kafka.test.hamcrest.KafkaMatchers提供了以下匹配器:

/*** @param key the key* @param <K> the type.* @return a Matcher that matches the key in a consumer record.*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }/*** @param value the value.* @param <V> the type.* @return a Matcher that matches the value in a consumer record.*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }/*** @param partition the partition.* @return a Matcher that matches the partition in a consumer record.*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }/*** Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.** @param ts timestamp of the consumer record.* @return a Matcher that matches the timestamp in a consumer record.*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {return hasTimestamp(TimestampType.CREATE_TIME, ts);
}/*** Matcher testing the timestamp of a {@link ConsumerRecord}* @param type timestamp type of the record* @param ts timestamp of the consumer record.* @return a Matcher that matches the timestamp in a consumer record.*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {return new ConsumerRecordTimestampMatcher(type, ts);
}

九、AssertJ Conditions

你可以使用以下AssertJ条件:

/*** @param key the key* @param <K> the type.* @return a Condition that matches the key in a consumer record.*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }/*** @param value the value.* @param <V> the type.* @return a Condition that matches the value in a consumer record.*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }/*** @param key the key.* @param value the value.* @param <K> the key type.* @param <V> the value type.* @return a Condition that matches the key in a consumer record.* @since 2.2.12*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }/*** @param partition the partition.* @return a Condition that matches the partition in a consumer record.*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }/*** @param value the timestamp.* @return a Condition that matches the timestamp value in a consumer record.*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}/*** @param type the type of timestamp* @param value the timestamp.* @return a Condition that matches the timestamp value in a consumer record.*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {return new ConsumerRecordTimestampCondition(type, value);
}

十、示例

下面的例子汇集了本文涵盖的大部分主题:

public class KafkaTemplateTests {private static final String TEMPLATE_TOPIC = "templateTopic";@ClassRulepublic static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);@Testpublic void testTemplate() throws Exception {Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",embeddedKafka.getEmbeddedKafka());DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProperties);final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();container.setupMessageListener(new MessageListener<Integer, String>() {@Overridepublic void onMessage(ConsumerRecord<Integer, String> record) {System.out.println(record);records.add(record);}});container.setBeanName("templateTests");container.start();ContainerTestUtils.waitForAssignment(container,embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());Map<String, Object> producerProps =KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());ProducerFactory<Integer, String> pf =new DefaultKafkaProducerFactory<Integer, String>(producerProps);KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);template.setDefaultTopic(TEMPLATE_TOPIC);template.sendDefault("foo");assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));template.sendDefault(0, 2, "bar");ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);assertThat(received, hasKey(2));assertThat(received, hasPartition(0));assertThat(received, hasValue("bar"));template.send(TEMPLATE_TOPIC, 0, 2, "baz");received = records.poll(10, TimeUnit.SECONDS);assertThat(received, hasKey(2));assertThat(received, hasPartition(0));assertThat(received, hasValue("baz"));}}

前面的示例使用Hamcrest匹配器。如果使用AssertJ,最后一部分看起来像下面的代码:

assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));

十一、模拟消费者和生产者

kafka-clients库提供了用于测试目的的MockConsumer和MockProducer 类。
如果您希望在监听器容器或KafkaTemplate的一些测试中分别使用这些类,框架现在提供了MockConsumerFactory和MockProducerFactory实现。
这些工厂可以在监听器容器和template中使用,而不是默认工厂,因为默认工厂需要运行的(或嵌入的)broker。
下面是一个返回单个consumer的简单实现示例:

@Bean
ConsumerFactory<String, String> consumerFactory() {MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);TopicPartition topicPartition0 = new TopicPartition("topic", 0);List<TopicPartition> topicPartitions = Arrays.asList(topicPartition0);Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 0L));consumer.updateBeginningOffsets(beginningOffsets);consumer.schedulePollTask(() -> {consumer.addRecord(new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",new RecordHeaders(), Optional.empty()));consumer.addRecord(new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",new RecordHeaders(), Optional.empty()));});return new MockConsumerFactory(() -> consumer);
}

如果希望使用并发性进行测试,则工厂构造函数中的Supplier lambda每次都需要创建一个新实例。对于MockProducerFactory,有两个构造函数; 一个用于创建简单工厂,另一个用于创建支持事务的工厂。下面是一些例子:

@Bean
ProducerFactory<String, String> nonTransFactory() {return new MockProducerFactory<>(() ->new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}@Bean
ProducerFactory<String, String> transFactory() {MockProducer<String, String> mockProducer =new MockProducer<>(true, new StringSerializer(), new StringSerializer());mockProducer.initTransactions();return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}

请注意,在第二种情况下,lambda是一个BiFunction<Boolean, String>,如果调用者想要一个事务生产者,则第一个参数为true;可选的第二个参数包含事务id。这可以是默认值(如构造函数中提供的),也可以由KafkaTransactionManager(或用于本地事务的KafkaTemplate)覆盖(如果配置了此参数)。如果你希望基于事务id使用不同的MockProducer,则会提供事务id。
如果你在多线程环境中使用生产者,BiFunction应该返回多个生产者(可能使用ThreadLocal绑定线程)。
必须通过调用initTransaction()为事务初始化事务性MockProducer。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/656306.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序遮罩层滚动穿透的问题

常见的布局 外层一个遮罩层 里面一层是弹窗以及内容 这里还是个textarea 滚动到底的时候 底部的遮罩层也跟着滚动了 发生滚动穿透 处理方法是添加 <page-meta page-style"{{ showPolish ? overflow: hidden; : }}" /> page-meta必须在第一个节点

【bug】在子组件中watch监听父组件传过来的值,监听不到,已解决

bug复现&#xff1a; 父组件需要传参数autoinputFs&#xff0c;autoinputFs为true的时候&#xff0c;子组件中的输入框&#xff08;只有一个&#xff09;自动聚焦&#xff0c;很多组件复用mp3Search&#xff0c;有些页面不需要&#xff0c;所以定义了autoinputFs控制输入框是否…

开发一个Android App,在项目中完成添加联系人的功能,通过ContentResolver向系统中添加联系人信息。

实现步骤&#xff1a; &#xff08;1&#xff09;添加动态联系人的权限。 &#xff08;2&#xff09;创建Activity和布局文件&#xff0c;添加输入框和按钮等控件。 &#xff08;3&#xff09;完成添加联系人的功能。 代码文件如下&#xff1a; activity_main.xml文件 <!…

Java - 遍历实体类字段值

当从数据库查询出一列数据&#xff0c;需要比对各列字段的最大值或者取不为空的值时&#xff0c;需要遍历字段操作。 示例数据 项目1月成交数量2月成交数量3月成交数量4月成交数量电脑15141019手机223519导管311001418 对应实体类 Data public class ProVo {// 项目private…

sv program module

为了避免races&#xff0c;在验证中引入program&#xff1b; Similarities between program and module block A program block can instantiate another program block in the way how the module is instantiated another module block.Both can have no or more inputs, …

2024.1.29 寒假训练记录(12)

昨天大半夜开了一道2000的组合数学&#xff0c;吗的忘记看场次&#xff0c;根本搜不到啥题解&#xff0c;只能对着别人代码一点点琢磨&#xff0c;终于看明白了&#xff0c;搞了套组合数取模的板子&#xff0c;还行 训练赛剩下的题目明天补 文章目录 CF 1912B Blueprint for S…

uni-app vite+ts+vue3模式 集成微信云开发

1.创建uni-app项目 此处使用的是通过vue-cli命令行方式uni-app官网 使用vue3/vite版 创建以 typescript 开发的工程&#xff08;如命令行创建失败&#xff0c;请直接访问 gitee 下载模板&#xff09; npx degit dcloudio/uni-preset-vue#vite-ts my-vue3-project(我创建失败…

【竞技宝jjb.lol】LOL:经典大龙毁一生 WE鏖战三局力克FPX

北京时间2024年1月30日&#xff0c;英雄联盟LPL2024春季赛在昨天迎来第二周首个比赛日&#xff0c;本日首场比赛由WE对阵FPX。本场比赛双方前两局战至1-1平&#xff0c;决胜局FPX一度建立不小的经济优势&#xff0c;然而太过冒险的打大龙决策最终让其功亏一篑&#xff0c;WE鏖战…

快速入门存内计算—助力人工智能加速深度学习模型的训练和推理

存内计算&#xff1a;提高计算性能和能效的新技术 传统的计算机架构是将数据存储在存储器中&#xff0c;然后将数据传输到计算单元进行处理。这种架构存在一个性能瓶颈&#xff0c;即数据传输延迟。存内计算通过将计算单元集成到存储器中&#xff0c;消除了数据传输延迟&#…

idea控制台出现乱码的解决方案

概述&#xff1a;有时候控制台的关键说明出现乱码&#xff0c;就很令人烦恼 在 IntelliJ IDEA 中出现控制台乱码通常是由于编码设置不正确或者字体显示问题导致的。以下是一些可能的解决方案&#xff1a; 1. 设置项目编码 确保你的项目编码设置正确&#xff1a; 在 Intelli…

element -table,多行或列合并

需求:后端返回的表格数据,如果某列值一样,前端表格样式需要合并他们,需要合并的列的行数未知(所以需要有数据后遍历后端数据对需要合并的属性进行计数)即动态遍历表格合并 效果 - 重点方法;table自带的:span-method="objectSpanMethod"方法 代码环境:vue2 ,…

顺序表的奥秘:高效数据存储与检索

&#x1f37f;顺序表 &#x1f9c0;1、顺序表的实现&#x1f365;1.1 创建顺序表类&#x1f365;1.2 插入操作&#x1f365;1.3 查找操作&#x1f365;1.4 删除操作&#x1f365;1.5 清空操作 &#x1f9c0;2、ArrayList的说明&#x1f9c0;3、ArrayList使用&#x1f365;3.1 A…

app广告变现|如何提升app广告点击率?

提升app内的广告点击率&#xff08;CTR&#xff09;可以增加广告收入&#xff0c;而对广告主来说&#xff0c;广告点击率下降会直接影响广告主的投资回报率&#xff0c;因此&#xff0c;如何提升广告点击率&#xff0c;对app运营来说是一项重要的工作。 AdSet官网 | 聚合SDK广…

分类预测 | Matlab实现SCN-Adaboost随机配置网络模型SCN的Adaboost数据分类预测/故障识别

分类预测 | Matlab实现SCN-Adaboost随机配置网络模型SCN的Adaboost数据分类预测/故障识别 目录 分类预测 | Matlab实现SCN-Adaboost随机配置网络模型SCN的Adaboost数据分类预测/故障识别分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现SCN-Adaboost随机配置网…

C++ ffmpeg RTSP 视频推流实现, arm linux平台

环境&#xff1a; FFmpeg版本&#xff1a;n4.2.2 下载地址&#xff08;下载编译后请确认版本正确&#xff09;&#xff1a; https://ffmpeg.org//download.html 下面地址经过第三方git加速可能存在实效性&#xff1a; https://hub.fgit.cf/FFmpeg/FFmpeg/tree/n4.4.2实现代码…

深入解析Dubbo负载均衡策略

深入解析Dubbo负载均衡策略 I. 引言 在当今日益复杂和庞大的网络环境中&#xff0c;分布式系统的设计和实现成为了现代软件架构的重要组成部分。Dubbo框架作为一种高性能、轻量级的分布式服务框架&#xff0c;为构建分布式系统提供了强大的支持。然而&#xff0c;随着系统规模…

发票pdf文件解析

借助pdfplumber 解析 效果如下&#xff1a; { 发票号码(FPHM): 24322000000011529984, 开票日期(KPRQ): 2024年01月11日, 合计(HJ): 1205.94, 购方: 91320213586657279T, 销方: 91320214MAD1N7EN36, 价税合计(JSHJ): 1218.00, 项目(XM)-1: …

【HarmonyOS应用开发】UIAbility实践第一部分(五)

一、UIAbility概述 1、UIAbility是一种包含用户界面的应用组件&#xff0c;主要用于和用户进行交互。UIAbility也是系统调度的单元&#xff0c;为应用提供窗口在其中绘制界面。 2、每一个UIAbility实例&#xff0c;都对应于一个最近任务列表中的任务。 3、一个应用可以有一个UI…

获取文件夹下所有文件路径

有时候我们会获取文件夹下所有文件的路径以及完成的名称,这时候如果有一个函数库轻松帮我得到数据就好了,还真有. cpp void getFiles(const std::string & path, std::vector<std::string> & files) { //文件句柄 long hFile 0; //文件信息&#xff0c;_fi…

信息安全考证攻略

&#x1f525;在信息安全领域&#xff0c;拥有相关的证书不仅能提升自己的专业技能&#xff0c;更能为职业生涯增添不少光彩。下面为大家盘点了一些国内外实用的信息安全证书&#xff0c;让你一睹为快&#xff01; &#x1f31f;国内证书&#xff08;认证机构&#xff1a;中国信…