FastAPI服务器构建常见问题与解决方案
FastAPI提供了许多开箱即用的功能:异步并发、Pydantic验证、中间件、错误处理、自动文档和依赖注入。它非常直观,许多团队可以快速上手——有时甚至不需要深入思考系统设计。
在本文中,我们将探讨常见陷阱,以及如何通过实用的项目结构和路由器组合模式、生命周期与请求作用域资源、并发处理(包括事件循环阻塞和卸载)来修复这些问题。
项目结构
项目开始时,功能优先级通常高于结构。随着复杂性增长,定位端点代码变得困难,重复和循环导入问题逐渐出现。虽然具体结构因业务需求而异,但大多数Web服务器都有相似的关注点,可以包含以下按功能划分的文件夹:
- main.py - 创建应用;生命周期连接;注册中间件和错误处理程序
- validators/ - 可重用的输入检查(ID、枚举、业务规则),供Pydantic使用
- middleware/ - 横切层(例如,负载大小保护)
- error_handlers/ - 异常到HTTP响应的映射
- utilities/ - 小型辅助工具(响应包装器、分页、格式化)
- models/ - 用于验证传入请求和传出响应的Pydantic模型
- services/ - 业务逻辑;协调存储库/客户端
- clients/ - 客户端:数据库引擎/会话生成器、HTTP客户端、日志记录/跟踪适配器
- factories/ - 依赖注入友好的客户端/服务构建器
- endpoints/ - 按业务领域分组的APIRouter(账户/、购买/、订单/)
- tests/ - 单元/集成测试;使用依赖覆盖和测试设置
棘手的部分不是文件夹——而是父/子路由器的关系。
让我们以以下API设计为目标:
1
2
3
4
|
/accounts
/accounts/{account_id}
/accounts/{account_id}/orders
/accounts/{account_id}/orders/{order_id}
|
为了实现这个目标,我们将创建两个路由器:
子路由器:
- 位置:app/endpoints/accounts/orders/root.py
- 服务:/{account_id}/orders
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
orders = APIRouter(
prefix="/{account_id}/orders",
tags=["orders"]
)
@orders.get("/")
async def list_orders(
account_id: Annotated[UUID, Path()],
last_id: str | None = None,
limit: int = 50
):
return {
"account": str(account_id),
"items": [],
"next": None
}
@orders.post("/")
async def create_order(
account_id: Annotated[UUID, Path()]
): return { "account": str(account_id), "ok": True}
|
父路由器:
- 位置:app/endpoints/accounts/root.py
- 服务:/accounts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
from .orders.root import orders as orders_router
accounts = APIRouter(
prefix="/accounts",
tags=["accounts"]
)
async def load_account(
account_id: Annotated[UUID, Path()]
): return {"id": account_id}
@accounts.get("/")
async def list_accounts(): return [{"id": "12345"}]
@accounts.get("/{account_id}")
async def get_account(account_id: UUID):
return {"id": account_id}
accounts.include_router(
orders_router,
# 对订单路由施加依赖注入
dependencies=[Depends(load_account)],
)
|
这种结构确保"orders"不能在没有指定account_id的情况下访问,并且允许开发人员快速定位端点位置并创建多层子路由器。通过这种设计,可以在其他地方重用"orders router"并调整依赖关系。
工厂模式与依赖注入
工厂通过使用应用状态和各种环境变量、配置来封装客户端和服务的构建。
工厂可用于管理:
- 云客户端 - SQS、S3、SNS等
- 数据库客户端 - Redis、Postgres、Mongo
- 配置连接 - 应用级和请求级(如果每个客户端都有特定配置)
- 服务类,封装业务逻辑 - AccountService、OrderService
- 辅助类 - 日志记录器、用户会话等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
class Factory:
@classmethod
async def get_http_client(
cls,
settings
): return AsyncClient(base_url=settings.api)
@classmethod
async def get_account_service(
cls,
request: Request
) -> AccountService:
settings: Settings = request.app.state.settings
return AccountService(
client=await cls.get_http_client(settings),
)
|
基于这个例子,我们看到一个设计良好的工厂可以轻松插入FastAPI的依赖注入系统。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@app.get("/accounts/{account_id}")
async def account_overview(
account_id: str,
account_service: Annotated[
AccountService,
Depends(Factory.get_account_service)
]
):
r = await account_service.fetch_profile(account_id)
if not r:
raise HTTPException(
status_code=404,
detail="Account not found"
)
return r
|
端点保持简洁和可读性:它验证输入、协调服务并将内部结果映射到HTTP响应。业务逻辑存在于端点之外;处理程序专注于HTTP相关事项(连接、状态码和格式化)。
工厂可以极大地简化对服务器上不同日志记录器的访问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
class Factory:
@classmethod
def get_root_logger(cls) -> logging.Logger:
return logging.getLogger("app")
@classmethod
def get_request_logger(
cls,
request: Request
) -> logging.LoggerAdapter:
base = cls.get_root_logger()
return logging.LoggerAdapter(
base,
{
"path": request.url.path,
"method": request.method,
"corr_id": request.state.corr_id,
}
)
|
现在所有类型的日志记录器都可以通过工厂在端点中轻松实例化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
acc_ser_dep = Annotated[
AccountService,
Depends(Factory.get_account_service),
]
log_dep = Annotated[
LoggerAdapter,
Depends(Factory.get_request_logger),
]
@app.get("/accounts/{account_id}")
async def account_overview(
account_id: str,
account_service: acc_ser_dep,
logger: log_dep,
):
r = await account_service.fetch_profile(account_id)
if not r:
logger.warning(
"Account not found",
extra={"account_id": account_id},
)
raise HTTPException(404, "Account not found")
return r
|
请求作用域的日志记录器需要关联ID来将请求的日志拼接成连贯的故事。通常在端点内部生成ID,但这会导致重复。更有效的方法是使用小型中间件,每个请求设置一次ID,并且可以扩展以包含user_id、firm_id和其他上下文:
1
2
3
4
5
6
7
8
|
@app.middleware("http")
async def corr_middleware(request: Request, call_next):
_id = request.headers.get("X-Request-ID") or str(uuid4())
request.state.corr_id = _id
response = await call_next(request)
response.headers["X-Request-ID"] = _id
return response
|
回到工厂示例,有一个容易忽略的步骤可以简化依赖注入架构:
1
|
settings: Settings = request.app.state.settings
|
这行代码假设Settings在启动时初始化(main.py):
1
2
3
4
5
6
7
|
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.settings = Settings()
yield
await Factory.aclose()
app = FastAPI(lifespan=lifespan)
|
附加到app.state的对象对于每个FastAPI工作进程是生命周期作用域的,应该在关闭阶段关闭。
生命周期与请求作用域
生命周期对象可以跨多个请求持久存在,并存在于服务器的整个生命周期。使用app.state不是持有它们的唯一方式(单例、带键的缓存对象等),但它是FastAPI中最方便的选项。
请求作用域的"会话"(聚合业务逻辑所需的请求特定状态,例如correlation_id、user_id)将在请求完成后立即消亡。
在服务器生命周期中,我们可以使用附加到app.state的对象(数据库连接池、HTTP客户端等)。当服务器关闭时,我们触发这些对象的关闭,并为此有特殊逻辑。
1
2
3
4
5
6
|
class Factory:
@classmethod
async def aclose(cls):
await app.state.db_pool.aclose()
await app.state.external_http_client.aclose()
...
|
对象被垃圾回收并不意味着相应的连接已关闭。这就是为什么我们在服务器生命周期中有await Factory.aclose()。
相同的模式可以通过工厂应用于请求作用域的客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
class Factory:
@classmethod
async def get_account_service(cls):
settings: Settings = request.app.state.settings
account_service = AccountService(
client=await cls.get_http_client(settings),
)
try:
yield account_service
finally:
await account_service.client.aclose()
|
在上面的例子中,account_service只有在关闭HTTP客户端的连接后才被删除,以便立即释放其套接字。
并发
Settings是通过app.state.settings并发访问的生命周期对象的好例子。理想情况下,生命周期对象应该是只读的,并且不应该引用请求作用域的对象(以便这些对象可以被垃圾回收)。
否则,存在内存泄漏和竞争条件的风险。通常,任何跨请求共享的内容都必须内置并发保护(线程/异步安全)。
对于在不同请求甚至工作进程之间共享状态,外部存储是更安全、更可扩展的选择。但是,如果同一工作进程的两个请求几乎同时尝试写入设置会发生什么?让我们考虑这个例子:
1
2
3
4
|
cfg = request.app.state.settings
old = cfg.threshold
await some_async_call() # 产生,其他请求在此运行
request.app.state.settings.threshold = old + 1
|
这段代码将使共享状态不一致。如果绝对必须更新状态,应该使用锁。
1
2
3
4
5
6
7
8
|
app.state.settings_lock = asyncio.Lock()
async def update_settings(app, patch: dict):
async with app.state.settings_lock:
settings = app.state.settings
app.state.settings = settings.model_copy(
update=patch,
)
|
有了这个函数,可以安全地更新设置:
1
|
await update_settings(request.app,{"threshold":old+1})
|
让我们考虑一个没有await some_async_call()的例子:
1
2
3
|
cfg = request.app.state.settings
old = cfg.threshold
request.app.state.settings.threshold = old + 1
|
代码作为单个块在事件循环线程上运行。这确保了状态的完整性,但增加了服务器的延迟。FastAPI不会自动将同步工作从事件循环线程移动到ThreadPool。必须由开发人员显式完成。
1
2
3
4
5
6
7
|
@app.get("/update-state")
async def update_state():
# 移动到线程池
await anyio.to_thread.run_sync(
update_settings_sync
)
return {"ok": True}
|
还有另一种通过Depends实现的方法,但不值得详细探讨,因为让多个线程写入共享状态不是正确的设计。
当不涉及共享状态更新时,两种卸载同步工作的例子都会很好地工作。
1
2
3
4
5
|
@app.post("/update-state")
async def update_state(
result: dict = Depends(update_settings_dep)
):
return result
|
对事件循环如何与同步和异步代码交互的扎实理解有助于开发人员找到最有效的解决方案。
事件循环耗尽通常发生在异步端点执行繁重的同步工作时。请看下面的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
def cpu_heavy(n: int) -> float:
# Python CPU;从不产生
s = 0.0
for i in range(n):
s += math.sqrt(i)
return s
@app.get("/cpu")
async def cpu(n: int = 10_000_000):
# 在循环上进行繁重的CPU操作,工作进程上的请求停滞
return {"sum": cpu_heavy(n)}
@app.get("/sleep")
async def sleep(ms: int = 500):
# 阻塞睡眠保持循环
time.sleep(ms / 1000)
return {"slept_ms": ms}
@app.get("/io")
async def io():
# 停滞调用,直到套接字完成
r = requests.get(
"https://httpbin.org/delay/1",
timeout=5,
)
return {"status": r.status_code}
|
事件循环耗尽很难诊断。开发人员必须查看在停滞请求发生时哪些请求正在发生,并逐步检查逻辑以识别可能的罪魁祸首。
通常,在服务器上创建特殊的loop_lag函数(通过生命周期)向OpenTelemetry报告扩展的延迟,但仅凭这个信号很少能 pinpoint 根本原因。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
async def lag_probe(
interval: float = 0.5,
warn_ms: float = 100.0
):
loop = asyncio.get_running_loop()
next_t = loop.time() + interval
while True:
await asyncio.sleep(interval)
now = loop.time()
lag_ms = max(0.0, (now - next_t) * 1000.0)
if lag_ms > warn_ms:
print(f"event_loop_lag_ms={lag_ms:.1f}")
next_t += interval
|
FastAPI中的一个典型反模式是让一个端点调用另一个端点。通常的动机是避免重复已经其他地方分隔的代码,并掩盖适当服务接口的缺失。这种模式增加了不必要的延迟和开销——额外的序列化、身份验证和日志记录。
如果这种反模式在端点上系统性地应用,它会迅速失控,放大负载和延迟。由于它们的指数性质,基于传入请求数量建立扩展策略变得更加困难。不用说,随着时间的推移,即使是小的设计错误也会滚雪球般变成大问题。
另一个反模式是在依赖于繁重内存结构(LLMs、大型Pydantic模型、大缓存等)的服务器中启动多个工作进程。每个工作进程都是具有自己事件循环的独立进程,因此这些对象会按工作进程复制,增加内存和CPU消耗。如果一个工作进程的峰值超过容器的限制,在Gunicorn可以回收该工作进程之前,pod将被OOM杀死(例如,由Kubernetes)。
对于这些工作负载,最好将推理卸载到单独的服务(例如,像HuggingFace这样的推理服务器)或每个pod运行单个工作进程。通常,更多的pod优于"每个pod更多工作进程"的设计,但有一个与活动数据库连接相关的注意事项。如果工作进程使用连接池,数据库的CPU将会上升,因为每个连接都保留CPU(≈ pods × workers × pool_size)。这反过来会导致更高的延迟、连接错误和整体性能下降。
没有银弹。模式和最佳实践有帮助,但每个解决方案都应由特定的业务背景塑造。