Async Python Patterns for AI Backends (That I Learned the Hard Way)

FastAPI and async Python are the obvious choice for AI backends — until you hit subtle concurrency bugs, blocked event loops, and streaming responses that silently drop chunks. Here's how I actually structure these systems.

Afzal ZubairFebruary 24, 20265 min read
Async Python Patterns for AI Backends (That I Learned the Hard Way)

When I first started building AI backends, I picked FastAPI because everyone said it was fast and async-native. Both things are true. What nobody mentioned is that async Python has a specific failure mode that's particularly nasty: blocking the event loop looks exactly like slow code until it completely breaks under load.

I've built AI backends at Allia Health (clinical note generation, telehealth sessions) and Fortell AI (voice AI, real-time analytics). Here's what I've actually learned.

The Event Loop is Single-Threaded and You Will Block It

This is the foundational thing to internalise. FastAPI runs on a single async event loop. Everything runs on it — your request handlers, your background tasks, your WebSocket connections. If anything blocks that loop, everything stalls.

The sneaky part: CPU-bound code blocks it silently. Parsing a large document, running a regex over a big string, serialising a massive JSON object — all of these will block the event loop and cause latency spikes that look like random slowness.

# This blocks the event loop — DO NOT do this in an async handler
@app.post("/summarise")
async def summarise(doc: DocumentRequest):
    # This is CPU-bound, it blocks everything else while it runs
    chunks = naive_chunk_document(doc.text)  # 200ms of pure Python
    summary = await llm.complete(chunks)
    return {"summary": summary}

The fix: offload CPU-bound work to a thread pool.

import asyncio
from concurrent.futures import ThreadPoolExecutor
 
executor = ThreadPoolExecutor(max_workers=4)
 
@app.post("/summarise")
async def summarise(doc: DocumentRequest):
    # Run CPU-bound chunking in a thread, don't block the loop
    loop = asyncio.get_event_loop()
    chunks = await loop.run_in_executor(executor, naive_chunk_document, doc.text)
    summary = await llm.complete(chunks)
    return {"summary": summary}

Streaming LLM Responses: Get This Right or Don't Use It

Streaming is one of those things where the happy path is simple and every edge case is a footgun.

The basic pattern in FastAPI with the Anthropic SDK:

from fastapi.responses import StreamingResponse
 
@app.post("/stream")
async def stream_completion(req: CompletionRequest):
    async def generate():
        async with client.messages.stream(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[{"role": "user", "content": req.prompt}],
        ) as stream:
            async for text in stream.text_stream:
                # SSE format
                yield f"data: {json.dumps({'text': text})}\n\n"
        yield "data: [DONE]\n\n"
 
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Critical for nginx — disables buffering
        },
    )

The X-Accel-Buffering: no header is the non-obvious one. If you're behind nginx (you probably are in production), it will buffer your entire streaming response before sending it to the client unless you explicitly tell it not to. This makes streaming look broken — the client receives nothing for 10 seconds then gets everything at once.

Handle client disconnects. If a user navigates away mid-stream, you want to stop the LLM call — otherwise you're burning tokens for nobody.

async def generate():
    try:
        async with client.messages.stream(...) as stream:
            async for text in stream.text_stream:
                yield f"data: {json.dumps({'text': text})}\n\n"
    except asyncio.CancelledError:
        # Client disconnected — clean up here if needed
        pass
    finally:
        yield "data: [DONE]\n\n"

FastAPI cancels the generator when the client disconnects. The CancelledError is your signal to stop and clean up.

Background Tasks for Long-Running Operations

Some AI operations take too long to stream — generating a full analysis report, processing a batch of documents, running an evaluation pipeline. These shouldn't be in the request handler at all.

My pattern for this: accept the job, return a task ID immediately, process in the background, store state in Redis.

import uuid
from fastapi import BackgroundTasks
 
@app.post("/analyse", response_model=TaskResponse)
async def start_analysis(req: AnalysisRequest, background_tasks: BackgroundTasks):
    task_id = str(uuid.uuid4())
    await redis.setex(f"task:{task_id}", 3600, json.dumps({"status": "pending"}))
 
    background_tasks.add_task(run_analysis, task_id, req)
    return {"task_id": task_id}
 
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    raw = await redis.get(f"task:{task_id}")
    if not raw:
        raise HTTPException(404)
    return json.loads(raw)
 
async def run_analysis(task_id: str, req: AnalysisRequest):
    try:
        await redis.set(f"task:{task_id}", json.dumps({"status": "running"}))
        result = await do_heavy_analysis(req)
        await redis.setex(
            f"task:{task_id}", 3600,
            json.dumps({"status": "complete", "result": result})
        )
    except Exception as e:
        await redis.setex(
            f"task:{task_id}", 3600,
            json.dumps({"status": "failed", "error": str(e)})
        )

For anything more complex than this, use a proper task queue. Celery with Redis broker is the standard choice. For lighter workloads, arq is worth a look — it's async-native and much simpler to set up.

Connection Pooling with Async Clients

Every tutorial initialises the LLM client inside the request handler. Don't do this — you pay the connection setup cost on every request, and at scale you'll exhaust file descriptors.

# Bad — new client per request
@app.post("/complete")
async def complete(req: CompletionRequest):
    client = anthropic.AsyncAnthropic()  # New connection every time
    return await client.messages.create(...)
 
# Good — shared client, initialised at startup
from contextlib import asynccontextmanager
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.llm = anthropic.AsyncAnthropic()
    app.state.db = await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)
    yield
    await app.state.llm.close()
    await app.state.db.close()
 
app = FastAPI(lifespan=lifespan)
 
@app.post("/complete")
async def complete(req: CompletionRequest, request: Request):
    return await request.app.state.llm.messages.create(...)

The lifespan context manager (the modern replacement for on_event("startup")) is the right place to initialise shared clients and connection pools. They live for the lifetime of the application, not the request.

Rate Limiting and Cost Control

LLM API costs are real and they'll surprise you. An endpoint that calls GPT-4o with no rate limiting will happily bankrupt you during a traffic spike or if a user finds a way to hammer it.

The minimum I put on every LLM endpoint in production:

from slowapi import Limiter
from slowapi.util import get_remote_address
 
limiter = Limiter(key_func=get_remote_address)
 
@app.post("/complete")
@limiter.limit("10/minute")  # Per-IP limit
async def complete(request: Request, req: CompletionRequest):
    ...

For per-user limits you'll want to key on the user ID instead of IP, and store counts in Redis so limits work across multiple instances. But even the basic IP-based limit prevents the most egregious abuse.

AI backends aren't fundamentally different from other backends — the same async patterns, the same connection pooling discipline, the same observability requirements. The difference is the failure modes are more expensive and latency tolerances are tighter. Getting the plumbing right lets you focus on the actually interesting problems.

Related Posts