Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer/leastrequest: Add least request balancer #6510

Merged
merged 9 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
168 changes: 94 additions & 74 deletions balancer/leastrequest/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,12 @@ func (s) TestParseConfig(t *testing.T) {
// setupBackends spins up three test backends, each listening on a port on
// localhost. The three backends always reply with an empty response with no
// error, and for streaming receive until hitting an EOF error.
func setupBackends(t *testing.T) ([]string, func()) {
func setupBackends(t *testing.T) []string {
t.Helper()

backends := make([]*stubserver.StubServer, 4)
addresses := make([]string, 4)
// Construct and start 4 working backends.
for i := 0; i < 4; i++ {
const numBackends = 3
addresses := make([]string, numBackends)
// Construct and start three working backends.
for i := 0; i < numBackends; i++ {
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
Expand All @@ -144,15 +143,10 @@ func setupBackends(t *testing.T) ([]string, func()) {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started good TestService backend at: %q", backend.Address)
backends[i] = backend
t.Cleanup(func() { backend.Stop() })
addresses[i] = backend.Address
}
cancel := func() {
for _, backend := range backends {
backend.Stop()
}
}
return addresses, cancel
return addresses
}

// checkRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn,
Expand Down Expand Up @@ -217,8 +211,7 @@ func (s) TestLeastRequestE2E(t *testing.T) {
index++
return ret
}
addresses, cancel := setupBackends(t)
defer cancel()
addresses := setupBackends(t)

mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
Expand Down Expand Up @@ -287,55 +280,38 @@ func (s) TestLeastRequestE2E(t *testing.T) {
index = 0
indexes = []uint32{
0, 0, // Causes first stream to be on first address.
0, 1, // Compares first address (which already has a RPC) to second, so choose second.
1, 2, // Compares second address (which already has a RPC) to third, so choose third.
}
// Start a streaming call on first, but don't finish the stream.
var peer1 peer.Peer
stream1, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer1))
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
0, 1, // Compares first address (one RPC) to second (no RPCs), so choose second.
1, 2, // Compares second address (one RPC) to third (no RPCs), so choose third.
0, 3, // Causes another stream on first address.
1, 0, // Compares second address (one RPC) to first (two RPCs), so choose second.
2, 0, // Compares third address (one RPC) to first (two RPCs), so choose third.
0, 0, // Causes another stream on first address.
2, 2, // Causes a stream on third address.
2, 1, // Compares third address (three RPCs) to second (two RPCs), so choose third.
}
wantIndex := []uint32{0, 1, 2, 0, 1, 2, 0, 2, 1}

// Start a second streaming call. From the indexes injected into random
// number generator, this should compare Address 1, which already has a RPC
// to Address 2, so thus should start the stream on Address 2.
var peer2 peer.Peer
stream2, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer2))
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}

// Start a third streaming call. From the indexes injected into random
// number generator, this should compare Address 2, which already has a RPC
// to Address 3, so thus should start the stream on Address 3.
var peer3 peer.Peer
stream3, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer3))
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}

// Finish the streams to collect peer information.
stream1.CloseSend()
if _, err = stream1.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
stream2.CloseSend()
if _, err = stream2.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
stream3.CloseSend()
if _, err = stream3.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
if peer1.Addr.String() != peerAtIndex[0] {
t.Fatalf("got: %v, want: %v", peer1.Addr.String(), peerAtIndex[0])
}
if peer2.Addr.String() != peerAtIndex[1] {
t.Fatalf("got: %v, want: %v", peer2.Addr.String(), peerAtIndex[1])
}
if peer3.Addr.String() != peerAtIndex[2] {
t.Fatalf("got: %v, want: %v", peer3.Addr.String(), peerAtIndex[2])
// Start streaming RPC's, but do not finish them. Each created stream should
// be started based on the least request algorithm and injected randomness
// (see indexes slice above for exact expectations).
for _, wantIndex := range wantIndex {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
defer func() {
stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just delete all of this and rely on ctx's cancelation to clean up everything.

Same with the other test. Just make the server's handler do a <-stream.Context().Done() instead of doing a stream.Recv() in a loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if p.Addr.String() != peerAtIndex[wantIndex] {
t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), peerAtIndex[wantIndex])
}
}
}

Expand All @@ -350,15 +326,14 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) {
}(grpcranduint32)
var index int
indexes := []uint32{
0, 1, 2, 3, 4, 5, // Triggers a round robin distribution of indexes for two addresses or three addresses.
0, 0, 1, 1,
}
grpcranduint32 = func() uint32 {
ret := indexes[index%len(indexes)]
index++
return ret
}
addresses, cancel := setupBackends(t)
defer cancel()
addresses := setupBackends(t)

mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
Expand Down Expand Up @@ -423,6 +398,10 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) {
// SubConns. Thus, since address 3 is the new address and the first two
// addresses are populated with RPCs, once the picker update of all 3 READY
// SubConns takes effect, all new streams should be started on address 3.
index = 0
indexes = []uint32{
dfawley marked this conversation as resolved.
Show resolved Hide resolved
0, 1, 2, 3, 4, 5,
}
lrscJSON = `
{
"loadBalancingConfig": [
Expand Down Expand Up @@ -451,18 +430,59 @@ func (s) TestLeastRequestPersistsCounts(t *testing.T) {
t.Fatalf("error in expected round robin: %v", err)
}

for i := 0; i < 50; i++ {
var peer peer.Peer
stream, err := testServiceClient.FullDuplexCall(ctx, grpc.Peer(&peer))
// Start 25 rpcs, but don't finish them. They should all start on address 3,
// since the first two addresses both have 25 RPCs (and randomness
// injection/choiceCount causes all 3 to be compared every iteration).
for i := 0; i < 25; i++ {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
stream.CloseSend() // Finish stream to populate peer.
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
defer func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
}()
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if peer.Addr.String() != addresses[2] {
t.Fatalf("got: %v, want: %v", peer.Addr.String(), addresses[2])
if p.Addr.String() != addresses[2] {
t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), addresses[2])
}
}

// Now 25 RPC's are active on each address, the next three RPC's should
// round robin, since choiceCount is three and the injected random indexes
// cause it to search all three addresses for fewest outstanding requests on
// each iteration.
wantAddrCount := map[string]int{
dfawley marked this conversation as resolved.
Show resolved Hide resolved
addresses[0]: 1,
addresses[1]: 1,
addresses[2]: 1,
}
gotAddrCount := make(map[string]int)
for i := 0; i < len(addresses); i++ {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
defer func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
}()
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if p.Addr != nil {
Comment on lines +447 to +450
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: IMO it's cleaner to remove the defensive programming and just let things panic if they're breaking assertions:

p, _ := peer.FromContext(stream.Context)
gotAddrCount[p.Addr.String()]++

But this is fine, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

gotAddrCount[p.Addr.String()]++
}
}
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if you're not showing the diff, just use !cmp.Equal() instead. (Also in checkRoundRobin.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Printed diff here. In check round robin no need for diff so just used !cmp.Equal().

t.Fatalf("addr count got: %v, want (round robin): %v", gotAddrCount, wantAddrCount)
}
}
11 changes: 5 additions & 6 deletions balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var grpcranduint32 = grpcrand.Uint32
// Name is the name of the least request balancer.
const Name = "least_request_experimental"

var logger = grpclog.Component("least request")
var logger = grpclog.Component("least-request")

func init() {
balancer.Register(bb{})
Expand All @@ -47,9 +47,9 @@ func init() {
type LBConfig struct {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
serviceconfig.LoadBalancingConfig `json:"-"`

// ChoiceCount is the number of random SubConns to sample to try and find
// the one with the Least Request. If unset, defaults to 2. If set to < 2,
// will become 2, and if set to > 10, will become 10.
// ChoiceCount is the number of random SubConns to sample to find the one
// with the fewest outstanding requests. If unset, defaults to 2. If set to
// < 2, the config will be rejected, and if set to > 10, will become 10.
ChoiceCount uint32 `json:"choiceCount,omitempty"`
}

Expand Down Expand Up @@ -82,8 +82,7 @@ func (bb) Name() string {
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*int32)}
baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
baseBalancer := baseBuilder.Build(cc, bOpts)
b.Balancer = baseBalancer
b.Balancer = baseBuilder.Build(cc, bOpts)
return b
}

Expand Down