From a3771e2dc143dada325a080e473005ccf97b7fe2 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 5 May 2023 16:28:32 -0700 Subject: [PATCH] grpc: support channel idleness --- balancer_conn_wrappers.go | 236 ++++++++-- call.go | 12 +- clientconn.go | 184 ++++++-- dialoptions.go | 22 + idle.go | 161 +++++++ idle_test.go | 263 +++++++++++ internal/grpcsync/callback_serializer.go | 56 ++- internal/grpcsync/callback_serializer_test.go | 50 ++- picker_wrapper.go | 26 +- resolver_conn_wrapper.go | 177 ++++++-- stream.go | 13 +- test/idleness_test.go | 408 ++++++++++++++++++ 12 files changed, 1462 insertions(+), 146 deletions(-) create mode 100644 idle.go create mode 100644 idle_test.go create mode 100644 test/idleness_test.go diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 1865a3f09c2b..87d7f2a37a47 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -32,6 +32,15 @@ import ( "google.golang.org/grpc/resolver" ) +type ccbMode int + +const ( + ccbModeActive = iota + ccbModeIdle + ccbModeClosed + ccbModeExitingIdle +) + // ccBalancerWrapper sits between the ClientConn and the Balancer. // // ccBalancerWrapper implements methods corresponding to the ones on the @@ -46,16 +55,25 @@ import ( // It uses the gracefulswitch.Balancer internally to ensure that balancer // switches happen in a graceful manner. type ccBalancerWrapper struct { - cc *ClientConn + // The following fields are initialized when the wrapper is created and are + // read-only afterwards, and therefore can be accessed without a mutex. + cc *ClientConn + opts balancer.BuildOptions // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled on the - // CallbackSerializer. Fields accessed *only* in serializer callbacks, can - // therefore be accessed without a mutex. - serializer *grpcsync.CallbackSerializer - serializerCancel context.CancelFunc - balancer *gracefulswitch.Balancer - curBalancerName string + // mutually exclusive manner as they are scheduled in the serializer. Fields + // accessed *only* in these serializer callbacks, can therefore be accessed + // without a mutex. + balancer *gracefulswitch.Balancer + curBalancerName string + + // mu guards access to the below fields. Access to the serializer and its + // cancel function needs to be mutex protected because they are overwritten + // when the wrapper exits idle mode. + mu sync.Mutex + serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. + serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. + mode ccbMode // Tracks the current mode of the wrapper. } // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer @@ -64,6 +82,7 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc ctx, cancel := context.WithCancel(context.Background()) ccb := &ccBalancerWrapper{ cc: cc, + opts: bopts, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } @@ -74,8 +93,12 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc // updateClientConnState is invoked by grpc to push a ClientConnState update to // the underlying balancer. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { + ccb.mu.Lock() errCh := make(chan error, 1) - ccb.serializer.Schedule(func(_ context.Context) { + // Here and everywhere else where Schedule() is called, it is done with the + // lock held. But the lock guards only the scheduling part. The actual + // callback is called asynchronously without the lock being held. + ok := ccb.serializer.Schedule(func(_ context.Context) { // If the addresses specified in the update contain addresses of type // "grpclb" and the selected LB policy is not "grpclb", these addresses // will be filtered out and ccs will be modified with the updated @@ -92,16 +115,19 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat } errCh <- ccb.balancer.UpdateClientConnState(*ccs) }) - - // If the balancer wrapper is closed when waiting for this state update to - // be handled, the callback serializer will be closed as well, and we can - // rely on its Done channel to ensure that we don't block here forever. - select { - case err := <-errCh: - return err - case <-ccb.serializer.Done: - return nil + if !ok { + // If we are unable to schedule a function with the serializer, it + // indicates that it has been closed. A serializer is only closed when + // the wrapper is closed or is in idle. + ccb.mu.Unlock() + return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") } + ccb.mu.Unlock() + + // We get here only if the above call to Schedule succeeds, in which case it + // is guaranteed that the scheduled function will run. Therefore it is safe + // to block on this channel. + return <-errCh } // updateSubConnState is invoked by grpc to push a subConn state update to the @@ -120,21 +146,19 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti if sc == nil { return } + ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) }) -} - -func (ccb *ccBalancerWrapper) exitIdle() { - ccb.serializer.Schedule(func(_ context.Context) { - ccb.balancer.ExitIdle() - }) + ccb.mu.Unlock() } func (ccb *ccBalancerWrapper) resolverError(err error) { + ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { ccb.balancer.ResolverError(err) }) + ccb.mu.Unlock() } // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -148,42 +172,142 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { + ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { // TODO: Other languages use case-sensitive balancer registries. We should // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. if strings.EqualFold(ccb.curBalancerName, name) { return } + ccb.buildLoadBalancingPolicy(name) + }) + ccb.mu.Unlock() +} - // Use the default LB policy, pick_first, if no LB policy with name is - // found in the registry. - builder := balancer.Get(name) - if builder == nil { - channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) - builder = newPickfirstBuilder() - } else { - channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) - } +// buildLoadBalancingPolicy performs the following: +// - retrieve a balancer builder for the given name. Use the default LB +// policy, pick_first, if no LB policy with name is found in the registry. +// - instruct the gracefulswitch balancer to switch to the above builder. This +// will actually build the new balancer. +// - update the `curBalancerName` field +// +// Must be called from a serializer callback. +func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { + builder := balancer.Get(name) + if builder == nil { + channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) + builder = newPickfirstBuilder() + } else { + channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) + } + + if err := ccb.balancer.SwitchTo(builder); err != nil { + channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) + return + } + ccb.curBalancerName = builder.Name() +} + +func (ccb *ccBalancerWrapper) close() { + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") + ccb.handleCloseAndEnterIdle(ccbModeClosed) +} + +// enterIdleMode is invoked by grpc when the channel enters idle mode upon +// expiry of idle_timeout. This call blocks until the balancer is closed. +func (ccb *ccBalancerWrapper) enterIdleMode() { + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") + ccb.handleCloseAndEnterIdle(ccbModeIdle) +} + +// handleCloseAndEnterIdle is invoked when the channel is being closed or when +// it enters idle mode upon expiry of idle_timeout. +// +// This call is not scheduled on the serializer because we need to ensure that +// the current serializer is completely shutdown before the next one is created +// (when exiting idle). +func (ccb *ccBalancerWrapper) handleCloseAndEnterIdle(m ccbMode) { + ccb.mu.Lock() + if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { + ccb.mu.Unlock() + return + } + + // Close the serializer to ensure that no more calls from gRPC are sent + // to the balancer. + ccb.serializerCancel() + ccb.mode = m + done := ccb.serializer.Done + b := ccb.balancer + ccb.mu.Unlock() + + // Give enqueued callbacks a chance to finish before closing the balancer. + <-done + b.Close() +} + +// exitIdleMode is invoked by grpc when the channel exits idle mode either +// because of an RPC or because of an invocation of the Connect() API. This +// recreates the balancer that was closed previously when entering idle mode. +// +// If the channel is not in idle mode, we know for a fact that we are here as a +// result of the user calling the Connect() method on the ClientConn. In this +// case, we can simply forward the call to the underlying balancer, instructing +// it to reconnect to the backends. +func (ccb *ccBalancerWrapper) exitIdleMode() { + ccb.mu.Lock() + if ccb.mode == ccbModeClosed { + // Request to exit idle is a no-op when wrapper is already closed. + ccb.mu.Unlock() + return + } + + if ccb.mode == ccbModeIdle { + // Recreate the serializer which was closed when we entered idle. + ctx, cancel := context.WithCancel(context.Background()) + ccb.serializer = grpcsync.NewCallbackSerializer(ctx) + ccb.serializerCancel = cancel + } - if err := ccb.balancer.SwitchTo(builder); err != nil { - channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) + // The ClientConn guarantees that mutual exclusion between close() and + // exitIdleMode(), and since we just created a new serializer, we can be + // sure that the below function will be scheduled. + done := make(chan struct{}) + ccb.serializer.Schedule(func(_ context.Context) { + defer close(done) + + ccb.mu.Lock() + defer ccb.mu.Unlock() + + if ccb.mode != ccbModeIdle { + ccb.balancer.ExitIdle() return } - ccb.curBalancerName = builder.Name() + + // Gracefulswitch balancer does not support a switchTo operation after + // being closed. Hence we need to create a new one here. + ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) + ccb.buildLoadBalancingPolicy(ccb.curBalancerName) + ccb.mode = ccbModeActive + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") + }) + ccb.mu.Unlock() + + <-done } -func (ccb *ccBalancerWrapper) close() { - // Close the serializer to ensure that no more calls from gRPC are sent to - // the balancer. We don't have to worry about suppressing calls from a - // closed balancer because these are handled by the ClientConn (balancer - // wrapper is only ever closed when the ClientConn is closed). - ccb.serializerCancel() - <-ccb.serializer.Done - ccb.balancer.Close() +func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { + ccb.mu.Lock() + defer ccb.mu.Unlock() + return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + if ccb.isIdleOrClosed() { + return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") + } + if len(addrs) <= 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } @@ -200,6 +324,18 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { + if ccb.isIdleOrClosed() { + // It it safe to ignore this call when the balancer is closed or in idle + // because the ClientConn takes care of closing the connections. + // + // Not returning early from here when the balancer is closed or in idle + // leads to a deadlock though, because of the following sequence of + // calls when holding cc.mu: + // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> + // ccb.RemoveAddrConn --> cc.removeAddrConn + return + } + acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -208,6 +344,10 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + if ccb.isIdleOrClosed() { + return + } + acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -216,6 +356,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { + if ccb.isIdleOrClosed() { + return + } + // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is @@ -226,6 +370,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { + if ccb.isIdleOrClosed() { + return + } + ccb.cc.resolveNow(o) } diff --git a/call.go b/call.go index 9e20e4d385f9..e1fc6c14826c 100644 --- a/call.go +++ b/call.go @@ -27,14 +27,22 @@ import ( // // All errors returned by Invoke are compatible with the status package. func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { + if err := cc.idlenessMgr.onCallBegin(); err != nil { + return err + } + // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) + var err error if cc.dopts.unaryInt != nil { - return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) + err = cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) + } else { + err = invoke(ctx, method, args, reply, cc, opts...) } - return invoke(ctx, method, args, reply, cc, opts...) + cc.idlenessMgr.onCallEnd() + return err } func combine(o1 []CallOption, o2 []CallOption) []CallOption { diff --git a/clientconn.go b/clientconn.go index 50d08a49a205..a20ac4f5ee41 100644 --- a/clientconn.go +++ b/clientconn.go @@ -69,6 +69,9 @@ var ( errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. errConnClosing = errors.New("grpc: the connection is closing") + // errConnIdling indicates the the connection is being closed as the channel + // is moving to an idle mode due to inactivity. + errConnIdling = errors.New("grpc: the connection is closing due to channel idleness") // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default // service config. invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" @@ -145,6 +148,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.retryThrottler.Store((*retryThrottler)(nil)) cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) + cc.exitIdleCond = sync.NewCond(&cc.mu) + cc.isExitingIdle = false // Channels don't start in idle mode. disableGlobalOpts := false for _, opt := range opts { @@ -243,6 +248,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * go cc.scWatcher() } + // Initialize the balancer wrapper. var credsClone credentials.TransportCredentials if creds := cc.dopts.copts.TransportCredentials; creds != nil { credsClone = creds.Clone() @@ -257,8 +263,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * Target: cc.parsedTarget, }) - // Build the resolver. - rWrapper, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ + // Initialize the resolver wrapper. + rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ target: cc.parsedTarget, builder: cc.resolverBuilder, bOpts: resolver.BuildOptions{ @@ -272,38 +278,130 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if err != nil { return nil, fmt.Errorf("failed to build resolver: %v", err) } + // Resolver implementations may report state update or error inline when + // built (or right after), and this is handled in cc.updateResolverState. + // Also, an error from the resolver might lead to a re-resolution request + // from the balancer, which is handled in resolveNow() where + // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. cc.mu.Lock() - cc.resolverWrapper = rWrapper + cc.resolverWrapper = rw cc.mu.Unlock() + // Configure idleness support with configured idle_timeout or default. + cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout) + + // Return early for non-blocking dials. + if !cc.dopts.block { + return cc, nil + } + // A blocking dial blocks until the clientConn is ready. - if cc.dopts.block { - for { - cc.Connect() - s := cc.GetState() - if s == connectivity.Ready { - break - } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { - if err = cc.connectionError(); err != nil { - terr, ok := err.(interface { - Temporary() bool - }) - if ok && !terr.Temporary() { - return nil, err - } - } - } - if !cc.WaitForStateChange(ctx, s) { - // ctx got timeout or canceled. - if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { + for { + cc.Connect() + s := cc.GetState() + if s == connectivity.Ready { + return cc, nil + } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { + if err = cc.connectionError(); err != nil { + terr, ok := err.(interface { + Temporary() bool + }) + if ok && !terr.Temporary() { return nil, err } - return nil, ctx.Err() } } + if !cc.WaitForStateChange(ctx, s) { + // ctx got timeout or canceled. + if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { + return nil, err + } + return nil, ctx.Err() + } } +} - return cc, nil +// addTraceEvent is a helper method to add a trace event on the channel. If the +// channel is a nested one, the same event is also added on the parent channel. +func (cc *ClientConn) addTraceEvent(msg string) { + ted := &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel %s", msg), + Severity: channelz.CtInfo, + } + if cc.dopts.channelzParentID != nil { + ted.Parent = &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg), + Severity: channelz.CtInfo, + } + } + channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) +} + +// exitIdleMode moves the channel out of idle mode by recreating the name +// resolver and load balancer. +func (cc *ClientConn) exitIdleMode() error { + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() + return errConnClosing + } + cc.isExitingIdle = true + + cc.blockingpicker.exitIdleMode() + cc.balancerWrapper.exitIdleMode() + cc.firstResolveEvent = grpcsync.NewEvent() + cc.mu.Unlock() + + // This needs to be called without cc.mu because this builds a new resolver + // which might update state or report error inline which needs to be handled + // by cc.updateResolverState() which also grabs cc.mu. + if err := cc.resolverWrapper.exitIdleMode(); err != nil { + return err + } + + // When Close() and exitIdleMode() race against each other, one of the + // following two can happen: + // - Close() wins the race and runs first. exitIdleMode() runs after, and + // sees that the ClientConn is already closed and hence returns early. + // - exitIdleMode() wins the race and runs first and recreates the balancer + // and releases the lock before recreating the resolver. If Close() runs + // in this window, it will wait for exitIdleMode to complete. + // + // We achieve this synchronization using the below condition variable. + cc.mu.Lock() + cc.isExitingIdle = false + cc.exitIdleCond.Signal() + cc.mu.Unlock() + cc.addTraceEvent("exiting idle mode") + return nil +} + +// enterIdleMode puts the channel in idle mode, and as part of it shuts down the +// name resolver, load balancer and any subchannels. +func (cc *ClientConn) enterIdleMode() error { + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() + return ErrClientConnClosing + } + + // cc.conns == nil is a proxy for the ClientConn being closed. So, instead + // of setting it to nil here, we recreate the map. This also means that we + // don't have to do this when exiting idle mode. + conns := cc.conns + cc.conns = make(map[*addrConn]struct{}) + cc.csMgr.updateState(connectivity.Idle) + + cc.resolverWrapper.enterIdleMode() + cc.blockingpicker.enterIdleMode() + cc.balancerWrapper.enterIdleMode() + cc.mu.Unlock() + + cc.addTraceEvent("entering idle mode") + for ac := range conns { + ac.tearDown(errConnIdling) + } + return nil } // validateTransportCredentials performs a series of checks on the configured @@ -350,17 +448,7 @@ func (cc *ClientConn) validateTransportCredentials() error { // Doesn't grab cc.mu as this method is expected to be called only at Dial time. func (cc *ClientConn) channelzRegistration(target string) { cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) - ted := &channelz.TraceEventDesc{ - Desc: "Channel created", - Severity: channelz.CtInfo, - } - if cc.dopts.channelzParentID != nil { - ted.Parent = &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()), - Severity: channelz.CtInfo, - } - } - channelz.AddTraceEvent(logger, cc.channelzID, 1, ted) + cc.addTraceEvent("created") cc.csMgr.channelzID = cc.channelzID } @@ -509,6 +597,7 @@ type ClientConn struct { channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. + idlenessMgr *idlenessManager // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. @@ -529,6 +618,8 @@ type ClientConn struct { sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. + isExitingIdle bool // True when channel is exiting idle. + exitIdleCond *sync.Cond // Signalled when channel exits idle. lceMu sync.Mutex // protects lastConnectionError lastConnectionError error @@ -573,7 +664,7 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - cc.balancerWrapper.exitIdle() + cc.balancerWrapper.exitIdleMode() } func (cc *ClientConn) scWatcher() { @@ -1061,6 +1152,11 @@ func (cc *ClientConn) Close() error { cc.mu.Unlock() return ErrClientConnClosing } + + for cc.isExitingIdle { + cc.exitIdleCond.Wait() + } + conns := cc.conns cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) @@ -1068,6 +1164,7 @@ func (cc *ClientConn) Close() error { rWrapper := cc.resolverWrapper cc.resolverWrapper = nil bWrapper := cc.balancerWrapper + idlenessMgr := cc.idlenessMgr cc.mu.Unlock() // The order of closing matters here since the balancer wrapper assumes the @@ -1079,21 +1176,14 @@ func (cc *ClientConn) Close() error { if rWrapper != nil { rWrapper.close() } + if idlenessMgr != nil { + idlenessMgr.close() + } for ac := range conns { ac.tearDown(ErrClientConnClosing) } - ted := &channelz.TraceEventDesc{ - Desc: "Channel deleted", - Severity: channelz.CtInfo, - } - if cc.dopts.channelzParentID != nil { - ted.Parent = &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()), - Severity: channelz.CtInfo, - } - } - channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) + cc.addTraceEvent("deleted") // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add // trace reference to the entity being deleted, and thus prevent it from being // deleted right away. diff --git a/dialoptions.go b/dialoptions.go index cdc8263bda65..51c8997d5d18 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -77,6 +77,7 @@ type dialOptions struct { defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string resolvers []resolver.Builder + idleTimeout time.Duration } // DialOption configures how we set up the connection. @@ -627,6 +628,7 @@ func defaultDialOptions() dialOptions { ReadBufferSize: defaultReadBufSize, UseProxy: true, }, + idleTimeout: 30 * time.Minute, } } @@ -655,3 +657,23 @@ func WithResolvers(rs ...resolver.Builder) DialOption { o.resolvers = append(o.resolvers, rs...) }) } + +// WithIdleTimeout returns a DialOption that configures an idle timeout for the +// channel. If the channel is idle for the configured timeout, i.e there are no +// ongoing RPCs and no new RPCs are initiated, the channel will enter idle mode +// and as a result the name resolver and load balancer will be shut down. The +// channel will exit idle mode when the Connect() method is called or when an +// RPC is initiated. +// +// A default timeout of 30 min will be used if this dial option is not set at +// dial time and idleness can be disabled by passing a timeout of zero. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithIdleTimeout(d time.Duration) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.idleTimeout = d + }) +} diff --git a/idle.go b/idle.go new file mode 100644 index 000000000000..642835342dbc --- /dev/null +++ b/idle.go @@ -0,0 +1,161 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "sync" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/status" +) + +// For overriding in unit tests. +var newTimer = func(d time.Duration, f func()) *time.Timer { + return time.AfterFunc(d, f) +} + +// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter +// and exit from idle mode. +type idlenessEnforcer interface { + exitIdleMode() error + enterIdleMode() error +} + +// idlenessManager contains functionality to track RPC activity on the channel +// and uses this to instruct the channel to enter or exit idle mode as +// appropriate. +type idlenessManager struct { + // The following fields are set when the manager is created and are never + // written to after that. Therefore these can be accessed without a mutex. + + enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. + timeout int64 // Idle timeout duration nanos stored as an int64. + isDisabled bool // Disabled if idle_timeout is set to 0. + + // All state maintained by the manager is guarded by this mutex. + mu sync.Mutex + activeCallsCount int // Count of active RPCs. + activeSinceLastTimerCheck bool // True if there was an RPC since the last timer callback. + lastCallEndTime int64 // Time when the most recent RPC finished, stored as unix nanos. + isIdle bool // True if the channel is in idle mode. + timer *time.Timer // Expires when the idle_timeout fires. +} + +// newIdlenessManager creates a new idleness state manager which attempts to put +// the channel in idle mode when there is no RPC activity for the configured +// idleTimeout. +// +// Idleness support can be disabled by passing a value of 0 for idleTimeout. +func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) *idlenessManager { + if idleTimeout == 0 { + logger.Infof("Channel idleness support explicitly disabled") + return &idlenessManager{isDisabled: true} + } + + i := &idlenessManager{ + enforcer: enforcer, + timeout: int64(idleTimeout), + } + i.timer = newTimer(idleTimeout, i.handleIdleTimeout) + return i +} + +// handleIdleTimeout is the timer callback when idle_timeout expires. +func (i *idlenessManager) handleIdleTimeout() { + i.mu.Lock() + defer i.mu.Unlock() + + // If there are ongoing RPCs, it means the channel is active. Reset the + // timer to fire after a duration of idle_timeout, and return early. + if i.activeCallsCount > 0 { + i.timer.Reset(time.Duration(i.timeout)) + return + } + + // There were some RPCs made since the last time we were here. So, the + // channel is still active. Reschedule the timer to fire after a duration + // of idle_timeout from the time the last call ended. + if i.activeSinceLastTimerCheck { + i.activeSinceLastTimerCheck = false + // It is safe to ignore the return value from Reset() because we are + // already in the timer callback and this is only place from where we + // reset the timer. + i.timer.Reset(time.Duration(i.lastCallEndTime + i.timeout - time.Now().UnixNano())) + return + } + + // There are no ongoing RPCs, and there were no RPCs since the last time we + // were here, we are all set to enter idle mode. + if err := i.enforcer.enterIdleMode(); err != nil { + logger.Warningf("Failed to enter idle mode: %v", err) + return + } + i.isIdle = true +} + +// onCallBegin is invoked by the ClientConn at the start of every RPC. If the +// channel is currently in idle mode, the manager asks the ClientConn to exit +// idle mode, and restarts the timer. The active calls count is incremented and +// the activeness bit is set to true. +func (i *idlenessManager) onCallBegin() error { + if i.isDisabled { + return nil + } + + i.mu.Lock() + defer i.mu.Unlock() + + if i.isIdle { + if err := i.enforcer.exitIdleMode(); err != nil { + return status.Errorf(codes.Internal, "grpc: ClientConn failed to exit idle mode: %v", err) + } + i.timer = newTimer(time.Duration(i.timeout), i.handleIdleTimeout) + i.isIdle = false + } + i.activeCallsCount++ + i.activeSinceLastTimerCheck = true + return nil +} + +// onCallEnd is invoked by the ClientConn at the end of every RPC. The active +// calls count is decremented and `i.lastCallEndTime` is updated. +func (i *idlenessManager) onCallEnd() { + if i.isDisabled { + return + } + + i.mu.Lock() + defer i.mu.Unlock() + + i.activeCallsCount-- + if i.activeCallsCount < 0 { + logger.Errorf("Number of active calls tracked by idleness manager is negative: %d", i.activeCallsCount) + } + i.lastCallEndTime = time.Now().UnixNano() +} + +func (i *idlenessManager) close() { + if i.isDisabled { + return + } + i.mu.Lock() + i.timer.Stop() + i.mu.Unlock() +} diff --git a/idle_test.go b/idle_test.go new file mode 100644 index 000000000000..3fbd32e99f30 --- /dev/null +++ b/idle_test.go @@ -0,0 +1,263 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "testing" + "time" +) + +const ( + defaultTestIdleTimeout = 500 * time.Millisecond // A short idle_timeout for tests. + defaultTestShortTimeout = 10 * time.Millisecond // A small deadline to wait for events expected to not happen. +) + +type testIdlenessEnforcer struct { + exitIdleCh chan struct{} + enterIdleCh chan struct{} +} + +func (ti *testIdlenessEnforcer) exitIdleMode() error { + ti.exitIdleCh <- struct{}{} + return nil + +} + +func (ti *testIdlenessEnforcer) enterIdleMode() error { + ti.enterIdleCh <- struct{}{} + return nil + +} + +func newTestIdlenessEnforcer() *testIdlenessEnforcer { + return &testIdlenessEnforcer{ + exitIdleCh: make(chan struct{}, 1), + enterIdleCh: make(chan struct{}, 1), + } +} + +// overrideNewTimer overrides the new timer creation function by ensuring that a +// message is pushed on the returned channel everytime the timer fires. +func overrideNewTimer(t *testing.T) <-chan struct{} { + t.Helper() + + ch := make(chan struct{}, 1) + origNewTimer := newTimer + newTimer = func(d time.Duration, callback func()) *time.Timer { + return time.AfterFunc(d, func() { + select { + case ch <- struct{}{}: + default: + } + callback() + }) + } + t.Cleanup(func() { newTimer = origNewTimer }) + return ch +} + +// TestIdlenessManager_Disabled tests the case where the idleness manager is +// disabled by passing an idle_timeout of 0. Verifies the following things: +// - timer callback does not fire +// - an RPC does not trigger a call to exitIdleMode on the ClientConn +// - more calls to RPC termination (as compared to RPC initiation) does not +// result in an error log +func (s) TestIdlenessManager_Disabled(t *testing.T) { + callbackCh := overrideNewTimer(t) + + // Create an idleness manager that is disabled because of idleTimeout being + // set to `0`. + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(0)) + + // Ensure that the timer callback does not fire within a short deadline. + select { + case <-callbackCh: + t.Fatal("Idle timer callback fired when manager is disabled") + case <-time.After(defaultTestShortTimeout): + } + + // The first invocation of onCallBegin() would lead to a call to + // exitIdleMode() on the enforcer, unless the idleness manager is disabled. + mgr.onCallBegin() + select { + case <-enforcer.exitIdleCh: + t.Fatalf("exitIdleMode() called on enforcer when manager is disabled") + case <-time.After(defaultTestShortTimeout): + } + + // If the number of calls to onCallEnd() exceeds the number of calls to + // onCallBegin(), the idleness manager is expected to throw an error log + // (which will cause our TestLogger to fail the test). But since the manager + // is disabled, this should not happen. + mgr.onCallEnd() + mgr.onCallEnd() + + // The idleness manager is explicitly not closed here. But since the manager + // is disabled, it will not start the run goroutine, and hence we expect the + // leakchecker to not find any leaked goroutines. +} + +// TestIdlenessManager_Enabled_TimerFires tests the case where the idle manager +// is enabled. Ensures that when there are no RPCs, the timer callback is +// invoked and the enterIdleMode() method is invoked on the enforcer. +func (s) TestIdlenessManager_Enabled_TimerFires(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Ensure that the timer callback fires within a appropriate amount of time. + select { + case <-callbackCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + + // Ensure that the channel moves to idle mode eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_OngoingCall tests the case where the idle manager +// is enabled. Ensures that when there is an ongoing RPC, the channel does not +// enter idle mode. +func (s) TestIdlenessManager_Enabled_OngoingCall(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Fire up a goroutine that simulates an ongoing RPC that is terminated + // after the timer callback fires for the first time. + timerFired := make(chan struct{}) + go func() { + mgr.onCallBegin() + <-timerFired + mgr.onCallEnd() + }() + + // Ensure that the timer callback fires and unblock the above goroutine. + select { + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + + // The invocation of the timer callback should not put the channel in idle + // mode since we had an ongoing RPC. + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when active RPC exists") + case <-time.After(defaultTestShortTimeout): + } + + // Since we terminated the ongoing RPC and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_ActiveSinceLastCheck tests the case where the +// idle manager is enabled. Ensures that when there are active RPCs in the last +// period (even though there is no active call when the timer fires), the +// channel does not enter idle mode. +func (s) TestIdlenessManager_Enabled_ActiveSinceLastCheck(t *testing.T) { + callbackCh := overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Fire up a goroutine that simulates unary RPCs until the timer callback + // fires. + timerFired := make(chan struct{}) + go func() { + for ; ; <-time.After(defaultTestShortTimeout) { + mgr.onCallBegin() + mgr.onCallEnd() + + select { + case <-timerFired: + return + default: + } + } + }() + + // Ensure that the timer callback fires, and that we don't enter idle as + // part of this invocation of the timer callback, since we had some RPCs in + // this period. + select { + case <-callbackCh: + close(timerFired) + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for idle timer callback to fire") + } + select { + case <-enforcer.enterIdleCh: + t.Fatalf("enterIdleMode() called on enforcer when one RPC completed in the last period") + case <-time.After(defaultTestShortTimeout): + } + + // Since the unrary RPC terminated and we have no other active RPCs, the + // channel must move to idle eventually. + select { + case <-enforcer.enterIdleCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout waiting for channel to move to idle") + } +} + +// TestIdlenessManager_Enabled_ExitIdleOnRPC tests the case where the idle +// manager is enabled. Ensures that the channel moves out of idle when an RPC is +// initiated. +func (s) TestIdlenessManager_Enabled_ExitIdleOnRPC(t *testing.T) { + overrideNewTimer(t) + + enforcer := newTestIdlenessEnforcer() + mgr := newIdlenessManager(enforcer, time.Duration(defaultTestIdleTimeout)) + defer mgr.close() + + // Ensure that the channel moves to idle since there are no RPCs. + select { + case <-enforcer.enterIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move to idle mode") + } + + mgr.onCallBegin() + mgr.onCallEnd() + + // Ensure that the channel moves out of idle as a result of the above RPC. + select { + case <-enforcer.exitIdleCh: + case <-time.After(2 * defaultTestIdleTimeout): + t.Fatal("Timeout waiting for channel to move out of idle mode") + } +} diff --git a/internal/grpcsync/callback_serializer.go b/internal/grpcsync/callback_serializer.go index d91f92463542..37b8d4117e77 100644 --- a/internal/grpcsync/callback_serializer.go +++ b/internal/grpcsync/callback_serializer.go @@ -20,6 +20,7 @@ package grpcsync import ( "context" + "sync" "google.golang.org/grpc/internal/buffer" ) @@ -31,19 +32,21 @@ import ( // // This type is safe for concurrent access. type CallbackSerializer struct { - // Done is closed once the serializer is shut down completely, i.e a - // scheduled callback, if any, that was running when the context passed to - // NewCallbackSerializer is cancelled, has completed and the serializer has - // deallocated all its resources. + // Done is closed once the serializer is shut down completely, i.e all + // scheduled callbacks are executed and the serializer has deallocated all + // its resources. Done chan struct{} callbacks *buffer.Unbounded + closedMu sync.Mutex + closed bool } // NewCallbackSerializer returns a new CallbackSerializer instance. The provided // context will be passed to the scheduled callbacks. Users should cancel the // provided context to shutdown the CallbackSerializer. It is guaranteed that no -// callbacks will be executed once this context is canceled. +// callbacks will be added once this context is canceled, and any pending un-run +// callbacks will be executed before the serializer is shut down. func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { t := &CallbackSerializer{ Done: make(chan struct{}), @@ -57,17 +60,30 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { // // Callbacks are expected to honor the context when performing any blocking // operations, and should return early when the context is canceled. -func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) { +// +// Return value indicates if the callback was successfully added to the list of +// callbacks to be executed by the serializer. It is not possible to add +// callbacks once the context passed to NewCallbackSerializer is cancelled. +func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { + t.closedMu.Lock() + defer t.closedMu.Unlock() + + if t.closed { + return false + } t.callbacks.Put(f) + return true } func (t *CallbackSerializer) run(ctx context.Context) { + var backlog []func(context.Context) + defer close(t.Done) for ctx.Err() == nil { select { case <-ctx.Done(): - t.callbacks.Close() - return + // Do nothing here. Next iteration of the for loop will not happen, + // since ctx.Err() would be non-nil. case callback, ok := <-t.callbacks.Get(): if !ok { return @@ -76,4 +92,28 @@ func (t *CallbackSerializer) run(ctx context.Context) { callback.(func(ctx context.Context))(ctx) } } + + // Fetch pending callbacks if any, and execute them before returning from + // this method and closing t.Done. + t.closedMu.Lock() + t.closed = true + backlog = t.fetchPendingCallbacks() + t.callbacks.Close() + t.closedMu.Unlock() + for _, b := range backlog { + b(ctx) + } +} + +func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { + var backlog []func(context.Context) + for { + select { + case b := <-t.callbacks.Get(): + backlog = append(backlog, b.(func(context.Context))) + t.callbacks.Load() + default: + return backlog + } + } } diff --git a/internal/grpcsync/callback_serializer_test.go b/internal/grpcsync/callback_serializer_test.go index 8c465af66aea..cdbd446f8101 100644 --- a/internal/grpcsync/callback_serializer_test.go +++ b/internal/grpcsync/callback_serializer_test.go @@ -20,7 +20,6 @@ package grpcsync import ( "context" - "fmt" "sync" "testing" "time" @@ -141,7 +140,10 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) { // are not executed once Close() returns. func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - cs := NewCallbackSerializer(ctx) + defer cancel() + + serializerCtx, serializerCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + cs := NewCallbackSerializer(serializerCtx) // Schedule a callback which blocks until the context passed to it is // canceled. It also closes a channel to signal that it has started. @@ -151,36 +153,54 @@ func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) { <-ctx.Done() }) - // Schedule a bunch of callbacks. These should not be exeuted since the first - // one started earlier is blocked. + // Schedule a bunch of callbacks. These should be exeuted since the are + // scheduled before the serializer is closed. const numCallbacks = 10 - errCh := make(chan error, numCallbacks) + callbackCh := make(chan int, numCallbacks) for i := 0; i < numCallbacks; i++ { - cs.Schedule(func(_ context.Context) { - errCh <- fmt.Errorf("callback %d executed when not expected to", i) - }) + num := i + if !cs.Schedule(func(context.Context) { callbackCh <- num }) { + t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed") + } } // Ensure that none of the newer callbacks are executed at this point. select { case <-time.After(defaultTestShortTimeout): - case err := <-errCh: - t.Fatal(err) + case <-callbackCh: + t.Fatal("Newer callback executed when older one is still executing") } // Wait for the first callback to start before closing the scheduler. <-firstCallbackStartedCh - // Cancel the context which will unblock the first callback. None of the + // Cancel the context which will unblock the first callback. All of the // other callbacks (which have not started executing at this point) should // be executed after this. - cancel() + serializerCancel() + + // Ensure that the newer callbacks are executed. + for i := 0; i < numCallbacks; i++ { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for callback scheduled before close to be executed") + case num := <-callbackCh: + if num != i { + t.Fatalf("Executing callback %d, want %d", num, i) + } + } + } <-cs.Done - // Ensure that the newer callbacks are not executed. + done := make(chan struct{}) + if cs.Schedule(func(context.Context) { close(done) }) { + t.Fatal("Scheduled a callback after closing the serializer") + } + + // Ensure that the lates callback is executed at this point. select { case <-time.After(defaultTestShortTimeout): - case err := <-errCh: - t.Fatal(err) + case <-done: + t.Fatal("Newer callback executed when scheduled after closing serializer") } } diff --git a/picker_wrapper.go b/picker_wrapper.go index c525dc070fc6..8e24d864986d 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -36,6 +36,7 @@ import ( type pickerWrapper struct { mu sync.Mutex done bool + idle bool blockingCh chan struct{} picker balancer.Picker } @@ -47,7 +48,11 @@ func newPickerWrapper() *pickerWrapper { // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (pw *pickerWrapper) updatePicker(p balancer.Picker) { pw.mu.Lock() - if pw.done { + if pw.done || pw.idle { + // There is a small window where a picker update from the LB policy can + // race with the channel going to idle mode. If the picker is idle here, + // it is because the channel asked it to do so, and therefore it is sage + // to ignore the update from the LB policy. pw.mu.Unlock() return } @@ -187,6 +192,25 @@ func (pw *pickerWrapper) close() { close(pw.blockingCh) } +func (pw *pickerWrapper) enterIdleMode() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.done { + return + } + pw.idle = true +} + +func (pw *pickerWrapper) exitIdleMode() { + pw.mu.Lock() + defer pw.mu.Unlock() + if pw.done { + return + } + pw.blockingCh = make(chan struct{}) + pw.idle = false +} + // dropError is a wrapper error that indicates the LB policy wishes to drop the // RPC and not retry it. type dropError struct { diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index ce12b52ecdc0..297d51762eae 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -20,7 +20,9 @@ package grpc import ( "context" + "fmt" "strings" + "sync" "google.golang.org/grpc/balancer" "google.golang.org/grpc/internal/channelz" @@ -36,6 +38,14 @@ type resolverStateUpdater interface { updateResolverState(s resolver.State, err error) error } +type ccrMode int + +const ( + ccrModeActive = iota + ccrModeIdleOrClosed + ccrModeExitingIdle +) + // ccResolverWrapper is a wrapper on top of cc for resolvers. // It implements resolver.ClientConn interface. type ccResolverWrapper struct { @@ -44,15 +54,23 @@ type ccResolverWrapper struct { cc resolverStateUpdater channelzID *channelz.Identifier ignoreServiceConfig bool + opts ccResolverWrapperOpts - // Outgoing (gRPC --> resolver) and incoming (resolver --> gRPC) calls are - // guaranteed to execute in a mutually exclusive manner as they are - // scheduled on the CallbackSerializer. Fields accessed *only* in serializer - // callbacks, can therefore be accessed without a mutex. - serializer *grpcsync.CallbackSerializer - serializerCancel context.CancelFunc - resolver resolver.Resolver - curState resolver.State + // All incoming (resolver --> gRPC) calls are guaranteed to execute in a + // mutually exclusive manner as they are scheduled on the serializer. + // Fields accessed *only* in these serializer callbacks, can therefore be + // accessed without a mutex. + curState resolver.State + + // mu guards access to the below fields. Access to the serializer and its + // cancel function needs to be mutex protected because they are overwritten + // when the wrapper exits idle mode. + mu sync.Mutex + serializer *grpcsync.CallbackSerializer // To serialize all incoming calls. + serializerCancel context.CancelFunc // To close the serializer at close/enterIdle time. + mode ccrMode // Tracks the current mode of the wrapper. + resolver resolver.Resolver // Accessed only from outgoing calls. + pendingResolve *resolver.ResolveNowOptions // Set when there is a pending call to ResolveNow(). } // ccResolverWrapperOpts wraps the arguments to be passed when creating a new @@ -72,38 +90,151 @@ func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) ( cc: cc, channelzID: opts.channelzID, ignoreServiceConfig: opts.bOpts.DisableServiceConfig, + opts: opts, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } + // Cannot hold the lock at build time because the resolver can send an + // update or error inline and these incoming calls grab the lock to schedule + // a callback in the serializer. r, err := opts.builder.Build(opts.target, ccr, opts.bOpts) if err != nil { cancel() return nil, err } + + // Any error reported by the resolver at build time that leads to a + // re-resolution request from the balancer is dropped by grpc until we + // return from this function. So, we don't have to handle pending resolveNow + // requests here. + ccr.mu.Lock() ccr.resolver = r + ccr.mu.Unlock() + return ccr, nil } func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { - ccr.serializer.Schedule(func(_ context.Context) { + ccr.mu.Lock() + defer ccr.mu.Unlock() + + switch ccr.mode { + case ccrModeActive: ccr.resolver.ResolveNow(o) - }) + case ccrModeIdleOrClosed: + // Do nothing. + case ccrModeExitingIdle: + // Set the `pendingResolve` field here so that the re-resolution request + // is handled properly after we exit idle mode. + ccr.pendingResolve = &o + } } func (ccr *ccResolverWrapper) close() { + channelz.Info(logger, ccr.channelzID, "ccResolverWrapper: closing") + ccr.handleCloseAndEnterIdle() +} + +func (ccr *ccResolverWrapper) enterIdleMode() { + channelz.Info(logger, ccr.channelzID, "ccResolverWrapper: entering idle mode") + ccr.handleCloseAndEnterIdle() +} + +// handleCloseAndEnterIdle is invoked with the channel is being closed or when +// it enters idle mode upon expiry of idle_timeout. +func (ccr *ccResolverWrapper) handleCloseAndEnterIdle() { + ccr.mu.Lock() + if ccr.mode != ccrModeActive { + ccr.mu.Unlock() + return + } + // Close the serializer to ensure that no more calls from the resolver are - // handled, before closing the resolver. + // handled, before actually closing the resolver. ccr.serializerCancel() - <-ccr.serializer.Done - ccr.resolver.Close() + ccr.mode = ccrModeIdleOrClosed + done := ccr.serializer.Done + r := ccr.resolver + ccr.mu.Unlock() + + // Give enqueued callbacks a chance to finish before closing the balancer. + <-done + + // Resolver close needs to be called outside the lock because these methods + // are generally blocking and don't return until they stop any pending + // goroutines and cleanup allocated resources. And since the main goroutine + // of the resolver might be reporting an error or state update at the same + // time as close, and the former needs to grab the lock to schedule a + // callback on the serializer, it will lead to a deadlock if we hold the + // lock here. + r.Close() +} + +// exitIdleMode is invoked by grpc when the channel exits idle mode either +// because of an RPC or because of an invocation of the Connect() API. This +// recreates the resolver that was closed previously when entering idle mode. +func (ccr *ccResolverWrapper) exitIdleMode() error { + channelz.Info(logger, ccr.channelzID, "ccResolverWrapper: exiting idle mode") + + ccr.mu.Lock() + ccr.mode = ccrModeExitingIdle + ctx, cancel := context.WithCancel(context.Background()) + ccr.serializer = grpcsync.NewCallbackSerializer(ctx) + ccr.serializerCancel = cancel + ccr.mu.Unlock() + + // See newCCResolverWrapper() to see why we cannot hold the lock here. + r, err := ccr.opts.builder.Build(ccr.opts.target, ccr, ccr.opts.bOpts) + if err != nil { + cancel() + return fmt.Errorf("failed to build resolver when exiting idle mode: %v", err) + } + + ccr.mu.Lock() + ccr.mode = ccrModeActive + ccr.resolver = r + if ccr.pendingResolve != nil { + ccr.resolver.ResolveNow(*ccr.pendingResolve) + ccr.pendingResolve = nil + } + ccr.mu.Unlock() + return nil +} + +// serializerScheduleLocked is a convenience method to schedule a function to be +// run on the serializer while holding ccr.mu. +func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) { + ccr.mu.Lock() + ccr.serializer.Schedule(f) + ccr.mu.Unlock() } // UpdateState is called by resolver implementations to report new state to gRPC // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { + // We cannot use serializerScheduleLocked() here because we need to ensure + // that these two operations execute atomically: + // - checking that the wrapper is not closed or idle, and + // - scheduling the function in the serializer + // + // If we do those steps in a non atomic way, i.e grab the lock, check the + // mode, release the lock, then use the serializerScheduleLocked() method, + // we could run into a race where the wrapper is active when we check the + // mode, but enters idle or is closed before we schedule the function on the + // serializer. This would lead to the scheduled function never executing, + // and since we block on the error value returned by that function, we would + // end up blocking forever. This requirement does not exist for other + // incoming calls because we don't have to return a value to the caller, and + // therefore it is fine if the scheduled function never executes (because + // the wrapper is closed or enters idle before it gets to run). + ccr.mu.Lock() + if ccr.mode == ccrModeIdleOrClosed { + ccr.mu.Unlock() + return nil + } errCh := make(chan error, 1) - ccr.serializer.Schedule(func(_ context.Context) { + ccr.serializer.Schedule(func(context.Context) { ccr.addChannelzTraceEvent(s) ccr.curState = s if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { @@ -112,22 +243,14 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { } errCh <- nil }) - - // If the resolver wrapper is closed when waiting for this state update to - // be handled, the callback serializer will be closed as well, and we can - // rely on its Done channel to ensure that we don't block here forever. - select { - case err := <-errCh: - return err - case <-ccr.serializer.Done: - return nil - } + ccr.mu.Unlock() + return <-errCh } // ReportError is called by resolver implementations to report errors // encountered during name resolution to gRPC. func (ccr *ccResolverWrapper) ReportError(err error) { - ccr.serializer.Schedule(func(_ context.Context) { + ccr.serializerScheduleLocked(func(_ context.Context) { channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) ccr.cc.updateResolverState(resolver.State{}, err) }) @@ -136,7 +259,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) { // NewAddress is called by the resolver implementation to send addresses to // gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { - ccr.serializer.Schedule(func(_ context.Context) { + ccr.serializerScheduleLocked(func(_ context.Context) { ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) ccr.curState.Addresses = addrs ccr.cc.updateResolverState(ccr.curState, nil) @@ -146,7 +269,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { // NewServiceConfig is called by the resolver implementation to send service // configs to gRPC. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { - ccr.serializer.Schedule(func(_ context.Context) { + ccr.serializerScheduleLocked(func(_ context.Context) { channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc) if ccr.ignoreServiceConfig { channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config") diff --git a/stream.go b/stream.go index 06ec22cd0a9d..73c4900e7efb 100644 --- a/stream.go +++ b/stream.go @@ -155,14 +155,23 @@ type ClientStream interface { // If none of the above happen, a goroutine and a context will be leaked, and grpc // will not call the optionally-configured stats handler with a stats.End message. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { + if err := cc.idlenessMgr.onCallBegin(); err != nil { + return nil, err + } + // allow interceptor to see all applicable call options, which means those // configured as defaults from dial option as well as per-call options opts = combine(cc.dopts.callOptions, opts) + var cs ClientStream + var err error if cc.dopts.streamInt != nil { - return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) + cs, err = cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) + } else { + cs, err = newClientStream(ctx, desc, cc, method, opts...) } - return newClientStream(ctx, desc, cc, method, opts...) + cc.idlenessMgr.onCallEnd() + return cs, err } // NewClientStream is a wrapper for ClientConn.NewStream. diff --git a/test/idleness_test.go b/test/idleness_test.go new file mode 100644 index 000000000000..e4160aa05da1 --- /dev/null +++ b/test/idleness_test.go @@ -0,0 +1,408 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +const defaultTestShortIdleTimeout = 500 * time.Millisecond + +// channelzTraceEventFound looks up the top-channels in channelz (expects a +// single one), and checks if there is a trace event on the channel matching the +// provided description string. +func channelzTraceEventFound(ctx context.Context, wantDesc string) error { + for ctx.Err() == nil { + tcs, _ := channelz.GetTopChannels(0, 0) + if l := len(tcs); l != 1 { + return fmt.Errorf("when looking for channelz trace event with description %q, found %d top-level channels, want 1", wantDesc, l) + } + if tcs[0].Trace == nil { + return fmt.Errorf("when looking for channelz trace event with description %q, no trace events found for top-level channel", wantDesc) + } + + for _, e := range tcs[0].Trace.Events { + if strings.Contains(e.Desc, wantDesc) { + return nil + } + } + } + return fmt.Errorf("when looking for channelz trace event with description %q, %w", wantDesc, ctx.Err()) +} + +// channelzTraceEventNotFound looks up the top-channels in channelz (expects a +// single one), and verifies that there is no trace event on the channel +// matching the provided description string. +func channelzTraceEventNotFound(ctx context.Context, wantDesc string) error { + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + + err := channelzTraceEventFound(sCtx, wantDesc) + if err == nil { + return fmt.Errorf("found channelz trace event with description %q, when expected not to", wantDesc) + } + if !errors.Is(err, context.DeadlineExceeded) { + return err + } + return nil +} + +// Tests the case where channel idleness is disabled by passing an idle_timeout +// of 0. Verifies that a READY channel with no RPCs does not move to IDLE. +func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with idle_timeout set to 0. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(0), // Disable idleness. + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend and push an address update via the resolver. + backend := stubserver.StartTestService(t, nil) + t.Cleanup(func() { backend.Stop() }) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for state := cc.GetState(); state != connectivity.Ready; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to READY") + } + } + + // Veirfy that the ClientConn stay in READY. + sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) + defer sCancel() + if cc.WaitForStateChange(sCtx, connectivity.Ready) { + t.Fatalf("Connectivity state changed to %q when expected to stay in READY", cc.GetState()) + } + + // Verify that there are no idleness related channelz events. + if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } + if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { + t.Fatal(err) + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. +func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with a short idle_timeout. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend and push an address update via the resolver. + backend := stubserver.StartTestService(t, nil) + t.Cleanup(func() { backend.Stop() }) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for state := cc.GetState(); state != connectivity.Ready; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to READY") + } + } + + // Veirfy that the ClientConn moves to IDLE as there is no activity. + for state := connectivity.Ready; state != connectivity.Idle; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to READY") + } + } + + // Verify idleness related channelz events. + if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY. +func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with a short idle_timeout. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend which keeps a unary RPC call active by blocking on a + // channel that is closed by the test later on. Also push an address update + // via the resolver. + blockCh := make(chan struct{}) + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + <-blockCh + return &testpb.Empty{}, nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Cleanup(func() { backend.Stop() }) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for state := cc.GetState(); state != connectivity.Ready; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to READY") + } + } + + // Spawn a goroutine which checks expected state transitions and idleness + // channelz trace events. It eventually closes `blockCh`, thereby unblocking + // the server RPC handler and the unary call below. + errCh := make(chan error, 1) + go func() { + // Veirfy that the ClientConn stay in READY. + sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) + defer sCancel() + if cc.WaitForStateChange(sCtx, connectivity.Ready) { + errCh <- fmt.Errorf("Connectivity state changed to %q when expected to stay in READY", cc.GetState()) + return + } + + // Verify that there are no idleness related channelz events. + if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { + errCh <- err + return + } + if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { + errCh <- err + return + } + + // Unblock the unary RPC on the server. + close(blockCh) + errCh <- nil + }() + + // Make a unary RPC that blocks on the server, thereby ensuring that the + // count of active RPCs on the client is non-zero. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Errorf("EmptyCall RPC failed: %v", err) + } + + select { + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + case <-ctx.Done(): + t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE") + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that activity on a READY channel (frequent and short +// RPCs) keeps it from moving to IDLE. +func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Create a ClientConn with a short idle_timeout. + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Start a test backend and push an address update via the resolver. + backend := stubserver.StartTestService(t, nil) + t.Cleanup(func() { backend.Stop() }) + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for state := cc.GetState(); state != connectivity.Ready; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to READY") + } + } + + // For a duration of three times the configured idle timeout, making RPCs + // every now and then and ensure that the channel does not move out of + // READY. + sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) + defer sCancel() + go func() { + for ; sCtx.Err() == nil; <-time.After(defaultTestShortTimeout) { + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); err != nil { + // While iterating through this for loop, at some point in time, + // the context deadline will expire. It is safe to ignore that + // error code. + if status.Code(err) != codes.DeadlineExceeded { + t.Errorf("EmptyCall RPC failed: %v", err) + return + } + } + } + }() + + // Veirfy that the ClientConn stay in READY. + if cc.WaitForStateChange(sCtx, connectivity.Ready) { + t.Fatalf("Connectivity state changed to %q when expected to stay in READY", cc.GetState()) + } + + // Verify that there are no idleness related channelz events. + if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } + if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { + t.Fatal(err) + } +} + +// Tests the case where channel idleness is enabled by passing a small value for +// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. Also +// verifies that a subsequent RPC on the IDLE channel kicks it out of IDLE. +func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) { + // Setup channelz for testing. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + + // Start a test backend and set the bootstrap state of the resolver to + // include this address. This will ensure that when the resolver is + // restarted when exiting idle, it will push the same address to grpc again. + r := manual.NewBuilderWithScheme("whatever") + backend := stubserver.StartTestService(t, nil) + t.Cleanup(func() { backend.Stop() }) + r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) + + // Create a ClientConn with a short idle_timeout. + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithIdleTimeout(defaultTestShortIdleTimeout), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Veirfy that the ClientConn moves to READY. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + for state := cc.GetState(); state != connectivity.Ready; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to READY") + } + } + + // Veirfy that the ClientConn moves to IDLE as there is no activity. + for state := connectivity.Ready; state != connectivity.Idle; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to IDLE") + } + } + + // Verify idleness related channelz events. + if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil { + t.Fatal(err) + } + + // Make an RPC and ensure that it succeeds and moves the channel back to + // READY. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall RPC failed: %v", err) + } + for state := cc.GetState(); state != connectivity.Ready; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatal("Timeout when waiting for channel to switch to READY") + } + } + if err := channelzTraceEventFound(ctx, "exiting idle mode"); err != nil { + t.Fatal(err) + } +}