需求:因为项目需要,边做边学python
,这次需要使用socket
功能,正常在main.py
中写个socket
,还是OK的,但是我想要在其他文件中,直接使用socket
的emit
方法,需要在文件结构上进行一些调整。
使用到的第三方库:python-socketio
官方地址:https://python-socketio.readthedocs.io/en/latest/
1、写一下前端代码:
<template><div><h1>Hello Socket</h1></div>
</template>
<script>
import * as io from 'socket.io-client';
export default {mounted() {this.socket = io.connect("http://localhost:8000");this.socket.emit('Client', "Hello, Server!");this.socket.on("Server", args => {console.log(args)});}
}
</script>
2、下面是python
代码
新建文件socket.py
import socketiobackground_task_started = Falsesio = socketio.AsyncServer(async_mode='asgi',cors_allowed_origins='*') # logger=False)async def background_task():# Example of how to send server generated events to clients.count = 0while True:await sio.sleep(10)count += 1await sio.emit('Server', {'data': "Hello, Client"})@sio.event
async def connect(sid, environ):print(f'{sid} connected!')global background_task_startedif not background_task_started:sio.start_background_task(background_task)background_task_started = Trueawait sio.emit("Server", {'data': "Connect",'count': 0})@sio.event
async def disconnect(sid):print('disconnect', sid)
3、下面是main
代码
在controllers
目录下,新建文件main.py
from fastapi import Depends, FastAPI, Header, HTTPException
from db.mongodb_utils import close_mongo_connection, connect_to_mongo
from fastapi.middleware.cors import CORSMiddleware
import socketio
from controllers.socket import sio
import uvicorn
import zmqdef get_app() -> FastAPI(title="接口",description="api",version="0.0.1"
):app = FastAPI()app.add_event_handler("startup", connect_to_mongo)app.add_event_handler("shutdown", close_mongo_connection)sio_app = socketio.ASGIApp(sio, other_asgi_app=app)app.mount('/', sio_app)return appapp = get_app()if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)
4、其他文件中使用
在其他文件中使用,需要如下代码即可
from controllers.socket import sioawait sio.emit('Server',{'data': "大铁牛"})