Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache request 'body' and 'stream_consumed' in the ASGI scope #1519

Closed
wants to merge 7 commits into from

Conversation

tomchristie
Copy link
Member

@tomchristie tomchristie commented Feb 15, 2022

Cache the request body and stream_consumed in the ASGI scope, rather than on the request instance.

This preserves the existing behaviours when calling .body() or .stream() multiple times, but extends it to also persisting when new Request instances are created. (Eg. both within some middleware, as well as within an endpoint.)

Similar to #944 but should also handle raising RuntimeError("Stream consumed") if user code attempts to stream the request body more than once.

Edit by @Kludex:

@adriangb
Copy link
Member

adriangb commented Feb 15, 2022

This looks nice. I feel like the implementation is easier to grok than #944.

And it's all implementation details so it we can always change it in the future 👍 (for example, I think it may make sense to cache other request state into the asgi scope)

Testing wise, I think we need:

  1. A test like this one to to verify the bug/issue is fixed
  2. A couple tests to verify the current behavior w.r.t. consuming the request body and errors.
  3. Maybe tests to verify interaction with middleware. I know this is an edge case, but I think that with this implementation if a middleware consumes the body and then another middleware replaces receive the behavior would be different than what we currently have. Might be worth documenting the change with a test.

@adriangb
Copy link
Member

This is a test I think is worth at least thinking about:

def test_request_middleware_replaces_receive(test_client_factory):
    """Test the behavior of middleware consuming the request body"""

     async def app(scope: Scope, receive: Receive, send: Send) -> None:
        # a middleware consumes the body
        await Request(scope, receive, send).body()
        # another middleware replaces receive

         async def new_receive() -> Message:
            return {"type": "http.request", "body": b"data"}

         # the endpoint creates a new Request that uses wrapped_receive
        data = await Request(scope, new_receive, send).body()
        await Response(data)(scope, receive, send)

     client = test_client_factory(app)
    resp = client.get("/", content=b"original data")
    assert resp.status_code == 200, resp.content
    assert resp.content == b"data"

It passes on master but not on this branch.

@retrry
Copy link

retrry commented Feb 21, 2022

Any movement with this PR as the #944 was closed?

@tomchristie
Copy link
Member Author

So...

I'm not necessarily convinced that this is a road that we should go down.

Starlette is built around "everything is just ASGI". The relevant part here is that middleware is just ASGI middleware.
There's pros and cons to that. The pros are that it helps to promote reusable components, the cons are that it's sometimes a more fiddly abstraction to understand.

My feeling here is that rather than attempting to kludge our middleware into a request/response style we ought to instead just provide better documentation about the limitations.

In particular:

I get that this is a cookie that a bunch of folks want. But it has some pretty unhealthy ingredients in it, and I'm not sure I'm comfortable selling it.

@retrry
Copy link

retrry commented Mar 7, 2022

Okay, maybe you have some examples where I could see middleware that inspects or uses request body?

@tomchristie
Copy link
Member Author

@retrry Let me pitch this the other way around... if somebody provides one of the concrete use-cases that they're running into, we could then use that to discuss if/how you'd be able to meet that use-case.

I think that tends to be a more productive tactic, since it helps ground the conversion in an actual end-user-facing issue.

@retrry
Copy link

retrry commented Mar 7, 2022

So my concrete use-case would be that I need to log all POST requests bodies. POST request bodies contain queries, which I need to log for debugging and accounting purposes.

@tomchristie
Copy link
Member Author

tomchristie commented Mar 7, 2022

Okay.

ASGI middleware is a little complex, so I need to start from basics in figuring out an example here.

First up I went and reminded myself how Starlette does it's class-based ASGI middleware, which I did by taking a look at this particular case. Because I knew it was a fairly simple one.

Next let's start with an empty example to work from.

class EmptyMiddleware:
    def __init__(
        self,
        app: ASGIApp,
    ) -> None:
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"]  != "http":
            await self.app(scope, receive, send)
            return

        # Do stuff with the HTTP case...
        await self.app(scope, receive, send)

Now. There's two different options.

First up, we could ingest the entire request body and log it at the point the middleware runs...

class BodyLoggingMiddleware:
    def __init__(
        self,
        app: ASGIApp,
    ) -> None:
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"]  != "http":
            await self.app(scope, receive, send)
            return

        # Ingest all body messages from the ASGI `receive` callable.
        messages = []
        more_body = True
        while more_body:
            message = await receive()
            messages.append(message)
            more_body = message.get("more_body", False)

        # Log the request
        body = b''.join([message.get("body", b"") for message in messages])
        print(scope["method"], body)

        # Dispatch to the ASGI callable
        async def wrapped_receive():
            # First up we want to return any messages we've stashed.
            if messages:
                return messages.pop(0)

            # Once that's done we can just await any other messages.
            return await receive()

        await self.app(scope, wrapped_receive, send)

Note that I'm also having to refer to https://asgi.readthedocs.io/en/latest/specs/www.html to remind myself how the ASGI HTTP events are structured.

Alternatively we could structure the middleware so that the body is not read entirely into memory, but instead log it only if/when it is accessed by the application.

class BodyLoggingMiddleware:
    def __init__(
        self,
        app: ASGIApp,
    ) -> None:
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"]  != "http":
            await self.app(scope, receive, send)
            return

        async def wrapped_receive():
            message = await receive()
            body = message.get("body", b"")
            more_body = message.get("more_body", False)
            if body:
                # Log each individual chunk as it is ingested.
                print(scope["method"], body, more_body)
            return message

        await self.app(scope, wrapped_receive, send)

So.

(And yup, these are a bit fiddly for developers to figure out.)

Disclaimer: I haven't tested these, let me know if you use either of these patterns. Feedback and alternate implementations are most welcome.

@adriangb
Copy link
Member

adriangb commented Mar 7, 2022

That generally looks good to me. I went ahead and pushed the second implementation a bit further, intending to handle cases where the app errors or the client disconnects: https://gist.github.com/adriangb/2f003410b05783924d3d6bf3ff18ad6f

Obviously it is a business-level decision if you want to log all of the body even if the client errored, but at lest this shows how you might do that if you wanted to.

@retrry
Copy link

retrry commented Mar 26, 2022

Hey, thanks for the code - it works and we have been using it for our logging implementation.

I have one additional question: is it possible to access response in the same middleware? Our use case in other project is: we were logging body (with patched starlette with this MR: #944) and logging response headers in the same log message. Can I do the same in async middleware or it is not possible and I should split log messages to separate messages?

Edit: I think the example you wrote should be added to starlette docs or repo 🙂

@gnat
Copy link

gnat commented Mar 27, 2022

I strongly feel we need to entertain a merge on this or #944 if for no other reason that:

  1. This is an inevitable stumbling block nearly every Starlette user will run into.
  2. The current resolution path is to search issues on GitHub. 👀

Even if the perfect documentation exists on the official site, the hangup will still be perceived as a "bug" every newbie will run into and we'll need to refer them to the manual for.

Reminder: The reason we do not have a flood of issues from FastAPI users is because they patched this with something similar to #944 in 2019 (see tiangolo/fastapi#589 and tiangolo/fastapi#394 for their big thread).

Early adopters of Starlette such as myself already know to work around this by storing the request body, but I could see newcomers seriously re-assessing whether they want to write pure Starlette or choose something else if they hit a snagging issue like this early on.

A "will not fix" resolution is risky to the future growth of the framework.

@tomchristie Suggestion: Perhaps shipping a tiny, performant, built-in middleware for this instead of modifying the core (make it common like the SessionMiddleware). I'd vastly prefer something like this in core but this may satisfy all parties.

Copy link
Sponsor Member

@Kludex Kludex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomchristie I think this solution is neat. We are not making much of a sacrifice here, we are just expanding the functionality.

From a user's point of view, this is a huge improvement.

Maybe we can move the _get_request_state and _set_request_state to another module, as we had the talk about moving the Starlette specific extensions to ["extensions"]["starlette"][<extension>].

@Kludex Kludex mentioned this pull request Sep 3, 2022
8 tasks
@Kludex Kludex removed the hold Don't merge it label Sep 3, 2022
@Kludex Kludex added this to the Version 0.21.0 milestone Sep 3, 2022
@Kludex
Copy link
Sponsor Member

Kludex commented Sep 3, 2022

@tomchristie I've added this PR to next milestone release. Please block me if you don't wish to have this in, and if it's the case, perhaps we should take a decision to not have this in until 1.0. We can go back to this after we get the stable release.

If we are not blocked, I'll merge this before the next release (which will be 0.21.0).

@adriangb
Copy link
Member

adriangb commented Sep 3, 2022

I've become convinced that this is not the right approach. Take a look at this example:

from typing import Awaitable, Callable
from starlette.applications import Starlette
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.routing import Route
from starlette.types import ASGIApp, Message, Receive, Scope, Send


CallNext = Callable[[Request], Awaitable[Response]]


class LogRequestBodySize(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next: CallNext) -> Response:
        print(len(await request.body()))
        return await call_next(request)


def replace_body_middleware(app: ASGIApp) -> ASGIApp:
    async def wrapped_app(scope: Scope, receive: Receive, send: Send) -> None:
        async def wrapped_rcv() -> Message:
            msg = await receive()
            msg["body"] += b"foo-"
            return msg

        await app(scope, wrapped_rcv, send)

    return wrapped_app


async def endpoint(request: Request) -> Response:
    assert (await request.body()).startswith(b"foo-")
    return Response()


app: ASGIApp = Starlette(routes=[Route("/", endpoint)])
app = replace_body_middleware(app)
app = LogRequestBodySize(app)

When you hit the / endpoint you'll get a 500 response because the body read in the endpoint is not the body

I know that replace_body_middleware seems contrived, but (1) it could be something like uncompressing an incoming request and (2) it's a perfectly valid ASGI middleware.

@Kludex
Copy link
Sponsor Member

Kludex commented Sep 3, 2022

What's the right approach?

@adriangb
Copy link
Member

adriangb commented Sep 3, 2022

I think I'd prefer something like #1692. This doesn't solve the issue for things that use Request outside of BaseHTTPMiddleware, but I think it solves the issue for things that use it inside of BaseHTTPMiddleware which is most of the cases where users encounter this.

I'm also open to saying this is a wontfix and move towards something like #1691

@gnat
Copy link

gnat commented Sep 3, 2022

but I think it solves the issue for things that use it inside of BaseHTTPMiddleware which is most of the cases where users encounter this.

The necessity has been discussed at length previously- it's certainly not just middleware. Using POST data anywhere in a request/response is extremely common for websites. See Toms starting post.

The ideal DX is being able to call post = await request.form() or body = await request.body() any time and not worrying about consuming it--- or storing it in the request state, or storing it yourself in an intermediate variable somewhere.

Just to name a few off the top of my head: Sanic, FastAPI, vanilla PHP, etc. all do this. Not having this is just extra manual busywork people should not have to get hung up on.

@adriangb
Copy link
Member

adriangb commented Sep 3, 2022

Where else would you want to call Request.body()? Like you say it already works in FastAPI endpoints and dependencies (because the same Request instance is used everywhere).

@Kludex Kludex modified the milestones: Version 0.21.0, Version 0.22.0 Sep 24, 2022
@Kludex
Copy link
Sponsor Member

Kludex commented Oct 3, 2022

@adriangb can you confirm that the issues this PR closes, are also closed on #1692? 🙏

@Kludex Kludex closed this Oct 3, 2022
@tomchristie tomchristie deleted the keep-request-state branch October 4, 2022 10:24
@Kludex
Copy link
Sponsor Member

Kludex commented Jan 27, 2023

#1692 doesn't solve the issues this PR solves.

@Kludex Kludex restored the keep-request-state branch January 27, 2023 09:55
@Kludex Kludex reopened this Jan 27, 2023
@adriangb
Copy link
Member

adriangb commented Jan 27, 2023

Mind being more specific about which issue? Also JFYK this change is going to wreak all sorts of havoc

client = test_client_factory(app)

response = client.post("/", data="abc")
assert response.json() == {"body": "abc", "stream": "abc"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert response.json() == {"body": "abc", "stream": "abc"}
assert response.json() == {"body": "abc", "stream": "abc"}
def test_request_body_then_replace_body(test_client_factory):
import zlib
data = "Hello, world!"
gzipped_payload = zlib.compress(data.encode())
def ungzip_middleware(app):
async def wrapped_app(scope, receive, send):
dec = zlib.decompressobj() # offset 16 to skip the header
buffer = bytearray()
async def wrapped_rcv():
more_body = True
while more_body:
msg = await receive()
more_body = msg.get("more_body", False)
buffer.extend(msg["body"])
if len(buffer) > 16 or not more_body:
msg["body"] = dec.decompress(buffer)
yield msg
buffer.clear()
await app(scope, wrapped_rcv().__anext__, send)
return wrapped_app
def log_request_body_middleware(app):
async def wrapped_app(scope, receive, send):
await Request(scope, receive).body()
await app(scope, receive, send)
return wrapped_app
async def app(scope, receive, send):
request = Request(scope, receive)
body = await request.body()
response = JSONResponse({"body": body.decode()})
await response(scope, receive, send)
app = log_request_body_middleware(ungzip_middleware(app))
client = test_client_factory(app)
response = client.post("/", data=gzipped_payload)
assert response.json() == {"body": data}

@gnat this is what I'm referring to

ungzip_middleware is a perfectly valid ASGI middleware. It could even be inside of a mounted ASGI app or something like that. If we implement this PR we completely break this sort of thing and it breaks in a very non-intuitive way. We're essentially dealing with cache invalidation

@enono78
Copy link

enono78 commented Mar 28, 2023

@tomchristie @Kludex
It would be great to get this improvement in Starlette to be able to read safely the request body within a middleware.
I was wondering if this change is planned to be applied also on previous Starlette versions (like 0.19.1)?
I will be happy to discuss also some other potential issues that have been solved (or not) on Starlette 0.19.1

@Kludex
Copy link
Sponsor Member

Kludex commented Mar 28, 2023

I was wondering if this change is planned to be applied also on previous Starlette versions (like 0.19.1)?

No.

@Kludex
Copy link
Sponsor Member

Kludex commented Jun 1, 2023

Given this comment, we can't accept this PR.

@adriangb worked on replacements for this:

Let's continue on them.

@Kludex Kludex closed this Jun 1, 2023
@Kludex Kludex deleted the keep-request-state branch June 1, 2023 18:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants