python fast api websocket 连接事例

news/2025/10/11 18:10:33/文章来源:https://www.cnblogs.com/zhaoyingjie/p/19135712

python fast api websocket 连接事例

服务端事例:

# -*- coding: utf-8 -*-
import asyncio
import traceback
import json
import uuid
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, List, Optional
import uvicorn
from redis.asyncio import from_url, Redis# ============= 基本配置 =============
REDIS_URL = "redis://localhost:6379/0"
CHANNEL_NAME = "websocket_messages"
SERVER_ID = str(uuid.uuid4())print(f"当前服务器ID: {SERVER_ID}")app = FastAPI(title="分布式WebSocket服务")# ============= 工具函数 =============
def log(msg):print(f"[Server {SERVER_ID[:8]}] {msg}")# ============= 核心管理类 =============
class ConnectionManager:def __init__(self):self.active_connections: Dict[str, WebSocket] = {}self.rooms: Dict[str, List[str]] = {}self.redis: Optional[Redis] = Noneself.pubsub = Noneself.subscription_task: Optional[asyncio.Task] = Noneasync def initialize(self):"""初始化 Redis 连接并订阅"""self.redis = from_url(REDIS_URL, decode_responses=False)self.pubsub = self.redis.pubsub()self.subscription_task = asyncio.create_task(self._subscribe_to_channel())log("Redis 初始化完成")async def _subscribe_to_channel(self):"""订阅 Redis 频道"""try:await self.pubsub.subscribe(CHANNEL_NAME)log(f"已订阅 Redis 频道: {CHANNEL_NAME}")async for message in self.pubsub.listen():if message["type"] == "message":await self._handle_external_message(message["data"])except Exception as e:log(f"Redis订阅异常: {e}\n{traceback.format_exc()}")async def _handle_external_message(self, data: bytes):"""处理来自其他服务器的消息"""try:msg = json.loads(data.decode())if msg.get("server_id") == SERVER_ID:return  # 跳过自身消息
msg_type = msg["type"]if msg_type == "broadcast":await self._broadcast_local(msg["message"])elif msg_type == "personal_message":await self.send_personal_message(msg["message"], msg["user_id"], remote=True)elif msg_type == "room_message":await self._send_room_local(msg["message"], msg["room_id"])except Exception as e:log(f"处理外部消息异常: {e}\n{traceback.format_exc()}")async def connect(self, websocket: WebSocket, user_id: str):await websocket.accept()self.active_connections[user_id] = websocketlog(f"用户 {user_id} 已连接")# 确保订阅频道if not self.pubsub:self.redis = await from_url(REDIS_URL)  # 如果没有 Redis 连接,重新连接self.pubsub = self.redis.pubsub()  # 获取 PubSub 对象await self.pubsub.subscribe(CHANNEL_NAME)  # 订阅频道log("已订阅Redis频道:", CHANNEL_NAME)async def disconnect(self, user_id: str):if user_id in self.active_connections:del self.active_connections[user_id]for room in list(self.rooms.keys()):if user_id in self.rooms[room]:self.rooms[room].remove(user_id)if not self.rooms[room]:del self.rooms[room]await self._publish_message({"type": "broadcast","message": f"用户 {user_id} 离开了聊天室","server_id": SERVER_ID,})log(f"用户 {user_id} 已断开")async def send_personal_message(self, message: str, user_id: str, remote: bool = False):"""发送私聊消息"""if user_id in self.active_connections:await self.active_connections[user_id].send_text(message)elif not remote:await self._publish_message({"type": "personal_message","message": message,"user_id": user_id,"server_id": SERVER_ID,})log(f"发送私聊消息给用户 {user_id}: {message}")async def broadcast(self, message: str):"""广播到所有服务器"""await self._broadcast_local(message)await self._publish_message({"type": "broadcast","message": message,"server_id": SERVER_ID,})async def _broadcast_local(self, message: str):"""仅在当前服务器内广播"""for ws in self.active_connections.values():await ws.send_text(message)async def join_room(self, user_id: str, room_id: str):self.rooms.setdefault(room_id, [])if user_id not in self.rooms[room_id]:self.rooms[room_id].append(user_id)await self._send_room_local(f"用户 {user_id} 加入了房间 {room_id}", room_id)await self._publish_message({"type": "room_message","message": f"用户 {user_id} 加入了房间 {room_id}","room_id": room_id,"server_id": SERVER_ID,})async def _send_room_local(self, message: str, room_id: str):"""仅在本地房间发送"""for uid in self.rooms.get(room_id, []):await self.send_personal_message(message, uid, remote=True)async def _publish_message(self, msg: dict):"""发布 Redis 消息"""if self.redis:log(f"发布消息:{msg}")await self.redis.publish(CHANNEL_NAME, json.dumps(msg).encode())async def shutdown(self):"""关闭 Redis 连接"""if self.subscription_task:self.subscription_task.cancel()if self.pubsub:await self.pubsub.unsubscribe(CHANNEL_NAME)if self.redis:await self.redis.close()manager = ConnectionManager()# ============= FastAPI 路由 =============
@app.on_event("startup")
async def startup():await manager.initialize()@app.on_event("shutdown")
async def shutdown():await manager.shutdown()@app.websocket("/ws/{user_id}")
async def websocket_endpoint(ws: WebSocket, user_id: str):await manager.connect(ws, user_id)try:await ws.send_text(f"欢迎,用户 {user_id} 已连接!")while True:msg = await ws.receive_text()if msg.startswith("join:"):await manager.join_room(user_id, msg.split(":", 1)[1])elif msg.startswith("room:"):_, rid, content = msg.split(":", 2)await manager._send_room_local(f"[{rid}] {user_id}: {content}", rid)await manager._publish_message({"type": "room_message","message": f"[{rid}] {user_id}: {content}","room_id": rid,"server_id": SERVER_ID,})else:await manager.broadcast(f"{user_id}: {msg}")except WebSocketDisconnect:await manager.disconnect(user_id)except Exception as e:log(f"WebSocket异常: {e}\n{traceback.format_exc()}")await manager.disconnect(user_id)@app.get("/")
async def get_page():return HTMLResponse("<h3>WebSocket服务器运行中</h3><p>使用 ws://localhost:8000/ws/{user_id} 连接</p>")@app.get("/push/{user_id}/{msg}")
async def push_message(user_id: str, msg: str):await manager.send_personal_message(msg, user_id)return {"status": "sent", "user_id": user_id, "message": msg}if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)

 

2.客户端通过postman 进行连接

image

 

3.通过接口向客户端推送消息 或直接调用send_personal_message

http://0.0.0.0:8000/push/oneday/hello_abc

await manager.send_personal_message(msg, user_id)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/934977.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Idea摸鱼看小说插件(YsQy-Book)-免费使用

前言 之前用的idea看小说插件要么要收费,要么就不好用,这我那里受得了,所以就决定自己开发一个idea看小说插件。 idea搜索:YsQy-Book Github地址 https://github.com/ZHJJ03/YsQy-Book-plugin 用法 先去设置里选择…

贴牛皮纸铝卷生产商推荐/铝卷生产厂家/铝卷哪家好

在当今的工业生产与建筑领域中,贴牛皮纸铝卷以其独特的性能和广泛的应用而备受青睐。它不仅具有良好的防腐保温效果,还在装饰、包装等方面发挥着重要作用。今天,就为大家推荐一家值得信赖的贴牛皮纸铝卷生产商——济…

2025浇注型聚氨酯厂家口碑排行榜:品质与服务双优之选

2025浇注型聚氨酯厂家口碑排行榜:品质与服务双优之选随着工业技术的不断进步,浇注型聚氨酯作为一种高性能材料,在众多领域中得到了广泛应用。从机械制造到建筑施工,从汽车工业到电子电器,浇注型聚氨酯凭借其优异的…

查询top cpu占用排行

查询top cpu占用排行1、命令行ps aux --sort=-%cpu | head -n 10

RAFT 共识算法

Leader - Follower 消息同步以Kafka为例子 在 Kafka 中,Leader 节点确保所有 Follower 节点成功接收消息的机制,主要通过 ISR(In-Sync Replicas,同步副本列表) 和 acks 消息确认机制 实现,具体流程如下: 1. 核心…

2025氧化镁厂家最新推荐榜:高纯度与优质服务并重的行业先锋

2025氧化镁厂家最新推荐榜:高纯度与优质服务并重的行业先锋随着工业和科技的快速发展,氧化镁作为一种重要的化工原料,在多个领域发挥着关键作用。为了帮助筛选氧化镁品牌,特此发布权威推荐榜单,为采购决策提供专业…

【Vue】LangChain4j大模型对话-前端页面完成(vite+vue3+router)

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

【Vue】LangChain4j大模型对话-前端页面完成(vite+vue3+router)

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

contenteditable 深度交互教程

contenteditable 交互式教程 - 百灵AI * { box-sizing: border-box; margin: 0; padding: 0; font-family: "Segoe UI", system-ui, -apple-system, sans-serif } body { background: linear-gradient(135de…

【gradio】使用Gradio快速开发前端界面:基础知识

【gradio】使用Gradio快速开发前端界面:基础知识使用Gradio快速开发前端界面:基础知识前言一、什么是Gradio?二、安装Gradio三、快速入门:构建一个简单的文本处理界面四、Gradio 的核心组件五、核心组件的演示六、…

2025风机盘管厂家口碑推荐榜:高效节能与稳定性能的行业首选

2025风机盘管厂家口碑推荐榜:高效节能与稳定性能的行业首选随着建筑行业的快速发展,风机盘管作为中央空调系统中的重要组成部分,其市场需求日益增长。高效节能与稳定性能成为用户选择风机盘管时的重要考量因素。为了…

距离和

http://noip.ybtoj.com.cn/contest/1121/problem/3 1.3 距离和 图解1685. 有序数组中差绝对值之和 14962615. 等值距离和 17932602. 使数组元素全部相等的最少操作次数 19032968. 执行操作使频率分数最大 24441703. 得…

痞子衡嵌入式:在i.MXRT下测试启动特性时可改写OTP Shadow寄存器而不烧OTP

大家好,我是痞子衡,是正经搞技术的痞子。今天痞子衡给大家介绍的是i.MXRT下测试启动特性时可改写OTP Shadow寄存器而不烧OTP。我们知道恩智浦 i.MXRT 系列除了 BOOT 相关引脚电平配置之外,主要通过片内 eFuse/OTP 存…

Open WebUI:打造友好且强大的自托管 AI 平台

Open WebUI:打造友好且强大的自托管 AI 平台目录 摘要 一、引言 二、Open WebUI 的概念讲解 (一)功能特性 (二)技术基础 三、Open WebUI 的架构设计 (一)系统架构图 (二)架构模块介绍 四、Open WebUI 的安装与…

直流微电网运行控制仿真算法设计与实现

一、仿真模型构建 1. 核心组件建模电源模块光伏电池:采用单二极管等效电路模型,结合扰动观察法实现MPPT控制 % MPPT控制仿真代码片段 V_pv = linspace(0, V_oc, 1000); % 光伏输出电压范围 I_pv = solar_cell_model…

基于MATLAB的多棵树分类器(随机森林)

一、实现流程 1. 数据预处理 % 加载鸢尾花数据集 load fisheriris X = meas(:,1:2); % 使用前两个特征 Y = species;% 处理缺失值(若有) cleanData = rmmissing([X, categorical(Y)]);% 特征标准化 X_scaled = zsco…

车载360环视平台:米尔RK3576开发板支持12路低延迟推流

在汽车智能化、网联化快速发展的今天,360环视系统 已成为智能驾驶和自动泊车的标配技术。无论是泊车入位、低速行车还是复杂路口的安全辅助,360环视都依赖于 多路摄像头 的实时接入与高效处理。然而,传统硬件平台往…

2025双氧水厂家最新推荐榜:品质卓越与环保安全的首选品牌!

2025双氧水厂家最新推荐榜:品质卓越与环保安全的首选品牌!随着环保意识的不断提升,双氧水作为一种高效、环保的氧化剂,在多个行业中的应用越来越广泛。从工业清洗到医疗消毒,从污水处理到食品加工,双氧水的需求量…

TDengine 3.3.6.0 使用Docker部署3节点集群

一、使用 Docker Swarm 管理 TDengine+Nginxdocker版本为26。 使用docker swarm 进行集群管理。 使用taosAdapter实现RESTful 接口访问。 使用nginx代理访问集群。二、架构设计 基于TDengine 节点 IP:192.168.0.1/db1…