赞
踩
今天整理一篇后端框架的笔记, fastapi框架是比较主流的后端异步web框架,关键是python语言可以写,正好公司最近安排了一些后端服务的活, 所以就看了一个fastapi框架的入门课程(链接在底部),完成任务,这次想把学习的笔记整理下,方便以后回看回练。
这次笔记算虽然是fastapi的入门,但学习完了之后, 用fastapi框级上手开发公司里面的项目应该是没问题的,这个我亲测过, 并且可以比较舒服的学习大佬的后端代码了, 至于具体细节,边学边补就好, 但和同事大佬能交流的前提,就是得知道一些基础知识, 所以这篇文章就抛砖引玉下, 把fastapi这块的基础知识补一下。
大纲如下:
ok, let’s go!
这里主要是为什么要用FastAPI框架呢? 这哥们有几大特点:
性能优越 — 异步的web框架(同步和异步)
开发效率高
减少越40%的人为bug(错误处理方面的体验,错误类型,错误原因等)
直观
易学易用
精简代码,代码重复率更低
自带API交互文档,开发成果随时交付
API开发标准化
Python的类型提示: type hints, 可以帮助我们知道函数里面的每个参数是什么类型,在写代码的时候减少出错。
Starlette是一种轻量级的ASGI框架/工具包(异步web框架),构建高性能Asyncio服务的理想选择。
FastAPI是引入了Pydantic和Starlette库,然后引入了一些新的内容
ASGI服务的Uvicorn和Hypercorn介绍:
基本介绍:
使用Python的类型注解来进行数据校验和settings管理
Pydantic可以在代码运行时提供类型提示,数据校验失败时提供友好的错误提示
定义数据应该如何在纯规范的Python代码中保存,并用Pydantic验证它
这个东西定义的我感觉是请求体,里面的每个参数,我们可以提前指定好类型, 以及可以加一些方法进行校验等,来保证数据不出错。
基本用法:
class User(BaseModel): id: int # 必填字段 name: str = "John Snow" # 有默认值,选填字段 signup_ts: Optional[datetime] = None # 选填字段 friends: List[int] = [] # 列表中元素是int类型或者可以直接转换成int类型 external_data = { "id": 123, "signup_ts": "2023-08-03 15:55", "friends": [1, 2, "3"] } print("\033[31m1. --- Pydantic的基本用法。Pycharm可以安装Pydantic插件 ---\033[0m") # python 解包传给上面的类 user = User(**external_data) print(user.id, user.friends, user.signup_ts) # 实例化之后调用属性 print(user.dict()) print("\033[31m2. --- 校验失败处理 ---\033[0m") try: User(id=1, signup_ts=datetime.today(), friends=[1, 2, "not number"]) except ValidationError as e: print(e.json()) print("\033[31m3. --- 模型类的的属性和方法 ---\033[0m") # 数据转成字典,json, copy print(user.dict()) print(user.json()) print(user.copy()) # 浅拷贝 # 下面解析数据 类名字要注意 print(User.parse_obj(obj=external_data)) print(User.parse_raw('{"id": "123", "signup_ts": "2020-12-22 12:22", "friends": [1, 2, "3"]}')) path = Path('pydantic_tutorial.json') path.write_text('{"id": "123", "signup_ts": "2020-12-22 12:22", "friends": [1, 2, "3"]}') print(User.parse_file(path)) # schema的方法 print(user.schema()) print(user.schema_json()) user_data = {"id": "error", "signup_ts": "2020-12-22 12 22", "friends": [1, 2, 3]} # id是字符串 是错误的 print(User.construct(**user_data)) # 不检验数据直接创建模型类,不建议在construct方法中传入未经验证的数据 # 查看类的所有字段 print(User.__fields__.keys()) # 定义模型类的时候,所有字段都注明类型,字段顺序就不会乱
递归模型: 这个就是一个模型类里面用了另一个模型类
print("\033[31m4. --- 递归模型 ---\033[0m")
class Sound(BaseModel):
sound: str
class Dog(BaseModel):
birthday: date
weight: float = Optional[None]
sound: List[Sound] # 不同的狗有不同的叫声。递归模型(Recursive Models)就是指一个嵌套一个
dogs = Dog(birthday=date.today(), weight=6.66, sound=[Sound.parse_obj(obj={"sound": "wang wang ~"}),
Sound.parse_obj(obj={"sound": "ying ying ~"})])
print(dogs.dict())
创建符合ORM类的实例对象
示例:
print("\033[31m5. --- ORM模型:从类实例创建符合ORM对象的模型 ---\033[0m") # 定义一张数据表的模型类,每个对象,与数据库中的表的一条记录关联 Base = declarative_base() class CompanyOrm(Base): __tablename__ = 'companies' id = Column(Integer, primary_key=True, nullable=False) public_key = Column(String(20), index=True, nullable=False) # index为True, 自动建立索引,就不用单独再用KEY建立索引了 name = Column(String(63), unique=True) domains = Column(ARRAY(String(255))) # 定义pydantic模型类定义的数据规范, 与数据表模型类的对象一一对应,限制其数据格式 class CompanyMode(BaseModel): id: int public_key: constr(max_length=20) name: constr(max_length=63) domains: List[constr(max_length=255)] class Config: orm_mode = True # 这个表示我们建立的数据格式和模型类定义的, 方便后面调用一个from_orm的方法 # 数据表的模型类 co_orm = CompanyOrm( id=123, public_key='foobar', name='Testing', domains=['example.com', 'foobar.com'], ) # pydantic模型类定义的数据格式的规范 创建了一个pydantic模型,符合orm对象 print(CompanyMode.from_orm(co_orm)) print("\033[31m6. --- Pydantic支撑的字段类型 ---\033[0m") # 官方文档:https://pydantic-docs.helpmanual.io/usage/types/
app = FastAPI() class CityInfo(BaseModel): province: str country: str is_affected: Optional[bool] = None # 与bool的区别是可以不传,默认是null @app.get('/') async def hello_world(): return {'hello': 'world'} # @app.get('/city/{city}?q=xx') fastapi里面两个/ /之间的叫做路径参数, ?后面的叫查询参数 @app.get('/city/{city}') async def result(city: str, query_string: Optional[str] = None): return {'city': city, 'query_string': query_string} @app.put('/city/{city}') async def result(city: str, city_info: CityInfo): return {'city': city, 'country': city_info.country, 'is_affected': city_info.is_affected} # 启动命令:uvicorn hello_world:app --reload # SwargUI文档: url/docs 可以打开,理解会显示各个接口,可以在上面进行相关的参数调试
第一个点,一个大的应用里面会包含很多子应用, 主程序里面是FastAPI类进行实例化,子应用是通过接口路由APPRouter的方式进行实例化,然后从主程序里面进行导入。
import uvicorn from fastapi import FastAPI # tutorial 下面的每个py文件相当于一个应用,但是不能每一个都给它建立一个fastapi应用,所以这里通过接口路由的方式去实例化应用, 相当于子应用 # 之所以这里能直接导入app03, 是因为在tutorial的__init__文件中导入app03了,就不用from tutorial.chapter03 import app03了 from tutorial import app03, app04, app05 # 示例化一个fastapi应用 app = FastAPI() # 把接口路由的子应用接到主应用里面来 # 这个前缀就是请求的url, tags表示应用的标题, api文档里面的接口上面都有标题名 app.include_router(app03, prefix='/chapter03', tags=['第三章 请求参数和验证']) app.include_router(app04, prefix='/chapter04', tags=['第四章 响应处理和FastAPI配置']) app.include_router(app05, prefix='/chapter05', tags=['第五章 FastAPI的依赖注入系统']) # 这里也可以只设置应用入口, 具体的prefix, 以及tags在相应的应用里面设置, 这个在实际开发中,会降低主应用与子应用的耦合性 # 主应用里面只管导入子应用, 不管子应用的路径以及tags app.include_router(app03) if __name__ == "__main__": # 等价于之前的命令行启动:uvicorn run:app --reload uvicorn.run("run:app", host='0.0.0.0', port=8000, reload=True, debug=True, workers=1)
下面在app03里面看路径参数和数据的解析验证
"""Path Parameters and Number Validations 路径参数和数字验证""" # GET和POST的区别: 最直观的区别就是GET把参数包含在URL中,POST通过request body传递参数 @app03.get('/path/parameters') def path_params01(): return {"message": "This is a message"} @app03.get('/path/{parameters}') # 函数的顺序就是路由的顺序 def path_params01(parameters: str): return {"message": parameters} # 如果是上面第二种写法, 子应用里面可以这样设置, openapi的作用是,有时候有些接口不需要加权限访问,此时可以用openapi的接口地址 app03 = fastapi.APIRouter(prefix=settings.API_V1_STR, tags=["app 03"]) app03_openapi = fastapi.APIRouter(prefix=settings.OPENAPI_V1_STR, tags=["app 03"]) # 接口定义的时候,还有一种写法是: @app03.api_route('/path/{parameters}', methods=["GET"]) def xxx
这里,如果在下面的接口中输入parameters参数, 看输出会发现匹配的是上面的这个函数。
枚举类型参数:
class CityName(str, Enum):
Beijing = "Beijing China"
Shanghai = "shanghai china"
# 枚举类型参数
@app03.get('/enum/{city}')
async def latest(city: CityName):
if city == CityName.Shanghai:
return {'city_name': city, "confirmed": 1492, 'death': 7}
if city == CityName.Beijing:
return {'city_name': city, "confirmed": 971, 'death': 9}
return {'city_name': city, "latest": "unknown"}
路径参数传递文件的路径:
# 通过path parameters传递文件路径, 参数后面加一个path标识, 这样file_path里面的/就不会作为路径里面的/了
@app03.get("/files/{file_path:path}")
def filepath(file_path: str):
return f"The file path is {file_path}"
# curl请求方式
curl -X 'GET' \
'http://127.0.0.1:8000/chapter03/files/%2Fzhongqinag%2Fhello' \
-H 'accept: application/json'
# http请求
http://127.0.0.1:8000/chapter03/files/%2Fzhongqinag%2Fhello
校验路径参数:
# FastAPI里面的Path类就是校验路径参数用的
@app03.get("/path_/{num}")
def path_params_validate(
num: int = Path(..., title="Your number", description="description", ge=1, le=10) # 传入的num值校验, 必须大于1, 小于10
):
return num
# 请求方式 get, 里面查询参数
curl -X 'GET' \
'http://127.0.0.1:8000/chapter03/query/bool/conversion?param=true' \
-H 'accept: application/json'
# http请求
http://127.0.0.1:8000/chapter03/query/bool/conversion?param=true
"""Query Parameters and String Validations 查询参数和字符串验证""" from fastapi import Query @app03.get("/query") def page_limit(page: int = 1, limit: Optional[int] = None): if limit: return {"path": page, "limit": limit} return {"page": page} # bool类型转换 # bool类型转换:yes on 1 True true会转换成true, 其它为false @app03.get("/query/bool/conversion") def type_conversion(param: bool = False): return param # 验证一个字符串 需要用到FastAPI里面的Query类 @app03.get("/query/validations") def query_params_validate( value: str = Query(..., min_length=8, max_length=16, regex="^a"), # 希望输入的字符串最小长度是8, 最大长度是16, 以a开头 values: List[str] = Query(default=['v1', 'v2'], alias="alias_name") ): # 多个查询参数的列表和参数别名 return value, values # 发送请求的方式 这些都是查询参数了, get请求里面的 curl -X 'GET' \ 'http://127.0.0.1:8000/chapter03/query/validations?value=a2938dkfjk&alias_name=v1&alias_name=v2' \ -H 'accept: application/json' # http请求 http://127.0.0.1:8000/chapter03/query/validations?value=a2938dkfjk&alias_name=v1&alias_name=v2
请求体和多参数混合, 请求体的话, 开始学习post请求了。
"""Request Body and Fields 请求体和字段""" from faskapi import Body # 通过Field添加注解, 可以为这个参数的使用提供一个demo class CityInfo(BaseModel): name: str = Field(..., example='Beijing') # example是注解的作用, 值不会被验证 country: str country_code: str = None # 给一个默认值 country_population: int = Field(default=800, title="人口数量", description="国家的人口数量", ge=800) class Config: schema_extra = { "example": { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000, } } # 这种pytandic定义的数据类型原来就是请求体类型啊 @app03.post("/request_body/city") def city_info(city: CityInfo): print(city.name, city.country) return city.dict() # 发送请求的方式 curl -X 'POST' \ 'http://127.0.0.1:8000/chapter03/request_body/city' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 }'
这种继承BaseModel的类, 定义出来的数据类型,原来就是请求体类型啊, fastAPI里面的Body类型。
下面是多参数混合的写法, 前端怎么往后端发送数据
"""Request Body + Path parameters + Query parameters 多参数混合""" # 路径参数name, 请求体参数city01, city02, 查询参数confirmed, death @app03.put("/request_body/city/{name}") def mix_city_info( name: str, city01: CityInfo, city02: CityInfo, # Body可以是多个的 confirmed: int = Query(ge=0, description="确诊数", default=0), death: int = Query(ge=0, description="死亡数", default=0), ): if name == "Shanghai": return {"Shanghai": {"confirmed": confirmed, "death": death}} return city01.dict(), city02.dict() # 请求体参数city, 查询参数confirmed, death @app03.put("/request_body/multiple/parameters") def body_multiple_parameters( city: CityInfo = Body(..., embed=True), # 当只有一个Body参数的时候,embed=True表示请求体参数嵌套。多个Body参数默认就是嵌套的 confirmed: int = Query(ge=0, description="确诊数", default=0), death: int = Query(ge=0, description="死亡数", default=0), ): print(f"{city.name} 确诊数:{confirmed} 死亡数:{death}") return city.dict() 一个boday参数,embed=True, 数据长这个样子,如果是False, 就没有外面这层大括号 { "city": { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 } } # 这时候发送请求的方式 curl -X 'PUT' \ 'http://127.0.0.1:8000/chapter03/request_body/city/shanghai?confirmed=0&death=0' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "city01": { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 }, "city02": { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 } }'
知识普及1:
lt:less than 小于
le:less than or equal to 小于等于
eq:equal to 等于
ne:not equal to 不等于
ge:greater than or equal to 大于等于
gt:greater than 大于
知识普及2:
涉及到HTTP请求, 有GET, POST和PUT三种不同的请求反法, 各自有不同的用途和特点:
GET通常用于检索数据,?q={},
拿到某些数据, POST用于发送需要处理的数据,PUT用户更新或创建资源。
"""Request Body - Nested Models 数据格式嵌套的请求体""" # 使用pydantic定义数据格式的时候,要对数据进行校验, 使用Field # 在fastapi的函数中,对路径参数进行校验,使用Path类 # 在fastapi的函数中,对请求参数进行校验,使用Query类 class Data(BaseModel): city: List[CityInfo] = None # 这里定义的数据格式嵌套的请求体 date: date # 额外的数据类型,还有uuid datetime bytes frozenset等,参考:https://fastapi.tiangolo.com/tutorial/extra-data-types/ confirmed: int = Field(gt=0, description="确诊数", default=0) deaths: int = Field(ge=0, description="死亡数", default=0) recovered: int = Field(ge=0, description="痊愈数", default=0) @app03.put("/request_body/nested") def nested_models(data: Data): return data # 请求方式 curl -X 'PUT' \ 'http://127.0.0.1:8000/chapter03/request_body/nested' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "city": [ { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 } ], "date": "2023-08-06", "confirmed": 0, "deaths": 0, "recovered": 0 }'
header请求头参数自动转换的功能以及如何处理请求头里面key重复的参数
cookie参数:
from fastapi import Cookie
"""Cookie 和 Header 参数"""
@app03.get("/cookie") # 效果只能用Postman测试
def cookie(cookie_id: Optional[str] = Cookie(None)): # 定义Cookie参数需要使用Cookie类,否则就是查询参数
return {"cookie_id": cookie_id}
header参数:
from fastapi import Header @app03.get("/header") def header(user_agent: Optional[str] = Header(None, convert_underscores=True), x_token: List[str] = Header(None)): """ 有些HTTP代理和服务器是不允许在请求头中带有下划线的,所以Header提供convert_underscores属性让设置 :param user_agent: convert_underscores=True 会把 user_agent 变成 user-agent :param x_token: x_token是包含多个值的列表 :return: """ return {"User-Agent": user_agent, "x_token": x_token} # 下面这个我自己搞了个请求参数想看下header参数和查询参数的区别,结果发现,header参数如果用Header修饰, 请求中参数放到了-H里面, 而test是普通的查询参数,放到了查询参数里面 @app03.put("/header1") def header(user_agent: Optional[str] = Header(None, convert_underscores=True), x_token: List[str] = Header(None), test: str = 'zhongqiang'): return {"User-Agent": user_agent, "x_token": x_token, "test": test} # 请求方式 curl -X 'PUT' \ 'http://127.0.0.1:8000/chapter03/header1?test=zhongqiang' \ -H 'accept: application/json' \ -H 'user-agent: e532532' \ -H 'x-token: string234,string6'
上面这些参数自己的一些理解:
get请求里面是使用查询参数, 不允许有请求体, 请求的时候,放到url链接中?后面
curl -X 'GET' \
'http://127.0.0.1:8000/chapter03/query/bool/conversion?param=true' \
-H 'accept: application/json'
post请求里面参数放到请求体里面,请求体可以通过pydantic定义的模型去限制相关格式, 请求的时候, 请求体弄成一个json放到-d参数里面
curl -X 'POST' \
'http://127.0.0.1:8000/chapter03/request_body/city' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "shanghai",
"country": "china",
"country_code": "CN",
"country_population": 14000000
}'
put请求, 里面可以放query参数, 这个?后面,也可以加请求体 -d里面
curl -X 'PUT' \ 'http://127.0.0.1:8000/chapter03/request_body/city/shanghai?confirmed=0&death=0' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "city01": { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 }, "city02": { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 } }'
-H里面也可以有其他的参数,这个用Header来修饰
curl -X 'PUT' \
'http://127.0.0.1:8000/chapter03/header1?test=zhongqiang' \
-H 'accept: application/json' \
-H 'user-agent: e532532' \
-H 'x-token: string234,string6'
下面我搞一个混合的进行一个对比:
# 下面我搞一个混合的进行一个总结 @app03.put("/conclusion/{file_path:path}/{num}/city") def conclusion(file_path: str, test_conclusion: str, city: CityInfo = Body(..., embed=True), num: int = Path(..., title="Your number", description="description", ge=1, le=10), confirmed: int = Query(ge=0, description="确诊数", default=0), user_agent: Optional[str] = Header(None, convert_underscores=True), x_token: List[str] = Header(None) ): """ :param file_path: 路径参数 :param test_conclusion: 查询参数 :param city: 请求体参数 :param num: 路径参数 :param confirmed: 查询参数 :user_agent: header参数 :return: """ return {"User-Agent": user_agent, "x_token": x_token, "file_path": file_path, "test_conclusion": test_conclusion, "city": city, "num": num, "confirmed": confirmed} # 请求方式 curl -X 'PUT' \ 'http://127.0.0.1:8000/chapter03/conclusion/%2Fzhongqiang%2Fstudy/8/city?test_conclusion=faskapi&confirmed=60' \ 路径参数和查询参数 -H 'accept: application/json' \ -H 'user-agent: zhongqiang666' \ header参数 -H 'x-token: string666' \ -H 'Content-Type: application/json' \ -d '{ 请求体参数 "city": { "name": "shanghai", "country": "china", "country_code": "CN", "country_population": 14000000 } }'
上一章是输入的部分,前端给到后面的参数有哪些,以及后端怎么接收这些参数去解析去校验, 那么如何用pydantic对响应的数据进行规范和校验,上一章是request部分, 这一章是response部分了。
请求里面加入response_model参数, 可以规范返回的数据符合响应模型
"""Response Model 响应模型""" # 模拟场景: 用户在前端传递用户名,密码, 邮箱,手机号等信息,后端给前端返回的时候, 返回用户名,邮箱,手机号等,密码不能传 # 建立两个模型类,一个是前端给后端传的模型类,一个是后端给前端返的模型类 class User(BaseModel): username: str mobile: str = "10086" email: EmailStr # 自动校验是否是email类型 address: str = None full_name: Optional[str] = None class UserIn(User): password: str class UserOut(User): pass users = { "user01": {"username": "user01", "password": "123123", "email": "user01@example.com"}, "user02": {"username": "user02", "password": "123456", "email": "user02@example.com", "mobile": "110"} } @app04.post("/response_model/", response_model=UserOut, response_model_exclude_unset=True) async def response_model(user: UserIn): """response_model_exclude_unset=True表示默认值不包含在响应中,仅包含实际给的值,如果实际给的值与默认值相同也会包含在响应中""" print(user.password) # password不会被返回 # return user # 这里面不会返回user01的password属性,因为UserOut中没有这个属性, 也不会返回mobile这个属性,因为UserOut中这个是默认值 # 而又设置了response_model_exclude_unset=True, user01里面没有这个属性, 所以此时采用了UserOut中的默认值,默认值这个会排除掉 # 此时就只有username和email属性 return users["user02"] @app04.post( "/response_model/attributes", # response_model=UserOut, # response_model=Union[UserIn, UserOut], # 返回UserIn和UserOut的并集,返回的类型是UserIn或者是UserOut,注意不是并集属性,如果UserIn在前面,不为None,返回UserIn,此时就会有passwd属性返回,如果不想返回,可以delpassword属性 如果UserOut在前面不为空,返回UserOut response_model=List[UserOut], # 这里返回的时候,还可以返回UserOut类型的列表,也就是多个用户列表,且每个用户都属于UserOut类 # response_model_include=["username", "email"], # 包含哪些字段 response_model_exclude=["mobile"] # 排除哪些字段 ) async def response_model_attributes(user: UserIn): """response_model_include列出需要在返回结果中包含的字段;response_model_exclude列出需要在返回结果中排除的字段""" # del user.password # Union[UserIn, UserOut]后,删除password属性也能返回成功 # return user return [user, user] # 如果上面response_model是个列表,这里可以返回一个列表 # 请求格式 curl -X 'POST' \ 'http://127.0.0.1:8000/chapter04/response_model/attributes' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "username": "string", "mobile": "10086", "email": "user@example.com", "address": "string", "full_name": "string", "password": "string" }'
"""Response Status Code 响应状态码"""
from fastapi import status_code
@app04.post("/status_code", status_code=200)
async def status_code():
return {"status_code": 200}
# 和下面这个等价
@app04.post("/status_attribute", status_code=status.HTTP_200_OK)
async def status_attribute():
print(type(status.HTTP_200_OK)) # int
return {"status_code": status.HTTP_200_OK}
"""Form Data 表单数据处理"""
from fastapi import File, UploadFile
@app04.post("/login")
async def login(username: str = Form(...), password: str = Form(...)): # 定义表单参数
"""用Form类需要pip install python-multipart; Form类的元数据和校验方法类似Body/Query/Path/Cookie"""
return {"username": username}
这个看下请求方式: 右上角这里就是form表单了,而不是之前的application/json了, 注意看header里面的content-type类型这个
关于响应模型,表单数据以及响应状态码, 通过这篇文章可以进行一些补充
"""Request Files 单文件、多文件上传及参数详解""" from fastapi import File, UploadFile @app04.post("/file") async def file_(file: bytes = File(...)): # 如果要上传多个文件 files: List[bytes] = File(...) """使用File类 文件内容会以bytes的形式读入内存 适合于上传小文件""" return {"file_size": len(file)} # 请求方式 curl -X 'POST' \ 'http://127.0.0.1:8000/chapter04/file' \ -H 'accept: application/json' \ -H 'Content-Type: multipart/form-data' \ -F 'file=@截图 2023-08-04 10-13-03.png;type=image/png' # 如果是上传大文件怎么办? @app04.post("/upload_files") async def upload_files(files: List[UploadFile] = File(...)): # 如果要上传单个文件 file: UploadFile = File(...) """ 使用UploadFile类的优势: 1.文件存储在内存中,使用的内存达到阈值后,将被保存在磁盘中 2.适合于图片、视频大文件 3.可以获取上传的文件的元数据,如文件名,创建时间等 4.有文件对象的异步接口 5.上传的文件是Python文件对象,可以使用write(), read(), seek(), close()操作 """ for file in files: contents = await file.read() # 由于使用的是异步操作,所以这里读取的时候,要使用await print(contents) return {"filename": files[0].filename, "content_type": files[0].content_type} # 请求方式 curl -X 'POST' \ 'http://127.0.0.1:8000/chapter04/upload_files' \ -H 'accept: application/json' \ -H 'Content-Type: multipart/form-data' \ -F 'files=@截图 2023-08-06 16-02-31.png;type=image/png' \ -F 'files=@mipilot_trigger_rules.pb.conf' \ -F 'files=@test.html;type=text/html'
这个就是将某个目录下面完全独立的应用挂载过来, 在主程序run.py中加入下面的代码即可
# 静态文件要使用挂载的方式挂载进应用中
# mount表示将某个目录下面完全独立的应用挂载过来,这个不会在API交互文档中显示
# path 表示请求地址,app这个表目录实际的位置
from fastapi.staticfiles import StaticFiles
app.mount(path='/static', app=StaticFiles(directory='./coronavirus/static'), name='static') # .mount()不要在分路由APIRouter().mount()调用,模板会报错
如何在路径里面去配置响应状态码, 标签和描述文档等
"""Path Operation Configuration 路径操作配置""" @app04.post( "/path_operation_configuration", # url 地址 response_model=UserOut, # tags=["Path", "Operation", "Configuration"], # 和主程序里面的tags效果是一样的 summary="This is summary", description="This is description", response_description="This is response description", # 返回给前端数据的结果添加描述 # deprecated=True, # 接口是否已经废弃,如果想废弃,就设置true,这时候文档里面就是会划掉这个接口, 表示已经废弃,但使用还是可以使用的 status_code=status.HTTP_200_OK ) async def path_operation_configuration(user: UserIn): """ Path Operation Configuration 路径操作配置 :param user: 用户信息 :return: 返回结果 """ return user.dict()
tag的作用:上面的3个tags下面都是这个函数
还可以在主应用里面添加一些常用配置项, 给Swagger文档添加的一些配置
app = FastAPI(
title='FastAPI Tutorial and Coronavirus Tracker API Docs', # 给应用加标题
description='FastAPI教程 新冠病毒疫情跟踪器API接口文档,项目代码:https://github.com/liaogx/fastapi-tutorial', # 加个描述
version='1.0.0', # 加个版本
docs_url='/docs', # 自定义swagger UI的地址
redoc_url='/redocs', # 自定义redocs文档的地址
)
看看效果:
如何使用*HTTPException
*和如何自定义异常处理器
默认的HTTPException的用法
@app04.get("/http_exception")
async def http_exception(city: str):
if city != "Beijing":
# 404 没有找到
raise HTTPException(status_code=404, detail="City not found!", headers={"X-Error": "Error"})
return {"city": city}
输入Beijing可以正常返回城市名,输入别的,就会抛出异常。
如果我们想重写错误处理的方法怎么办?对于每次请求错误都用新的逻辑处理, 需要在主程序run.py里面写:
# run.py中加入 from fastapi.exceptions import RequestValidationError from fastapi.responses import PlainTextResponse from fastapi.exceptions import HTTPException from starlette.exceptions import HTTPException as StarletteHTTPException @app.exception_handler(StarletteHTTPException) # 重写HTTPException异常处理器 async def http_exception_handler(request, exc): """ :param request: 这个参数不能省 :param exc: :return: """ return PlainTextResponse(str(exc.detail), status_code=exc.status_code) @app.exception_handler(RequestValidationError) # 重写请求验证异常处理器 async def validation_exception_handler(request, exc): """ :param request: 这个参数不能省 :param exc: :return: """ return PlainTextResponse(str(exc), status_code=400) # charapter04.py中加入 @app04.get("/http_exception/{city_id}") async def override_http_exception(city_id: int): if city_id == 1: raise HTTPException(status_code=418, detail="Nope! I don't like 1.") return {"city_id": city_id}
下面看下效果:
下面再测试下请求参数验证异常是怎么样子的? 在验证bool类型的那个里面传入参数zhongqiang,此时这个参数不能转成bool类型,所以就会抛出请求参数验证的异常, 此时就会发现改异常是个文本类型了,如果不重写的话,是个字符串。
“依赖注入”是指在编程中,为保证代码成功运行, 先导入或声明其所需要的“依赖”, 如子函数,数据库连接等。
优势:
上面是从开发的角度看依赖注入系统,下面是FastAPI框架本身:增强FastAPI的兼容性
我理解这个依赖复用,就是让写的每个路由函数之间有了一定的联系,之前学习的3和4章,都是单个单个的接口,而这里呢,就是单个的接口与另外的接口形成一种依赖,使得接口与接口之间有了联系。
下面是个最简单的例子:
"""Dependencies 创建、导入和声明依赖""" from fastapi import Depends # 先定义一个公共参数 async def common_parameters(q: Optional[str] = None, page: int = 1, limit: int = 100): return {"q": q, "page": page, "limit": limit} # #下面两个函数都会依赖上面这个 @app05.get("/dependency01") async def dependency01(commons: dict = Depends(common_parameters)): # 这个就是上面返回的结果给到当前这个函数进行处理 return commons # # 依赖不区分同步和异步函数 @app05.get("/dependency02") def dependency02(commons: dict = Depends(common_parameters)): # 可以在async def中调用def依赖,也可以在def中导入async def依赖 return commons
上面这个好处就是可以实现一些接口的复用, 减少重复代码,方便维护。
上面是将函数作为依赖项, 下面是看看如何将类作为依赖项。
"""Classes as Dependencies 类作为依赖项""" fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}] class CommonQueryParams: def __init__(self, q: Optional[str] = None, page: int = 1, limit: int = 100): self.q = q self.page = page self.limit = limit # 下面三种写法都可以 # 下面模拟了从前端传过来一个新的请求页,然后后端从数据库里面获取新的数据,更新返回到前端的一个场景 @app05.get("/classes_as_dependencies") # async def classes_as_dependencies(commons: CommonQueryParams = Depends(CommonQueryParams)): # async def classes_as_dependencies(commons: CommonQueryParams = Depends()): # 不直观和不好理解 async def classes_as_dependencies(commons=Depends(CommonQueryParams)): response = {} if commons.q: response.update({"q": commons.q}) # 这里是拿到数据库里面新的页中的数据 items = fake_items_db[commons.page: commons.page + commons.limit] # 更新到response里面去, response.update({"items": items}) return response 这样,前端传入查询的page和limit的时候,就不用下面的函数里面都写page, limit参数了,只接用一个公共的类
这里看下效果:
这个的意思,就是我先写一个函数, 里面写一些公共的处理逻辑,然后再写个函数依赖前面的公共处理逻辑,增加一些新逻辑, 接下来,写个接口,依赖新的这个函数进行处理。
"""Sub-dependencies 子依赖""" def query(q: Optional[str] = None): return q # 这个依赖于上面的函数 def sub_query(q: str = Depends(query), last_query: Optional[str] = None): if not q: return last_query return q # 这个依赖于上面的函数 @app05.get("/sub_dependency") async def sub_dependency(final_query: str = Depends(sub_query, use_cache=True)): """use_cache默认是True, 表示当多个依赖有一个共同的子依赖时,每次request请求只会调用子依赖一次,多次调用将从缓存中获取""" return {"sub_dependency": final_query} # 这里面做的事情,就是如果q参数指定了,最后就会返回q参数,如果q参数没有指定,那么就会返回last_query参数, 看sub_query里面的处理逻辑
上面说的是如何在函数里面作为参数里面导入依赖,下面是如何在路径操作里面导入依赖
"""Dependencies in path operation decorators 路径操作装饰器中的多依赖""" # 场景: 对Header里面输入的token和key进行校验 async def verify_token(x_token: str = Header(...)): """没有返回值的子依赖""" if x_token != "fake-super-secret-token": raise HTTPException(status_code=400, detail="X-Token header invalid") async def verify_key(x_key: str = Header(...)): """有返回值的子依赖,但是返回值不会被调用""" if x_key != "fake-super-secret-key": raise HTTPException(status_code=400, detail="X-Key header invalid") return x_key # 下面导入多个依赖 @app05.get("/dependency_in_path_operation", dependencies=[Depends(verify_token), Depends(verify_key)]) # 这时候不是在函数参数中调用依赖,而是在路径操作中 async def dependency_in_path_operation(): # 如果header里面参数输入的是x_token: fake-super-secret-token, x_key: fake-super-secret-key # 如果不是,就会抛出相应的异常,x_token或者x_key无效 return [{"user": "user01"}, {"user": "user02"}, {"header": }]
假设我有一些子依赖需要提供给站点的所有应用程序,这时候怎么搞, 可以在主程序里面进行添加
from tutorial.chapter05 import verify_key, verify_token
# 示例化一个fastapi应用
app = FastAPI(
title='FastAPI Tutorial and Coronavirus Tracker API Docs', # 给应用加标题
description='FastAPI教程 新冠病毒疫情跟踪器API接口文档,项目代码:https://github.com/liaogx/fastapi-tutorial', # 加个描述
version='1.0.0', # 加个版本
docs_url='/docs', # 自定义swagger UI的地址
redoc_url='/redocs', # 自定义redocs文档的地址,
dependencies=[Depends(verify_token), Depends(verify_key)]
)
# 这样之前写的所有接口函数里面,都增加了两个header参数,来验证token和key
这个在数据库连接共享中可能会用到,下面是伪代码了
"""Dependencies with yield 带yield的依赖""" # 这个需要Python3.7才支持,Python3.6需要pip install async-exit-stack async-generator # 以下都是伪代码 # 先搞一个数据库连接的函数,后面的结果函数都会用到这个连接数据库,进行数据库的访问操作 # 数据库的共享 async def get_db(): db = "db_connection" # 连接数据库,拿到数据库 try: yield db finally: db.endswith("db_close") # 关闭数据库 async def dependency_a(): dep_a = "generate_dep_a()" try: yield dep_a finally: dep_a.endswith("db_close") async def dependency_b(dep_a=Depends(dependency_a)): dep_b = "generate_dep_b()" try: yield dep_b finally: # 关闭的是使用的子依赖的连接,不是其本身 dep_b.endswith(dep_a) async def dependency_c(dep_b=Depends(dependency_b)): dep_c = "generate_dep_c()" try: yield dep_c finally: dep_c.endswith(dep_b)
主要有4种:
这里主要是先学习下第3个,密码授权模式。
原理大概是下面这样:
客户先用授权的身份去服务器中获取一个token, 后续访问服务器资源的时候,必须携带token。
下面从代码的角度,看两种不同的认证方式。
要实现用户权限的认证,需要有用户的相关信息,下面定义两个用户并定义相关的接口,先获取到用户的相关信息。
"""基于 Password 和 Bearer token 的 OAuth2 认证""" # 模拟一个数据库, 假设用户表里面有两条记录, 这个是用户注册完了之后,写入到数据库里面的 # 后面用户输入用户名和密码获取token的时候,需要先看看改用户注册了没有 fake_users_db = { "john snow": { "username": "john snow", "full_name": "John Snow", "email": "johnsnow@example.com", "hashed_password": "fakehashedsecret", "disabled": False, }, "alice": { "username": "alice", "full_name": "Alice Wonderson", "email": "alice@example.com", "hashed_password": "fakehashedsecret2", "disabled": True, # 这个模拟用户的权限, 激活的这个有权限 }, } from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm # 后端需要提供一个让用户获取token的接口 """OAuth2 密码模式和 FastAPI 的 OAuth2PasswordBearer""" """ OAuth2PasswordBearer是接收URL作为参数的一个类:客户端会向该URL发送username和password参数,然后得到一个Token值 OAuth2PasswordBearer并不会创建相应的URL路径操作,只是指明客户端用来请求Token的URL地址 当请求到来的时候,FastAPI会检查请求的Authorization头信息,如果没有找到Authorization头信息,或者头信息的内容不是Bearer token,它会返回401状态码(UNAUTHORIZED) """ # 请求Token的URL地址 http://127.0.0.1:8000/chapter06/token 这个接口下面会实现, 接收用户名和密码,返回一个token oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token") @app06.get("/oauth2_password_bearer") # 这个接口是用来获取用户的token, 这里依赖oauth2_schema, 即需要先输入用户名和密码进行授权, 这个背后调用的就是token这个函数,拿到token之后 # 作为参数传入进来, 返回这个token async def oauth2_password_bearer(token: str = Depends(oauth2_schema)): return {"token": token}
这样就实现了一个接口, 点击右边的锁, 就会出现oauth2_schema的格式,里面输入用户名和密码:
点击授权,就会对于当前用户返回一个token, 后端的接口是/token这个接口
# 模拟加密密码过程 def fake_hash_password(password: str): return "fakehashed" + password # 建立一个用户模型类 class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None # 存储在数据库里面的用户,继承上面的user class UserInDB(User): hashed_password: str # 给用户的用户名和密码,返回一个token @app06.post("/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): # 这里使用的表单是oauth的表单,使用依赖注入的方式导入进来 # 先去查数据库里面用没有用户,然后看用户名和密码是否能对应上 user_dict = fake_users_db.get(form_data.username) if not user_dict: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password") user = UserInDB(**user_dict) # 哈希加密 hashed_password = fake_hash_password(form_data.password) if not hashed_password == user.hashed_password: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect username or password") # 上面都没有问题,正常的逻辑就是为该用户生成一个token就可以了,这里为了简单,生成的token只接是用户名了。 # 下面有个用jwt算法进行生成token的方式 # 下面这个字典的key不要改 access_token是token的固定标识,这个如果改了,会报错"detail": "Invalid authentication credentials" return {"access_token": user.username, "token_type": "bearer"} # 获取用户 def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 模拟解码token, 传入token, 返回对应的用户 def fake_decode_token(token: str): user = get_user(fake_users_db, token) return user # 获取当前的用户 # 这个函数要依赖于oauth2_schema, 这个函数获取到token才能用 async def get_current_user(token: str = Depends(oauth2_schema)): user = fake_decode_token(token) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, # OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate” ) return user # 获取当前活跃的用户 async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user # 获取当前用户信息, 这个函数要依赖于get_current_active_user函数 @app06.get("/users/me") async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
下面主要是看看获取当前用户信息的接口的运行逻辑,就大致上理解这里的认证过程了。
首先, 这个函数接收current_user参数,这是一个User类, 函数依赖于get_current_user函数,这个依赖于oauth2_schema, 即需要用户先提前认证,获取到用户token,如果没有认证,会显示没有认证的报错。
先点击锁进行认证,
认证完成之后,点击try, 就会返回用户的信息。
注意header里面的授权部分,这里是Bearer的授权类型,后面那个是token不是用户名。
下面总结下上面的一个运行逻辑:
用户认证: 点击锁, 此时需要输入用户名和密码,点击授权时, 背后调用一个接口获取token, 指定方式: oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/token")
此时,输入用户名和密码,实际上是走token这个接口的函数逻辑,这个函数的响应要包含access_token和token_type两个字段,指明类型和token的具体值
# 下面会用更规范的方式,上面的这个不是很规范,需要包含access_token和token_type这两个字段
class Token(BaseModel):
"""返回给用户的Token"""
access_token: str
token_type: str
后面的一些接口,如果需要认证的用户才能进行操作,就通过依赖注入的方式,让用户先认证得到一个token, 后面的函数接受的是这个token值,然后进行相关的操作。
# 获取当前的用户
# 这个函数要依赖于oauth2_schema, 这个函数获取到token才能用
async def get_current_user(token: str = Depends(oauth2_schema)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"}, # OAuth2的规范,如果认证失败,请求头中返回“WWW-Authenticate”
)
return user
这两个点是关于认证的核心逻辑,即先写一个接口,让用户输入用户名和秘密,得到token值,后面依赖这个token值进行访问,此时靠的是依赖注入。
其他的涉及数据库的相关操作的不是重点。
下面这个认证方式,其实也是这样的一个逻辑,无非就是生成token的方式不同。
这是一种比较常用的认证方式。
用户在浏览器上输入用户名和密码给到服务器,服务器创建一个JWT的token返回给浏览器, 浏览器接收到JWT的一个token之后, 会把这个放到认证的请求头里面,服务器接收到请求之后,会校验请求头里面的JWT签名, 从JWT里面获取用户的信息, 然后返回响应给到用户。
"""OAuth2 with Password (and hashing), Bearer with JWT tokens 开发基于JSON Web Tokens的认证""" from datetime import datetime, timedelta from jose import JWTError, jwt from passlib.context import CryptContext # 这里依然是先假设有一个用户注册完成了 fake_users_db.update({ "john snow": { "username": "john snow", "full_name": "John Snow", "email": "johnsnow@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } }) SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # 生成密钥 openssl rand -hex 32 ALGORITHM = "HS256" # 算法 ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 访问令牌过期分钟 class Token(BaseModel): """返回给用户的Token""" access_token: str token_type: str # 对用户的密码进行加密 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 搞一个能接收用户名和密码,创建token的接口 oauth2_schema = OAuth2PasswordBearer(tokenUrl="/chapter06/jwt/token") def verity_password(plain_password: str, hashed_password: str): """对密码进行校验""" return pwd_context.verify(plain_password, hashed_password) def jwt_get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 获取授权用户 def jwt_authenticate_user(db, username: str, password: str): user = jwt_get_user(db=db, username=username) if not user: return False if not verity_password(plain_password=password, hashed_password=user.hashed_password): return False return user # 为用户创建access token def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: # 默认的过期时间15分钟 expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # 使用jwt算法生成token encoded_jwt = jwt.encode(claims=to_encode, key=SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt @app06.post("/jwt/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): # 类的依赖的方式导入进来 # 先校验用户 user = jwt_authenticate_user(db=fake_users_db, username=form_data.username, password=form_data.password) if not user: raise HTTPException( status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # 设置token的过期时间 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # 返回token 这两个参数的key不能变 return {"access_token": access_token, "token_type": "bearer"} # 获取当前用户, 需要认证,用依赖注入的方式 async def jwt_get_current_user(token: str = Depends(oauth2_schema)): credentials_exception = HTTPException( status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token=token, key=SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = jwt_get_user(db=fake_users_db, username=username) if user is None: raise credentials_exception return user async def jwt_get_current_active_user(current_user: User = Depends(jwt_get_current_user)): if current_user.disabled: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user") return current_user # 获取用户自己的信息 @app06.get("/jwt/users/me") async def jwt_read_users_me(current_user: User = Depends(jwt_get_current_user)): return current_user
这个和上面的逻辑基本上是一致的,无非改的是获取token的方式。
测试下生成token的接口,这个算法生成的token就比较正规了。
先看一个比较大的项目里面后端必备的一些文件或者目录吧:
介绍:
database.py
: 创建数据库,配置等相关代码(数据库的名称,地址,创建等等)crud.py
: 函数封装,对数据库的表进行增删改查操作,对数据库进行操作的代码models.py
: 模型类ORM,这里面会写aqlalchemy的orm, 也就是建的所有表(哪些表,表里面哪些属性)schemas.py
: 使用Pydantic的BaseModel 建立与数据表里面的字段一一对应的模型类, 也就是响应体的数据格式规范, 会建立很多类,都继承BaseModel, 类里面的属性与models.py的表字段一一对应,规范数据格式响应给前端main.py
: 业务逻辑,写接口供前端调用这个是database.py, 主要是建立数据库的连接配置等
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker # sqllite 是个文件,这里可以直接指定地址 # '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名' SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3' # SQLALCHEMY_DATABASE_URL = "postgresql://username:password@host:port/database_name" # PostgreSQL的连接方法 # SQLALCHEMY_DATABASE_URL = "mysql://username:password@host:port/database_name" # MySQL的连接方法 engine = create_engine( # echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志, 能在终端看到很多sql语句 # 由于SQLAlchemy是多线程,指定check_same_thread=False来让建立的对象任意线程都可使用。这个参数只在用SQLite数据库时设置 SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True, connect_args={'check_same_thread': False} ) # 在SQLAlchemy中,CRUD都是通过会话(session)进行的,所以我们必须要先创建会话,每一个SessionLocal实例就是一个数据库session # flush()是指发送数据库语句到数据库,但数据库不一定执行写入磁盘; # commit()是指提交事务,将变更保存到数据库文件, 这里不让他自动commit SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True) # 创建基本映射类 Base = declarative_base(bind=engine, name='Base')
这个会在main.py里面进行调用,会生成一个.sqlite3的文件。
模型类就是整个项目中要用到的表,如果是不同类型的表,我看还可以把模型类写成个目录,在目录下写多个py文件来建不同类型的各种表
# 导入列 from sqlalchemy import Column # 导入数据类型 from sqlalchemy import String, Integer, BigInteger, Date, DateTime, ForeignKey # 导入函数 from sqlalchemy import func # 获取当前时间,把他插入到字段 from sqlalchemy.orm import relationship # 关系型字段 from .database import Base # 下面两个类, 两张表, 一对多, 一个城市下面有多条数据 class City(Base): __tablename__ = "city" # 数据表的表名 id = Column(Integer, primary_key=True, index=True, autoincrement=True) province = Column(String(100), unique=True, nullable=False, comment='省/直辖市') # 对这个字段添加注解 country = Column(String(100), nullable=False, comment='国家') country_code = Column(String(100), nullable=False, comment='国家代码') country_population = Column(BigInteger, nullable=False, comment='国家人口') # 关联到数据表 # 'Data'是关联的类名;back_populates来指定反向访问的属性名称, 通过父表查询到子表 data = relationship('Data', back_populates='city') # 当数据创建插入当前的时间 created_at = Column(DateTime, server_default=func.now(), comment='创建时间') updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间') # 获取这张表,让他进行排序 __mapper_args__ = {"order_by": country_code} # 默认是正序,倒序加上.desc()方法 # 类对象显示出来 def __repr__(self): return f'{self.country}_{self.province}' class Data(Base): __tablename__ = 'data' id = Column(Integer, primary_key=True, index=True, autoincrement=True) # 这里是外建字段,关联到另外一张表 # ForeignKey里的字符串格式不是类名.属性名,而是表名.字段名 city_id = Column(Integer, ForeignKey('city.id'), comment='所属省/直辖市') date = Column(Date, nullable=False, comment='数据日期') confirmed = Column(BigInteger, default=0, nullable=False, comment='确诊数量') deaths = Column(BigInteger, default=0, nullable=False, comment='死亡数量') recovered = Column(BigInteger, default=0, nullable=False, comment='痊愈数量') # 'City'是关联的类名;back_populates来指定反向访问的属性名称 city = relationship('City', back_populates='data') created_at = Column(DateTime, server_default=func.now(), comment='创建时间') updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间') __mapper_args__ = {"order_by": date.desc()} # 按日期降序排列 def __repr__(self): # repr 把这个date进行规范化, 年月日这种,增加可读性 return f'{repr(self.date)}:确诊{self.confirmed}例' """ 附上三个SQLAlchemy教程 SQLAlchemy的基本操作大全 http://www.taodudu.cc/news/show-175725.html Python3+SQLAlchemy+Sqlite3实现ORM教程 https://www.cnblogs.com/jiangxiaobo/p/12350561.html SQLAlchemy基础知识 Autoflush和Autocommit https://zhuanlan.zhihu.com/p/48994990 """
这个是schema的文件编写
from datetime import date as date_ from datetime import datetime from pydantic import BaseModel # create data的时候,不需要id, 读数据的时候才需要,所以这个搞成个基类 class CreateData(BaseModel): date: date_ confirmed: int = 0 deaths: int = 0 recovered: int = 0 class ReadData(CreateData): id: int city_id: int updated_at: datetime created_at: datetime class Config: orm_mode = True class CreateCity(BaseModel): province: str country: str country_code: str country_population: int class ReadCity(CreateCity): id: int updated_at: datetime created_at: datetime class Config: orm_mode = True
crud.py
, 这个是封装了一些操作数据库里面数据的函数, 但这些封装应该是基于接口去做的, 业务逻辑很重要, 这里老师是一下子全写出来了,但是实际开发中,这个东西是无法一下写出来的, 根据业务接口,慢慢的去提炼和添加。
from sqlalchemy.orm import Session from coronavirus import models, schemas # 给定city_id,得到城市 def get_city(db: Session, city_id: int): return db.query(models.City).filter(models.City.id == city_id).first() # 根据城市名去得到城市 def get_city_by_name(db: Session, name: str): return db.query(models.City).filter(models.City.province == name).first() # 这里是获取批量城市列表 def get_cities(db: Session, skip: int = 0, limit: int = 10): return db.query(models.City).offset(skip).limit(limit).all() # 创建城市表的数据 def create_city(db: Session, city: schemas.CreateCity): db_city = models.City(**city.dict()) # 下面这3步是常规操作 db.add(db_city) # 提交到数据库 db.commit() # 执行 db.refresh(db_city) # 刷新 return db_city # 获取数据表中的数据,如果传入了城市,就直接获取对应城市的数据,但这里注意外键关联查询的写法 def get_data(db: Session, city: str = None, skip: int = 0, limit: int = 10): if city: # Data类里面有个city字段关联到City类,所以通过Data.city可以访问city类中的字段 return db.query(models.Data).filter(models.Data.city.has(province=city)) # 外键关联查询,这里不是像Django ORM那样Data.city.province return db.query(models.Data).offset(skip).limit(limit).all() # 创建data数据 def create_city_data(db: Session, data: schemas.CreateData, city_id: int): db_data = models.Data(**data.dict(), city_id=city_id) db.add(db_data) db.commit() db.refresh(db_data) return db_data
这个就是开发具体路由接口了,创建城市, 查询城市,查询每个城市的感染数据等, main.py
# 创建路由应用 application = APIRouter() # 导入一个模板 templates = Jinja2Templates(directory='./coronavirus/templates') # 生成所有的表 Base.metadata.create_all(bind=engine) # 数据库连接 后面会用依赖注入的方式,依赖于当前这个数据库连接 def get_db(): db = SessionLocal() try: yield db finally: db.close() # 创建城市接口 # 响应的时候,是响应的模型类, 把读取的数据返回 @application.post("/create_city", response_model=schemas.ReadCity) def create_city(city: schemas.CreateCity, db: Session = Depends(get_db)): # 这里依赖数据库的连接 db_city = crud.get_city_by_name(db, name=city.province) # 如果数据库里面有这个城市, 发出提醒, 城市已经创建 if db_city: raise HTTPException(status_code=400, detail="City already registered") # 创建城市 return crud.create_city(db=db, city=city) # 查询城市接口 这里用到了路径参数,查某个城市 @application.get("/get_city/{city}", response_model=schemas.ReadCity) def get_city(city: str, db: Session = Depends(get_db)): db_city = crud.get_city_by_name(db, name=city) # 如果城市为空 if db_city is None: raise HTTPException(status_code=404, detail="City not found") return db_city # 这个是查询批量城市, 根据想要的数量来查 @application.get("/get_cities", response_model=List[schemas.ReadCity]) def get_cities(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): cities = crud.get_cities(db, skip=skip, limit=limit) return cities # 对数据的创建和查询操作 @application.post("/create_data", response_model=schemas.ReadData) def create_data_for_city(city: str, data: schemas.CreateData, db: Session = Depends(get_db)): # 获取城市, 需要创建哪个城市的数据 根据传入的城市获取到城市 db_city = crud.get_city_by_name(db, name=city) # 创建对应城市的记录,需要传入城市id, data = crud.create_city_data(db=db, data=data, city_id=db_city.id) return data @application.get("/get_data") def get_data(city: str = None, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): data = crud.get_data(db, city=city, skip=skip, limit=limit) return data def bg_task(url: HttpUrl, db: Session): """这里注意一个坑,不要在后台任务的参数中db: Session = Depends(get_db)这样导入依赖""" city_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=false") if 200 == city_data.status_code: db.query(City).delete() # 同步数据前先清空原有的数据 for location in city_data.json()["locations"]: city = { "province": location["province"], "country": location["country"], "country_code": "CN", "country_population": location["country_population"] } crud.create_city(db=db, city=schemas.CreateCity(**city)) coronavirus_data = requests.get(url=f"{url}?source=jhu&country_code=CN&timelines=true") if 200 == coronavirus_data.status_code: db.query(Data).delete() for city in coronavirus_data.json()["locations"]: db_city = crud.get_city_by_name(db=db, name=city["province"]) for date, confirmed in city["timelines"]["confirmed"]["timeline"].items(): data = { "date": date.split("T")[0], # 把'2020-12-31T00:00:00Z' 变成 ‘2020-12-31’ "confirmed": confirmed, "deaths": city["timelines"]["deaths"]["timeline"][date], "recovered": 0 # 每个城市每天有多少人痊愈,这种数据没有 } # 这个city_id是city表中的主键ID,不是coronavirus_data数据里的ID crud.create_city_data(db=db, data=schemas.CreateData(**data), city_id=db_city.id) # 这个接口是同步感染数据用的 @application.get("/sync_coronavirus_data/jhu") def sync_coronavirus_data(background_tasks: BackgroundTasks, db: Session = Depends(get_db)): """从Johns Hopkins University同步COVID-19数据""" background_tasks.add_task(bg_task, "https://coronavirus-tracker-api.herokuapp.com/v2/locations", db) return {"message": "正在后台同步数据..."} # 这个接口,用于和前端的交互, 把一些信息返回给前端, 前端拿到之后,进行展示,具体是下面的home.html @application.get("/") def coronavirus(request: Request, city: str = None, skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): data = crud.get_data(db, city=city, skip=skip, limit=limit) return templates.TemplateResponse("home.html", { "request": request, "data": data, "sync_data_url": "/coronavirus/sync_coronavirus_data/jhu" })
这里是用了JIANJIA2模板自己写了一个前端的简单表格页面, 介绍了semantic UI和JQuery, 这个简单过一下即可,正常大公司开发,前端都是基于框架开发,并且是前后端分离的, 属于不同的团队。 两个团队只需要一份接口文档进行交互。 后端设计了哪些功能接口, 接口路由地址,以及req, resp的格式定义好给到前端即可。
所以下面这个代码简单了解下吧:
<!DOCTYPE html> <html lang="en"> <head> <title>新冠病毒疫情跟踪器</title> <link rel="stylesheet" href="{{ url_for('static', path='/semantic.min.css') }}"> <script src="{{ url_for('static', path='/jquery-3.5.1/jquery-3.5.1.min.js') }}"></script> <script src="{{ url_for('static', path='/semantic.min.js') }}"></script> -- 这里是用jquery写的两个动作函数,把前端的动作传到后端 <script> $(document).ready(function () { $("#filter").click(function () { const city = $("#city").val(); window.location.href = "http://" + window.location.host + "/coronavirus?city=" + city; }); $("#sync").click(function () { $.get("{{ sync_data_url }}", function (result) { alert("Message: " + result.message); }); }); }); </script> </head> -- 下面是个纯页面展示 <body> <div class="ui container"> <h2></h2> <h1 style="text-align: center">新冠病毒疫情跟踪器</h1> <h2></h2> <button id="filter" style="float: left" type="submit" class="ui button alert-secondary">过滤</button> <div class="ui input"> <label for="city"></label><input id="city" type="text" placeholder="城市" value=""> </div> <button id="sync" style="float: right" type="submit" class="ui button primary">同步数据</button> <table class="ui celled table"> <thead> <tr> <th>城市</th> <th>日期</th> <th>累计确诊数</th> <th>累计死亡数</th> <th>累计痊愈数</th> <th>更新时间</th> </tr> </thead> <tbody> {% for d in data %} <tr> <td>{{ d.city.province }}</td> <td>{{ d.date }}</td> <td>{{ d.confirmed }}</td> <td>{{ d.deaths }}</td> <td>{{ d.recovered }}</td> <td>{{ d.updated_at }}</td> </tr> {% endfor %} </tbody> </table> </div> </body> </html>
在实际工程项目中, 后端服务的代码需要一个比较完善的目录结构设计,才可以更好的维护。所以这里我记录下,目前使用的一个比较不错的目录结构设计,这一块,与视频里面的不太一样了,感觉视频里面的还是有些简单,真正的大项目是有很多业务,很多服务的。
目录结构可以设置成这样: 主仓库目录根据业务,建立一个新目录,比如Search/trigger,这个就是主目录, 里面创建一个web目录,专门用于写服务
web/
biz/
: 该目录下主要是写各个业务相关的服务,根据具体业务划分 biz是business的缩写, 实际上是控制层(业务逻辑层),起了一个server服务的角色。
evaluate_biz.py
: 评测相关业务的路由接口实现,建一个服务类,里面写静态方法实现接口的相关逻辑
class EvaluateBiz(object):
@staticmethod
def xxx():
pass
artifact_biz.py
: 制品相关业务的路由接口实现
….
dal
: 主要是各个业务数据表的增删改查, dal: database access layer
, 主要是和各类数据库打交道。
__init __.py
: 创建数据库的连接
url = "mysql+pymysql://tohka:zhongqiang@localhost:3306/mydb"
engine = sqlalchemy.create_engine(
url,
# echo=True,
pool_pre_ping=True,
pool_size=100,
pool_recycle=30,
)
session_maker = sessionmaker(bind=engine)
base = declarative_base()
evaluate_dal.py
: 评测业务的数据表的ORM定义,以及相关数据表的增删改查
class TaskModel(base):
__tablename__ = 'xxx'
class TaskDao(object):
@statisticmethod
def insert
def delete
def get_by_xxx()
artifact_dal.py
: 制品业务的相关模型类,增删该查
model
:模型层,定义各种模型类
request.py
: 请求体相关的pydantic类,主要定义请求格式
import typing
import pydantic
class xxxReq(pydantic.BaseModel):
name: str
team: str
xxx
response.py
: 定义响应的格式
import typing
import pydantic
class xxxRsp(pydantic.BaseModel):
name: str
team: str
xxx
router
: 各个子应用
evaluate.py
: 评测业务所有的路由接口
import typing
import fastapi
from web import settings
from web.service import evaluate_biz
from web.model import request
router = fastapi.APIRouter(prefix=settings.API_V1_STR, tags=["xxx"])
router_openapi = fastapi.APIRouter(prefix=settings.OPENAPI_V1_STR, tags=["xxx"])
@router.api_route("/xxx/xxx", methods=["GET"])
async def xxx(params):
return await evaluate_biz.xxx(params)
artifact.py
: 制品业务相关的路由接口
scheduler
: 存放一些后台执行的定时任务
script
: 存放一些其他脚本,比如访问spark等
main.py
: 主应用启动服务,把上面各个子应用包含进来
import os import fastapi from apscheduler.schedulers.background import BackgroundScheduler from auth_middleware import fastapi_middleware from fastapi.middleware import cors from web import settings from web.router import xxx from web.scheduler import database_manager app = fastapi.FastAPI(docs_url=f"{settings.API_V1_STR}/doc", openapi_url=f"{settings.API_V1_STR}/openapi.json") app.include_router(xxx.router) origins = [ "http://127.0.0.1", "http://127.0.0.1:3000", "http://localhost", "http://localhost:3000", "*", ] app.add_middleware( cors.CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) if os.getenv("ENABLE_AUTH", "").strip().lower() == 'true': app.add_middleware(fastapi_middleware.AUTHMiddleware, exclude_paths=["/openapi", "/api/trigger/v1/doc", "/api/trigger/v1/openapi.json"]) # 后台任务 update_database_manager = database_manager.UpdateDataBaseManager() scheduler = BackgroundScheduler() scheduler.add_job(update_database_manager.update_evaluate_info, 'interval', seconds=60) scheduler.start()
test.py
:测试代码
seetings.py
: 关于服务的一些参数设置
Dockerfile
: docker打服务需要的镜像
pyproject.toml
: poetry环境, 主要是安装依赖的包
trigger.sql
: 数据库里面创建表的代码
Request请求在到达具体的应用之前,可以加一些中间件取拦截request请求, 同样, 在应用与返回结果之间,也可以加一些中间件对结果进行一些后处理之后再形成返回结果。
demo: 下面增加一个中间件, 拦截到所有的http请求, 并计算响应时间,把响应时间加入到response中。
import time
from fastapi import FastAPI, Request
@app.middleware('http')
async def add_process_time_header(request: Request, call_next): # call_next将接收request请求做为参数
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time) # 添加自定义的以“X-”开头的请求头
return response
这时候,如果再调用接口处理请求的时候,最后响应的Header里面会带上处理时间信息。
需要注意的一个点: 带yield的依赖的退出部分的代码 和 后台任务 会在中间件之后运行
域是由协议, 域名,端口三部分组成,如果有一个不同,就属于不同的域。
上面这两个网址, 协议相同, 域名相同, 端口如果不写默认80, 所以是相同的域,不存在跨域的问题。
下面这个就会出现跨域的问题:
demo: FastAPI 的 CORSMiddleware 实现 CORS跨域资源共享
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
# 允许跨域,添加信任列表
allow_origins=[
"http://127.0.0.1",
"http://127.0.0.1:8080"
],
allow_credentials=True,
allow_methods=["*"], # 允许跨域的方法,可以是get或者post,也可以是*,都可以
allow_headers=["*"], # 允许的请求头
)
当请求的应用里面有一些比较耗时的操作,无法立即返回结果的时候,可以把耗时的操作弄成后台任务去执行,这里学习下fastAPI的后台任务。
下面是一个后台任务的demo:
from fastapi import APIRouter, BackgroundTasks, Depends """Background Tasks 后台任务""" # 写一个bg_task函数,假设该函数非常的耗时 # 实际工作中还真遇到了一个: 就是针对每条数据,需要访问一个接口去实时更新数据,此时如果只接去调用,发现会有几秒的响应延迟, 就可以考虑写成一个后台任务的形式去做,不过我暂时没用 # 而是用了一个apscheduler的方式,加了一个定时更新的后台任务,没有用到当前这个fastapi的后台任务 def bg_task(framework: str): with open("README.md", mode="a") as f: f.write(f"## {framework} 框架精讲") # 这里写个函数, 添加后台任务用 @app08.post("/background_tasks") async def run_bg_task(framework: str, background_tasks: BackgroundTasks): """ :param framework: 被调用的后台任务函数的参数 :param background_tasks: FastAPI.BackgroundTasks :return: """ background_tasks.add_task(bg_task, framework) return {"message": "任务已在后台运行"} # 下面介绍一种依赖注入的方式: # 这里写一个普通函数, 添加后台任务 def continue_write_readme(background_tasks: BackgroundTasks, q: Optional[str] = None): if q: background_tasks.add_task(bg_task, "\n> 整体的介绍 FastAPI,快速上手开发,结合 API 交互文档逐个讲解核心模块的使用\n") return q # 依赖于上面的函数 @app08.post("/dependency/background_tasks") async def dependency_run_bg_task(q: str = Depends(continue_write_readme)): if q: return {"message": "README.md更新成功"}
这里再补充一个启动定时后台任务的一个demo
背景:工作时有一个需要实时调用别组同学的后端接口,去更新一个统计数据,之前设计的方案,就是展示列表的时候,对于每条记录,访问接口,去更新统计数据,但发现这样,响应时间会几秒,用户体验上不是很好。所以我把这个放到了后台, 做成了一个定时任务, 每10s访问更新下统计数据,就把这个问题解决了。
所以整理下逻辑:
# 服务里面写一个函数 service/evaluate_biz.py class EvaluateBiz: @statisticmethod def sync_statistic_data(): # 调用接口,更新数据到自己的数据库 # 定时任务函数 scheluder/database_manager.py class UpdateDataBaseManager(object): @staticmethod def update_evaluate_info(): evaluate_biz.EvaluateBiz.sync_statistic_data() # main.py from apscheduler.schedulers.background import BackgroundScheduler from scheduler import database_manager update_database_manager = database_manager.UpdateDataBaseManager() scheduler = BackgroundScheduler() scheduler.add_job(update_database_manager.update_evaluate_info, 'interval', seconds=60) scheduler.start()
测试用例编写很重要, 可以自己通过测试用例去测试相关的接口。所以这个要学习下。
先建立一个测试文件, 要以”test_
”开头, 比如test_charpter08.py
:
from fastapi.testclient import TestClient from run import app """Testing 测试用例""" client = TestClient(app) # 先pip install pytest def test_run_bg_task(): # 函数名用“test_”开头是 pytest 的规范。注意不是async def response = client.post(url="/chapter08/background_tasks?framework=FastAPI") # 站点的访问路径开始 assert response.status_code == 200 assert response.json() == {"message": "任务已在后台运行"} def test_dependency_run_bg_task(): response = client.post(url="/chapter08/dependency/background_tasks") assert response.status_code == 200 assert response.json() is None def test_dependency_run_bg_task_q(): response = client.post(url="/chapter08/dependency/background_tasks?q=1") assert response.status_code == 200 assert response.json() == {"message": "README.md更新成功"}
运行的时候,不能点击run, 而是需要进入到test_chapter08.py这个目录,输入pytest,就会自己运行这个文件。
下面是我自己的实践: 在我的项目里面新建一个test目录,专门用于每个模块下每个接口的单元测试,
这个test_artifact.py
里面可以测试与artifact相关的接口:
from fastapi.testclient import TestClient from web.main import app """Testing Artifact测试用例""" client = TestClient(app) def test_run_artifact_list(): response = client.get(url="/api/trigger/v1/artifact/list?size=10&page=1&relation_type=child&artifact_type=dev&user_name=wuzhongqiang") assert response.status_code == 200 resp_json = response.json() assert resp_json['message'] == "SUCCESS" def test_run_update_artifact_op_info(): response = client.post("/api/trigger/v1/artifact/update_artifact_op_info", json={ "artifact_id": 1448, "notes": "测试制品,用完删除" }) assert response.status_code == 200 resp_json = response.json() assert resp_json['data'] == "1448制品运营信息更新成功!" ...
接下来在服务的终端,安装两个包:
poetry run pip install pytest httpx
poetry run pip install pytest requests
# 进入test目录,执行命令
poetry run pytest
# 如果想测试其中的某个接口
poetry run pytest -k test_run_artifact_list
这篇笔记比较长,是我听了下面的课程之后整理的一些笔记,也有一些自己的想法, fastapi框架, 首先要是什么, 同步和异步是咋回事,有啥特色等, 其次,就是请求参数和响应参数这块, 这是与前端交互的核心。 后面依赖注入系统是个重点, 增加代码的复用和可维护性。 另外就是知道一个大型项目里面的分层架构,数据层,服务层,模型层,每一层大致上干啥的等, 有了这些知识之后, 就可以直接拿大型项目实践,从里面再补充一些新的知识,完善自己的知识框架等。公司里的大型项目开发顺序:根据产品文档,先设计技术方案,也就是怎么实现某个产品, 数据的存储,服务的搭建等,接下来就是设计接口文档, 也就是如何和前端交互, 后面就是开发测试,上线预发环境联调,提测,最后上线。 这就是一整套流程了。 由于我这是第一次写后端,所以想把学习的东西沉淀下来,方便后续知识架构的搭建。
参考资料:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。