赞
踩
参考:https://github.com/pyropy/fastapi-socketio
如下,一般fastapi项目会这样创建一个应用
- from fastapi import FastAPI
-
- app = FastAPI()
那么如果在项目中需要用到socketio服务时,可以通过以下方式来实现绑定;当然也可以把socketio的服务单独起一个项目。
- # 初始化socketio
- sio = socketio.AsyncServer(async_mode='asgi')
- # app绑定socketio
- app.mount('/', socketio.ASGIApp(socketio_server=sio)) # 使用默认的socket path
-
- # 默认的socketio_path是socket.io,通过http://localhost/socket.io就可以访问socketio服务,也可以自定义修改,例如改为ws
- # sio中的事件函数可以指定namespace,前端通过路由来进入不同的namespace,例如
- # sio.on('connect', connect, namespace='/chat')
- # 那么前端的调用接口应该是:http://localhost/socket.io/chat
在fastapi项目中建立一个init_socketio目录,结构如下,chat_room.html和socket.io-client-4.1.2可以在python socketio 实现(极)简单聊天室中拿到
- ├── __init__.py
- ├── chat_room.html
- └── static
- └── socket.io-client-4.1.2
- import os
- import socketio
- from urllib import parse
- from bidict import bidict
- from pydantic import BaseModel, Field
- from fastapi.responses import JSONResponse, HTMLResponse
- from fastapi import APIRouter
- from fastapi.staticfiles import StaticFiles
-
- user_sid = bidict()
- pwd_path = os.path.dirname(os.path.abspath(__file__))
-
-
- class Login(BaseModel):
- user_name: str = Field(..., description='1')
-
-
- router = APIRouter()
-
-
- @router.get('/chat_room', summary='聊天室界面')
- async def chat_room():
- chat_room = os.path.join(pwd_path, 'chat_room.html')
- with open(chat_room) as f:
- return HTMLResponse(f.read())
-
-
- @router.post('/login', summary='登录接口')
- async def login(item: Login):
- user_name = item.user_name
- JSONResponse.media_type = 'application/json; charset=utf-8'
- if user_sid.get(user_name, None):
- return JSONResponse({'code': 'fail'}, status_code=400)
- else:
- return JSONResponse({'code': 'success'}, status_code=200)
-
-
- def init_socketio(app):
- # 事件函数
- async def connect(sid, environ):
- query_params = environ['QUERY_STRING'].split('&')
- params = dict()
- for query_param in query_params:
- a, b = query_param.split('=')
- params[a] = parse.unquote(b)
- user_name = params['name']
- user_sid[user_name] = sid
- await sio.emit('reply', f'{user_name}连线成功!', namespace='/chat')
-
- async def message(sid, data):
- await sio.emit('reply', f"{user_sid.inv[sid]}: {data}", namespace='/chat')
-
- async def disconnect(sid):
- user_name = user_sid.inv[sid]
- del user_sid[user_name]
- await sio.emit('reply', f'{user_name}退出连线!', namespace='/chat')
- # 静态文件
- app.mount("/static", StaticFiles(directory=f"{pwd_path}/static"), name="static")
-
- # 初始化socketio
- sio = socketio.AsyncServer(async_mode='asgi')
- # 绑定事件函数
- sio.on('disconnect', disconnect, namespace='/chat')
- sio.on('chat message', message, namespace='/chat')
- sio.on('connect', connect, namespace='/chat')
- # app绑定socketio
- app.mount('/', socketio.ASGIApp(socketio_server=sio)) # 使用默认的socket path

然后在项目的启动文件中添加
- from fastapi import FastAPI
- from init_socketio import router, init_socketio
-
- app = FastAPI()
-
- async def startup_event():
- """项目初始化"""
-
- app.include_router(router)
- init_socketio(app)
启动后,打开http://localhost:8080/chat_room即可测试
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。