Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

community[patch]: Graceful handling of redis errors in RedisCache and AsyncRedisCache #17171

Merged
merged 2 commits into from
Feb 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 34 additions & 16 deletions libs/community/langchain_community/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,22 +461,31 @@ def __init__(self, redis_: Any, *, ttl: Optional[int] = None):
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up based on prompt and llm_string."""
# Read from a Redis HASH
results = self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two other reasonable behaviors.

  1. Retry once prior to giving up
  2. Circuit breaker pattern (out of scope)

What are your thoughts on these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Adding retry with some timeout make sense.
  2. About circuit breaker for long term failure scenario:
    • We keep logging errors while redis is down until circuit is open.
    • When circuit is open do the error logs continue for redis failure in this duration?
      Or
    • Circuit is open we log error that the circuit is open so redis is not being used?

Ideally it would be better to continue showing error logs since the api costs are incurred without cache, the service user should be aware of the redis failure even during the open circuit state from continued failure.

I can start with retry and close this and start a new pr for circuit breaker considering all the scenarios or do it here. Let me know what do you suggest.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think users would appreciate being able to configure the retry behavior since a retry involves some additional latency and isn't guaranteed to pay off.

Would you be able to suggest a parameterization for the initializer that you think would make sense to configure the retry behavior?


For a circuit breaker, I think it probably make sense to log periodically, but not every request. We should probably figure out a separate way to tackle this. I'd appreciate if we could design a circuit breaker as a higher level primitive that accepts a cache as an instance and dresses it up as a circuit breaker


cc @cbornet / @dzmitry-kankalovich tagging you in case this is of interest for you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even as-is current change set is great, because cache layer IO should not break LLM calls in my opinion.

Taking it further, I do agree that more configuration around it would be nice, like the fact whether it should error out or not (default "not"?); and if retry to be added, then its definitely need to have configuration for that as well (which I'd probably set to no retry by default? you rightfully mentioned with cache retries you need to be sure this is what you want, as it can just defeat the purpose of a cache in the first place).

And yes, a general circuit breaker for cache interface would be great, but this is probably a substantial changeset, which IDK if @snsten would go for.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on all points!

@snsten do you want to proceed with the PR as is and just update the logger.warning -> logger.error? This will probably spam someone's logs at some point, but at least their service will stay up.

And if you want to tackle more configuration separately that will be great (or a circuit breaker).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will change the error log only in this pr for the two classes mentioned

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

results = self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
except Exception as e:
logger.error(f"Redis lookup failed: {e}")
return None

def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on prompt and llm_string."""
self._ensure_generation_type(return_val)
key = self._key(prompt, llm_string)

with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
pipe.execute()
try:
with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
pipe.execute()
except Exception as e:
logger.error(f"Redis update failed: {e}")

def clear(self, **kwargs: Any) -> None:
"""Clear cache. If `asynchronous` is True, flush asynchronously."""
asynchronous = kwargs.get("asynchronous", False)
self.redis.flushdb(asynchronous=asynchronous, **kwargs)
try:
asynchronous = kwargs.get("asynchronous", False)
self.redis.flushdb(asynchronous=asynchronous, **kwargs)
except Exception as e:
logger.error(f"Redis clear failed: {e}")


class AsyncRedisCache(_RedisCacheBase):
Expand Down Expand Up @@ -525,8 +534,12 @@ def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:

async def alookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""Look up based on prompt and llm_string. Async version."""
results = await self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
try:
results = await self.redis.hgetall(self._key(prompt, llm_string))
return self._get_generations(results) # type: ignore[arg-type]
except Exception as e:
logger.error(f"Redis async lookup failed: {e}")
return None

def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""Update cache based on prompt and llm_string."""
Expand All @@ -541,10 +554,12 @@ async def aupdate(
"""Update cache based on prompt and llm_string. Async version."""
self._ensure_generation_type(return_val)
key = self._key(prompt, llm_string)

async with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
await pipe.execute() # type: ignore[attr-defined]
try:
async with self.redis.pipeline() as pipe:
self._configure_pipeline_for_update(key, pipe, return_val, self.ttl)
await pipe.execute() # type: ignore[attr-defined]
except Exception as e:
logger.error(f"Redis async update failed: {e}")

def clear(self, **kwargs: Any) -> None:
"""Clear cache. If `asynchronous` is True, flush asynchronously."""
Expand All @@ -558,8 +573,11 @@ async def aclear(self, **kwargs: Any) -> None:
Clear cache. If `asynchronous` is True, flush asynchronously.
Async version.
"""
asynchronous = kwargs.get("asynchronous", False)
await self.redis.flushdb(asynchronous=asynchronous, **kwargs)
try:
asynchronous = kwargs.get("asynchronous", False)
await self.redis.flushdb(asynchronous=asynchronous, **kwargs)
except Exception as e:
logger.error(f"Redis async clear failed: {e}")


class RedisSemanticCache(BaseCache):
Expand Down