Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streamline client side caching API typing #3216

Merged
merged 12 commits into from
May 8, 2024
36 changes: 36 additions & 0 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,18 @@ def lock(
thread_local=thread_local,
)

def flush_cache(self):
if self.nodes_manager:
self.nodes_manager.flush_cache()

def delete_command_from_cache(self, command):
if self.nodes_manager:
self.nodes_manager.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key):
if self.nodes_manager:
self.nodes_manager.invalidate_key_from_cache(key)


class ClusterNode:
"""
Expand Down Expand Up @@ -1107,6 +1119,18 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:

return ret

def flush_cache(self):
for connection in self._connections:
connection.flush_cache()

def delete_command_from_cache(self, command):
for connection in self._connections:
connection.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key):
for connection in self._connections:
connection.invalidate_key_from_cache(key)


class NodesManager:
__slots__ = (
Expand Down Expand Up @@ -1392,6 +1416,18 @@ def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
return self.address_remap((host, port))
return host, port

def flush_cache(self):
for node in self.nodes_cache.values():
node.flush_cache()

def delete_command_from_cache(self, command):
for node in self.nodes_cache.values():
node.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key):
for node in self.nodes_cache.values():
node.invalidate_key_from_cache(key)


class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands):
"""
Expand Down
42 changes: 24 additions & 18 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,27 @@ def _add_to_local_cache(
):
self.client_cache.set(command, response, keys)

def flush_cache(self):
try:
if self.client_cache:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the if inside the try block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try is meant to compensate for the if? You mean we could leave the if without the try? I would find that cleaner.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if-statement will prevent the call to flush/delete_command/invalidate_key that would cause the AttributeError if client_cache is falsy. The try-catch-block will also catch an AttributeError if client_cache is truthy but doesn't provide the method.
I'm missing to much context to say which is more desirable. But if it can be assumed, that a thruthy client_cache always also provides the methods, then the if-statement and try-catch are redundant and it becomes a question of style. You can read more on that here https://realpython.com/python-lbyl-vs-eafp/
Best regards, a random passerby XD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @CBerndt-Work for your nice explanation. The article is very well written. I realized that I am by now too conditioned towards LBYL, from other languages. Especially here, using AttributeError, which signals issues with the code structure, not with data. So I went for the ifs.

self.client_cache.flush()
except AttributeError:
pass

def delete_command_from_cache(self, command):
try:
if self.client_cache:
self.client_cache.delete_command(command)
except AttributeError:
pass

def invalidate_key_from_cache(self, key):
try:
if self.client_cache:
self.client_cache.invalidate_key(key)
except AttributeError:
pass


class Connection(AbstractConnection):
"Manages TCP communication to and from a Redis server"
Expand Down Expand Up @@ -1252,33 +1273,18 @@ def set_retry(self, retry: "Retry") -> None:

def flush_cache(self):
connections = chain(self._available_connections, self._in_use_connections)

for connection in connections:
try:
connection.client_cache.flush()
except AttributeError:
# cache is not enabled
pass
connection.flush_cache()

def delete_command_from_cache(self, command: str):
connections = chain(self._available_connections, self._in_use_connections)

for connection in connections:
try:
connection.client_cache.delete_command(command)
except AttributeError:
# cache is not enabled
pass
connection.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key: str):
connections = chain(self._available_connections, self._in_use_connections)

for connection in connections:
try:
connection.client_cache.invalidate_key(key)
except AttributeError:
# cache is not enabled
pass
connection.invalidate_key_from_cache(key)


class BlockingConnectionPool(ConnectionPool):
Expand Down
33 changes: 12 additions & 21 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,31 +601,22 @@ def parse_response(self, connection, command_name, **options):
return response

def flush_cache(self):
try:
if self.connection:
self.connection.client_cache.flush()
else:
self.connection_pool.flush_cache()
except AttributeError:
pass
if self.connection:
self.connection.flush_cache()
else:
self.connection_pool.flush_cache()

def delete_command_from_cache(self, command):
try:
if self.connection:
self.connection.client_cache.delete_command(command)
else:
self.connection_pool.delete_command_from_cache(command)
except AttributeError:
pass
if self.connection:
self.connection.delete_command_from_cache(command)
else:
self.connection_pool.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key):
try:
if self.connection:
self.connection.client_cache.invalidate_key(key)
else:
self.connection_pool.invalidate_key_from_cache(key)
except AttributeError:
pass
if self.connection:
self.connection.invalidate_key_from_cache(key)
else:
self.connection_pool.invalidate_key_from_cache(key)


StrictRedis = Redis
Expand Down
36 changes: 36 additions & 0 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,18 @@ def load_external_module(self, funcname, func):
"""
setattr(self, funcname, func)

def flush_cache(self):
if self.nodes_manager:
self.nodes_manager.flush_cache()

def delete_command_from_cache(self, command):
if self.nodes_manager:
self.nodes_manager.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key):
if self.nodes_manager:
self.nodes_manager.invalidate_key_from_cache(key)


class ClusterNode:
def __init__(self, host, port, server_type=None, redis_connection=None):
Expand Down Expand Up @@ -1294,6 +1306,18 @@ def __del__(self):
if self.redis_connection is not None:
self.redis_connection.close()

def flush_cache(self):
if self.redis_connection is not None:
self.redis_connection.flush_cache()

def delete_command_from_cache(self, command):
if self.redis_connection is not None:
self.redis_connection.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key):
if self.redis_connection is not None:
self.redis_connection.invalidate_key_from_cache(key)


class LoadBalancer:
"""
Expand Down Expand Up @@ -1660,6 +1684,18 @@ def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
return self.address_remap((host, port))
return host, port

def flush_cache(self):
for node in self.nodes_cache.values():
node.flush_cache()

def delete_command_from_cache(self, command):
for node in self.nodes_cache.values():
node.delete_command_from_cache(command)

def invalidate_key_from_cache(self, key):
for node in self.nodes_cache.values():
node.invalidate_key_from_cache(key)


class ClusterPubSub(PubSub):
"""
Expand Down
21 changes: 21 additions & 0 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,27 @@ def _add_to_local_cache(
):
self.client_cache.set(command, response, keys)

def flush_cache(self):
try:
if self.client_cache:
self.client_cache.flush()
except AttributeError:
pass

def delete_command_from_cache(self, command):
try:
if self.client_cache:
self.client_cache.delete_command(command)
except AttributeError:
pass

def invalidate_key_from_cache(self, key):
try:
if self.client_cache:
self.client_cache.invalidate_key(key)
except AttributeError:
pass


class Connection(AbstractConnection):
"Manages TCP communication to and from a Redis server"
Expand Down