Skip to content

Commit

Permalink
Make the connection callback methods public again, add documentation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kristjanvalur authored and dvora-h committed Feb 25, 2024
1 parent 7c13191 commit 3f4f5e3
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Connection.register_connect_callback() is made public.
* Fix async `read_response` to use `disable_decoding`.
* Add 'aclose()' methods to async classes, deprecate async close().
* Fix #2831, add auto_close_connection_pool=True arg to asyncio.Redis.from_url()
Expand Down
6 changes: 3 additions & 3 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ async def __aexit__(self, exc_type, exc_value, traceback):

def __del__(self):
if self.connection:
self.connection._deregister_connect_callback(self.on_connect)
self.connection.deregister_connect_callback(self.on_connect)

async def aclose(self):
# In case a connection property does not yet exist
Expand All @@ -795,7 +795,7 @@ async def aclose(self):
async with self._lock:
if self.connection:
await self.connection.disconnect()
self.connection._deregister_connect_callback(self.on_connect)
self.connection.deregister_connect_callback(self.on_connect)
await self.connection_pool.release(self.connection)
self.connection = None
self.channels = {}
Expand Down Expand Up @@ -858,7 +858,7 @@ async def connect(self):
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection._register_connect_callback(self.on_connect)
self.connection.register_connect_callback(self.on_connect)
else:
await self.connection.connect()
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
Expand Down
16 changes: 14 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,24 @@ def repr_pieces(self):
def is_connected(self):
return self._reader is not None and self._writer is not None

def _register_connect_callback(self, callback):
def register_connect_callback(self, callback):
"""
Register a callback to be called when the connection is established either
initially or reconnected. This allows listeners to issue commands that
are ephemeral to the connection, for example pub/sub subscription or
key tracking. The callback must be a _method_ and will be kept as
a weak reference.
"""
wm = weakref.WeakMethod(callback)
if wm not in self._connect_callbacks:
self._connect_callbacks.append(wm)

def _deregister_connect_callback(self, callback):
def deregister_connect_callback(self, callback):
"""
De-register a previously registered callback. It will no-longer receive
notifications on connection events. Calling this is not required when the
listener goes away, since the callbacks are kept as weak methods.
"""
try:
self._connect_callbacks.remove(weakref.WeakMethod(callback))
except ValueError:
Expand Down
4 changes: 2 additions & 2 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ def __del__(self) -> None:
def reset(self) -> None:
if self.connection:
self.connection.disconnect()
self.connection._deregister_connect_callback(self.on_connect)
self.connection.deregister_connect_callback(self.on_connect)
self.connection_pool.release(self.connection)
self.connection = None
self.health_check_response_counter = 0
Expand Down Expand Up @@ -751,7 +751,7 @@ def execute_command(self, *args):
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection._register_connect_callback(self.on_connect)
self.connection.register_connect_callback(self.on_connect)
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
self.connection._parser.set_push_handler(self.push_handler_func)
connection = self.connection
Expand Down
2 changes: 1 addition & 1 deletion redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@ def execute_command(self, *args):
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection._register_connect_callback(self.on_connect)
self.connection.register_connect_callback(self.on_connect)
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
self.connection._parser.set_push_handler(self.push_handler_func)
connection = self.connection
Expand Down
16 changes: 14 additions & 2 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,24 @@ def _construct_command_packer(self, packer):
else:
return PythonRespSerializer(self._buffer_cutoff, self.encoder.encode)

def _register_connect_callback(self, callback):
def register_connect_callback(self, callback):
"""
Register a callback to be called when the connection is established either
initially or reconnected. This allows listeners to issue commands that
are ephemeral to the connection, for example pub/sub subscription or
key tracking. The callback must be a _method_ and will be kept as
a weak reference.
"""
wm = weakref.WeakMethod(callback)
if wm not in self._connect_callbacks:
self._connect_callbacks.append(wm)

def _deregister_connect_callback(self, callback):
def deregister_connect_callback(self, callback):
"""
De-register a previously registered callback. It will no-longer receive
notifications on connection events. Calling this is not required when the
listener goes away, since the callbacks are kept as weak methods.
"""
try:
self._connect_callbacks.remove(weakref.WeakMethod(callback))
except ValueError:
Expand Down

0 comments on commit 3f4f5e3

Please sign in to comment.