网站首页布局分析网站开发工程师 招聘
web/
2025/10/2 21:09:04/
文章来源:
网站首页布局分析,网站开发工程师 招聘,西安有几个区,乐清本地网站文章目录 案例信息介绍后端异步处理请求和后端同步处理请求同步方式异步方式 环境文件目录配置.envrequirements.txt 完整代码ext.pyapp.pykafka_create_user.py 运行方式本地安装 kafka运行 app.py使用 postman 测试建立 http 长连接#xff0c;等待后端处理结果发送 RAW DAT… 文章目录 案例信息介绍后端异步处理请求和后端同步处理请求同步方式异步方式 环境文件目录配置.envrequirements.txt 完整代码ext.pyapp.pykafka_create_user.py 运行方式本地安装 kafka运行 app.py使用 postman 测试建立 http 长连接等待后端处理结果发送 RAW DATA 在看这个文章之前建议先学习 kafka的工作原理 这个系列视频讲得很好虽然基于 Java 但是理解原理并不用区分语言。只需要看懂工作原理即可。
案例信息介绍
假设我的网站需要高并发地处理 user 注册这个简单的功能。前端会发送 {user_id: xxxx, psw:xxx} 的信息到后端完成创建 前端用 postman 来模拟后端用 flask 框架来简单演示 下面我用一张大致的图来表示代码的架构 前端的原始数据进入后端之后后端要用 kafka 的架构在有序地处理 user 的请求在这个任务中所有 user 的请求都是 register因此我们就创建一个 kafka 的 topic 专门用来处理 user 的这类请求同时由于 kafka 是通过队列的方式 异步地处理 user 的请求所以当 kafka 处理完 user 的请求后我们需要找到这个处理结果并返回给对应的 user 如果大家对于 异步处理 user 请求和同步处理没有概念那么下面一章我先给大家讲一下同步处理请求和异步处理的区别 后端异步处理请求和后端同步处理请求
同步方式
file: app.py.pyTime : 2024/3/30Author : Peinuan qinimport threading
import json
from flask import Flask, jsonify, request
from flask_cors import CORS
from ext import FLASK_HOST, FLASK_PORTapp Flask(__name__)
CORS(app)app.route(/login, methods[POST])
def create_user_post():data request.jsonregister user code ....return jsonify({status: 200, msg: success})if __name__ __main__:app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue)上述方式可以看到我的 create_user_post 负责接受前端的数据并且即刻处理处理之后将结果返回前端 jsonify({status: 200, msg: success})这个过程是一行接着一行发生的如果中途出现了很耗时的操作那么程序会一直等着。在 Flask 应用中如果 register user code .... 处理需要20秒这确实会阻塞处理该请求的线程直到该过程完成。由于 Flask 开发服务器默认是单线程的这意味着在这20秒内服务器将无法处理来自其他用户的任何其他请求。为了允许 Flask 同时处理多个请求你可以启用多线程模式。这可以通过在 app.run() 中设置 threadedTrue 来实现 app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue, threadedTrue)。这样Flask 将能够为每个请求启动一个新的线程从而允许同时处理多个请求。但这仍然并不是一种很好的方法因为整个服务器来看不具备扩展性。 假设我们服务器为每个 user 的请求开一个线程那么服务器资源是有限的当服务器宕机也并不能很快的恢复这就导致扩展性很差。
异步方式
import threading
import json
from flask import Flask, jsonify, request
from flask_cors import CORS
from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer
from kafka_create_user import kafka_consumer_taskapp Flask(__name__)
CORS(app)app.route(/login, methods[POST])
def create_user_post():data request.json# 发送数据到Kafkaproducer.produce(LOGIN_TOPIC, keystr(data[user_id]).encode(utf-8), valuejson.dumps(data).encode(utf-8))producer.flush()logger.info(send message to consumer)return jsonify({msg: 你好请求正在处理})我们先忽略其他的代码只看这一部分。这里相当于我们接受 user 的请求之后通过 kafka 把处理请求的需要转移到外部的服务器集群上去了。而 kafka 的特性在于非常高的可扩展性。增加 kafka 的节点就可以线性地将任务处理的数量提高。如果你看我上面给的那张图kafka 可以通过无限制增加 consumer 的数量来提高数据的处理能力。而后端的服务器需要做的就是把这些数据不断地派发出去这个步骤相比于直接在后端将所有的请求处理来说可以忽略不计。
环境文件目录
.
├── app.py
├── ext.py
├── kafka_create_user.py
└── requirements.txt配置
.env
首先构建一个配置文件 .env 来存放基础的配置信息
FLASK_HOST0.0.0.0
FLASK_PORT9300
# LOGIN 这个 topic 是用来处理用户注册这个业务的
LOGIN_TOPICLOGIN# RESPONSE_TOPIC 则是用来构建 response 来返回前端成功或者失败
RESPONSE_TOPICRESPONSE_TOPICrequirements.txt
kafka-python2.0.0
colorlog6.7.0
configparser5.3.0
flask2.3.2
flask_basicauth0.2.0
Flask-JWT-Extended4.6.0
Flask-Limiter3.5.1
Flask-PyMongo2.3.0
requests2.31.0
gunicorn21.0.0
pymongo4.6.0
pdfminer.six20231228
flask_cors4.0.0
python-dotenv
orjson3.10.0
langchain
langchain-community
langchain_openai
chromadbpython3.10
完整代码
ext.py
file: ext.py.pyTime : 2024/3/30Author : Peinuan qin
import json
import logging
import os
import queue
import threading
import colorlog
from confluent_kafka import Producer, Consumer, KafkaError
from dotenv import load_dotenv
from confluent_kafka.admin import AdminClient, NewTopic# 加载 .env 中的变量
load_dotenv()FLASK_HOST os.environ[FLASK_HOST]
FLASK_PORT os.environ[FLASK_PORT]
LOGIN_TOPIC os.environ[LOGIN_TOPIC]
RESPONSE_TOPIC os.environ[RESPONSE_TOPIC]TOPICS [LOGIN_TOPIC, RESPONSE_TOPIC]def create_topic():# Kafka服务器配置admin_client AdminClient({bootstrap.servers: localhost:9092})# 创建新主题的配置topic_list [NewTopic(topic, num_partitions3, replication_factor1) for topic in TOPICS]# 注意: replication_factor 和 num_partitions 可能需要根据你的Kafka集群配置进行调整# 创建主题fs admin_client.create_topics(topic_list)# 处理结果for topic, f in fs.items():try:f.result() # The result itself is Nonelogger.info(fTopic {topic} created)except Exception as e:logger.error(fFailed to create topic {topic}: {e})# Handler for logging
handler colorlog.StreamHandler()
formatter colorlog.ColoredFormatter(%(log_color)s%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s,datefmt%Y-%m-%d %H:%M:%S,log_colors{DEBUG: cyan,INFO: green,WARNING: yellow,ERROR: red,CRITICAL: red,bg_white,}
)
handler.setFormatter(formatter)# Logger
logger colorlog.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
尝试创建 topiccreate_topic()# 初始化Kafka生产者
producer_config {bootstrap.servers: localhost:9092
}
producer Producer(**producer_config)
定义专门用来回复 response 的 consumer
class ResponseConsumer:专门用来将各种处理好的结果返回给 user 作为 response 也就是图中针对 RESPONSE TOPIC 的 consumerdef __init__(self):self.users_streams {}self.config {bootstrap.servers: localhost:9092,group.id: user-response, # 设置 groupid如果不知道为什么要设置 groupid 可以去先看 kafka 的讲解视频auto.offset.reset: earliest} # 告诉 Kafka 消费者在找不到初始偏移量offset或者偏移量无效时比如指定的偏移量已经被删除应该从哪里开始消费消息。它可以设置为 earliest 或 latest。设置为 earliest 意味着消费者将从主题的开始处开始读取数据即尽可能不漏掉任何消息设置为 latest 意味着消费者将从新产生的消息开始读取即只消费自启动之后产生的消息。self.consumer Consumer(**self.config)logger.info(Create Response Consumer)self.consumer.subscribe([RESPONSE_TOPIC])logger.info(Subscribe Response Topic)# 因为可能有多个线程一起操作 consumer所以通过 lock 来保证线程安全self.lock threading.Lock()def get_or_make(self, user_id):获取某个 user_id 的 response queue 如果当前 user_id 的 response queue 不存在就创建一个每个 user_id 的 response queue 中都是返回给前端 user 的信息也就是图中的 RESPONSE MSG:param user_id::return:with self.lock:# 如果当前 user_id 还没有 queue就构建一个q self.users_streams.get(user_id, queue.Queue())self.users_streams[user_id] qreturn qdef pop(self, user_id):with self.lock:self.users_streams.pop(user_id, None)def put(self, user_id, msg_dict):当 user_id 的请求处理完产生的 RESPONSE MSG 放到 user_id 的队列里面:param user_id: :param msg_dict: :return: q self.get_or_make(user_id)if q:with self.lock:self.users_streams[user_id].put(msg_dict)logger.info(fput {msg_dict} into {user_id}s queue)return Trueelse:return Falsedef listen_for_response(self):不断拉取 RESPONSE TOPIC 的 producer 生成的结果:return:try:while True:msg self.consumer.poll(timeout1.0) # 1秒超时if msg is None:continueif msg.error():if msg.error().code() KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())break如果拉取到了就放到对应的 user_id 的 queue 里面if msg:logger.info(freceived data: {msg})msg_data json.loads(msg.value().decode(utf-8))user_id msg.key().decode(utf-8)logger.info(fmsg_data: {msg_data})logger.info(fuser_id: {user_id})put_flag self.put(user_id, msg_data)if not put_flag:logger.error(fCreate RESPONSE MSG for {user_id} failed)else:logger.info(fcreate RESPONSE MSG response for {user_id})except Exception as e:self.consumer.close()
app.py
file: app.py.pyTime : 2024/3/30Author : Peinuan qinimport threading
import json
from flask import Flask, jsonify, request
from flask_cors import CORS
from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer
from kafka_create_user import kafka_consumer_taskapp Flask(__name__)
CORS(app)response_consumer ResponseConsumer()app.route(/login, methods[POST])
def create_user_post():data request.json# 发送数据到Kafkaproducer.produce(LOGIN_TOPIC, keystr(data[user_id]).encode(utf-8), valuejson.dumps(data).encode(utf-8))producer.flush()logger.info(send message to login consumer)return jsonify({msg: 你好请求正在处理})app.route(/stream)
def stream():user_id request.args.get(user_id) # 假设用户ID通过查询参数传入logger.info(fuid: {user_id})logger.info(fuser_streams: {response_consumer.users_streams})def event_stream(user_id):# 这里需要一种机制来持续发送数据给特定用户的流q response_consumer.get_or_make(user_id)logger.info(f{user_id} s queue is: {q})while True:if not q.empty():message q.get()logger.info(fmessage: {message})yield fdata: {json.dumps(message)}\n\nreturn app.response_class(event_stream(user_id), content_typetext/event-stream)def run_multi_thread():consumer_thread threading.Thread(targetkafka_consumer_task)response_thread threading.Thread(targetresponse_consumer.listen_for_response, daemonTrue)logger.info(Start APP ...)consumer_thread.start()logger.info(Create User Consumer start ...)response_thread.start()logger.info(Response Consumer start ...)app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue, use_reloaderFalse)if __name__ __main__:run_multi_thread()kafka_create_user.py
file: kafka_create_user.pyTime : 2024/3/30Author : Peinuan qin
import json
import os
from queue import Queue
import threading# 初始化全局消息队列
from confluent_kafka import Consumer, KafkaError
from kafka import KafkaConsumer, KafkaProducer
from dotenv import load_dotenv
from ext import logger, LOGIN_TOPIC, RESPONSE_TOPIC, producerdef kafka_consumer_task():这里定义了 LOGIN TOPIC 的 consumer 的行为也就是对 user_id 传过来的 RAW DATA 如何处理:return: # Kafka配置config {bootstrap.servers: localhost:9092,group.id: user-login-group,auto.offset.reset: earliest}consumer Consumer(**config)consumer.subscribe([LOGIN_TOPIC])# 读取数据try:while True:msg consumer.poll(timeout1.0) # 1秒超时if msg is None:continueif msg.error():if msg.error().code() KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())breakif msg:data json.loads(msg.value().decode(utf-8))key msg.key().decode(utf-8)print(key:, key)为了观察我们将 user 传过来的数据保存到本地with open(f{key}.json, w) as f:json.dump(data, f, ensure_asciiFalse, indent4)logger.info(fsuccessfully saved the {key}.json)完成任务后通过 RESPONSE TOPIC 的 producer 生成 response并发送给 RESPONSE TOPIC 等待对应的 consumer 来取并且返回给前端producer.produce(RESPONSE_TOPIC, keymsg.key(), valuejson.dumps({msg: fsuccessfully create user {key}}).encode(utf-8))producer.flush()logger.info(send processed data to response consumer)except KeyboardInterrupt:passfinally:# 清理操作consumer.close()producer.flush()producer.close()强调一下 如果你也是基于 Flask 框架虽然这里的 debugTrue 可以保证每次更改代码后对代码进行重载方便你进行调试。但是关于内存中的一些变量会消失所以保证我上面的代码能够顺利运行我设置了 use_reloaderFalse 否则 response_consumer.users_streams 总是为空因为重载变量会造成混淆引发未知的程序错误。app.py 中的 stream 是以 SSE 的方式让服务器可以主动通知 user本质是 user 向服务器建立长连接然后 kafka 完成任务后通过这个端口将信息发送给 user 运行方式
本地安装 kafka
不知道如何安装请 参考
运行 app.py
直接用 pycharm 运行就可以
使用 postman 测试
建立 http 长连接等待后端处理结果 新建窗口建立 http 连接针对 stream 端口并且是 GET 方法注意选中 http 协议哦通过左上角的符号不要选择其他协议同时在 Params 下面的 key 和 value 输入你 user_id 的信息要和下面的 /login 的一致然后点击 send长连接就会成功建立了
发送 RAW DATA
打开另一个新的窗口输入你本地运行的地址和端口并且选择 post 方法选择 body 和 raw 选择 json 的格式并在文本框中键入 json 数据发送就会收到 阶段性的服务器回复 这代表后端已经通过 kafka 来异步处理数据 这个时候很快你应该可以看到在长连接的那个 postman 窗口里面出现 {msg: successfully create user peinuan}并且每次你在 /login send 一次这里就会成功获得一次结果前端获得成功的信息
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/85814.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!