天德建设集团网站如何做旅游小视频网站
news/
2025/10/8 7:06:40/
文章来源:
天德建设集团网站,如何做旅游小视频网站,免费在线图片编辑器,昆山做网站的那家好kafka中文文档 本文环境#xff1a;ubuntu:18.04
kafka安装、配置与基本使用(单节点)
安装kafka
下载 0.10.0.1版本并解压缩 tar -xzf kafka_2.11-0.10.0.1.tgzcd kafka_2.11-0.10.0.1.tgzkafka简单配置 vi config/server.properties主要注意三个地方ubuntu:18.04
kafka安装、配置与基本使用(单节点)
安装kafka
下载 0.10.0.1版本并解压缩 tar -xzf kafka_2.11-0.10.0.1.tgzcd kafka_2.11-0.10.0.1.tgzkafka简单配置 vi config/server.properties主要注意三个地方broker.id 标识本机log.dirs 是kafka接收消息存放路径zookeeper.connect 指定连接的zookeeper集群地址启动服务器
启动ZooKeeper服务器 如果没有ZooKeeper服务器,可以通过与kafka打包在一起的便捷脚本来创建一个单节点ZooKeeper实例 bin/zookeeper-server-start.sh config/zookeeper.properties#后台启动: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[2021-02-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...启动Kafka服务器 bin/kafka-server-start.sh config/server.properties#后台启动: bin/kafka-server-start.sh -daemon config/server.properties
[2021-02-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2021-02-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...创建一个 topic
创建一个名为“test”的topic它有一个分区和一个副本 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test查看所有 topic列表 bin/kafka-topics.sh --list --zookeeper localhost:2181
test删除一个 topic 标记: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test发送一些消息
Kafka自带一个命令行客户端它从文件或标准输入中获取输入并将其作为message消息发送到Kafka集群。默认情况下每行将作为单独的message发送。 运行 producer然后在控制台输入一些消息以发送到服务器。 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message启动一个 consumer
Kafka 还有一个命令行consumer消费者将消息转储到标准输出。 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message查看消费组
查看所有消费组 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer查看单个消费组的消费详情 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test(消费组name) --new-consumer发布-订阅模式
Topic Kafka中的Topics总是多订阅者模式一个topic可以拥有一个或者多个消费者来订阅它的数据 对于每一个topic Kafka集群都会维持一个分区日志每个分区都是有序且顺序不可变的记录集并且不断地追加到结构化的commitlog文件 分区中的每一个记录都会分配一个id号来表示顺序我们称之为 offsetoffset用来唯一的标识分区中每一条记录 Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 每一个消费者中唯一保存的元数据是offset偏移量即消费在log中的位置.
偏移量由消费者所控制:通常读取记录后消费者会以线性的方式增加偏移量但是实际上由于这个位置由消费者控制所以消费者可以采用任何顺序来消费记录.
生产者 生产者可以将数据发布到所选择的topic主题中
消费者 消费者使用一个 消费组 名称来进行标识发布到topic中的每条记录被分配给 订阅消费组 中的一个消费者实例. 如果所有的消费者实例在同一消费组中消息记录会负载平衡到每一个消费者实例 如果所有的消费者实例在不同的消费组中每条消息记录会广播到所有的消费者进程 消费者消费是以消费组为单位消费组之间数据互不影响组内数据均分
Go实现(Go 1.15)
kafka_p.go
package kafka_pimport (github.com/Shopify/saramalogtime
)// 定义Kafka生产者对象
type KafkaProducer struct {Config *sarama.ConfigAddress []stringTopic string
}// 定义生产者接口
type Producer interface {MsgContent() []byte
}//实例化sarama: Config
func (kP *KafkaProducer) ProducerConfigInit() {config : sarama.NewConfig()//是否开启消息发送成功后通知 successes channelconfig.Producer.Return.Successes true//重试次数config.Producer.Retry.Max 3//失败后再次尝试的间隔时间config.Producer.Retry.Backoff 1 * time.Second//指定kafka版本,不指定,使用最小版本,高版本的新功能可能无法正常使用.config.Version sarama.V0_10_0_1kP.Config config
}//同步消息模式
func (kP *KafkaProducer) SyncProducer(p Producer) error {//初始化客户端producer, err : sarama.NewSyncProducer(kP.Address, kP.Config)if err ! nil {log.Printf(sarama.NewSyncProducer err, message%s \n, err)return err}defer producer.Close()//发送消息msg : sarama.ProducerMessage{Topic: kP.Topic,Value: sarama.ByteEncoder(p.MsgContent()),}part, offset, err : producer.SendMessage(msg)if err ! nil {log.Printf(send message(%s) err%s \n, p.MsgContent(), err)return err} else {log.Printf(发送成功partition%d, offset%d \n, part, offset)return nil}
}// 启动Kafka生产者
func (kP *KafkaProducer) StartP(p Producer) error {kP.ProducerConfigInit()return kP.SyncProducer(p)
}
kafka_c.go
package kafka_cimport (github.com/Shopify/saramacluster github.com/bsm/sarama-clusterlogtime
)// 定义Kafka消费者对象
type KafkaConsumer struct {Config *cluster.ConfigAddress []stringTopics []stringGroupId string
}// 定义消费者接口
type Consumer interface {Consumer([]byte) error
}//实例化sarama: Config
func (kC *KafkaConsumer) ConsumerConfigInit() {config : cluster.NewConfig()//接收失败通知config.Consumer.Return.Errors true//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置config.Version sarama.V0_10_0_1//初始从最新的offset开始config.Consumer.Offsets.Initial sarama.OffsetNewestkC.Config config
}//cluster消费组 消费
func (kC *KafkaConsumer) ClusterConsumer(c Consumer) error {//初始化客户端consumer, err : cluster.NewConsumer(kC.Address, kC.GroupId, kC.Topics, kC.Config)if err ! nil {log.Printf(Failed open consumer: %s \n, err)return err}defer consumer.Close()go func() {for err : range consumer.Errors() {log.Printf(consumer err: %s \n, err)}}()go func() {for note : range consumer.Notifications() {log.Println(Rebalanced: , note)}}()//接收消息for msg : range consumer.Messages() {if err c.Consumer(msg.Value); err ! nil {return nil}else {// MarkOffset 并不是实时写入kafka有可能在程序crash时丢掉未提交的offsetconsumer.MarkOffset(msg, )time.Sleep(1 * time.Second)}}return nil
}// 启动Kafka消费者
func (kC *KafkaConsumer) StartC(c Consumer) error {kC.ConsumerConfigInit()return kC.ClusterConsumer(c)
}配置kafka_config.go
package kafka_configimport fmt// Address
var Address []string{IP:9092}var Topic test // 生产者topic
var Topics []string{test} // 消费者topics
var GroupId test-group-1 // 消费组id//发送者
type TestP struct {MsgData []byte
}// 实现发送者
func (t *TestP) MsgContent() []byte {fmt.Println(string(t.MsgData))return t.MsgData
}//接收者
type TestC struct {MsgData string
}// 实现接收者
func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}type KafkaMessage struct {FileDir string json:dirFileName string json:fileOperateType string json:operationOldData string json:old_dataNewData string json:new_data
}type KafkaMessageP struct {UserName string json:user_nameMsgId string json:msg_idMessages []KafkaMessage json:msg
}
消费者main.go
package mainimport (golong/kafka_cgolong/kafka_config
)func main() {//消费者c : kafka_config.TestC{}kC : kafka_c.KafkaConsumer{Address: kafka_config.Address,Topics: kafka_config.Topics,GroupId: kafka_config.GroupId,}_ kC.StartC(c)
}
生产者main.go
package mainimport (encoding/jsonfmtgolong/kafka_configgolong/kafka_p
)func main() {//生产者msgKp : kafka_config.KafkaMessageP{}msgKp.UserName fxmmsgKpBytes, _ : json.Marshal(msgKp)p : kafka_config.TestP{MsgData: msgKpBytes,}kP : kafka_p.KafkaProducer{Address: kafka_config.Address,Topic: kafka_config.Topic,}err : kP.StartP(p)if err ! nil {fmt.Println(添加异常 !!!)}
}
panic: non-positive interval for NewTicker 问题处理
//处理1 找到这个consumer.go源码位置上面的第二个报错有标注位置github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc000212000, 0xc0002ba1e0)D:/work/mygo/pkg/mod/github.com/bsm/sarama-clusterv2.1.15incompatible/consumer.go:452 0x61// 修改452行// ticker : time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)ticker : time.NewTicker(c.client.config.Consumer.Offsets.AutoCommit.Interval)// 保存重新build即可//处理2把 sarama 版本改成 从 v1.26.1 -- v1.24.1 就可以用啦 github.com/Shopify/sarama v1.24.1gomod 的配置改下版本号就可以github.com/Shopify/sarama v1.24.1github.com/bsm/sarama-cluster v2.1.15incompatible
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/931171.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!