Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kombu redis requeue concurrency bug #1800

Closed
jiangxianfu opened this issue Sep 27, 2023 · 2 comments
Closed

kombu redis requeue concurrency bug #1800

jiangxianfu opened this issue Sep 27, 2023 · 2 comments
Milestone

Comments

@jiangxianfu
Copy link
Contributor

jiangxianfu commented Sep 27, 2023

problem:

When I have a queue of 3 consumers consuming messages, sometimes I don't get the complete data correctly.

for debug :
i modify file ,add print log [py3/lib/python3.10/site-packages/kombu/transport/redis.py ]

 417     def restore_by_tag(self, tag, client=None, leftmost=False):
 418 
 419         def restore_transaction(pipe):
 420             p = pipe.hget(self.unacked_key, tag)
 421             pipe.multi()
 422             self._remove_from_indices(tag, pipe)
 423             if p:
 424                 M, EX, RK = loads(bytes_to_str(p))  # json is unicode
 425                 self.channel._do_restore_message(M, EX, RK, pipe, leftmost)
 426                 **print('restore tag')**
 427             else:
 428                 **print('un restore tag')**
 429 
 430         with self.channel.conn_or_acquire(client) as client:
 431             client.transaction(restore_transaction, self.unacked_key)

Reproduction step:

  1. create a queue and enqueue 3 items data

(py3) [jiangxf@xxx ~]$ python test_kombu.py --init
Namespace(init=True, requeue=False, ack=False)
ok 0
ok 1
ok 2

  1. simulate two consumer to get item from redis queue, but only requeue data

(py3) [jiangxf@xxx ~]$ python test_kombu.py --requeue
Namespace(init=False, requeue=True, ack=False)
{'test-1': 1695786971.69251}
restore tag
finished-requeue
{'test-2': 1695786971.6956441}
restore tag
finished-requeue
{'formtest-0': 1695785710.66742}
restore tag
finished-requeue
{'formtest-1': 1695785711.2942934}

py3) [jiangxf@xxx ~]$ python test_kombu.py --requeue
Namespace(init=False, requeue=True, ack=False)
{'test-1': 1695787583.5387304}
restore tag
finished-requeue
{'test-0': 1695787583.5360556}
restore tag
finished-requeue
{'test-2': 1695787583.5412734}
restore tag
finished-requeue
{'test-2': 1695787583.5412734}
restore tag
finished-requeue
{'test-0': 1695787583.5360556}
un restore tag ---the data is missing
finished-requeue

  1. i simulate a normal consumer to get item from redis queue

(py3) [jiangxf@xxx ~]$ python test_kombu.py --ack
Namespace(init=False, requeue=False, ack=True)
{'test-1': 1695787583.5387304}
finished-ack

{'test-2': 1695787583.5412734}
finished-ack

test file:
test_kombu.txt
result:
test-0 is missing

@jiangxianfu jiangxianfu changed the title kombu requeue concurrency bug kombu redis requeue concurrency bug Sep 27, 2023
@auvipy
Copy link
Member

auvipy commented Oct 4, 2023

If you can try to fix it, that will be highly appreciated

jiangxianfu added a commit to jiangxianfu/kombu that referenced this issue Oct 7, 2023
@jiangxianfu
Copy link
Contributor Author

If you can try to fix it, that will be highly appreciated

Ok, I push the request and The specific problem is described.

auvipy pushed a commit that referenced this issue Oct 30, 2023
* fix: redis requeue concurrency bug  #1800

* fix: add unit test

* fix: update
@auvipy auvipy closed this as completed Nov 14, 2023
@auvipy auvipy added this to the 5.3.x milestone Nov 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants