Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub error when running in threads with redis-py 5.0.3 #3184

Open
spicy-sauce opened this issue Mar 14, 2024 · 1 comment
Open

PubSub error when running in threads with redis-py 5.0.3 #3184

spicy-sauce opened this issue Mar 14, 2024 · 1 comment

Comments

@spicy-sauce
Copy link

pubsub.py:

import sys
import time

import redis

# Global variable to indicate if the event handler is invoked
event_handler_invoked = False


def event_handler(message):
    # Event handler function
    global event_handler_invoked
    print("Event handler invoked with message:", message)
    event_handler_invoked = True


def setup_pubsub():
    # Function to set up Redis pub/sub
    # Connect to Redis
    redis_client = redis.Redis(host="localhost", port=12000)

    # Subscribe to a channel pattern
    pubsub = redis_client.pubsub()
    pubsub.psubscribe(**{'test_channel*': event_handler})

    # Start a thread to handle messages
    pubsub.run_in_thread(sleep_time=0.1)


def main():
    setup_pubsub()

    # Publish a test message
    redis_client = redis.Redis(host="localhost", port=12000)
    redis_client.publish('test_channel', 'Hello, Redis!')

    # Wait for the event handler to be invoked
    timeout = 5  # Timeout in seconds
    start_time = time.time()
    while not event_handler_invoked:
        if time.time() - start_time > timeout:
            print("Timeout reached. Event handler not invoked.")
            sys.exit(1)
        time.sleep(0.1)

    print("Event handler successfully invoked.")


if __name__ == "__main__":
    main()

In redis-py 4.6.0 it works as expected:

$ python3 pubsub.py
Event handler invoked with message: {'type': 'pmessage', 'pattern': b'test_channel*', 'channel': b'test_channel', 'data': b'Hello, Redis!'}
Event handler successfully invoked.

With redis-py 5.0.3 it constantly fails with:

$ python3 pubsub.py
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/socket.py", line 69, in _read_from_socket
    buf.write(data)
ValueError: I/O operation on closed file.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 1162, in run
    pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 1024, in get_message
    response = self.parse_response(block=(timeout is None), timeout=timeout)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 835, in parse_response
    response = self._execute(conn, try_read)
  File "/home/ubuntu//venv3.8/lib/python3.8/site-packages/redis/client.py", line 811, in _execute
    return conn.retry.call_with_retry(
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 812, in <lambda>
    lambda: command(*args, **kwargs),
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/client.py", line 829, in try_read
    if not conn.can_read(timeout=timeout):
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/connection.py", line 490, in can_read
    return self._parser.can_read(timeout)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/base.py", line 128, in can_read
    return self._buffer and self._buffer.can_read(timeout)
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/socket.py", line 95, in can_read
    return bool(self.unread_bytes()) or self._read_from_socket(
  File "/home/ubuntu/my-project/venv3.8/lib/python3.8/site-packages/redis/_parsers/socket.py", line 90, in _read_from_socket
    buf.seek(current_pos)
ValueError: I/O operation on closed file.
^CTraceback (most recent call last):
  File "pubsub.py", line 54, in <module>
    main()
  File "pubsub.py", line 48, in main
    time.sleep(0.1)

Tested in Python3.8 and Python3.11, same result.

@kheyer
Copy link

kheyer commented Apr 12, 2024

Running into a similar issue with Celery/Redis. Downgrading to 4.6.0 didn't help. For me, the issue triggers when I try to get the results of celery tasks

Stack trace when gathering celery task results.

     results = await asyncio.gather(*[asyncio.to_thread(task.get) for task in tasks])
   File "/usr/local/lib/python3.9/asyncio/threads.py", line 25, in to_thread
     return await loop.run_in_executor(None, func_call)
   File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 58, in run
     result = self.fn(*self.args, **self.kwargs)
   File "/usr/local/lib/python3.9/site-packages/celery/result.py", line 251, in get
     return self.backend.wait_for_pending(
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 221, in wait_for_pending
     for _ in self._wait_for_pending(result, **kwargs):
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 287, in _wait_for_pending
     for _ in self.drain_events_until(
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 54, in drain_events_until
     yield self.wait_for(p, wait, timeout=interval)
   File "/usr/local/lib/python3.9/site-packages/celery/backends/asynchronous.py", line 63, in wait_for
     wait(timeout=timeout)
   File "/usr/local/lib/python3.9/site-packages/celery/backends/redis.py", line 161, in drain_events
     message = self._pubsub.get_message(timeout=timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1690, in get_message
     response = self.parse_response(block=(timeout is None), timeout=timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1542, in parse_response
     response = self._execute(conn, try_read)
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1518, in _execute
     return conn.retry.call_with_retry(
  File "/usr/local/lib/python3.9/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1519, in <lambda>
     lambda: command(*args, **kwargs),
   File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 1536, in try_read
    if not conn.can_read(timeout=timeout):
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 869, in can_read
    return self._parser.can_read(timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 344, in can_read
     return self._buffer and self._buffer.can_read(timeout)
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 242, in can_read
     return bool(self.unread_bytes()) or self._read_from_socket(
   File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 237, in _read_from_socket
     buf.seek(current_pos)
ValueError: I/O operation on closed file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants