cdn网站加速如何免费创建自己的小程序
news/
2025/9/22 21:05:57/
文章来源:
cdn网站加速,如何免费创建自己的小程序,seo营销怎么做,wordpress安装过程说明 在大量数据处理任务下的缓存与分发 这个算是来自顾同学的助攻1#xff0c;我有点java绝缘体的体质#xff0c;碰到和java相关的安装部署总会碰到点奇怪的问题#xff0c;不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素#xff0…说明 在大量数据处理任务下的缓存与分发 这个算是来自顾同学的助攻1我有点java绝缘体的体质碰到和java相关的安装部署总会碰到点奇怪的问题不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素可以简单认为kafka的速度是10万/秒级的。
本次文章的目的是
1 搭建一个平时工作中常用的队列服务2 方便自己或者其他同事再次搭建
内容
1 搭建过程
共要搭建两个服务zookeeper和kafka。
1.1 创建zookeeper
这个是基础服务必须要最先启动
docker run -d --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT2181 -e \
ZOOKEEPER_TICK_TIME2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100通常来说这个服务启动后就不用管了但是偶尔如果需要debug的时候
docker exec -it zookeeper bash
bin/zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids1.2 创建持久化路径
这个会实际保存kafka的消息
mkdir -p /data/kafka-logs1.3 创建kafka
一种场景是只监听外网IP(WAN_IP)另一种场景是同时监听内外网(LAN_IP)。
只监听外网的比较简单
WAN_IP111
LAN_IP222
docker run -it --rm --name kafka \-p 24666:24666 \--link zookeeper:zk \-e HOST_IPlocalhost \-e KAFKA_BROKER_ID1 \-e KAFKA_ZOOKEEPER_CONNECTzk:2181 \-e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://${WAN_IP}:24666 \-e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:24666 \-e KAFKA_LOG_DIRS/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100
同时监听内外网的比较麻烦(且要求端口不同)
WAN_IP111
LAN_IP222
docker run -d --name kafka \-p 24666:24666 \-p 9092:9092 \--link zookeeper:zk \-e HOST_IPlocalhost \-e KAFKA_BROKER_ID1 \-e KAFKA_ZOOKEEPER_CONNECTzk:2181 \-e KAFKA_ADVERTISED_LISTENERSINTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:24666 \-e KAFKA_LISTENERSINTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:24666 \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAPINTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAMEINTERNAL \-e KAFKA_LISTENER_NAMEEXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAMEINTERNAL \-e KAFKA_LOG_DIRS/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100配置解释 KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092 用于所有网络接口监听。 EXTERNAL://0.0.0.0:24666 用于所有网络接口监听。 KAFKA_ADVERTISED_LISTENERS: INTERNAL://IP:9092 用于内网客户端。 EXTERNAL://IP:24666 用于外网客户端。 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT 和 EXTERNAL:PLAINTEXT 映射了每个监听器名称和协议类型。
注释
• docker run -d --name kafka启动一个名为 kafka 的容器并在后台运行。
• -p 9092:9092将主机的 9092 端口映射到容器的 9092 端口这是 Kafka 的默认端口。
• --link zookeeper:zk将名为 zookeeper 的容器链接到当前容器并在当前容器中以 zk 作为别名进行访问。
• -e HOST_IPlocalhost设置环境变量 HOST_IP 为 localhost。
• -e KAFKA_BROKER_ID1设置 Kafka 的 broker ID 为 1。【如果有多个应该在这里区分】
• -e KAFKA_ZOOKEEPER_CONNECTzk:2181指定 Zookeeper 的连接地址。
• -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://xxx:9092设置 Kafka 的广告监听器地址。【这个是实际上Consumer一定会用的。】
• -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092设置 Kafka 的监听地址。
• -e KAFKA_LOG_DIRS/data/kafka-logs指定 Kafka 日志存储目录。
• -v /data/kafka-logs:/data/kafka-logs将主机的 /data/kafka-logs 目录挂载到容器的 /data/kafka-logs 目录以持久化存储 Kafka 日志。2 测试
2.1 生产者测试
from pydantic import BaseModel, field_validator
import json
import pandas as pd
class KafkaJsonMsgList(BaseModel):json_list : listpropertydef msg_list(self):return pd.Series(self.json_list).apply(json.loads).to_list()from func_timeout import func_set_timeout,FunctionTimedOutimport json
from confluent_kafka import Producer
# func_set_timeout(60)def send_messages(bootstrap_servers None, topic None, messages None):发送消息到 Kafka 主题:param bootstrap_servers: Kafka 服务器地址:param topic: Kafka 主题:param messages: 要发送的消息列表# 创建 Producer 实例producer Producer(**{bootstrap.servers: bootstrap_servers,acks: 1 })for msg in messages:try:producer.produce(topic, msg)except BufferError:# 如果队列已满等待队列空出空间producer.poll(1)# 定期调用poll以确保消息传递producer.poll(0)# 确保所有消息都被发送producer.flush()msg_list [json.dumps({id:i ,value:aaa,aa:this is test}) for i in range(3)]
topic my_test6
# 外网
## bootstrap_servers WAN_IP:24666
# 内网
bootstrap_servers LAN_IP:9092send_messages(bootstrap_serversbootstrap_servers,topictopic,messages msg_list)2.2 消费者测试
from confluent_kafka import Consumer# 如果是非json的直接拿到就可以了
# func_set_timeout(60)def consume_messages(config None, topic None, max_messages 3):# Create Consumer instanceconsumer Consumer(config)# Subscribe to topicconsumer.subscribe([topic])consumed_count 0res_list []try:while consumed_count max_messages:msg consumer.poll(1.0)if msg is None:print(Empty Q)break else:res_list.append(msg.value().decode(utf-8))consumed_count 1if consumed_count max_messages:breakexcept KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()return res_list # 外网
config {
# User-specific properties that you must set
bootstrap.servers: WAN_IP:24666,
group.id:group1,
auto.offset.reset: earliest,
enable.auto.commit: True
}
# 内网
config {
# User-specific properties that you must set
bootstrap.servers: LAN_IP:9092,
group.id:group1,
auto.offset.reset: earliest,
enable.auto.commit: True
}
topic my_test6
import time
tick1 time.time()
max_messages 100 # 这里设置要消费的消息数量
json_list consume_messages(config, topic, max_messages)
tick2 time.time()
kj KafkaJsonMsgList(json_list json_list)
msg_list kj.msg_list
tick3 time.time()2.3 性能测试
发送端1.48秒发送10万条消息稍微弱了点不过考虑这个是一台仅仅4核8G且繁忙的机器那就还好(我默认的方式是需要json序列化的)。 tick1 time.time()
msg_list_10w [json.dumps({id:i ,value:aaa,aa:this is test}) for i in range(100000)]
topic my_test6
send_messages(bootstrap_serversbootstrap_servers,topictopic,messages msg_list_10w)
tick2 time.time()
print(takes %.2f to send 100000 % (tick2-tick1))
takes 1.48 to send 100000
接收端
python
topic my_test6
import time
tick1 time.time()
max_messages 100000 # 这里设置要消费的消息数量
json_list consume_messages(config, topic, max_messages)
tick2 time.time()
kj KafkaJsonMsgList(json_list json_list)
msg_list kj.msg_list
tick3 time.time()
print(tick2-tick1, get_time)
print(tick3-tick2, parse-time)1.3391587734222412 get_time
0.24841904640197754 parse-time
总体上还是满意的可以了。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/910372.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!