Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: drain client transport when streamID approaches maxStreamID #5889

Merged
merged 20 commits into from Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/transport/defaults.go
Expand Up @@ -47,3 +47,8 @@ const (
defaultClientMaxHeaderListSize = uint32(16 << 20)
defaultServerMaxHeaderListSize = uint32(16 << 20)
)

// MaxStreamID is the upper bound for the stream ID before the current
// transport gracefully closes and new transport is created for subsequent RPCs.
// This is set to 75% of math.MaxUint32. It's exported so that tests can override it.
var MaxStreamID = uint32(3_221_225_472)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamID is a 31-bit unsigned integer. The max value a streamID could be before overflow is 2^31-1 = 2,147,483,647.

I'm not sure this will actually prevent this issue from happening, as the overflow would occur before 3_221_225_472. You can read more about this in RFC-7540: https://www.rfc-editor.org/rfc/rfc7540#section-5.1.1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec:

   Streams are identified with an unsigned 31-bit integer.  Streams
   initiated by a client MUST use odd-numbered stream identifiers; those
   initiated by the server MUST use even-numbered stream identifiers.  A
   stream identifier of zero (0x0) is used for connection control
   messages; the stream identifier of zero cannot be used to establish a
   new stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @patrick-ogrady! Great catch. Im going to fix that

18 changes: 18 additions & 0 deletions internal/transport/http2_client.go
Expand Up @@ -768,6 +768,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
}
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it interface{}) bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
Expand All @@ -783,6 +784,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
h := it.(*headerFrame)
h.streamID = t.nextID
t.nextID += 2

// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID

s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
Expand Down Expand Up @@ -862,6 +868,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
sh.HandleRPC(s.ctx, outHeader)
}
}
if transportDrainRequired {
if logger.V(logLevel) {
logger.Infof("transport: t.nextID > MaxStreamID. Draining")
}
t.GracefulClose()
}
return s, nil
}

Expand Down Expand Up @@ -1783,3 +1795,9 @@ func (t *http2Client) getOutFlowWindow() int64 {
return -2
}
}

func (t *http2Client) stateForTesting() transportState {
t.mu.Lock()
defer t.mu.Unlock()
return t.state
}
46 changes: 46 additions & 0 deletions internal/transport/transport_test.go
Expand Up @@ -536,6 +536,52 @@ func (s) TestInflightStreamClosing(t *testing.T) {
}
}

// Tests that when streamID > MaxStreamId, the current client transport drains.
func (s) TestClientTransportDrainsAfterStreamIDExhausted(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
defer server.stop()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Small",
}

originalMaxStreamID := MaxStreamID
MaxStreamID = 3
defer func() {
MaxStreamID = originalMaxStreamID
}()

ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()

s, err := ct.NewStream(ctx, callHdr)
if err != nil {
t.Fatalf("ct.NewStream() = %v", err)
}
if s.id != 1 {
t.Fatalf("Stream id: %d, want: 1", s.id)
}

if got, want := ct.stateForTesting(), reachable; got != want {
t.Fatalf("Client transport state %v, want %v", got, want)
}

// The expected stream ID here is 3 since stream IDs are incremented by 2.
s, err = ct.NewStream(ctx, callHdr)
easwars marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("ct.NewStream() = %v", err)
}
if s.id != 3 {
t.Fatalf("Stream id: %d, want: 3", s.id)
}

// Verifying that ct.state is draining when next stream ID > MaxStreamId.
if got, want := ct.stateForTesting(), draining; got != want {
t.Fatalf("Client transport state %v, want %v", got, want)
}
}

func (s) TestClientSendAndReceive(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
Expand Down
198 changes: 198 additions & 0 deletions test/transport_end2end_test.go
@@ -0,0 +1,198 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Just transport_test.go; it's cleaner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* Copyright 2022 gRPC authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023 now!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. In my head, i'm still stuck in 2020

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test

import (
"context"
"io"
"net"
"sync"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)

// authInfoWithConn wraps the underlying net.Conn, and makes it available
// to the test as part of the Peer call option.
type authInfoWithConn struct {
credentials.CommonAuthInfo
conn net.Conn
}

func (ai *authInfoWithConn) AuthType() string {
return ""
}

// connWrapperWithCloseCh wraps a net.Conn and pushes on a channel when closed.
type connWrapperWithCloseCh struct {
net.Conn
closeCh chan interface{}
}

// Close closes the connection and sends a value on the close channel.
func (cw *connWrapperWithCloseCh) Close() error {
err := cw.Conn.Close()
for {
select {
case cw.closeCh <- nil:
return err
case <-cw.closeCh:
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't you just do close(cw.closeCh)?

Close shouldn't be called multiple times -- is it? If so, use a grpcsync.Event for simplicity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. updated

}

// These custom creds are used for storing the connections made by the client.
// The closeCh in conn can be used to detect when conn is closed.
type transportRestartCheckCreds struct {
mu sync.Mutex
connections []connWrapperWithCloseCh
}

func (c *transportRestartCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
return rawConn, nil, nil
}
func (c *transportRestartCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
conn := &connWrapperWithCloseCh{Conn: rawConn, closeCh: make(chan interface{}, 1)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the channel means you don't need a buffer on it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

c.connections = append(c.connections, *conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make connections a []*connWrapperWithCloseCh to avoid copies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt know that before. Updating.

commonAuthInfo := credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}
authInfo := &authInfoWithConn{commonAuthInfo, conn}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO you should still name your fields even though it's an unexported type: {CommonAuthInfo: commonAuthInfo, conn: conn}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Updating

return conn, authInfo, nil
}
func (c *transportRestartCheckCreds) Info() credentials.ProtocolInfo {
return credentials.ProtocolInfo{}
}
func (c *transportRestartCheckCreds) Clone() credentials.TransportCredentials {
return &transportRestartCheckCreds{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return c?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}
func (c *transportRestartCheckCreds) OverrideServerName(s string) error {
return nil
}

// Tests that the client transport drains and restarts when next stream ID exceeds
// MaxStreamID. This test also verifies that subsequent RPCs use a new client
// transport and the old transport is closed.
func (s) TestClientTransportRestartsAfterStreamIDExhausted(t *testing.T) {
// Set the transport's MaxStreamID to 5 to cause connection to drain after 2 RPCs.
originalMaxStreamID := transport.MaxStreamID
transport.MaxStreamID = 5
defer func() {
transport.MaxStreamID = originalMaxStreamID
}()

ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
for i := 0; i < 2; i++ {
if _, err := stream.Recv(); err != nil {
return status.Errorf(codes.Internal, "unexpected error receiving: %v", err)
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil {
return status.Errorf(codes.Internal, "unexpected error sending: %v", err)
}
}
if recv, err := stream.Recv(); err != io.EOF {
return status.Errorf(codes.Internal, "Recv = %v, %v; want _, io.EOF", recv, err)
}
return nil
},
}

creds := &transportRestartCheckCreds{}
if err := ss.Start(nil, grpc.WithTransportCredentials(creds)); err != nil {
t.Fatalf("Starting stubServer: %v", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

var streams []testpb.TestService_FullDuplexCallClient

// expectedNumConns when each stream is created.
expectedNumConns := []int{1, 1, 2}

// Set up 3 streams and call sendAndReceive() once on each.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no more sendAndReceive()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol yea.. sad old function. This PR has had this function created and deleted multiple times. Sucks to be sendAndReceive()

for i := 0; i < 3; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional:

const numStreams = 3
expectedNumConns := [numStreams]int{1, 1, 2}
for i := 0; i < numStreams; i++ {
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

s, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("Creating FullDuplex stream: %v", err)
}

streams = append(streams, s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the connection guaranteed to be established even without this?

I think so, as gRPC won't return from NewStream unless there's at least a connection for the RPC. But maybe not, because it could be that we see the first connection as okay and return a stream that uses it

So maybe we don't need this? Or if we do need it, please add a comment explaining why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heavily offline-d this with Doug. Turns out there was a bug in the way this implemented restarted the stream. The test was also checking the true end2end behavior incorrectly.

Like in this example, 3 streams are created (s1,s2,s3) while MaxStreamId is set to 5. Which means there should not be any transport restarts.

However, it was perceived that s3 was created on a new connection. this is because, since we are creating 3 streams on the same connection, the 3rd stream makes transport call graceClose. Now when a worker picks up an item from the controlbuf (which is a headerFrame for the stream), it would see that the stream is already in draining (checked in headerFrame.initStream). This is a race, and the test would have been flakey.

This bug would not have immediate regression due to the 3B buffer in streamIDs, but thanks @dfawley for calling this out!

if err := s.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("Sending on stream %d: %v", i, err)
}
if _, err := s.Recv(); err != nil {
t.Fatalf("Receiving on stream %d: %v", i, err)
}

// Verify expected num of conns.
if len(creds.connections) != expectedNumConns[i] {
t.Fatalf("Number of connections created: %v, want: %v", len(creds.connections), expectedNumConns[i])
}
}

// Verify all streams still work.
for i, stream := range streams {
if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("Sending on stream %d: %v", i, err)
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("Receiving on stream %d: %v", i, err)
}
}

var connPerStream []net.Conn

// The peer passed via the call option is set up only after the RPC is complete.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. nixing it

// Conn used by the stream is available in authInfo.
for i, stream := range streams {
if err := stream.CloseSend(); err != nil {
t.Fatalf("CloseSend() on stream %d: %v", i, err)
}
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("Getting peer from stream context for stream %d", i)
}
connPerStream = append(connPerStream, p.AuthInfo.(*authInfoWithConn).conn)
}

// Verifying the first and second RPCs were made on the same connection.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not even sure we need any of this anymore, since you're verifying the number of connections after each stream, above. Please consider and simplify as much as possible (without compromising the validity/coverage of the test).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are correct. This doesnt seem to be important anymore. I can also get rid of the AuthInfo stuff along with this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Verify and below 2x.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. and noted.

if connPerStream[0] != connPerStream[1] {
t.Fatal("Got streams using different connections; want same.")
}
// Verifying the third and first/second RPCs were made on different connections.
if connPerStream[2] == connPerStream[0] {
t.Fatal("Got streams using same connections; want different.")
}

// Verifying first connection was closed.
select {
case <-creds.connections[0].closeCh:
case <-ctx.Done():
t.Fatal("Timeout expired when waiting for first client transport to close")
}
}