FastAPI服务器构建中的常见问题与解决方案

本文深入探讨了构建FastAPI服务器时常见的项目结构设计、工厂模式与依赖注入、生命周期与请求作用域管理以及并发处理等问题,提供了实用的解决方案和最佳实践,帮助开发者避免常见陷阱并构建高性能的API服务。

FastAPI服务器构建中的常见问题与解决方案

FastAPI提供了许多开箱即用的功能:异步并发、Pydantic验证、中间件、错误处理、自动文档和依赖注入。它非常直观,许多团队能够快速上手 - 有时甚至不需要深入思考系统设计。

在本文中,我们将探讨常见的陷阱,以及如何通过实用的模式来解决这些问题,包括项目结构和路由器组合、生命周期与请求作用域资源、以及并发处理(包括事件循环阻塞和卸载)。

项目结构

当项目开始时,功能优先级通常高于结构。随着复杂性的增长,定位端点代码变得更加困难,重复和循环导入问题逐渐出现。虽然具体结构因业务需求而异,但大多数Web服务器都有类似的关注点,可以包含以下按功能划分的文件夹:

  • main.py - 创建应用;生命周期连接;注册中间件和错误处理程序
  • validators/ - 可重用的输入检查(ID、枚举、业务规则),由Pydantic使用
  • middleware/ - 横切层(例如,负载大小保护)
  • error_handlers/ - 异常到HTTP响应的映射
  • utilities/ - 小型辅助工具(响应包装器、分页、格式化)
  • models/ - 用于验证传入请求和传出响应的Pydantic模型
  • services/ - 业务逻辑;协调存储库/客户端
  • clients/ - 客户端:数据库引擎/会话生成器、HTTP客户端、日志记录/跟踪适配器
  • factories/ - 依赖注入友好的客户端/服务构建器
  • endpoints/ - 按业务领域分组的APIRouter(账户/、购买/、订单/)
  • tests/ - 单元/集成测试;使用依赖覆盖和测试设置

棘手的部分不是文件夹 - 而是父/子路由器关系。

让我们以这个API设计为目标:

1
2
3
4
/accounts
/accounts/{account_id}
/accounts/{account_id}/orders
/accounts/{account_id}/orders/{order_id}

为了实现这个目标,我们将创建两个路由器:

子路由器:

  • 位置:app/endpoints/accounts/orders/root.py
  • 服务:/{account_id}/orders
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
orders = APIRouter(
    prefix="/{account_id}/orders", 
    tags=["orders"]
)

@orders.get("/")
async def list_orders(
    account_id: Annotated[UUID, Path()], 
    last_id: str | None = None, 
    limit: int = 50
):
    return {
      "account": str(account_id), 
      "items": [], 
      "next": None
    }

@orders.post("/")
async def create_order(
    account_id: Annotated[UUID, Path()]
): return { "account": str(account_id), "ok": True}

父路由器:

  • 位置:app/endpoints/accounts/root.py
  • 服务:/accounts
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from .orders.root import orders as orders_router

accounts = APIRouter(
    prefix="/accounts", 
    tags=["accounts"]
)

async def load_account(
    account_id: Annotated[UUID, Path()]
): return {"id": account_id}

@accounts.get("/")
async def list_accounts(): return [{"id": "12345"}]

@accounts.get("/{account_id}")
async def get_account(account_id: UUID):
    return {"id": account_id}

accounts.include_router(
    orders_router,
    # 对订单路由施加依赖注入
    dependencies=[Depends(load_account)],
)

这种结构确保"orders"不能在没有指定account_id的情况下访问,并且允许开发人员快速定位端点的位置并创建多层的子路由器。通过这种设计,可以在其他地方重用"orders router",并调整依赖关系。

工厂模式与依赖注入

工厂通过使用应用状态和各种环境变量和配置来封装客户端和服务的构建。

工厂可用于管理:

  • 云客户端 - SQS、S3、SNS等
  • 数据库客户端 - Redis、Postgres、Mongo
  • 配置连接 - 应用级和请求级(如果每个客户端都有特定配置)
  • 服务类,封装业务逻辑 - AccountService、OrderService
  • 辅助类 - 日志记录器、用户会话等
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class Factory:
    @classmethod
    async def get_http_client(
        cls, 
        settings
    ): return AsyncClient(base_url=settings.api)

    @classmethod
    async def get_account_service(
      cls, 
      request: Request
    ) -> AccountService:
      settings: Settings = request.app.state.settings 
      return AccountService(
          client=await cls.get_http_client(settings),
      )

基于这个例子,我们看到一个设计良好的工厂可以轻松地插入到FastAPI的依赖注入中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@app.get("/accounts/{account_id}")
async def account_overview(
    account_id: str,
    account_service: Annotated[
        AccountService,
        Depends(Factory.get_account_service)
    ]
):
    r = await account_service.fetch_profile(account_id)
    if not r:
        raise HTTPException(
            status_code=404, 
            detail="Account not found"
        )
    return r

端点保持简洁和可读性:它验证输入、协调服务并将内部结果映射到HTTP响应。业务逻辑存在于端点之外;处理程序专注于HTTP相关的问题(连接、状态码和格式化)。

工厂可以极大地简化对服务器上不同日志记录器的访问。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class Factory
    @classmethod
    def get_root_logger(cls) -> logging.Logger:
        return logging.getLogger("app")

    @classmethod
    def get_request_logger(
        cls, 
        request: Request
     ) -> logging.LoggerAdapter:
        base = cls.get_root_logger()
        return logging.LoggerAdapter(
            base,
            {
              "path": request.url.path, 
              "method": request.method,
              "corr_id": request.state.corr_id,
            }
        )

现在所有类型的日志记录器都可以通过工厂在端点中轻松实例化:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
acc_ser_dep = Annotated[
    AccountService,
    Depends(Factory.get_account_service),
]
log_dep = Annotated[
    LoggerAdapter,
    Depends(Factory.get_request_logger),
]

@app.get("/accounts/{account_id}")
async def account_overview(
    account_id: str,
    account_service: acc_ser_dep,
    logger: log_dep,
):
    r = await account_service.fetch_profile(account_id)
    if not r:
        logger.warning(
            "Account not found",
            extra={"account_id": account_id},
        )
        raise HTTPException(404, "Account not found")
    return r

请求作用域的日志记录器需要一个关联ID来将请求的日志拼接成一个连贯的故事。通常在端点内部生成ID,但这会导致重复。更有效的方法是使用一个小型中间件,每个请求设置一次ID,并且可以扩展以包含user_id、firm_id和其他上下文:

1
2
3
4
5
6
7
8
@app.middleware("http")
async def corr_middleware(request: Request, call_next):
    _id = request.headers.get("X-Request-ID") or str(uuid4())
    request.state.corr_id = _id

    response = await call_next(request)
    response.headers["X-Request-ID"] = _id
    return response

回到工厂的例子,有一个容易被忽略的步骤可以简化依赖注入架构。

1
settings: Settings = request.app.state.settings

这行代码假设Settings在启动时初始化(main.py):

1
2
3
4
5
6
7
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.settings = Settings()
    yield
    await Factory.aclose()

app = FastAPI(lifespan=lifespan)

附加到app.state的对象对于每个FastAPI工作进程是生命周期作用域的,应该在关闭阶段关闭。

生命周期与请求作用域

生命周期对象可以在多个请求之间持久存在,并存在于服务器的整个生命周期。使用app.state不是持有它们的唯一方式(单例、带键的缓存对象等),但它是FastAPI中最方便的选项。

请求作用域的"会话",它聚合了业务逻辑所需的请求特定状态(例如,correlation_id、user_id),将在请求完成后立即消亡。

在服务器生命周期中,我们可以使用附加到app.state的对象(数据库连接池、HTTP客户端等)。当服务器关闭时,我们触发这些对象的关闭,并且我们有特殊的逻辑来处理这个问题。

1
2
3
4
5
6
class Factory:
    @classmethod
    async def aclose(cls):
        await app.state.db_pool.aclose()
        await app.state.external_http_client.aclose()
        ...

对象被垃圾回收并不意味着相应的连接已经关闭。这就是为什么我们在服务器生命周期中有await Factory.aclose()。

相同的模式可以通过工厂应用于请求作用域的客户端。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class Factory:
    @classmethod
    async def get_account_service(cls):
        settings: Settings = request.app.state.settings

        account_service = AccountService(
            client=await cls.get_http_client(settings),
        )

        try:
            yield account_service
        finally:
            await account_service.client.aclose()

在上面的例子中,account_service只有在关闭HTTP客户端的连接后才被删除,以便其套接字立即释放。

并发

Settings是一个很好的生命周期对象示例,请求可以通过app.state.settings并发访问它。理想情况下,生命周期对象应该是只读的,并且不应该引用请求作用域的对象(这样这些对象可以被垃圾回收)。

否则,存在内存泄漏和竞态条件的风险。一般来说,任何在请求之间共享的东西都必须内置并发保护(线程/异步安全)。

对于在不同请求甚至工作进程之间共享状态,外部存储是更安全、更可扩展的选择。但是,如果同一工作进程的两个请求几乎同时尝试写入设置会发生什么?让我们考虑这个例子:

1
2
3
4
cfg = request.app.state.settings
old = cfg.threshold
await some_async_call()  # 产生,其他请求在这里运行
request.app.state.settings.threshold = old + 1

这段代码将使共享状态不一致。如果绝对有必要更新状态,应该使用锁。

1
2
3
4
5
6
7
8
app.state.settings_lock = asyncio.Lock()

async def update_settings(app, patch: dict):
    async with app.state.settings_lock:
        settings = app.state.settings
        app.state.settings = settings.model_copy(
            update=patch,
        )

有了这个函数,设置可以安全地更新:

1
await update_settings(request.app,{"threshold":old+1})

让我们考虑一个没有await some_async_call()的例子:

1
2
3
cfg = request.app.state.settings
old = cfg.threshold
request.app.state.settings.threshold = old + 1

代码作为单个块在事件循环线程上运行。这确保了状态的完整性,但增加了服务器的延迟。FastAPI不会自动将同步工作从事件循环线程移动到ThreadPool。必须由开发人员显式完成。

1
2
3
4
5
6
7
@app.get("/update-state")
async def update_state():
    # 移动到线程池
    await anyio.to_thread.run_sync(
        update_settings_sync
    )
    return {"ok": True}

还有另一种通过Depends的方法,但不值得详细探讨,因为让多个线程写入共享状态不是正确的设计。

当不涉及共享状态更新时,两种卸载同步工作的例子都会很好地工作。

1
2
3
4
5
@app.post("/update-state")
async def update_state(
    result: dict = Depends(update_settings_dep)
):
    return result

对事件循环如何与同步和异步代码交互的扎实理解有助于开发人员找到最有效的解决方案。

事件循环耗尽通常发生在异步端点执行繁重的同步工作时。请参阅以下示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def cpu_heavy(n: int) -> float:
    # Python CPU;从不产生
    s = 0.0
    for i in range(n):
        s += math.sqrt(i)
    return s

@app.get("/cpu")
async def cpu(n: int = 10_000_000):
    # 在循环上进行繁重的CPU操作,工作进程上的请求停滞
    return {"sum": cpu_heavy(n)}

@app.get("/sleep")
async def sleep(ms: int = 500):
    # 阻塞睡眠保持循环
    time.sleep(ms / 1000)
    return {"slept_ms": ms}

@app.get("/io")
async def io():
    # 停滞调用直到套接字完成
    r = requests.get(
        "https://httpbin.org/delay/1",
        timeout=5,
    )
    return {"status": r.status_code}

事件循环耗尽很难诊断。开发人员必须查看在停滞请求发生时间附近发生了哪些请求,并逐步检查逻辑以识别可能的罪魁祸首。

通常,在服务器上创建一个特殊的loop_lag函数(通过生命周期)来向OpenTelemetry报告扩展的延迟,但仅凭这个信号很少能确定根本原因。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
async def lag_probe(
    interval: float = 0.5,
    warn_ms: float = 100.0
):
    loop = asyncio.get_running_loop()
    next_t = loop.time() + interval

    while True:
        await asyncio.sleep(interval)
        now = loop.time()
        lag_ms = max(0.0, (now - next_t) * 1000.0)

        if lag_ms > warn_ms:
            print(f"event_loop_lag_ms={lag_ms:.1f}")

        next_t += interval

FastAPI中的一个典型反模式是让一个端点调用另一个端点。通常的动机是避免重复已经其他地方分隔的代码,并掩盖适当服务接口的缺失。这种模式增加了不必要的延迟和开销 - 额外的序列化、身份验证和日志记录。

如果这种反模式在端点上系统地应用,它会迅速失控,放大负载和延迟。由于请求的指数性质,基于传入请求数量建立扩展策略变得更加困难。不用说,随着时间的推移,即使是小的设计错误也会滚雪球般地变成大问题。

另一个反模式是在依赖于繁重内存结构(LLMs、大型Pydantic模型、大缓存等)的服务器中启动多个工作进程。每个工作进程都是一个单独的进程,有自己的事件循环,因此这些对象会按工作进程复制,增加内存和CPU消耗。如果一个工作进程的峰值超过容器的限制,pod将在Gunicorn回收该工作进程之前被OOM杀死(例如,由Kubernetes)。

对于这些工作负载,更喜欢将推理卸载到单独的服务(例如,像HuggingFace这样的推理服务器)或每个pod运行单个工作进程。一般来说,更多的pod优于"每个pod更多工作进程"的设计,但有一个与活动数据库连接相关的注意事项。如果工作进程使用连接池,数据库的CPU将会上升,因为每个连接都保留CPU(≈ pods × workers × pool_size)。这反过来会导致更高的延迟、连接错误和整体性能下降。

没有银弹。模式和最佳实践有帮助,但每个解决方案都应该由特定的业务背景来塑造。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计