Python 全栈系列266 Kafka服务的Docker搭建

说明

在大量数据处理任务下的缓存与分发

这个算是来自顾同学的助攻+1,我有点java绝缘体的体质,碰到和java相关的安装部署总会碰到点奇怪的问题,不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素,可以简单认为kafka的速度是10万/秒级的。

本次文章的目的是:

  • 1 搭建一个平时工作中常用的队列服务
  • 2 方便自己或者其他同事再次搭建

内容

1 搭建过程

共要搭建两个服务:zookeeper和kafka。

1.1 创建zookeeper

这个是基础服务,必须要最先启动

docker run -d --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100

通常来说,这个服务启动后就不用管了,但是偶尔如果需要debug的时候:

docker exec -it zookeeper bash
bin/zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids

1.2 创建持久化路径

这个会实际保存kafka的消息

mkdir -p /data/kafka-logs

1.3 创建kafka

一种场景是只监听外网IP(WAN_IP),另一种场景是同时监听内外网(LAN_IP)。

只监听外网的比较简单

WAN_IP=111
LAN_IP=222
docker run -it --rm --name kafka \-p 24666:24666 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${WAN_IP}:24666  \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:24666 \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

同时监听内外网的比较麻烦(且要求端口不同)

WAN_IP=111
LAN_IP=222
docker run -d --name kafka \-p 24666:24666 \-p 9092:9092 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:24666 \-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:24666 \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAME=INTERNAL \-e KAFKA_LISTENER_NAME=EXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

配置解释
KAFKA_LISTENERS:

  • INTERNAL://0.0.0.0:9092 用于所有网络接口监听。

  • EXTERNAL://0.0.0.0:24666 用于所有网络接口监听。

  • KAFKA_ADVERTISED_LISTENERS:

  • INTERNAL://IP:9092 用于内网客户端。

  • EXTERNAL://IP:24666 用于外网客户端。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:

  • INTERNAL:PLAINTEXT 和 EXTERNAL:PLAINTEXT 映射了每个监听器名称和协议类型。

注释
• docker run -d --name kafka:启动一个名为 kafka 的容器,并在后台运行。
• -p 9092:9092:将主机的 9092 端口映射到容器的 9092 端口,这是 Kafka 的默认端口。
• --link zookeeper:zk:将名为 zookeeper 的容器链接到当前容器,并在当前容器中以 zk 作为别名进行访问。
• -e HOST_IP=localhost:设置环境变量 HOST_IP 为 localhost。
• -e KAFKA_BROKER_ID=1:设置 Kafka 的 broker ID 为 1。【如果有多个,应该在这里区分】
• -e KAFKA_ZOOKEEPER_CONNECT=zk:2181:指定 Zookeeper 的连接地址。
• -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx:9092:设置 Kafka 的广告监听器地址。【这个是实际上Consumer一定会用的。】
• -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:设置 Kafka 的监听地址。
• -e KAFKA_LOG_DIRS=/data/kafka-logs:指定 Kafka 日志存储目录。
• -v /data/kafka-logs:/data/kafka-logs:将主机的 /data/kafka-logs 目录挂载到容器的 /data/kafka-logs 目录,以持久化存储 Kafka 日志。

2 测试

2.1 生产者测试

from pydantic import BaseModel, field_validator
import json 
import pandas as pd 
class KafkaJsonMsgList(BaseModel):json_list : list@propertydef msg_list(self):return pd.Series(self.json_list).apply(json.loads).to_list()from func_timeout import func_set_timeout,FunctionTimedOutimport json
from confluent_kafka import Producer
# @func_set_timeout(60)def send_messages(bootstrap_servers = None, topic= None, messages= None):"""发送消息到 Kafka 主题:param bootstrap_servers: Kafka 服务器地址:param topic: Kafka 主题:param messages: 要发送的消息列表"""# 创建 Producer 实例producer = Producer(**{'bootstrap.servers': bootstrap_servers,'acks': 1 })for msg in messages:try:producer.produce(topic, msg)except BufferError:# 如果队列已满,等待队列空出空间producer.poll(1)# 定期调用poll以确保消息传递producer.poll(0)# 确保所有消息都被发送producer.flush()msg_list = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(3)]
topic = 'my_test6'
# 外网
## bootstrap_servers = 'WAN_IP:24666'
# 内网
bootstrap_servers = 'LAN_IP:9092'send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list)

2.2 消费者测试

from confluent_kafka import Consumer# 如果是非json的,直接拿到就可以了
# @func_set_timeout(60)def consume_messages(config = None, topic = None, max_messages = 3):# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topicconsumer.subscribe([topic])consumed_count = 0res_list = []try:while consumed_count < max_messages:msg = consumer.poll(1.0)if msg is None:print('Empty Q')break else:res_list.append(msg.value().decode('utf-8'))consumed_count += 1if consumed_count >= max_messages:breakexcept KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()return res_list # 外网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'WAN_IP:24666',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
# 内网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'LAN_IP:9092',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()

2.3 性能测试

发送端,1.48秒发送10万条消息,稍微弱了点,不过考虑这个是一台仅仅4核8G且繁忙的机器,那就还好(我默认的方式是需要json序列化的)。


tick1 = time.time()
msg_list_10w = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(100000)]
topic = 'my_test6'
send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list_10w)
tick2 = time.time()
print('takes %.2f to send 100000' % (tick2-tick1))
takes 1.48 to send 100000
```接收端
````python
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100000  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()
print(tick2-tick1, 'get_time')
print(tick3-tick2, 'parse-time')1.3391587734222412 get_time
0.24841904640197754 parse-time
```总体上还是满意的,可以了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/878708.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

风机设计基础

目录 1、风机分类按气体出口压力&#xff08;或升压&#xff09;来进行风机分类按风机全压来进行分类 2、风机定律及效率、功率、压力计算风机轴功率与扭矩关系&#xff1a;风机全压、静压效率计算公式&#xff1a;全压、动压、静压计算公式&#xff1a; 3、风机噪声1、离散噪声…

修改jupyter notebook 默认浏览器(不动配置文件,改系统默认浏览器)

最开始把联想浏览器切到EDGE就是用的修改系统的默认浏览器。不知怎么的现在搜到的方法都是在说修改配置文件&#x1f613;。 不想动配置文件&#xff0c;平时对默认浏览器没有特殊要求的&#xff0c;可以用这个方法。 这里是把默认浏览器改成联想浏览器&#xff0c;电脑也是联…

macos 系统 降级, 重装, 升级图文教程

最近一不小心mac被升级到了最新版本, 在使用vscode的时候经常卡顿,于是有了降级macos的想法! 于是就有了这macos的系统降级 , 重装, 升级图文教程. 本文重点介绍macos降级, 重装过程, 至于升级, 这个一不小心就会被升级(通过应用商店)基本上都是自动升级的,所以不做更多说明. …

低代码平台赋能:烟花鞭炮企业数字化转型新篇章

随着数字化转型的浪潮席卷全球&#xff0c;传统制造业正面临着前所未有的变革机遇。烟花鞭炮行业&#xff0c;作为承载深厚文化底蕴与独特工艺的传统产业&#xff0c;也不例外。近年来&#xff0c;我国政府高度重视中小企业数字化转型&#xff0c;出台了一系列扶持政策&#xf…

基于大数据的电商平台电脑销售数据分析系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 QQ 名片 :) 1. 项目简介 随着电子商务的蓬勃发展&#xff0c;各大电商平台积累了海量的商品数据。如何从这些数据中提取有价值的信息&#xff0c;对于商家来说至关重要。本项目利用网络爬虫技术从京东电商平台采集各类品牌…

春秋云镜(OpenSSH)·CVE-2023-51385

靶标介绍&#xff1a; OpenSSH 是使用 SSH 协议进行远程登录的连接工具。在OpenSSH 9.6版本之前的ssh中&#xff0c;如果用户名或主机名中含有shell元字符&#xff08;如 | "等&#xff09;&#xff0c;并且ssh_config中ProxyCommand、LocalCommand指令或"match exe…

Mybatis 基础知识

目录 一、简介 1、JDK&#xff1a; 2、JRE&#xff1a; 3、JVM&#xff1a; 4、Java SE&#xff1a; 5、Java EE&#xff1a; 6、持久层&#xff1a; 7、轻量级&#xff1a; 8、半自动化&#xff1a; 9、ORM&#xff1a; 10、框架&#xff1a; 二、三层架构 1、业…

数据结构-----双向链表

1.什么是内存泄漏&#xff1f; 主动申请到的内存空间没有进行内存释放&#xff0c;导致无内存空间可用。 2.双向链表&#xff1f; 双向链表&#xff08;Doubly Linked List&#xff09;是链表的一种&#xff0c;它允许我们在链表中的任何一个节点向前或 向后遍历。与单向链表…

display flex 的div 被子元素撑开不显示滚动条的一个解决demo

display flex 的div 被子元素撑开&#xff0c;不显示y轴滚动条的 一个解决demo。 注&#xff1a; 不一定适用所有人的的相同问题 less # less .contact {display: flex;flex-grow: 1;overflow: hidden auto;flex-direction: column;.contact-items {flex: 1 1 0;display: flex…

Python和Java及MATLAB和CUDA显微镜导图

&#x1f3af;要点 交互式设备控制和图像处理图像背景和阴影校正可视化萤光团位置算法和读取光学图像读写转换显微镜图像生物医学细胞图像分析荧光图像算法计算亮度数据和模拟表征新型染料和缓冲液强度估计细菌图像分析扫描透射和高分辨率透射图像模拟多模态成像分割可视化透射…

Hive服务部署及Datagrip工具使用

目录 Hive服务部署 Hiveserver2服务 1&#xff09;用户说明 2&#xff09;Hiveserver2部署 &#xff08;1&#xff09;Hadoop端配置 &#xff08;2&#xff09;Hive端配置 3&#xff09;测试 &#xff08;1&#xff09;启动Hiveserver2 &#xff08;2&#xff09;使用命…

深入学习电路基础:从理论到实践

引言 电路是电子学的核心&#xff0c;也是现代科技的基石。从简单的灯泡开关到复杂的计算机处理器&#xff0c;电路在各类电子设备中都起到了至关重要的作用。深入学习电路知识不仅有助于理解电子设备的工作原理&#xff0c;还能够为实际设计和开发电子产品打下坚实的基础。 …

flutter语法:var、late、const、final区别

var: 用于声明可变变量&#xff0c;支持类型推断并能多次赋值&#xff0c;但只能是同类型的数据赋值。之后其类型更改&#xff0c;会抛出异常。 var number 10;void main() {print(number); // 这将打印10。number 20; // 再次赋值&#xff0c;但必须同类型print(number); /…

某云彩SRM2.0任意文件下载漏洞

文章目录 免责申明搜索语法漏洞描述漏洞复现修复建议 免责申明 本文章仅供学习与交流&#xff0c;请勿用于非法用途&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任 搜索语法 fofa icon_hash"1665918155"漏洞描述 某云采 SRM2.0是一款先…

扁平数组转化分类树

使用下列数组生成一个分类树&#xff08;数组中每项中的pid是父节点的id&#xff0c;pid:0表示顶层&#xff0c;pid:1&#xff0c;表示这个节点属于id为1的节点&#xff0c;children该节点的子节点数组&#xff09; const jsona [{"ID": 1,"CreatedAt": …

【计算机视觉前沿研究 热点 顶会】ECCV 2024中扩散模型有关的论文

神经辐射场修复的驯服潜在扩散模型 神经辐射场(NERF)是一种从多视角图像进行三维重建的表示法。尽管最近的一些工作表明&#xff0c;在编辑具有扩散先验的重建的 NERF 方面取得了初步成功&#xff0c;但他们仍然在努力在完全未覆盖的区域中合成合理的几何图形。一个主要原因是…

使用大型语言模型进行监督微调(SFT)

大型语言模型&#xff08;LLMs&#xff09;通常经过几个阶段的训练&#xff0c;包括预训练和几个微调阶段&#xff1b;请参见下文。尽管预训练很昂贵&#xff08;即需要数十万美元的计算成本&#xff09;&#xff0c;但相比之下&#xff0c;微调LLM&#xff08;或进行上下文学习…

应用商店优化(ASO)的四大误区

应用商店优化 (ASO) 是移动营销中最重要的部分之一&#xff0c;可以帮助开发人员吸引自然流量并在应用推广方面取得预期效果。近年来ASO优化在开发者中越来越受欢迎。虽然它已经证明了其有效性和对应用成功的影响力&#xff0c;但尽管如此仍然存在与ASO相关的误解&#xff0c;导…

Day-04-QFile打开文件的两种方式

一、UI界面设置两个按键&#xff0c;并直接转到槽函数 二、两种代码展示 #include <QFile> #include <QDebug>//此两种方式中调用函数&#xff0c;应包含的头文件void Widget::on_btnReadFile01_clicked()//第一种打开方式 {//1. 打开文件QFile file;file.setFile…

ARM发布新一代高性能处理器N3

简介 就在2月21日&#xff0c;ARM发布了新一代面向服务器的高性能处理器N3和V3&#xff0c;N系列平衡性能和功耗&#xff0c;而V系列则注重更高的性能。此次发布的N3&#xff0c;单个die最高32核&#xff08;并加入到CCS&#xff0c;Compute Subsystems&#xff0c;包含Core&a…