Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Received 0x01 while expecting 0xce" errors when using python-amqp #6232

Closed
mskogorevrmc opened this issue Jul 16, 2020 · 6 comments
Closed

Comments

@mskogorevrmc
Copy link

mskogorevrmc commented Jul 16, 2020

On Debian Linux with Python3.5 with these packages:

celery                 4.4.0
billiard               3.6.3.0
kombu                  4.6.11
amqp                   2.6.0

I use RabbitMQ as brocker.
I occasionally get the following errors:

 Received 0x00 while expecting 0xce
Traceback (most recent call last):
  File "/var/www/my_app/app/mod_axmsg/dispatch.py", line 376, in execute
    result_value = func_meta.func(*task.args_list, **task.kwargs_dict)
  File "/var/www/my_app/app/mod_rmc/libs/health.py", line 91, in update_all_health
    health.update_all_health()
  File "/var/www/my_app/app/mod_core/libs/health.py", line 515, in update_all_health
    result.join()
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/result.py", line 772, in join
    disable_sync_subtasks=disable_sync_subtasks,
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/result.py", line 228, in get
    on_message=on_message,
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 193, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 260, in _wait_for_pending
    on_interval=on_interval):
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 56, in drain_events_until
    yield self.wait_for(p, wait, timeout=1)
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 65, in wait_for
    wait(timeout=timeout)
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/rpc.py", line 63, in drain_events
    return self._connection.drain_events(timeout=timeout)
  File "/usr/local/my_app/lib/python3.5/site-packages/kombu/connection.py", line 323, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/my_app/lib/python3.5/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/connection.py", line 505, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/connection.py", line 510, in blocking_read
    frame = self.transport.read_frame()
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/transport.py", line 280, in read_frame
    'Received {0:#04x} while expecting 0xce'.format(ch))
amqp.exceptions.UnexpectedFrame: Received 0x00 while expecting 0xce

----------------------------------
----------------------------------

 Received 0x50 while expecting 0xce
Traceback (most recent call last):
  File "/var/www/my_app/app/mod_axmsg/dispatch.py", line 376, in execute
    result_value = func_meta.func(*task.args_list, **task.kwargs_dict)
  File "/var/www/my_app/app/mod_rmc/libs/health.py", line 91, in update_all_health
    health.update_all_health()
  File "/var/www/my_app/app/mod_core/libs/health.py", line 570, in update_all_health
    result.join()
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/result.py", line 772, in join
    disable_sync_subtasks=disable_sync_subtasks,
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/result.py", line 228, in get
    on_message=on_message,
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 193, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 260, in _wait_for_pending
    on_interval=on_interval):
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 56, in drain_events_until
    yield self.wait_for(p, wait, timeout=1)
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 65, in wait_for
    wait(timeout=timeout)
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/rpc.py", line 63, in drain_events
    return self._connection.drain_events(timeout=timeout)
  File "/usr/local/my_app/lib/python3.5/site-packages/kombu/connection.py", line 323, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/my_app/lib/python3.5/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/connection.py", line 505, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/connection.py", line 510, in blocking_read
    frame = self.transport.read_frame()
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/transport.py", line 280, in read_frame
    'Received {0:#04x} while expecting 0xce'.format(ch))
amqp.exceptions.UnexpectedFrame: Received 0x50 while expecting 0xce

----------------------------------
----------------------------------

 Received 0x39 while expecting 0xce
Traceback (most recent call last):
  File "/var/www/my_app/app/mod_axmsg/dispatch.py", line 376, in execute
    result_value = func_meta.func(*task.args_list, **task.kwargs_dict)
  File "/var/www/my_app/app/mod_rmc/libs/health.py", line 91, in update_all_health
    health.update_all_health()
  File "/var/www/my_app/app/mod_core/libs/health.py", line 570, in update_all_health
    result.join()
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/result.py", line 772, in join
    disable_sync_subtasks=disable_sync_subtasks,
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/result.py", line 228, in get
    on_message=on_message,
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 193, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 260, in _wait_for_pending
    on_interval=on_interval):
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 56, in drain_events_until
    yield self.wait_for(p, wait, timeout=1)
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 65, in wait_for
    wait(timeout=timeout)
  File "/usr/local/my_app/lib/python3.5/site-packages/celery/backends/rpc.py", line 63, in drain_events
    return self._connection.drain_events(timeout=timeout)
  File "/usr/local/my_app/lib/python3.5/site-packages/kombu/connection.py", line 323, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/my_app/lib/python3.5/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/connection.py", line 505, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/connection.py", line 510, in blocking_read
    frame = self.transport.read_frame()
  File "/usr/local/my_app/lib/python3.5/site-packages/amqp/transport.py", line 280, in read_frame
    'Received {0:#04x} while expecting 0xce'.format(ch))
amqp.exceptions.UnexpectedFrame: Received 0x39 while expecting 0xce

There are other tickets, but there is no activity in them: #1779 and #2066

@auvipy
Copy link
Member

auvipy commented Jul 16, 2020

update to latest

@kheyer
Copy link

kheyer commented Apr 13, 2024

Encountering this with the following versions:

amqp              5.2.0
billiard          4.2.0
celery            5.3.6
kombu             5.3.7

Using the RabbitMQ 3.13.1 docker image

Is there a solution or workaround? Seems to be triggered by starting several tasks at once

@Nusnus
Copy link
Member

Nusnus commented Apr 13, 2024

Encountering this with the following versions:

amqp              5.2.0
billiard          4.2.0
celery            5.3.6
kombu             5.3.7

Using the RabbitMQ 3.13.1 docker image

Is there a solution or workaround? Seems to be triggered by starting several tasks at once

This error seems to happen when two amqp connections share the same socket which causes them to be blocked on recv but once one connection is unblocked the other gets unblocked as well and reads a buffer it shouldn't which breaks the AMQP protocol.

@kheyer
Copy link

kheyer commented Apr 13, 2024

Can anything be done about that?

Edit: for anyone who runs into this, my current workaround is replacing Celery/Redis with Dask Distributed

@Nusnus
Copy link
Member

Nusnus commented Apr 14, 2024

Can anything be done about that?

Do you have steps to reproduce?

@kheyer
Copy link

kheyer commented Apr 14, 2024

I don't have a minimal reproducible example, but I believe this is an issue that has been kicking around for some time caused by lots of tasks being started concurrently.

My system uses a web app to start celery tasks. There are often bursts of a hundred of so tasks being started concurrently.

Initially I was using Celery with a Redis backend, and I ran into #6335 and redis/redis-py#3184

I swapped the Redis backend for RabbitMQ, which produced the Received 0x01 while expecting 0xce issue.

Comments on #6335 speculate that the issue is caused by two threads reading from the same socket, which would be the same underlying mechanism as your thinking about two amqp connections sharing the same socket.

It seems that when a lot of celery tasks are started at the same time, there is an issue with two connections/threads/etc connecting to the same socket, producing different errors based on which broker is used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants