网站开发培训周末班网站模仿算侵权吗
news/
2025/10/2 23:54:57/
文章来源:
网站开发培训周末班,网站模仿算侵权吗,装修平台自己做网站有几个,龙岩建网站文章目录一、环境分布1. 版本声明2. 依赖2. case测试2. case2测试一、环境分布
1. 版本声明
linux服务器软件版本jdk1.8kafkakafka_2.13-2.4.0注#xff1a;建议版本和应用依赖的客户端版本依赖保持一致#xff0c;如果需要更高版本#xff0c;可以尝试
但是有一点#x…
文章目录一、环境分布1. 版本声明2. 依赖2. case测试2. case2测试一、环境分布
1. 版本声明
linux服务器软件版本jdk1.8kafkakafka_2.13-2.4.0注建议版本和应用依赖的客户端版本依赖保持一致如果需要更高版本可以尝试
但是有一点小伙伴们要记住linux服务器的kafka版本向下兼容但是kafka的客户端版本不向下兼容这一点很重要
2. 依赖 !-- kafka连接 --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.0/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.13/artifactIdversion2.4.0/version/dependency2. case测试
package com.sinosoft.a;import kafka.consumer.ConsumerConfig;
import kafka.producer.ProducerConfig;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;public class KafkaDemo {public static void main(String[] args) throws InterruptedException {// 生产者示例providerDemo();// 消费者示例consumerDemo();}/*** 生产者示例*/public static void providerDemo() {Properties properties new Properties();/*** kafka的服务地址*/properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.521.1314:9092);
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.521.1314:9092,192.168.17.137:9092);/*** 在考虑完成请求之前生产者要求leader收到的确认数量。这可以控制发送记录的持久性。允许以下设置* acks 0生产者将不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下无法保证服务器已收到记录并且retries配置将不会生效因为客户端通常不会知道任何故障。* acks 1这意味着leader会将记录写入其本地日志但无需等待所有follower的完全确认即可做出回应。在这种情况下如果leader在确认记录后立即失败但在关注者复制之前则记录将丢失。* acks all这意味着leader将等待完整的同步副本集以确认记录。这保证了只要至少一个同步副本仍然存活记录就不会丢失。这是最强有力的保证。这相当于acks -1设置*/properties.put(ProducerConfig.ACKS_CONFIG, all);/*** 当从broker接收到的是临时可恢复的异常时生产者会向broker重发消息但是不能无限制重发如果重发次数达到限制值生产者将不会重试并返回错误。* 通过retries属性设置。默认情况下生产者会在重试后等待100ms可以通过 retries.backoff.ms属性进行修改*/properties.put(ProducerConfig.RETRIES_CONFIG, 0);/*** 当有多条消息要被发送到同一分区时生产者会把他们放到同一批里。kafka通过批次的概念来 提高吞吐量但是也会在增加延迟。* 以下配置当缓存数量达到16kb就会触发网络请求发送消息*/properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/*** 每条消息在缓存中的最长时间单位ms如果超过这个时间就会忽略batch.size的限制由客户端立即将消息发送出去*/properties.put(ProducerConfig.LINGER_MS_CONFIG, 50);/*** Kafka的客户端发送数据到服务器不是来一条就发一条而是经过缓冲的也就是说通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里然后把很多消息收集成一个一个的Batch再发送到Broker上去的这样性能才可能高。* buffer.memory的本质就是用来约束KafkaProducer能够使用的内存缓冲的大小的默认值32MB。* 如果buffer.memory设置的太小可能导致的问题是消息快速的写入内存缓冲里但Sender线程来不及把Request发送到Kafka服务器会造成内存缓冲很快就被写满。而一旦被写满就会阻塞用户线程不让继续往Kafka写消息了。* 所以“buffer.memory”参数需要结合实际业务情况压测需要测算在生产环境中用户线程会以每秒多少消息的频率来写入内存缓冲。经过压测调试出来一个合理值。*/properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);/*** key的序列化方式*/properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);/*** value序列化方式*/properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer null;try {producer new KafkaProducerString, String(properties);
// for (int i 0; i 100; i) {
// String msg ------Message i;
// producer.send(new ProducerRecordString, String(mytest, msg));
// System.out.println(Sent: msg);
// }String msg ------Message hello world!;// mytest 为topicproducer.send(new ProducerRecordString, String(mytest, msg));System.out.println(Sent: msg);} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}/*** 消费者示例* throws InterruptedException*/public static void consumerDemo() throws InterruptedException {Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.521.1314:9092);
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.17.136:9092,192.168.17.137:9092);// 每个消费者分配独立的组号props.put(ConsumerConfig.GROUP_ID_CONFIG, test1);// 如果value合法则自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动更新被消费消息的偏移量的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置会话响应的时间超过这个时间kafka可以选择放弃消费或者消费下一条消息props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);// 设置服务返回的最大数据量这不是绝对最大值如果提取的第一个非空分区中的第一条消息大于此值则仍将返回该消息以确保使用者使用。此处设置5MBprops.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 5242880);// 设置服务返回的每个分区的最大数据量此大小必须至少与服务器允许的最大消息大小fetch.max.bytes一样大否则生产者有可能发送大于消费者可以获取的消息。此处设置5MBprops.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5242880);/*** earliest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时从头开始消费* latest当各分区下有已提交的offset时从提交的offset开始消费无提交的offset时消费新产生的该分区下的数据* nonetopic各分区都存在已提交的offset时从offset后开始消费只要有一个分区不存在已提交的offset则抛出异常*/props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumerString, String(props);// mytest 为topicconsumer.subscribe(Arrays.asList(mytest));while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records){System.out.printf(------------------offset %d, key %s, value %s,record.offset(), record.key(), record.value());System.out.println();}/*** 手动提交偏移量* 保证同一个consumer group中下一次读取不论进行了rebalance时既不会重复消费消息也不会遗漏消息。* 防止consumer莫名挂掉后下次进行数据fetch时不能从上次读到的数据开始读而导致Consumer消费的数据丢失*/consumer.commitSync();Thread.sleep(2000);}}}2. case2测试
package com.sinosoft.b;import java.util.Arrays;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;public class kafkaConsumerTest {public static void main(String[] args) {Properties properties new Properties();properties.put(bootstrap.servers, 192.168.521.1314:9092); // 指向kafka集群的IP地址properties.put(group.id, group-1); // Consumer分组IDproperties.put(enable.auto.commit, true);properties.put(auto.commit.interval.ms, 1000); /* 自动确认offset的时间间隔 */properties.put(auto.offset.reset, earliest);properties.put(session.timeout.ms, 30000);properties.put(max.poll.records, 100);// max.poll.records条数据需要在在session.timeout.ms这个时间内处理完properties.put(fetch.min.bytes, 1);//server发送到消费端的最小数据若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。properties.put(fetch.wait.max.ms, 1000);properties.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 反序列化properties.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 反序列化KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);kafkaConsumer.subscribe(Arrays.asList(xuhaitao)); // 设置消费的主题while (true) {ConsumerRecordsString, String records kafkaConsumer.poll(100); // 调用poll方法来轮循Kafka集群的消息其中参数100是超时时间for (ConsumerRecordString, String record : records) {System.out.printf(offsetConsumer %d, value %s, record.offset(), record.value());System.out.println();}}}
}package com.sinosoft.b;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;public class ProducerTest {public static void main(String[] args) {Properties properties new Properties();properties.put(bootstrap.servers, 192.168.521.1314:9092);// 指向kafka集群的IP地址properties.put(acks, all);properties.put(retries, 0);properties.put(batch.size, 16384);properties.put(linger.ms, 1);properties.put(buffer.memory, 33554432);properties.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer null;try {producer new KafkaProducerString, String(properties);for (int i 0; i 100; i) {String msg This is Message i;producer.send(new ProducerRecordString, String(xuhaitao, msg));System.out.println(Sent: msg);}} catch (Exception e) {e.printStackTrace();} finally {producer.close();}}
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/925425.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!