Skip to content

Commit

Permalink
Borrowing botocore implementation of get_response
Browse files Browse the repository at this point in the history
TL;DR - The use of boto3 in celery#1759 resulted in relying on blocking
(synchronous) HTTP requests, which caused the performance issue reported
in celery#1783.

`kombu` previously used to craft AWS requests manually as explained in
detail in celery#1726, which resulted in an outage when botocore temporarily
changed the default protocol to JSON (before rolling back due to the
impact on celery and airflow.) To fix the issue, I submitted celery#1759,
which changes `kombu` to use `boto3` instead of manually crafting AWS
requests. This way when boto3 changes the default protocol, kombu won't
be impacted.

While working on celery#1759, I did extensive debugging to understand the
multi-threading nature of kombu. What I discovered is that there isn't
an actual multi-threading in the true sense of the word, but an event
loop that runs on the same thread and process and orchestrate the
communication with SQS. As such, it didn't appear to me that there is
anything to worry about my change, and the testing I did didn't discover
any issue. However, it turns out that while kombu's event loop doesn't
have actual multi-threading, its [reliance on
pycurl](https://github.com/celery/kombu/blob/main/kombu/asynchronous/http/curl.py#L48)
(and thus libcurl) meant that the requests to AWS were being done
asynchronously. On the other hand, boto3 requests are always done
synchronously, i.e. they are blocking requests.

The above meant that my change introduced blocking on the event loop of
kombu. This is fine in most of the cases, since the requests to SQS are
pretty fast. However, in the case of using long-polling, a call to SQS's
ReceiveMessage can last up to 20 seconds (depending on the user
configuration).

To solve this problem, I rolled back my earlier changes and, instead, to
address the issue reported in celery#1726, I now borrowed the implementation
of `get_response` from botocore and changed the code such that the
protocol is hard-coded to `query`. This way, when botocore changes the
default protocol of SQS to JSON, kombu won't be impacted, since it
crafts its own request and, after my change, it uses a hard-coded
protocol based on the crafted requests.

This solution shouldn't be the final solution, and it is more of a
workaround that does the job for now. There are two problems with this
approach:

1. It doesn't address the fundamental problem discussed in celery#1726, which
   is that `kombu` is using some stuff that are kind of internal to
   botocore, namely the `StreamingBody` class.
2. It is still making an assumption, namely that the protocol of
   communication is the `query` protocol. While this is true, and likely
   going to be true for some time, in theory nothing stops SQS (the
   backend, not client) from changing the default protocol, rendering
   the hard-coding of protocol in the new `get_response` method to
   `query` to be problematic.

As such, the long term solution should be to completely rely on boto3
for any communication with AWS, and ensuring that all requests all async
in nature (non-blocking.) This, however, is a fundamental change that
requires a lot of testing, in particular performance testing.
  • Loading branch information
rafidka committed Oct 9, 2023
1 parent ff2c474 commit 446a9b1
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions kombu/asynchronous/aws/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import boto3
from botocore import exceptions
from botocore.awsrequest import AWSRequest
from botocore.response import get_response
from botocore.response import StreamingBody
from botocore import parsers
except ImportError:
boto3 = None

Expand All @@ -18,9 +19,32 @@ class BotoCoreError(Exception):
exceptions = _void()
exceptions.BotoCoreError = BotoCoreError
AWSRequest = _void()
get_response = _void()


__all__ = (
'exceptions', 'AWSRequest', 'get_response'
)


def get_response(operation_model, http_response):
protocol = 'query' # operation_model.metadata['protocol']
response_dict = {
'headers': http_response.headers,
'status_code': http_response.status_code,
}
# TODO: Unfortunately, we have to have error logic here.
# If it looks like an error, in the streaming response case we
# need to actually grab the contents.
if response_dict['status_code'] >= 300:
response_dict['body'] = http_response.content
elif operation_model.has_streaming_output:
response_dict['body'] = StreamingBody(
http_response.raw, response_dict['headers'].get('content-length')
)
else:
response_dict['body'] = http_response.content

parser = parsers.create_parser(protocol)
return http_response, parser.parse(
response_dict, operation_model.output_shape
)

0 comments on commit 446a9b1

Please sign in to comment.