威海 网站开发女性时尚网站带论坛php程序
news/
2025/10/1 4:15:47/
文章来源:
威海 网站开发,女性时尚网站带论坛php程序,中国核工业二三建设有限公司怎么样,网站建设天猫店文章目录一、消费者监听1. 启动zk2. 启动kafka3. 创建主题4. 消费者监听消息二、生产者工程2.1. 依赖2.2. 生产者代码#xff08;同步#xff09;2.3. 生产者代码#xff08;异步#xff09;2.4. 发送消息2.5. 消费者监听消息2.6. 结果返回一、消费者监听
1. 启动zk
zkSe…
文章目录一、消费者监听1. 启动zk2. 启动kafka3. 创建主题4. 消费者监听消息二、生产者工程2.1. 依赖2.2. 生产者代码同步2.3. 生产者代码异步2.4. 发送消息2.5. 消费者监听消息2.6. 结果返回一、消费者监听
1. 启动zk
zkServer.sh start# 监听运行状态
zkServer.sh status2. 启动kafka
# 后台启动kafka
kafka-server-start.sh -daemon /app/kafka_2.12-2.8.0/config/server.properties3. 创建主题
# 创建一个主题名称为topic_1 该主题分区1个分区 该分区有1个副本
kafka-topics.sh --zookeeper localhost:2181/mykafka --create --topic topic_1 --partitions 1 --replication-factor 14. 消费者监听消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1二、生产者工程
2.1. 依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.7.1/version/dependency2.2. 生产者代码同步
package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {MapString, Object configs new HashMapString, Object();//指定初始化连接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.159.102:9092);//指定key序列化类configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//构造生产者对象 指定发送的key和value的类型 配置的参数列表必填参数辅助参数KafkaProducerInteger, String producer new KafkaProducerInteger, String(configs);//用于设置用户自定义的消息头字段ListHeader headers new ArrayList();headers.add(new RecordHeader(biz.name, producer.demo.getBytes()));//构造record封装发送消息主体ProducerRecordInteger, String record new ProducerRecordInteger, String(topic_1, //指定发送主题0,//指定发送分区0,//指定发送keyhello gblfy 0,//指定发送消息主题headers//用于设置用户自定义的消息头字段);//消息的同步确认 调用send方法发送消息final FutureRecordMetadata future producer.send(record);//调用get方法接收消息final RecordMetadata metadata future.get();System.out.println(消息的主题: metadata.topic());System.out.println(消息的分区: metadata.partition());System.out.println(消息的偏移量: metadata.offset());//关闭生产者producer.close();}
}
2.3. 生产者代码异步
package com.gblfy.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {MapString, Object configs new HashMapString, Object();//指定初始化连接用到的broker地址configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.159.102:9092);//指定key序列化类configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);//指定value序列化类configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//构造生产者对象 指定发送的key和value的类型 配置的参数列表必填参数辅助参数KafkaProducerInteger, String producer new KafkaProducerInteger, String(configs);//用于设置用户自定义的消息头字段ListHeader headers new ArrayList();headers.add(new RecordHeader(biz.name, producer.demo.getBytes()));//构造record封装发送消息主体ProducerRecordInteger, String record new ProducerRecordInteger, String(topic_1, //指定发送主题0,//指定发送分区0,//指定发送keyhello gblfy 0,//指定发送消息主题headers//用于设置用户自定义的消息头字段);//消息的同步确认 调用send方法发送消息final FutureRecordMetadata future producer.send(record);//调用get方法接收消息// final RecordMetadata metadata future.get();// System.out.println(消息的主题: metadata.topic());// System.out.println(消息的分区: metadata.partition());// System.out.println(消息的偏移量: metadata.offset());//消息的异步确认producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {System.out.println(消息的主题: metadata.topic());System.out.println(消息的分区: metadata.partition());System.out.println(消息的偏移量: metadata.offset());} else {System.out.println(异常消息);}}});//关闭生产者producer.close();}
}
2.4. 发送消息 消息有同步发送和异步发送二种 2.5. 消费者监听消息 2.6. 结果返回
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/923490.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!