Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash when using global_keyprefix with a sentinel connection #1838

Merged
merged 5 commits into from Jan 9, 2024

Conversation

adam-homeboost
Copy link
Contributor

Fix for #1809.

When global_keyprefix is set, self.Client is no longer a class, it is a closure to create the class with the prefix argument. This breaks in a place that expects this to be a class and not just any callable. Fortunately, in this particular case, the place that is breaking doesn't actually need or care about the prefix and so it is safe to use the main redis client class. It only cares about the pool that comes back. This fix does not prevent the key prefix from being used later by kombu's own client extension.

@auvipy auvipy self-requested a review January 4, 2024 10:19
kombu/transport/redis.py Outdated Show resolved Hide resolved
@50-Course
Copy link

Hello @adam-homeboost,thank you for providing this patch. I re-traced the steps and made a comparable build of both patches (this and replicated environment containing the bug).

Here are my analysis:

before - matching your exact debug information:

[2024-01-08 09:55:01,745: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'functools.partial' object has no attribute 'from_pool'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 951, in create_channel
    return self._avail_channels.pop()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/connection.py", line 21, in start
    c.connection = c.connect()
                   ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 469, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 475, in connection_for_read
    return self.ensure_connected(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 526, in ensure_connected
    conn = conn.ensure_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 406, in ensure_connection
    self._ensure_connection(*args, **kwargs)
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 459, in _ensure_connection
    return retry_over_time(
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
    return fun(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 934, in _connection_factory
    self._connection = self._establish_connection()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 860, in _establish_connection
    conn = self.transport.establish_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 975, in establish_connection
    self._avail_channels.append(self.create_channel(self))
                                ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 953, in create_channel
    channel = self.Channel(connection)
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 741, in __init__
    self.client.ping()
    ^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/utils/objects.py", line 31, in __get__
    return super().__get__(instance, owner)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1254, in client
    return self._create_client(asynchronous=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1210, in _create_client
    return self.Client(connection_pool=self.async_pool)
                                       ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1248, in async_pool
    self._async_pool = self._get_pool(asynchronous=True)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1433, in _get_pool
    return self._sentinel_managed_pool(asynchronous)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1427, in _sentinel_managed_pool
    return sentinel_inst.master_for(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/redis/sentinel.py", line 356, in master_for
    return redis_class.from_pool(
           ^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'functools.partial' object has no attribute 'from_pool'

after:

I am trying to run the tests, but hitting a few dead-end here. Precisely, I can't catch the test with:

$ python t/unit/transport/test_redis.py::test_can_create_connection_with_global_prefix

Other commands I have tried:

$ python -k <test_name::removed_for_conciseness> -o python_files=t/unit/transport/test_redis.py

Screenshots:

image

image

Comment on lines 1740 to 1748
connection = Connection(
'sentinel://localhost:65534/',
transport_options={
'global_keyprefix': 'some_prefix',
'master_name': 'not_important',
},
)
with pytest.raises(ConnectionError):
connection.channel()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider one more approach here:

try:
    connection = Connection(
        'sentinel://localhost:65534/',
        transport_options={
            'global_keyprefix': 'some_prefix',
            'master_name': 'not_important',
        },
    )
    with pytest.raises(ConnectionError):
        connection.channel()
finally:
    connection.close()  # Ensure connection is closed even if an exception occurs

Should we consider testing potential exceptions that could occur during connection creation??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@50-Course

These tests are inside a test class. Try:

pytest t/unit/transport/test_redis.py::test_RedisSentinel::test_can_create_connection_with_global_keyprefix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@50-Course

These tests are inside a test class. Try:

pytest t/unit/transport/test_redis.py::test_RedisSentinel::test_can_create_connection_with_global_keyprefix

class test_Redis:

    def setup(self):
        self.connection = Connection(transport=Transport)
        self.exchange = Exchange('test_Redis', type='direct')
        self.queue = Queue('test_Redis', self.exchange, 'test_Redis')

    def teardown(self):
        self.connection.close()

If I am not mistaken it means it does not apply to your connection obj so I agree with a safer approach like @50-Course proposed.

Also, reviewing a sample of the other tests shows most tests have consistent resource cleanup manually, so it would make sense to apply it here as well.

Copy link

@50-Course 50-Course left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @adam-homeboost,

Thanks for the PR addressing the global_keyprefix issue! I was able to reproduce the #1809 simulations locally (check out https://github.com/50-Course/global-prefix-error-celery for my setup).

Overall, the proposed fix makes sense and avoids future key prefix problems. The explanation is clear and the logic seems solid.

What if we added a test case to cover potential connection creation errors? Here's an example:

  • This would catch any unexpected problems while connecting and further solidify our confidence in the fix.

  • Besides that, the PR looks great! With suggested changes, I'd be happy to give it a green light.

Any thoughts on potential side effects? I am open to discussion!

cc: @auvipy

@adam-homeboost
Copy link
Contributor Author

adam-homeboost commented Jan 8, 2024

In terms of adding things to the test, I hesitate because:

  1. This is just the testing of the parameterization of the connection, not the connection itself.

  2. This specific structure is a direct copy of the other sentinel connection tests in the same test class and follows the same methodology. Note that this does not include connection to a REAL sentinel as that is tested elsewhere, as are the specific varieties of exceptions that can happen.

  3. I believe strongly that unit tests should be as focused on the specific one thing they are testing as possible as this makes the code simpler, easier to understand and easier for future contributors to grasp and contribute to. Testing specific kinds of connection error outside the scope of this change belong elsewhere (in my opinion) and (hopefully) already exist in the main redis module.

But open to other opinions, just not sure where I would even begin.

Copy link
Member

@Nusnus Nusnus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All in all looks good - just needs to resolve the comments on the PR

@@ -1429,7 +1429,7 @@ def _sentinel_managed_pool(self, asynchronous=False):

return sentinel_inst.master_for(
master_name,
self.Client,
redis.StrictRedis,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StrictRedis is deprecated.
Better use Redis instead.

Comment on lines 1740 to 1748
connection = Connection(
'sentinel://localhost:65534/',
transport_options={
'global_keyprefix': 'some_prefix',
'master_name': 'not_important',
},
)
with pytest.raises(ConnectionError):
connection.channel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@50-Course

These tests are inside a test class. Try:

pytest t/unit/transport/test_redis.py::test_RedisSentinel::test_can_create_connection_with_global_keyprefix

class test_Redis:

    def setup(self):
        self.connection = Connection(transport=Transport)
        self.exchange = Exchange('test_Redis', type='direct')
        self.queue = Queue('test_Redis', self.exchange, 'test_Redis')

    def teardown(self):
        self.connection.close()

If I am not mistaken it means it does not apply to your connection obj so I agree with a safer approach like @50-Course proposed.

Also, reviewing a sample of the other tests shows most tests have consistent resource cleanup manually, so it would make sense to apply it here as well.

@adam-homeboost
Copy link
Contributor Author

As requested: changed connection class to non-deprecated, added resource cleanup to exception test.

Copy link
Member

@Nusnus Nusnus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Waiting for @50-Course @auvipy to approve/comment before merge.

@50-Course
Copy link

Foremost, thank you for the heads-up on pytest issue. Found out it slipped past me, the omission of the TestClass - test_channel of which the function in question belong to and found it much later.

Onto our conversation,

In terms of adding things to the test, I hesitate because:

1. This is just the testing of the parameterization of the connection, not the connection itself.

Thank you for clarification, I am well aware this is parameterization of the connection construct, precisely addressing the issue of global_prefix when creating the connection.

2. This specific structure is a direct copy of the other sentinel connection tests in the same test class and follows the same methodology.  Note that this does not include connection to a REAL sentinel as that is tested elsewhere, as are the specific varieties of exceptions that can happen.

Following this up, I have reviewed the few other tests, while in this scenario the test scope aims to catch the error from the connection such as the get_pool_error. I was suggesting additional tests could as well be considered so as not to leave the connection pool not cleaned up.

3. I believe strongly that unit tests should be as focused on the specific one thing they are testing as possible as this makes the code simpler, easier to understand and easier for future contributors to grasp and contribute to.  Testing specific kinds of connection error outside the scope of this change belong elsewhere (in my opinion) and (hopefully) already exist in the main redis module.

Point understood! I am also a firm believer of the single-responsibility principle. And this would have been a good approach, However, given the a connection setup, we might as well clean it up properly as found in the snippet shared by @Nusnus above.

Copy link

@50-Course 50-Course left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀

@auvipy auvipy merged commit 159e9df into celery:main Jan 9, 2024
17 checks passed
@Nusnus
Copy link
Member

Nusnus commented Jan 9, 2024

Good work all!
Thank you @adam-homeboost for your contribution ❤️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants