返回

FastAPI API 并发优化:解决串行执行的多种方案

python

FastAPI API 调用串行执行问题分析与解决

FastAPI 作为一款高效的 Python Web 框架,常用于构建 API 服务。有些开发者在使用过程中可能会发现,即便在浏览器中并发请求相同的 API 端点,响应过程却并非并行执行。例如,当 API 中包含耗时操作时,不同请求的执行结果会呈现明显的先后顺序,而不是预期中的并发处理,这种现象值得关注。

问题本质

问题的根源在于 Python 的异步特性,以及 FastAPI 如何使用 asyncawait 进行协程管理。 async def 定义的函数实际上是协程,这些协程由事件循环驱动。 在默认情况下,FastAPI 依赖于 ASGI (Asynchronous Server Gateway Interface) 服务器(例如 Uvicorn 或 Gunicorn + Uvicorn)的单进程和单线程特性来处理请求。当 API 函数内遇到 I/O 阻塞或 time.sleep() 这样的耗时操作时,事件循环会被挂起,后续的请求需要等到前面的操作完成后才能继续执行。因此,看似多个并发请求实际上仍然按照串行的方式被处理,并不是真正的并行。

解决方案

以下提供多种方案来解决API调用串行的问题,让请求达到真正意义的并行处理。

1. 使用多进程服务器 (Multiprocessing)

多进程是实现并发最直接的方式。通过启动多个 Python 进程,每个进程可以处理各自独立的请求,从而充分利用多核 CPU 的性能。可以使用 Gunicorn 这类服务器配合 Uvicorn,以 worker 进程模式启动,实现多进程并发。

操作步骤:

  1. 安装 Gunicorn:

    pip install gunicorn
    
  2. 使用 Gunicorn 启动 FastAPI 应用。指定 -k uvicorn.workers.UvicornWorker 来使用 uvicorn 的 worker class。同时用 -w 参数设置 worker 进程数,根据 CPU 核数合理设置此值。

    gunicorn -k uvicorn.workers.UvicornWorker -w 4 main:app
    

    (main:app 假设您的 FastAPI 应用定义在 main.py 文件中的 app 变量下。)

    使用多进程时,系统会将连接均衡的分配给不同的worker,worker们真正并行的执行任务,以此来实现并行的目标。

安全建议: 启动多个进程会消耗更多的系统资源。需根据实际服务器负载和配置情况,合理配置进程数,避免因过度使用资源导致系统不稳定。

2. 利用 asyncio 的并发特性(Concurrently Processing With asyncio.gather)

asyncio 库的 asyncio.gather 提供了在单个事件循环内并发执行多个协程的能力。 如果耗时操作可以进一步拆解为非阻塞的异步任务,那么就可以利用 gather 进行并发优化,达到类似并行的效果。注意 asyncio.gather 本质还是在单进程和单线程的环境下模拟并发,效率并没有多进程方案高,但是适用性较广。

操作步骤:

  1. 修改API代码, 用异步的睡眠asyncio.sleep代替同步的time.sleep:

    import asyncio
    from fastapi import FastAPI, Request
    
    app = FastAPI()
    
    async def delayed_task(seconds: int):
       print("Task starts")
       await asyncio.sleep(seconds)
       print("Task ends")
    
    @app.get("/ping")
    async def ping(request: Request):
      await asyncio.gather(delayed_task(5), delayed_task(5))
      return {"ping": "pong!"}
    
    
  2. 直接运行这个fastapi项目, 然后访问/ping路径。虽然还在单个线程内,但是多个任务并行起来了。

  uvicorn main:app --reload

你会看到如下输出,证实任务并行了:

 Task starts
 Task starts
 Task ends
 Task ends

安全建议: 此方案的核心在于确保执行的耗时任务可以转化为异步非阻塞的操作,适合I/O操作多而CPU密集型计算较少的场景。 如果有大量 CPU 密集型的任务,多进程方案的效果更好。

3. 任务队列 (Task Queues)

利用 Celery 等任务队列可以将耗时操作卸载到独立的进程中执行,主进程只需要向队列发送任务请求即可,而不用阻塞等待结果。 这让应用在响应外部请求时更加的迅捷。

操作步骤:

  1. 安装 Celery 和 Redis (或 RabbitMQ):

    pip install celery redis
    
  2. 配置 Celery 任务:

定义一个 tasks.py:

import time
from celery import Celery

celery_app = Celery(
  'tasks',
   broker='redis://localhost:6379/0', # 连接 Redis Broker
  backend='redis://localhost:6379/0'   # 连接 Redis 后端
  )

@celery_app.task
def long_running_task():
  print("Celery task starts.")
  time.sleep(5) #  模拟一个耗时操作
  print("Celery task ends.")
  return "Task Done."
  1. 修改 FastAPI 代码:
  from fastapi import FastAPI
  from tasks import long_running_task
  from celery.result import AsyncResult
  from uuid import UUID


  app = FastAPI()
  @app.get("/ping")
  async def ping():
       task_result : AsyncResult = long_running_task.delay()
       return {"task_id": str(task_result.id)}

  @app.get("/status/{task_id}")
  async def status(task_id : UUID):
      task = AsyncResult(str(task_id))
      if task.ready():
          return {"status":"done", "result":task.get()}
      else:
        return {"status":"processing"}
  1. 启动 Celery Worker:

    celery -A tasks worker --loglevel=INFO
    
  2. 启动 FastAPI 应用。访问 /ping 端点发起任务,并通过 /status/{task_id} 查询任务状态。

安全建议: 确保任务队列的中间件 (Redis 或 RabbitMQ) 配置恰当,做好访问控制,避免因暴露中间件导致安全风险。

总结

解决 FastAPI 中 API 调用串行执行的问题有多种方法,多进程是提高并发性能的关键。在实际应用中,开发者应该根据项目特点、资源限制和需求复杂度选择合适的方案。理解 Python 异步编程的底层机制,有助于更有效率的使用 FastAPI。