Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: add a draining state check before creating streams #6142

Merged
merged 3 commits into from Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/transport/http2_client.go
Expand Up @@ -782,7 +782,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.activeStreams == nil { // Can be niled from Close().
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
Expand Down
28 changes: 17 additions & 11 deletions internal/transport/transport_test.go
Expand Up @@ -802,6 +802,9 @@ func (s) TestLargeMessageWithDelayRead(t *testing.T) {
}
}

// TestGracefulClose ensures that GracefulClose allows in-flight streams to
// proceed until they complete naturally, while not allowing creation of new
// streams during this window.
func (s) TestGracefulClose(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong)
defer cancel()
Expand All @@ -817,6 +820,9 @@ func (s) TestGracefulClose(t *testing.T) {
}()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
defer cancel()

// Create a stream that will exist for this whole test and confirm basic
// functionality.
s, err := ct.NewStream(ctx, &CallHdr{})
if err != nil {
t.Fatalf("NewStream(_, _) = _, %v, want _, <nil>", err)
Expand All @@ -837,31 +843,31 @@ func (s) TestGracefulClose(t *testing.T) {
if _, err := s.Read(recvMsg); err != nil {
t.Fatalf("Error while reading: %v", err)
}

// Gracefully close the transport, which should not affect the existing
// stream.
ct.GracefulClose()

var wg sync.WaitGroup
// Expect the failure for all the follow-up streams because ct has been closed gracefully.
// Expect errors creating new streams because the client transport has been
// gracefully closed.
for i := 0; i < 200; i++ {
wg.Add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can you switch the docstring for this code block from "Expect the failure for all the follow-up streams because ct has been closed gracefully." to something like "expect errors trying to create new streams after the client transport has been Gracefully Closed (and is in draining)".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

go func() {
defer wg.Done()
str, err := ct.NewStream(ctx, &CallHdr{})
if err != nil && err.(*NewStreamError).Err == ErrConnClosing {
_, err := ct.NewStream(ctx, &CallHdr{})
if err != nil && err.(*NewStreamError).Err == ErrConnClosing && err.(*NewStreamError).AllowTransparentRetry {
return
} else if err != nil {
t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing)
return
}
ct.Write(str, nil, nil, &Options{Last: true})
if _, err := str.Read(make([]byte, 8)); err != errStreamDrain && err != ErrConnClosing {
t.Errorf("_.Read(_) = _, %v, want _, %v or %v", err, errStreamDrain, ErrConnClosing)
}
t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing)
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can you add a top level docstring to this test explaining what this is actually testing (I know it wasn't there previously? I see ct.GracefulClose(), new streams not being able to be created, and a write and a read (expecting io.EOF) from the already created stream. I'm assuming move "// The stream which was created before graceful close can still proceed." from before the wg.Wait (which is waiting on the failed streams for loop) to before the ct.Write. Please add top level docstring that explains this test is gracefully closing a client transport and expecting these two downstream effects (new stream creation fails, and also previously created streams still continue to be able to be operated on and are still functional (all the time or in certain conditions/timebound etc.?)).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// Confirm the existing stream still functions as expected.
ct.Write(s, nil, nil, &Options{Last: true})
if _, err := s.Read(incomingHeader); err != io.EOF {
t.Fatalf("Client expected EOF from the server. Got: %v", err)
}
// The stream which was created before graceful close can still proceed.
wg.Wait()
}

Expand Down