Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: support channel idleness #6263

Merged
merged 21 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
bb51629
grpc: support channel idleness
easwars May 5, 2023
968a05f
call cc.Connect only when IDLE at blocking dial time
easwars May 12, 2023
fe30e55
support an atomic idleness manager
easwars May 12, 2023
1693446
align 64-bit fields for proper atomic access
easwars May 16, 2023
0b2a531
make vet happy
easwars May 16, 2023
87ae17b
implement Doug's suggestion
easwars May 17, 2023
32875a1
minor fix + access timer under lock
easwars May 17, 2023
5c6910e
also check acitivty in tryEnterIdleMode
easwars May 17, 2023
fa1a004
skip error log when calls count is negative in onCallEnd
easwars May 18, 2023
2b04a5b
create and destroy resolver wrapper instead of supporting idleness in it
easwars May 18, 2023
0748ab4
start channel in idle, and kick it out of idle at the end of Dial
easwars May 18, 2023
93c990e
review comments + remove mutexIdlenessManager
easwars May 18, 2023
a0eaa5c
reset ccb.curBalancerName when exiting idle
easwars May 18, 2023
37c286a
remove the mutex idleness manager from tests
easwars May 18, 2023
0682572
todo to switch balancer/picker wrappers to the same approach as resol…
easwars May 18, 2023
460400c
remove unused consts
easwars May 18, 2023
f13ba8a
reset ccb.curBalancerName when entering idle instead of when exiting …
easwars May 18, 2023
2068ba0
refactor new idleness manager to hide implementation details
easwars May 19, 2023
d69513f
defer the call to onCallEnd()
easwars May 19, 2023
f1f091a
test cleanups
easwars May 20, 2023
f94f923
handle small flake by trying the RPC more frequently
easwars May 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
ccb.buildLoadBalancingPolicy(ccb.curBalancerName)
// Reset the current balancer name so that we act on the next call to
// switchTo by creating a new balancer specified by the new resolver.
ccb.curBalancerName = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to do this when entering idle instead of exiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")

Expand Down
6 changes: 6 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ func (cc *ClientConn) enterIdleMode() error {
conns := cc.conns
cc.conns = make(map[*addrConn]struct{})

// TODO: Currently, we close the resolver wrapper upon entering idle mode
// and create a new one upon exiting idle mode. This means that the
// `cc.resolverWrapper` field would be overwritten everytime we exit idle
// mode. While this means that we need to hold `cc.mu` when accessing
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
// try to do the same for the balancer and picker wrappers too.
cc.resolverWrapper.close()
cc.blockingpicker.enterIdleMode()
cc.balancerWrapper.enterIdleMode()
Expand Down
273 changes: 118 additions & 155 deletions idle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,87 +122,68 @@ func (s) TestIdlenessManager_Disabled(t *testing.T) {
// is enabled. Ensures that when there are no RPCs, the timer callback is
// invoked and the enterIdleMode() method is invoked on the enforcer.
func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) {
for _, name := range []string{"mutex", "atomic"} {
t.Run(name, func(t *testing.T) {
callbackCh := overrideNewTimer(t)

enforcer := newTestIdlenessEnforcer()
var mgr idlenessManager
if name == "mutex" {
mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
} else if name == "atomic" {
mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
}
defer mgr.close()
callbackCh := overrideNewTimer(t)

// Ensure that the timer callback fires within a appropriate amount of time.
select {
case <-callbackCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}
enforcer := newTestIdlenessEnforcer()
mgr := newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()

// Ensure that the channel moves to idle mode eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
// Ensure that the timer callback fires within a appropriate amount of time.
select {
case <-callbackCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}

})
// Ensure that the channel moves to idle mode eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
}

// TestIdlenessManager_Enabled_OngoingCall tests the case where the idle manager
// is enabled. Ensures that when there is an ongoing RPC, the channel does not
// enter idle mode.
func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) {
for _, name := range []string{"mutex", "atomic"} {
t.Run(name, func(t *testing.T) {
callbackCh := overrideNewTimer(t)

enforcer := newTestIdlenessEnforcer()
var mgr idlenessManager
if name == "mutex" {
mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
} else if name == "atomic" {
mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
}
defer mgr.close()

// Fire up a goroutine that simulates an ongoing RPC that is terminated
// after the timer callback fires for the first time.
timerFired := make(chan struct{})
go func() {
mgr.onCallBegin()
<-timerFired
mgr.onCallEnd()
}()
callbackCh := overrideNewTimer(t)

// Ensure that the timer callback fires and unblock the above goroutine.
select {
case <-callbackCh:
close(timerFired)
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}
enforcer := newTestIdlenessEnforcer()
mgr := newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()

// Fire up a goroutine that simulates an ongoing RPC that is terminated
// after the timer callback fires for the first time.
timerFired := make(chan struct{})
go func() {
mgr.onCallBegin()
<-timerFired
mgr.onCallEnd()
}()

// Ensure that the timer callback fires and unblock the above goroutine.
select {
case <-callbackCh:
close(timerFired)
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}

// The invocation of the timer callback should not put the channel in idle
// mode since we had an ongoing RPC.
select {
case <-enforcer.enterIdleCh:
t.Fatalf("enterIdleMode() called on enforcer when active RPC exists")
case <-time.After(defaultTestShortTimeout):
}
// The invocation of the timer callback should not put the channel in idle
// mode since we had an ongoing RPC.
select {
case <-enforcer.enterIdleCh:
t.Fatalf("enterIdleMode() called on enforcer when active RPC exists")
case <-time.After(defaultTestShortTimeout):
}

// Since we terminated the ongoing RPC and we have no other active RPCs, the
// channel must move to idle eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
})
// Since we terminated the ongoing RPC and we have no other active RPCs, the
// channel must move to idle eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
}

Expand All @@ -211,111 +192,93 @@ func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) {
// period (even though there is no active call when the timer fires), the
// channel does not enter idle mode.
func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) {
for _, name := range []string{"mutex", "atomic"} {
t.Run(name, func(t *testing.T) {
callbackCh := overrideNewTimer(t)

enforcer := newTestIdlenessEnforcer()
var mgr idlenessManager
if name == "mutex" {
mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
} else if name == "atomic" {
mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
}
defer mgr.close()
callbackCh := overrideNewTimer(t)

// Fire up a goroutine that simulates unary RPCs until the timer callback
// fires.
timerFired := make(chan struct{})
go func() {
for ; ; <-time.After(defaultTestShortTimeout) {
mgr.onCallBegin()
mgr.onCallEnd()
enforcer := newTestIdlenessEnforcer()
mgr := newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()

select {
case <-timerFired:
return
default:
}
}
}()
// Fire up a goroutine that simulates unary RPCs until the timer callback
// fires.
timerFired := make(chan struct{})
go func() {
for ; ; <-time.After(defaultTestShortTimeout) {
mgr.onCallBegin()
mgr.onCallEnd()

// Ensure that the timer callback fires, and that we don't enter idle as
// part of this invocation of the timer callback, since we had some RPCs in
// this period.
select {
case <-callbackCh:
close(timerFired)
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}
select {
case <-enforcer.enterIdleCh:
t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period")
case <-time.After(defaultTestShortTimeout):
case <-timerFired:
return
default:
}
}
}()

// Since the unrary RPC terminated and we have no other active RPCs, the
// channel must move to idle eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
})
// Ensure that the timer callback fires, and that we don't enter idle as
// part of this invocation of the timer callback, since we had some RPCs in
// this period.
select {
case <-callbackCh:
close(timerFired)
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for idle timer callback to fire")
}
select {
case <-enforcer.enterIdleCh:
t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period")
case <-time.After(defaultTestShortTimeout):
}

// Since the unrary RPC terminated and we have no other active RPCs, the
// channel must move to idle eventually.
select {
case <-enforcer.enterIdleCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout waiting for channel to move to idle")
}
}

// TestIdlenessManager_Enabled_ExitIdleOnRPC tests the case where the idle
// manager is enabled. Ensures that the channel moves out of idle when an RPC is
// initiated.
func (s) TestIdlenessManager_Enabled_ExitIdleOnRPC(t *testing.T) {
for _, name := range []string{"mutex", "atomic"} {
t.Run(name, func(t *testing.T) {
overrideNewTimer(t)

enforcer := newTestIdlenessEnforcer()
var mgr idlenessManager
if name == "mutex" {
mgr = newMutexIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
} else if name == "atomic" {
mgr = newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
}
defer mgr.close()
overrideNewTimer(t)

// Ensure that the channel moves to idle since there are no RPCs.
select {
case <-enforcer.enterIdleCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for channel to move to idle mode")
}
enforcer := newTestIdlenessEnforcer()
mgr := newAtomicIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout))
defer mgr.close()

for i := 0; i < 100; i++ {
// A call to onCallBegin and onCallEnd simulates an RPC.
go func() {
if err := mgr.onCallBegin(); err != nil {
t.Errorf("onCallBegin() failed: %v", err)
}
mgr.onCallEnd()
}()
}
// Ensure that the channel moves to idle since there are no RPCs.
select {
case <-enforcer.enterIdleCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for channel to move to idle mode")
}

// Ensure that the channel moves out of idle as a result of the above RPC.
select {
case <-enforcer.exitIdleCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for channel to move out of idle mode")
for i := 0; i < 100; i++ {
// A call to onCallBegin and onCallEnd simulates an RPC.
go func() {
if err := mgr.onCallBegin(); err != nil {
t.Errorf("onCallBegin() failed: %v", err)
}
mgr.onCallEnd()
}()
}

// Ensure that only one call to exit idle mode is made to the CC.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
select {
case <-enforcer.exitIdleCh:
t.Fatal("More than one call to exit idle mode on the ClientConn; only one expected")
case <-sCtx.Done():
}
})
// Ensure that the channel moves out of idle as a result of the above RPC.
select {
case <-enforcer.exitIdleCh:
case <-time.After(2 * defaultTestIdleTimeout):
t.Fatal("Timeout waiting for channel to move out of idle mode")
}

// Ensure that only one call to exit idle mode is made to the CC.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
select {
case <-enforcer.exitIdleCh:
t.Fatal("More than one call to exit idle mode on the ClientConn; only one expected")
case <-sCtx.Done():
}
}

Expand Down