Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: eliminate panics in server worker implementation #6856

Merged
merged 5 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 13 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData

serverWorkerChannel chan func()
serverWorkerChannel chan func()
serverWorkerChannelClose func()
}

type serverOptions struct {
Expand Down Expand Up @@ -623,15 +624,14 @@ func (s *Server) serverWorker() {
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan func())
s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
close(s.serverWorkerChannel)
})
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
}

func (s *Server) stopServerWorkers() {
close(s.serverWorkerChannel)
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
Expand Down Expand Up @@ -1898,15 +1898,19 @@ func (s *Server) stop(graceful bool) {
s.closeServerTransportsLocked()
}

if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
}

for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil

if s.opts.numServerWorkers > 0 {
// Closing the channel (only once, via grpcsync.OnceFunc) after all the
// connections have been closed above ensures that there are no
// goroutines executing the callback passed to st.HandleStreams (where
// the channel is written to).
s.serverWorkerChannelClose()
}

if s.events != nil {
s.events.Finish()
s.events = nil
Expand Down
64 changes: 64 additions & 0 deletions test/server_test.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These probably truly belong in /server_ext_test.go instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the concept of these _ext_test.go files?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point is that they're not in the grpc package, but they're adjacent to the code they're testing in the grpc package.

The test package is too big. It has many tests of many sub-components, and we should be looking to move tests closer to the code they're testing. Also growing this one package means it doesn't scale, since the tests within it can't run in parallel.

Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,81 @@ package test
import (
"context"
"io"
"runtime"
"sync"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/status"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)

// Tests the case where the server worker goroutine option is enabled, and a
// number of RPCs are initiated around the same time that Stop() is called. This
// used to result in a write to a closed channel. This test verifies that there
// is no panic.
func (s) TestServerWorkers_RPCsAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
// This deferred stop takes care of stopping the server when one of the
// below grpc.Dials fail, and the test exits early.
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
const numChannels = 20
const numRPCLoops = 20
var wg sync.WaitGroup
for i := 0; i < numChannels; i++ {
cc, err := grpc.Dial(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("[iteration: %d] grpc.Dial(%s) failed: %v", i, ss.Address, err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
for j := 0; j < numRPCLoops; j++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Logf("EmptyCall() failed: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect some to fail with unavailable, right? So maybe silently ignore those and Error for other errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, what I see is that once the server listener is closed, the client channel moves to IDLE --> CONNECTING --> TRANSIENT_FAILURE. So, the RPCs fail with deadline exceeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, some fail with UNAVAILABLE as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems wrong, since they're not wait-for-ready RPCs. They should all fail immediately with UNAVAILABLE if there is no working connection to a server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense now. Switched to test deadline for the RPCs, which ensures that there is enough time for the the LB policy to kick the subConn into connecting (after it goes IDLE on transport closure) and then into TF.

return
}
}
}()
}
}
// Call Stop() concurrently with the above RPC attempts.
ss.Stop()
wg.Wait()
}

// Tests the case where the server worker goroutine option is enabled, and both
// Stop() and GracefulStop() care called. This used to result in a close of a
// closed channel. This test verifies that there is no panic.
func (s) TestServerGracefulStopAndStop(t *testing.T) {
ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
defer ss.Stop()

if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
t.Fatalf("Failed to create client to stub server: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(ss.CC)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

ss.S.GracefulStop()
}

type ctxKey string

// TestServerReturningContextError verifies that if a context error is returned
Expand Down