Skip to content

Commit

Permalink
Fix bugs from BalancerAddress changes
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK committed Aug 30, 2023
1 parent 311f878 commit 45c0065
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 8 deletions.
11 changes: 8 additions & 3 deletions src/Grpc.Net.Client/Balancer/Subchannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endregion

#if SUPPORT_LOAD_BALANCING
using System.Diagnostics;
using System.Net;
using Grpc.Core;
using Grpc.Net.Client.Balancer.Internal;
Expand Down Expand Up @@ -173,6 +174,10 @@ public void UpdateAddresses(IReadOnlyList<BalancerAddress> addresses)
return;
}

// Get a copy of the current address before updating addresses.
// Updating addresses to not contain this value changes the property to return null.
var currentAddress = CurrentAddress;

_addresses.Clear();
_addresses.AddRange(addresses);

Expand All @@ -186,11 +191,11 @@ public void UpdateAddresses(IReadOnlyList<BalancerAddress> addresses)
requireReconnect = true;
break;
case ConnectivityState.Ready:
// Transport uses the subchannel lock but take copy in an abundance of caution.
var currentAddress = CurrentAddress;
// Check if the subchannel is connected to an address that's not longer present.
// In this situation require the subchannel to reconnect to a new address.
if (currentAddress != null)
{
if (GetAddressByEndpoint(_addresses, currentAddress.EndPoint) != null)
if (GetAddressByEndpoint(_addresses, currentAddress.EndPoint) is null)
{
SubchannelLog.ConnectedAddressNotInUpdatedAddresses(_logger, Id, currentAddress);
requireReconnect = true;
Expand Down
11 changes: 8 additions & 3 deletions src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endregion

#if SUPPORT_LOAD_BALANCING
using System.Diagnostics;
using Grpc.Core;
using Grpc.Net.Client.Balancer.Internal;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -144,8 +145,11 @@ public override void UpdateChannelState(ChannelState state)
// The new subchannel address has the same endpoint so the connection isn't impacted.
if (!BalancerAddressEqualityComparer.Instance.Equals(address, newOrCurrentSubchannel.Address))
{
newOrCurrentSubchannel = new AddressSubchannel(
newOrCurrentSubchannel.Subchannel,
address,
newOrCurrentSubchannel.LastKnownState);
newOrCurrentSubchannel.Subchannel.UpdateAddresses(new[] { address });
newOrCurrentSubchannel = new AddressSubchannel(newOrCurrentSubchannel.Subchannel, address);
}

SubchannelLog.SubchannelPreserved(_logger, newOrCurrentSubchannel.Subchannel.Id, address);
Expand Down Expand Up @@ -306,15 +310,16 @@ protected override void Dispose(bool disposing)
/// <returns>A subchannel picker.</returns>
protected abstract SubchannelPicker CreatePicker(IReadOnlyList<Subchannel> readySubchannels);

[DebuggerDisplay("Subchannel = {Subchannel.Id}, Address = {Address}, LastKnownState = {LastKnownState}")]
private sealed class AddressSubchannel
{
private ConnectivityState _lastKnownState;

public AddressSubchannel(Subchannel subchannel, BalancerAddress address)
public AddressSubchannel(Subchannel subchannel, BalancerAddress address, ConnectivityState lastKnownState = ConnectivityState.Idle)
{
Subchannel = subchannel;
Address = address;
_lastKnownState = ConnectivityState.Idle;
_lastKnownState = lastKnownState;
}

// Track connectivity state that has been updated to load balancer.
Expand Down
19 changes: 18 additions & 1 deletion test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,18 @@ public async Task ChangeAddresses_HasReadySubchannel_OldSubchannelShutdown()
});

services.AddSingleton<ResolverFactory>(new TestResolverFactory(resolver));
services.AddSingleton<ISubchannelTransportFactory>(new TestSubchannelTransportFactory());

var subChannelConnections = new List<Subchannel>();
var transportFactory = new TestSubchannelTransportFactory((s, c) =>
{
lock (subChannelConnections)
{
subChannelConnections.Add(s);
}
return Task.FromResult(new TryConnectResult(ConnectivityState.Ready));
});
services.AddSingleton<ISubchannelTransportFactory>(transportFactory);

var serviceProvider = services.BuildServiceProvider();
var logger = serviceProvider.GetRequiredService<ILoggerProvider>().CreateLogger(GetType().FullName!);

Expand Down Expand Up @@ -95,6 +106,12 @@ public async Task ChangeAddresses_HasReadySubchannel_OldSubchannelShutdown()
Assert.AreEqual(1, subchannels[0]._addresses.Count);
Assert.AreEqual(new DnsEndPoint("localhost", 81), subchannels[0]._addresses[0].EndPoint);
Assert.AreEqual(ConnectivityState.Ready, subchannels[0].State);

lock (subChannelConnections)
{
Assert.AreEqual(2, subChannelConnections.Count);
Assert.AreSame(subChannelConnections[0], subChannelConnections[1]);
}
}

[Test]
Expand Down
38 changes: 37 additions & 1 deletion test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,15 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()

var connectState = ConnectivityState.Ready;

var transportFactory = new TestSubchannelTransportFactory((s, c) => Task.FromResult(new TryConnectResult(connectState)));
var subChannelConnections = new List<Subchannel>();
var transportFactory = new TestSubchannelTransportFactory((s, c) =>
{
lock (subChannelConnections)
{
subChannelConnections.Add(s);
}
return Task.FromResult(new TryConnectResult(connectState));
});
services.AddSingleton<TestResolver>(s =>
{
return new TestResolver(
Expand Down Expand Up @@ -351,9 +359,15 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()
Assert.AreEqual(new DnsEndPoint("localhost", 82), subchannels[2]._addresses[0].EndPoint);

// Preserved because port 81, 82 is in both refresh results
var discardedSubchannel = subchannels[0];
var preservedSubchannel1 = subchannels[1];
var preservedSubchannel2 = subchannels[2];

await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(
serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(GetType()),
channel.ConnectionManager,
expectedCount: 3).DefaultTimeout();

var address2 = new BalancerAddress("localhost", 82);
address2.Attributes.Set(new BalancerAttributesKey<int>("test"), 1);

Expand All @@ -364,7 +378,13 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()
new BalancerAddress("localhost", 83)
});

await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync(
serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(GetType()),
channel.ConnectionManager,
expectedCount: 3).DefaultTimeout();

subchannels = channel.ConnectionManager.GetSubchannels();
var newSubchannel = subchannels[2];
Assert.AreEqual(3, subchannels.Count);

Assert.AreEqual(1, subchannels[0]._addresses.Count);
Expand All @@ -379,6 +399,22 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged()

// Test that the channel's address was updated with new attribute with new attributes.
Assert.AreSame(preservedSubchannel2.CurrentAddress, address2);

lock (subChannelConnections)
{
try
{
Assert.AreEqual(4, subChannelConnections.Count);
Assert.Contains(discardedSubchannel, subChannelConnections);
Assert.Contains(preservedSubchannel1, subChannelConnections);
Assert.Contains(preservedSubchannel2, subChannelConnections);
Assert.Contains(newSubchannel, subChannelConnections);
}
catch (Exception ex)
{
throw new Exception("Connected subchannels: " + Environment.NewLine + string.Join(Environment.NewLine, subChannelConnections), ex);
}
}
}
}
#endif

0 comments on commit 45c0065

Please sign in to comment.