Skip to content

Commit

Permalink
6.3.2 (#167)
Browse files Browse the repository at this point in the history
* fix: intermittent disconnections from write errors
  • Loading branch information
aembke committed Sep 19, 2023
1 parent 0b2887f commit a9bf850
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,7 @@
## 6.3.2

* Fix a bug with connection errors unexpectedly ending the connection task.

## 6.3.1

* Update various dependencies
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "fred"
version = "6.3.1"
version = "6.3.2"
authors = ["Alec Embke <aembke@gmail.com>"]
edition = "2021"
description = "An async Redis client built on Tokio."
Expand Down
6 changes: 5 additions & 1 deletion src/router/centralized.rs
Expand Up @@ -26,7 +26,11 @@ pub async fn send_command(
Ok(utils::write_command(inner, writer, command, force_flush).await)
} else {
_debug!(inner, "Failed to read connection for {}", command.kind.to_str_debug());
Err((RedisError::new(RedisErrorKind::IO, "Missing connection."), command))
Ok(Written::Disconnect((
None,
Some(command),
RedisError::new(RedisErrorKind::IO, "Missing connection."),
)))
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/router/clustered.rs
Expand Up @@ -106,7 +106,12 @@ pub async fn send_command(
server,
command.kind.to_str_debug()
);
Err((RedisError::new(RedisErrorKind::IO, "Missing connection."), command))

Ok(Written::Disconnect((
Some(server.clone()),
Some(command),
RedisError::new(RedisErrorKind::IO, "Missing connection."),
)))
}
}

Expand Down Expand Up @@ -162,7 +167,7 @@ pub async fn send_all_cluster_command(
if let Written::Disconnect((server, _, err)) = utils::write_command(inner, writer, cmd, true).await {
_debug!(
inner,
"Exit all nodes command early ({}/{}: {}) from error: {:?}",
"Exit all nodes command early ({}/{}: {:?}) from error: {:?}",
idx + 1,
num_nodes,
server,
Expand Down
4 changes: 2 additions & 2 deletions src/router/commands.rs
Expand Up @@ -139,8 +139,8 @@ async fn write_with_backpressure(
continue;
},
Ok(Written::Disconnect((server, command, error))) => {
_debug!(inner, "Handle disconnect for {} from {:?}", server, error);
let commands = router.connections.disconnect(inner, Some(&server)).await;
_debug!(inner, "Handle disconnect for {:?} due to {:?}", server, error);
let commands = router.connections.disconnect(inner, server.as_ref()).await;
router.buffer_commands(commands);
if let Some(command) = command {
router.buffer_command(command);
Expand Down
8 changes: 4 additions & 4 deletions src/router/mod.rs
Expand Up @@ -44,7 +44,7 @@ pub enum Written {
/// Indicates that the command was sent to all servers.
SentAll,
/// Disconnect from the provided server and retry the command later.
Disconnect((Server, Option<RedisCommand>, RedisError)),
Disconnect((Option<Server>, Option<RedisCommand>, RedisError)),
/// Indicates that the result should be ignored since the command will not be retried.
Ignore,
/// (Cluster only) Synchronize the cached cluster routing table and retry.
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Connections {
if let Some(writer) = writers.remove(server) {
_debug!(inner, "Disconnecting from {}", writer.server);
let commands = writer.graceful_close().await;
out.extend(commands.into_iter());
out.extend(commands);
}
}
out
Expand Down Expand Up @@ -784,7 +784,7 @@ impl Router {

match write_result {
Written::Disconnect((server, command, error)) => {
let buffer = self.connections.disconnect(&inner, Some(&server)).await;
let buffer = self.connections.disconnect(&inner, server.as_ref()).await;
self.buffer_commands(buffer);
self.sync_network_timeout_state();

Expand Down Expand Up @@ -996,7 +996,7 @@ impl Router {
}

warn!(
"{}: Disconnect from {} while replaying command: {:?}",
"{}: Disconnect from {:?} while replaying command: {:?}",
self.inner.id, server, error
);
self.disconnect_all().await; // triggers a reconnect if needed
Expand Down
4 changes: 2 additions & 2 deletions src/router/utils.rs
Expand Up @@ -173,10 +173,10 @@ pub async fn write_command(
_debug!(inner, "Error sending command {}: {:?}", command.kind.to_str_debug(), e);
if command.should_send_write_error(inner) {
command.respond_to_caller(Err(e.clone()));
Written::Disconnect((writer.server.clone(), None, e))
Written::Disconnect((Some(writer.server.clone()), None, e))
} else {
inner.notifications.broadcast_error(e.clone());
Written::Disconnect((writer.server.clone(), Some(command), e))
Written::Disconnect((Some(writer.server.clone()), Some(command), e))
}
} else {
Written::Sent((writer.server.clone(), should_flush))
Expand Down

0 comments on commit a9bf850

Please sign in to comment.