网站首页布局分析网站开发工程师 招聘

web/2025/10/2 21:09:04/文章来源:
网站首页布局分析,网站开发工程师 招聘,西安有几个区,乐清本地网站文章目录 案例信息介绍后端异步处理请求和后端同步处理请求同步方式异步方式 环境文件目录配置.envrequirements.txt 完整代码ext.pyapp.pykafka_create_user.py 运行方式本地安装 kafka运行 app.py使用 postman 测试建立 http 长连接#xff0c;等待后端处理结果发送 RAW DAT… 文章目录 案例信息介绍后端异步处理请求和后端同步处理请求同步方式异步方式 环境文件目录配置.envrequirements.txt 完整代码ext.pyapp.pykafka_create_user.py 运行方式本地安装 kafka运行 app.py使用 postman 测试建立 http 长连接等待后端处理结果发送 RAW DATA 在看这个文章之前建议先学习 kafka的工作原理 这个系列视频讲得很好虽然基于 Java 但是理解原理并不用区分语言。只需要看懂工作原理即可。 案例信息介绍 假设我的网站需要高并发地处理 user 注册这个简单的功能。前端会发送 {user_id: xxxx, psw:xxx} 的信息到后端完成创建 前端用 postman 来模拟后端用 flask 框架来简单演示 下面我用一张大致的图来表示代码的架构 前端的原始数据进入后端之后后端要用 kafka 的架构在有序地处理 user 的请求在这个任务中所有 user 的请求都是 register因此我们就创建一个 kafka 的 topic 专门用来处理 user 的这类请求同时由于 kafka 是通过队列的方式 异步地处理 user 的请求所以当 kafka 处理完 user 的请求后我们需要找到这个处理结果并返回给对应的 user 如果大家对于 异步处理 user 请求和同步处理没有概念那么下面一章我先给大家讲一下同步处理请求和异步处理的区别 后端异步处理请求和后端同步处理请求 同步方式 file: app.py.pyTime : 2024/3/30Author : Peinuan qinimport threading import json from flask import Flask, jsonify, request from flask_cors import CORS from ext import FLASK_HOST, FLASK_PORTapp Flask(__name__) CORS(app)app.route(/login, methods[POST]) def create_user_post():data request.jsonregister user code ....return jsonify({status: 200, msg: success})if __name__ __main__:app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue)上述方式可以看到我的 create_user_post 负责接受前端的数据并且即刻处理处理之后将结果返回前端 jsonify({status: 200, msg: success})这个过程是一行接着一行发生的如果中途出现了很耗时的操作那么程序会一直等着。在 Flask 应用中如果 register user code .... 处理需要20秒这确实会阻塞处理该请求的线程直到该过程完成。由于 Flask 开发服务器默认是单线程的这意味着在这20秒内服务器将无法处理来自其他用户的任何其他请求。为了允许 Flask 同时处理多个请求你可以启用多线程模式。这可以通过在 app.run() 中设置 threadedTrue 来实现 app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue, threadedTrue)。这样Flask 将能够为每个请求启动一个新的线程从而允许同时处理多个请求。但这仍然并不是一种很好的方法因为整个服务器来看不具备扩展性。 假设我们服务器为每个 user 的请求开一个线程那么服务器资源是有限的当服务器宕机也并不能很快的恢复这就导致扩展性很差。 异步方式 import threading import json from flask import Flask, jsonify, request from flask_cors import CORS from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer from kafka_create_user import kafka_consumer_taskapp Flask(__name__) CORS(app)app.route(/login, methods[POST]) def create_user_post():data request.json# 发送数据到Kafkaproducer.produce(LOGIN_TOPIC, keystr(data[user_id]).encode(utf-8), valuejson.dumps(data).encode(utf-8))producer.flush()logger.info(send message to consumer)return jsonify({msg: 你好请求正在处理})我们先忽略其他的代码只看这一部分。这里相当于我们接受 user 的请求之后通过 kafka 把处理请求的需要转移到外部的服务器集群上去了。而 kafka 的特性在于非常高的可扩展性。增加 kafka 的节点就可以线性地将任务处理的数量提高。如果你看我上面给的那张图kafka 可以通过无限制增加 consumer 的数量来提高数据的处理能力。而后端的服务器需要做的就是把这些数据不断地派发出去这个步骤相比于直接在后端将所有的请求处理来说可以忽略不计。 环境文件目录 . ├── app.py ├── ext.py ├── kafka_create_user.py └── requirements.txt配置 .env 首先构建一个配置文件 .env 来存放基础的配置信息 FLASK_HOST0.0.0.0 FLASK_PORT9300 # LOGIN 这个 topic 是用来处理用户注册这个业务的 LOGIN_TOPICLOGIN# RESPONSE_TOPIC 则是用来构建 response 来返回前端成功或者失败 RESPONSE_TOPICRESPONSE_TOPICrequirements.txt kafka-python2.0.0 colorlog6.7.0 configparser5.3.0 flask2.3.2 flask_basicauth0.2.0 Flask-JWT-Extended4.6.0 Flask-Limiter3.5.1 Flask-PyMongo2.3.0 requests2.31.0 gunicorn21.0.0 pymongo4.6.0 pdfminer.six20231228 flask_cors4.0.0 python-dotenv orjson3.10.0 langchain langchain-community langchain_openai chromadbpython3.10 完整代码 ext.py file: ext.py.pyTime : 2024/3/30Author : Peinuan qin import json import logging import os import queue import threading import colorlog from confluent_kafka import Producer, Consumer, KafkaError from dotenv import load_dotenv from confluent_kafka.admin import AdminClient, NewTopic# 加载 .env 中的变量 load_dotenv()FLASK_HOST os.environ[FLASK_HOST] FLASK_PORT os.environ[FLASK_PORT] LOGIN_TOPIC os.environ[LOGIN_TOPIC] RESPONSE_TOPIC os.environ[RESPONSE_TOPIC]TOPICS [LOGIN_TOPIC, RESPONSE_TOPIC]def create_topic():# Kafka服务器配置admin_client AdminClient({bootstrap.servers: localhost:9092})# 创建新主题的配置topic_list [NewTopic(topic, num_partitions3, replication_factor1) for topic in TOPICS]# 注意: replication_factor 和 num_partitions 可能需要根据你的Kafka集群配置进行调整# 创建主题fs admin_client.create_topics(topic_list)# 处理结果for topic, f in fs.items():try:f.result() # The result itself is Nonelogger.info(fTopic {topic} created)except Exception as e:logger.error(fFailed to create topic {topic}: {e})# Handler for logging handler colorlog.StreamHandler() formatter colorlog.ColoredFormatter(%(log_color)s%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s,datefmt%Y-%m-%d %H:%M:%S,log_colors{DEBUG: cyan,INFO: green,WARNING: yellow,ERROR: red,CRITICAL: red,bg_white,} ) handler.setFormatter(formatter)# Logger logger colorlog.getLogger(__name__) logger.addHandler(handler) logger.setLevel(logging.INFO) 尝试创建 topiccreate_topic()# 初始化Kafka生产者 producer_config {bootstrap.servers: localhost:9092 } producer Producer(**producer_config) 定义专门用来回复 response 的 consumer class ResponseConsumer:专门用来将各种处理好的结果返回给 user 作为 response 也就是图中针对 RESPONSE TOPIC 的 consumerdef __init__(self):self.users_streams {}self.config {bootstrap.servers: localhost:9092,group.id: user-response, # 设置 groupid如果不知道为什么要设置 groupid 可以去先看 kafka 的讲解视频auto.offset.reset: earliest} # 告诉 Kafka 消费者在找不到初始偏移量offset或者偏移量无效时比如指定的偏移量已经被删除应该从哪里开始消费消息。它可以设置为 earliest 或 latest。设置为 earliest 意味着消费者将从主题的开始处开始读取数据即尽可能不漏掉任何消息设置为 latest 意味着消费者将从新产生的消息开始读取即只消费自启动之后产生的消息。self.consumer Consumer(**self.config)logger.info(Create Response Consumer)self.consumer.subscribe([RESPONSE_TOPIC])logger.info(Subscribe Response Topic)# 因为可能有多个线程一起操作 consumer所以通过 lock 来保证线程安全self.lock threading.Lock()def get_or_make(self, user_id):获取某个 user_id 的 response queue 如果当前 user_id 的 response queue 不存在就创建一个每个 user_id 的 response queue 中都是返回给前端 user 的信息也就是图中的 RESPONSE MSG:param user_id::return:with self.lock:# 如果当前 user_id 还没有 queue就构建一个q self.users_streams.get(user_id, queue.Queue())self.users_streams[user_id] qreturn qdef pop(self, user_id):with self.lock:self.users_streams.pop(user_id, None)def put(self, user_id, msg_dict):当 user_id 的请求处理完产生的 RESPONSE MSG 放到 user_id 的队列里面:param user_id: :param msg_dict: :return: q self.get_or_make(user_id)if q:with self.lock:self.users_streams[user_id].put(msg_dict)logger.info(fput {msg_dict} into {user_id}s queue)return Trueelse:return Falsedef listen_for_response(self):不断拉取 RESPONSE TOPIC 的 producer 生成的结果:return:try:while True:msg self.consumer.poll(timeout1.0) # 1秒超时if msg is None:continueif msg.error():if msg.error().code() KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())break如果拉取到了就放到对应的 user_id 的 queue 里面if msg:logger.info(freceived data: {msg})msg_data json.loads(msg.value().decode(utf-8))user_id msg.key().decode(utf-8)logger.info(fmsg_data: {msg_data})logger.info(fuser_id: {user_id})put_flag self.put(user_id, msg_data)if not put_flag:logger.error(fCreate RESPONSE MSG for {user_id} failed)else:logger.info(fcreate RESPONSE MSG response for {user_id})except Exception as e:self.consumer.close() app.py file: app.py.pyTime : 2024/3/30Author : Peinuan qinimport threading import json from flask import Flask, jsonify, request from flask_cors import CORS from ext import FLASK_HOST, FLASK_PORT, users_streams, LOGIN_TOPIC, producer, logger, ResponseConsumer from kafka_create_user import kafka_consumer_taskapp Flask(__name__) CORS(app)response_consumer ResponseConsumer()app.route(/login, methods[POST]) def create_user_post():data request.json# 发送数据到Kafkaproducer.produce(LOGIN_TOPIC, keystr(data[user_id]).encode(utf-8), valuejson.dumps(data).encode(utf-8))producer.flush()logger.info(send message to login consumer)return jsonify({msg: 你好请求正在处理})app.route(/stream) def stream():user_id request.args.get(user_id) # 假设用户ID通过查询参数传入logger.info(fuid: {user_id})logger.info(fuser_streams: {response_consumer.users_streams})def event_stream(user_id):# 这里需要一种机制来持续发送数据给特定用户的流q response_consumer.get_or_make(user_id)logger.info(f{user_id} s queue is: {q})while True:if not q.empty():message q.get()logger.info(fmessage: {message})yield fdata: {json.dumps(message)}\n\nreturn app.response_class(event_stream(user_id), content_typetext/event-stream)def run_multi_thread():consumer_thread threading.Thread(targetkafka_consumer_task)response_thread threading.Thread(targetresponse_consumer.listen_for_response, daemonTrue)logger.info(Start APP ...)consumer_thread.start()logger.info(Create User Consumer start ...)response_thread.start()logger.info(Response Consumer start ...)app.run(hostFLASK_HOST, portFLASK_PORT, debugTrue, use_reloaderFalse)if __name__ __main__:run_multi_thread()kafka_create_user.py file: kafka_create_user.pyTime : 2024/3/30Author : Peinuan qin import json import os from queue import Queue import threading# 初始化全局消息队列 from confluent_kafka import Consumer, KafkaError from kafka import KafkaConsumer, KafkaProducer from dotenv import load_dotenv from ext import logger, LOGIN_TOPIC, RESPONSE_TOPIC, producerdef kafka_consumer_task():这里定义了 LOGIN TOPIC 的 consumer 的行为也就是对 user_id 传过来的 RAW DATA 如何处理:return: # Kafka配置config {bootstrap.servers: localhost:9092,group.id: user-login-group,auto.offset.reset: earliest}consumer Consumer(**config)consumer.subscribe([LOGIN_TOPIC])# 读取数据try:while True:msg consumer.poll(timeout1.0) # 1秒超时if msg is None:continueif msg.error():if msg.error().code() KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())breakif msg:data json.loads(msg.value().decode(utf-8))key msg.key().decode(utf-8)print(key:, key)为了观察我们将 user 传过来的数据保存到本地with open(f{key}.json, w) as f:json.dump(data, f, ensure_asciiFalse, indent4)logger.info(fsuccessfully saved the {key}.json)完成任务后通过 RESPONSE TOPIC 的 producer 生成 response并发送给 RESPONSE TOPIC 等待对应的 consumer 来取并且返回给前端producer.produce(RESPONSE_TOPIC, keymsg.key(), valuejson.dumps({msg: fsuccessfully create user {key}}).encode(utf-8))producer.flush()logger.info(send processed data to response consumer)except KeyboardInterrupt:passfinally:# 清理操作consumer.close()producer.flush()producer.close()强调一下 如果你也是基于 Flask 框架虽然这里的 debugTrue 可以保证每次更改代码后对代码进行重载方便你进行调试。但是关于内存中的一些变量会消失所以保证我上面的代码能够顺利运行我设置了 use_reloaderFalse 否则 response_consumer.users_streams 总是为空因为重载变量会造成混淆引发未知的程序错误。app.py 中的 stream 是以 SSE 的方式让服务器可以主动通知 user本质是 user 向服务器建立长连接然后 kafka 完成任务后通过这个端口将信息发送给 user 运行方式 本地安装 kafka 不知道如何安装请 参考 运行 app.py 直接用 pycharm 运行就可以 使用 postman 测试 建立 http 长连接等待后端处理结果 新建窗口建立 http 连接针对 stream 端口并且是 GET 方法注意选中 http 协议哦通过左上角的符号不要选择其他协议同时在 Params 下面的 key 和 value 输入你 user_id 的信息要和下面的 /login 的一致然后点击 send长连接就会成功建立了 发送 RAW DATA 打开另一个新的窗口输入你本地运行的地址和端口并且选择 post 方法选择 body 和 raw 选择 json 的格式并在文本框中键入 json 数据发送就会收到 阶段性的服务器回复 这代表后端已经通过 kafka 来异步处理数据 这个时候很快你应该可以看到在长连接的那个 postman 窗口里面出现 {msg: successfully create user peinuan}并且每次你在 /login send 一次这里就会成功获得一次结果前端获得成功的信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/85814.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

郯城县网站建设消防公司宣传册设计样本

参考自:计算机体系结构-寄存器重命名 - 知乎 (zhihu.com) 这里的重点在红色部分描述上,也是容易想不通的地方; 在SRAM方法中要恢复映射表状态,需要事先把分支指令来临时刻的映射表全份拷贝下来,然后在恢复时把备份复制…

网站增加权重吗wordpress带样式备份

这里写目录标题 基于Python微博舆情数据爬虫可视化分析系统(NLP情感分析爬虫机器学习)一、项目概述二、微博热词统计析三、微博文章分析四、微博评论分析五、微博舆情分析六、项目展示七、结语 基于Python微博舆情数据爬虫可视化分析系统(NLP情感分析爬虫机器学习) 一、项目概…

aspnet网站开发教程数据库重庆网站建设师

玩转幻兽帕鲁服务器,阿里云推出新手0基础一键部署幻兽帕鲁服务器教程,傻瓜式一键部署,3分钟即可成功创建一台Palworld专属服务器,成本仅需26元,阿里云服务器网aliyunfuwuqi.com分享2024年新版基于阿里云搭建幻兽帕鲁服…

宝塔搭建网站教程集团微网站建设

来到大学,好多时候填写一些班级或者身份证的表格时都要求设置文本格式,具体该怎么做呢,music走起嘿嘿。 Excel实现设置文本格式随手录了一个小视频,希望对你有所帮助,有帮到你的话可以点个赞,鼓励一下创作哈…

网站建设捌金手指专业1做网站项目前期工作包括哪些

文章目录 一. 斐波那契数列模型1. 第 N 个泰波那契数2. 三步问题3. 使用最小花费爬楼梯解法一:从左往右填表解法二:从右往左填表 一. 斐波那契数列模型 解题步骤: 确定状态表示(最重要):明确dp表里的值所…

白酒网站模板南方数据企业网站管理系统

在Linux上有很多监控工具,比如Zabbix、Prometheus、APM和ELK 监控工具是确保系统稳定运行的关键组件之一,它可以帮助系统管理员和开发人员及时发现并解决问题。 以下是几种流行的监控工具的简要介绍: Zabbix: Zabbix 是一个企…

南通企业模板建站wordpress 调用侧边栏

丝杆支撑座是丝杆和电机之间连接的重要组成部分,发挥着非常重要的功能。提到丝杆支撑座和滚珠丝杆,很多人都会想到支撑关系,但丝杆支撑座作为滚珠丝杆系统中至关重要的角色,其作用远不止于简单的支撑。 丝杆支撑座安装过程非常简单…

公司网站建设方案拓扑图ps网页入口设计步骤

解释器的结果通常通过上述表格展示: 1. select_type 表示查询的类型 simple: 表示简单的选择查询,没有子查询或连接操作 primary:表示主查询,通常是最外层的查询 subquery :表示子查询,在主查询中嵌套的查询 derived: 表示派…

网站建设费计入无形资产做网站买域名要买几个后缀最安全

系列文章目录 什么是计算机网络? 什么是网络协议? 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能(1)——速率、带宽、延迟 计算机网络性能(2)…

完整网站模板网站营销理念

在此推荐2篇发表在lancet以及jama子刊上的paf文章,这两篇文章套路是一样的,只是在不同国家进行。 在计算combined PAF或者说weighted PAF的时候,先建立了相关矩阵,再做主成分分析,得到communality。详细信息大家可翻阅…

做品牌的人常用的网站关键词优化如何做

目录 前言一、准备工作—准备各类文件1、准备实体类、枚举类、常量类2、准备配置文件 二、先编写Controller三、再编写Service、serviceImpl四、再编写mapper后续敬请期待 前言 Java开发功能简单思路 一、准备工作—准备各类文件 提前知道需要用到哪些类可以提前准备好&#…

北京微信网站建设报价旅游网站建设可行性分析

文章目录 1. 前言2. 常用api介绍3. 需求假设(获取离我最近的停车场)4. 代码示例 1. 前言 接着上一篇Redis那些事儿(二) ,这一篇主要介绍Redis基于Geo数据结构实现的地理服务,它提供了一种方便的方式来存储和…

高州市网站建设广州越秀公司网站建设

Full authentication is required to access this resource解决办法 报错问题:在使用 postman 测试接口时,该接口需要在 Header 中传入 access_token,实际上也在请求的 Header 中添加上了 access_token 参数,但是服务端还是返回4…

做淘宝客建网站要多少费用网站编辑心得体会

四 路由配置及页面跳转 (1)路由配置 uni-app页面路由全部交给框架统一管理,需要在pages.json里配置每个路由页面的路径以及页面样式(类似小程序在app.json中配置页面路由) (2)路由跳转 uni-app有两种页面路由跳转方式:使用navigator组件跳转(标签式导航)、调用API跳…

唐山网站制作案例网站设计流程软件

背景:在测试中,测试MM总喜欢连续重复点击Button,如果click事件的处理业务,稍微有些耗时,或者设备反应比较慢时,就会响应2遍处理,导致错误的现象出现。 前提:click事件的处理业务&…

企业网站模板 html广告公司平面设计培训

目录 一、下载 二、移植数据库 三、测试sqlite3 一、下载 SQLite Download Page 暂时先下载最新版的试试,我们以前其实在ubuntu上直接使用过 嵌入式数据库sqlite3_常见的嵌入式数据库-CSDN博客 当时我把常用的操作和怎么使用记录下来了 现在把他移植到开发板…

做计划的网站类似设计师联盟的网站

查看原文>>>【深度解析】WRF-LES与PALM微尺度气象大涡模拟 针对微尺度气象的复杂性,大涡模拟(LES)提供了一种无可比拟的解决方案。微尺度气象学涉及对小范围内的大气过程进行精确模拟,这些过程往往与天气模式、地形影响和…

怎么建立自己网站wordpress 生成页面

UML精简概述 UML精简概述 UML精简概述UML的定义常见的关系 在学习设计模式之前,需要掌握一些预备知识,主要包括UML类图和面向对象设计原则,它们是“基础内功”,将为后续的“深入修行”奠定基础。UML类图可用于描述每一个设计模式的…

杭州网站前端建设嘉定网站设计制作公司

FPGA里面的可执行文件都涉及到 *.bit, *.mcs, *.bin 和 *.elf。 bit文件 bit 文件一般用于JTAG在线进行调试的时候,是把bit文件是烧写到FPGA中进行在线调试。 bin文件 bin 文件是二进制文件,按顺序只包含原始字节流&#xff0c…

网站建设开发岗位职责产品设计大师作品

http://www.68design.net/Development/Aspnet/Basis-AspNet/26011-1.html 转载于:https://www.cnblogs.com/faxian/p/4402910.html