New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use the correct protocol for SQS requests #1807
Conversation
e1d02d7
to
1ee41ae
Compare
I think we better revert the change first then we do other stuff. that is the safest thing for short term |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can rebase this now for further review and making the the diff smaller for better review. we have to be careful to merge this, so let us take some time. thanks for your continuous effort
Sure, I will do that. Having said that, I discovered an issue, which I need to address before we can merge this PR. |
well this will take some time to merge this PR. so you can work on other issues as well :) |
9cbdcec
to
1506a00
Compare
I managed to fix the issue and confirmed that my updated code is working with both query and JSON protocol. However, I am still marking it as WIP as I don't want it to get merged before getting approvals from the following:
Additionally, we need to do load testing and generate metrics to make sure we are not introducing any performance degradation. As such, let's aim for at least October 20 before we merge this PR. |
bee6c08
to
f4a17b2
Compare
@auvipy , it is a bit of a pain to get all the tests to run successfully, and I keep getting some weird formatting issue. Is there a way to run the tests locally before updating the PR? |
Here you go @rafidka Unit
Also possible:
Integration
Also possible:
Using Lint (pre-commit)
|
kombu/asynchronous/aws/connection.py
Outdated
@@ -229,9 +229,11 @@ def get_status(self, operation, params, path='/', parent=None, verb='GET', callb | |||
|
|||
def _on_list_ready(self, parent, markers, operation, response): | |||
service_model = self.sqs_connection.meta.service_model | |||
protocol = service_model.protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be a class level static variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, since the self.sqs_connection
itself is an instance variable rather than a static variable, We could make the protocol
an instance variable as well, but there isn't much advantage, and I would rather not since we can easily access the protocol from the service model.
kombu/asynchronous/aws/ext.py
Outdated
|
||
with the exception that instead of hard-coding the protocol to 'query', | ||
we expect the caller to specify the protocol. This helps us deal with both | ||
jquery and json protocols. For more context on this, the reader is refered |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is jquery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ops.. should be query
. I will fix that.
kombu/asynchronous/aws/ext.py
Outdated
def get_response(operation_model, http_response, protocol): | ||
"""Parses the HTTP response of an AWS operation. | ||
|
||
This is an almost identical copy of botocore's get_response: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the botocore implementation be aliased and operation_model.metadata['protocol'] be set to the protocol parameter?
It would be safer if the botocore implementation was used inside this function to prevent drift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we don't need this method anymore, and I will just remove it. Initially, I was trying to have get_response
always parse the response as query
. This, however, didn't work, and I decided to craft the request to match the format the operation is using, and hence we don't need our version of get_response
and I will accordingly rename the PR.
# query-based opts | ||
param_payload = {'params': params} | ||
|
||
return AWSRequest(method=verb, url=queue_url, **param_payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep verb
consistent with method
used elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dhegberg , I tried to stay consistent with the original implementation to reduce the amount of changes. Having said that, yeah, it is confusing, so I will just rename everything to method.
else: | ||
raise Exception(f'Unsupported protocol: {protocol}.') | ||
|
||
signing_type = 'presignurl' if request.method.lower() == 'get' \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use presignurl
on get
? It's not clear from https://github.com/boto/botocore/blob/develop/botocore/signers.py when pre-signed urls should be used. Can you provide some documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also confused me, and the only explanation I have is that the documentation here is inaccurate and they meant to instead say:
This should be used when pre-signing a GET* request"
Having said that, I believe this code (which I copied from the original implementation of make_request
in the kombu.aws.connection.AsyncAWSQueryConnection
class) is actually incorrect, since the signer name is presign-url
rather than presignurl
, but I was planning on keeping it as is until I get feedback from the botocore team.
Latest update:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after all process is complete, we will need the test coverage as well.
|
||
service_model = self.sqs_connection.meta.service_model | ||
protocol = service_model.protocol | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth leaving a comment here explaining why this method is being updated with this branching. Just set a little bit of the context with some links. I know it could be gotten from the git blame/commit, but it's nice to have a bit in the code as well to hint people to go looking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally. I will do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
operation_model.metadata['targetPrefix'], | ||
operation_model.name, | ||
) | ||
headers['X-Amz-Target'] = target |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at a sample request in the docs I also see Content-Encoding: amz-1.0
, X-Amz-Date: <date>
and a few others in the header as well? I suppose those aren't required and we'll have no issues excluding them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which document are you referring to?
What I did was to check what botocore is doing in their JSONSerialiazer, and I don't see those fields in there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll find the docs, I think they were in a slack thread that I will share with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll find the docs, I think they were in a slack thread that I will share with you.
would you mind doing a last round of review?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@auvipy Can you explain what kind of coverage you are looking for? |
I am considering this and few other PR to include in a bug fix release tomorrow or day after tomorrow. should we pin the boto versions with this patch as well as a safety measure? |
Why does it matter what is the underlying protocol? If any, the customers will get a performance boost, since JSON is better. At the end of the day, when we communicate with AWS, what matters is the request being sent and the response by received, and the underlying network protocol used shouldn’t matter. Yes, it would’ve mattered if introduced a performance degradation, but in this case, it shouldn’t. |
No, I don’t think we should pin the botocore versions. The whole point of this PR is so we don’t have to pin the version, and make sure kombu works with any botocore version. If we plan to pin the version of botocore, then we should probably not merge this PR. In my humble opinion, what we need is to for the open source community to do as much testing as possible for this PR, and then merge. I did a lot of testing internally, but my test will probably not be as thorough as someone from the Celery/Kombu team, since my exposure to Celery/Kombu is relatively limited. |
As mentioned above, I don't recommend pinning the boto version, if by pinning you mean a specific version. However, I do recommend limiting the range of supported botocore versions, since Celery is no more compatible with the new botocore. |
I know this isn't a bulletproof test as it's not using real SQS but I've managed to replicate this in our local Docker Compose stack using this image https://hub.docker.com/r/softwaremill/elasticmq-native in place for SQS. We have a Celery container and a separate container for Beat so it is easy to see when one is sending tasks to the other. For all tests the Celery version is 5.3.1 because Poetry complained about this PR being versioned as kombu 5.3.2 which is lower than the requirements for 5.3.5 (no testing has showed main Celery to be the issue so I don't think that matters): boto3 1.28.80 + botocore 1.31.80 + kombu 5.3.3: Messages processed successfully Not sure how much this helps everyone as it isn't actually using SQS, it is using the latest botocore though and has been incredibly useful when we were debugging what version we should pin our environment to. |
@auvipy , thanks for merging this PR. I am just curious to learn about performance tests that Celery currently does as part of the release process. |
I am asking because I felt with both my PRs (this one and #1759) I didn't get the impression that the Celery team did some performance testing, and it was mostly my testing, which makes me slightly worried as I cannot possibly cover the different cases of Celery, and my testing is mostly focused on Airflow/MWAA. |
@rafidka thanks for raising the concern/question. @Nusnus is planning/doing something similar here https://github.com/celery/pytest-celery which is work in progress. the previous stress test tools are out off date now so we can't do much right now. but hopefully that is going to change soon |
I have been under this internal struggle for a long time now between progressing with the project and communicating my plans. So far, I have chosen 100% focus on progress to get pytest-celery v1.0.0 out as soon as I possibly can. My MVP strategy goes like this:
This way, I lead the MVP feature list based on actual tests I am developing in parallel, which helps me define the project scope on the fly TDD style. I have a
This means I have a list of tests I plan to implement that once done and passing, will allow me to binge-doc the entire project end-to-end and start moving into beta/RC stages etc. @rafidka - This could be an excellent time to collaborate on possibly adding SQS support to Timeline-wise, I hope to start binge-docing in about a month, hopefully less. The end game is having Celery v5.4 level up in terms of QA for production environments. |
If you're up for it, I can contact you when the time comes (let's say Dec/Jan) to collaborate together on it, could be fun and interesting! |
P.S I am currently working on about 5 active branches in both Celery and Pytest Celery, for the infra, the smoke tests, fixes for bugs I found (waiting for merge for v5.4), and more... It's a git rollercoaster, I have to admit haha 😄 |
Just don't overwhelm yourself with 5.4 stuffs now. you will reach the goals gradually and eventually. And don't forget to take breaks to avoid possible burnouts. Celery need you for long time! so beside celery, don't forget to take care of yourselves too. |
Thank you, @Nusnus . I discussed with my manager and we will try to fit some time for this into our roadmap, though unlike for December, but good chance next year. I think what will help is when you have a document where you jot down your ideas of what you will be working on. Then I could also add some ideas for SQS-related stuff in Celery (we can only promise help on this front unfortunately). |
Sounds great ! P.S |
@rafidka v1.0.0 is pretty much close feature-wise. Bug fixing & Documentation are my highest priorities right now alongside supporting the newly merged test suites, in preparation for Celery v5.4 release. P.S |
we need to bump boto3 to bring in the changes to the sesv2 API to add support for custom headers. I went through the boto3 changelogs with a fine toothcomb - nothing there is particularly alarming. Here's a filtered list of just the lines i think will affect our apps. Note that we only use boto3 for interacting with things programatically, so while there may have been changes to, eg, boto3's ecs API, since we don't interact with ECS from our apps we don't need to worry about that. See the filtered list[^1]. Notably there were some changes to the boto3 sqs protocol, and celery needed to be bumped to support that - see the kombu PR[^2] if you're interested in learning more. [^1]: https://gist.github.com/leohemsted/0572ad15c57fdbaef8b9697cc6e5eaa4 [^2]: celery/kombu#1807
we need to bump boto3 to bring in the changes to the sesv2 API to add support for custom headers. I went through the boto3 changelogs with a fine toothcomb - nothing there is particularly alarming. Here's a filtered list of just the lines i think will affect our apps. Note that we only use boto3 for interacting with things programatically, so while there may have been changes to, eg, boto3's ecs API, since we don't interact with ECS from our apps we don't need to worry about that. See the filtered list[^1]. Notably there were some changes to the boto3 sqs protocol, and celery needed to be bumped to support that - see the kombu PR[^2] if you're interested in learning more. [^1]: https://gist.github.com/leohemsted/0572ad15c57fdbaef8b9697cc6e5eaa4 [^2]: celery/kombu#1807
we need to bump boto3 to bring in the changes to the sesv2 API to add support for custom headers. I went through the boto3 changelogs with a fine toothcomb - nothing there is particularly alarming. Here's a filtered list of just the lines i think will affect our apps. Note that we only use boto3 for interacting with things programatically, so while there may have been changes to, eg, boto3's ecs API, since we don't interact with ECS from our apps we don't need to worry about that. See the filtered list[^1]. Notably there were some changes to the boto3 sqs protocol, and celery needed to be bumped to support that - see the kombu PR[^2] if you're interested in learning more. [^1]: https://gist.github.com/leohemsted/0572ad15c57fdbaef8b9697cc6e5eaa4 [^2]: celery/kombu#1807
TL;DR - The use of boto3 in #1759 resulted in relying on blocking (synchronous) HTTP requests, which caused the performance issue reported in #1783.
kombu
previously used to craft AWS requests manually as explained in detail in #1726, which resulted in an outage when botocore temporarily changed the default protocol to JSON (before rolling back due to the impact on celery and airflow.) To fix the issue, I submitted #1759, which changeskombu
to useboto3
instead of manually crafting AWS requests. This way when boto3 changes the default protocol, kombu won't be impacted.While working on #1759, I did extensive debugging to understand the multi-threading nature of kombu. What I discovered is that there isn't an actual multi-threading in the true sense of the word, but an event loop that runs on the same thread and process and orchestrate the communication with SQS. As such, it didn't appear to me that there is anything to worry about my change, and the testing I did didn't discover any issue. However, it turns out that while kombu's event loop doesn't have actual multi-threading, its reliance on pycurl (and thus libcurl) meant that the requests to AWS were being done asynchronously. On the other hand, boto3 requests are always done synchronously, i.e. they are blocking requests.
The above meant that my change introduced blocking on the event loop of kombu. This is fine in most of the cases, since the requests to SQS are pretty fast. However, in the case of using long-polling, a call to SQS's ReceiveMessage can last up to 20 seconds (depending on the user configuration).
To solve this problem, I rolled back my earlier changes and, instead, to address the issue reported in #1726, I now changed the
AsyncSQSConnection
class such that it crafts either aquery
or ajson
request depending on the protocol used by the SQS client. Thus, when botocore changes the default protocol of SQS to JSON, kombu won't be impacted, since it crafts its own request and, after my change, it uses a hard-coded protocol based on the crafted requests.This solution shouldn't be the final solution, and it is more of a workaround that does the job for now. The final solution should be to completely rely on boto3 for any communication with AWS, and ensuring that all requests are async in nature (non-blocking.) This, however, is a fundamental change that requires a lot of testing, in particular performance testing.