Middleware — HTTP Body Stream Exhaustion
The ASGI request body is like a one-time-use tape recorder. Once you've listened to the tape (read the body), it's 'played out' and cannot be listened to again by subsequent components in the request processing pipeline, unless you explicitly re-spool it or record a new copy.
The Setup
You want to implement an audit middleware to inspect incoming HTTP requests. The middleware logs payload data prior to forwarding the request to downstream controllers.
What Does This Print?
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def audit_logger(request: Request, call_next):
# Reading request body consumes the underlying byte stream
body = await request.body()
print(f"Incoming API call body: {body.decode()}")
response = await call_next(request)
return response
@app.post("/submit")
async def submit(request: Request):
# This line executes next but must read the payload again
payload = await request.json()
return {"status": "received", "data": payload}
/submit with a JSON payload of {"item": "server"}.
The Output
The server starts processing the request, logs the audit line, and then hangs indefinitely, eventually causing client connection timeouts or Gateway Timeout (504) responses.
Why Python Does This
Under the hood, FastAPI utilizes ASGI (Asynchronous Server Gateway Interface). The incoming request body is transmitted as an asynchronous stream of bytes. When you call await request.body() inside your middleware, the stream is read completely until EOF. However, when the downstream handler attempts to parse the JSON with await request.json(), it tries to pull from the same ASGI connection stream. Because the stream has already been exhausted, the reader waits endlessly for more data packet events that will never arrive because the client has completed transmission.
The Fix
from fastapi import FastAPI, Request
from starlette.requests import Request as StarletteRequest
app = FastAPI()
@app.middleware("http")
async def audit_logger(request: Request, call_next):
# Check if content type expects body payload
if request.method in ["POST", "PUT", "PATCH"]:
# Read and cache the body on the request object state
body = await request.body()
print(f"Incoming API call body: {body.decode()}")
# Re-initialize or override the request stream reader so downstream routes can read it
async def receive_wrapper():
return {"type": "http.request", "body": body, "more_body": False}
request._receive = receive_wrapper
response = await call_next(request)
return response
@app.post("/submit")
async def submit(request: Request):
payload = await request.json()
return {"status": "received", "data": payload}
The fix typically involves reading the body, storing it, and then creating a new Request object (or patching the existing one) with a new ASGI receive channel that 'plays back' the buffered body. This allows the middleware to inspect the body while ensuring the original endpoint can still receive it as if it were the first reader.
How This Fails in Real Systems
A security auditing middleware was deployed to capture and log injection payloads in real time. Because the ASGI stream was consumed and not re-initialized, every POST request to the API's file processing server began hanging, rendering the backend completely unresponsive.