当前位置:   article > 正文

fastapi项目内使用socketio_fastapi socketio

fastapi socketio

参考:https://github.com/pyropy/fastapi-socketio

一、app中启用socketio

如下,一般fastapi项目会这样创建一个应用

  1. from fastapi import FastAPI
  2. app = FastAPI()

那么如果在项目中需要用到socketio服务时,可以通过以下方式来实现绑定;当然也可以把socketio的服务单独起一个项目。

  1. # 初始化socketio
  2. sio = socketio.AsyncServer(async_mode='asgi')
  3. # app绑定socketio
  4. app.mount('/', socketio.ASGIApp(socketio_server=sio)) # 使用默认的socket path
  5. # 默认的socketio_path是socket.io,通过http://localhost/socket.io就可以访问socketio服务,也可以自定义修改,例如改为ws
  6. # sio中的事件函数可以指定namespace,前端通过路由来进入不同的namespace,例如
  7. # sio.on('connect', connect, namespace='/chat')
  8. # 那么前端的调用接口应该是:http://localhost/socket.io/chat

二、简单示例

在fastapi项目中建立一个init_socketio目录,结构如下,chat_room.html和socket.io-client-4.1.2可以在python socketio 实现(极)简单聊天室中拿到

  1. ├── __init__.py
  2. ├── chat_room.html
  3. └── static
  4. └── socket.io-client-4.1.2

 __init__.py

  1. import os
  2. import socketio
  3. from urllib import parse
  4. from bidict import bidict
  5. from pydantic import BaseModel, Field
  6. from fastapi.responses import JSONResponse, HTMLResponse
  7. from fastapi import APIRouter
  8. from fastapi.staticfiles import StaticFiles
  9. user_sid = bidict()
  10. pwd_path = os.path.dirname(os.path.abspath(__file__))
  11. class Login(BaseModel):
  12. user_name: str = Field(..., description='1')
  13. router = APIRouter()
  14. @router.get('/chat_room', summary='聊天室界面')
  15. async def chat_room():
  16. chat_room = os.path.join(pwd_path, 'chat_room.html')
  17. with open(chat_room) as f:
  18. return HTMLResponse(f.read())
  19. @router.post('/login', summary='登录接口')
  20. async def login(item: Login):
  21. user_name = item.user_name
  22. JSONResponse.media_type = 'application/json; charset=utf-8'
  23. if user_sid.get(user_name, None):
  24. return JSONResponse({'code': 'fail'}, status_code=400)
  25. else:
  26. return JSONResponse({'code': 'success'}, status_code=200)
  27. def init_socketio(app):
  28. # 事件函数
  29. async def connect(sid, environ):
  30. query_params = environ['QUERY_STRING'].split('&')
  31. params = dict()
  32. for query_param in query_params:
  33. a, b = query_param.split('=')
  34. params[a] = parse.unquote(b)
  35. user_name = params['name']
  36. user_sid[user_name] = sid
  37. await sio.emit('reply', f'{user_name}连线成功!', namespace='/chat')
  38. async def message(sid, data):
  39. await sio.emit('reply', f"{user_sid.inv[sid]}: {data}", namespace='/chat')
  40. async def disconnect(sid):
  41. user_name = user_sid.inv[sid]
  42. del user_sid[user_name]
  43. await sio.emit('reply', f'{user_name}退出连线!', namespace='/chat')
  44. # 静态文件
  45. app.mount("/static", StaticFiles(directory=f"{pwd_path}/static"), name="static")
  46. # 初始化socketio
  47. sio = socketio.AsyncServer(async_mode='asgi')
  48. # 绑定事件函数
  49. sio.on('disconnect', disconnect, namespace='/chat')
  50. sio.on('chat message', message, namespace='/chat')
  51. sio.on('connect', connect, namespace='/chat')
  52. # app绑定socketio
  53. app.mount('/', socketio.ASGIApp(socketio_server=sio)) # 使用默认的socket path

然后在项目的启动文件中添加

  1. from fastapi import FastAPI
  2. from init_socketio import router, init_socketio
  3. app = FastAPI()
  4. async def startup_event():
  5. """项目初始化"""
  6. app.include_router(router)
  7. init_socketio(app)

启动后,打开http://localhost:8080/chat_room即可测试

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/863453?site
推荐阅读
相关标签
  

闽ICP备14008679号