当前位置:   article > 正文

python 搭建sse服务_python sse

python sse

服务端

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import time
from pydantic import BaseModel

app = FastAPI()


# 定义一个Pydantic模型来描述JSON数据的结构
class Item(BaseModel):
    msg: str


@app.post("/stream")
async def stream(item: Item):

    async def event_stream():
        while True:
            # 模拟一个事件
            yield '{} data: {}\n\n'.format(item.msg, time.ctime())
            # 立即发送事件,而不是等待缓冲
            await asyncio.sleep(2)  # 每2秒发送一次事件

    return StreamingResponse(event_stream(), media_type="text/event-stream")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

代码客户端

import requests
import json

# SSE URL
url = 'http://localhost:8000/stream'

# JSON payload
payload = {'msg': '你好'}

# Headers for SSE request
headers = {'Content-Type': 'application/json', 'Accept': 'text/event-stream'}

# Send SSE request
response = requests.post(url,
                         data=json.dumps(payload),
                         headers=headers,
                         stream=True)

# Check if the request was successful
if response.status_code == 200:
    # Process the SSE stream
    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')
            print(decoded_line)
else:
    print(f"Request failed with status code: {response.status_code}")

# Close the connection
response.close()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

webui客户端

import random
import gradio as gr
import requests
import json
"""
用户输入后的回调函数 random_response
参数:
message: 用户此次输入的消息
history: 历史聊天记录,比如 [["use input 1", "assistant output 1"],["user input 2", "assistant output 2"]]
​
返回值:输出的内容
"""


def random_response(message, history):
    return random.choice(["Yes", "No"])


def chat(message, history):
    # SSE URL
    url = 'http://localhost:8000/stream'

    # JSON payload
    payload = {'msg': '你好'}

    # Headers for SSE request
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'text/event-stream'
    }

    # Send SSE request
    response = requests.post(url,
                             data=json.dumps(payload),
                             headers=headers,
                             stream=True)

    # Check if the request was successful
    if response.status_code == 200:
        # Process the SSE stream
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                yield decoded_line
    else:
        print(f"Request failed with status code: {response.status_code}")

    # Close the connection
    response.close()


gr.ChatInterface(chat).launch(server_name='0.0.0.0',
                                         server_port=8001)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/880328
推荐阅读
相关标签
  

闽ICP备14008679号