Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: drain client transport when streamID approaches maxStreamID #5889

Merged
merged 20 commits into from Jan 11, 2023

Conversation

arvindbr8
Copy link
Member

@arvindbr8 arvindbr8 commented Dec 22, 2022

From #5358 (comment), we noticed that users may be working around the max stream ID issue by using max connection age, which is not really an intended use case of that feature. This change sets MaxStreamID to 75% of math.uint32 and drain client transport when next stream ID > MaxStreamID.

Fixes #5600

RELEASE NOTES:

  • transport: drain client transport when streamID approaches MaxStreamID

@arvindbr8 arvindbr8 added the Type: Behavior Change Behavior changes not categorized as bugs label Dec 22, 2022
@arvindbr8 arvindbr8 added this to the 1.53 Release milestone Dec 22, 2022
@arvindbr8 arvindbr8 changed the title transport: drain client transport when streamId approaches maxStreamId transport: drain client transport when streamID approaches maxStreamID Dec 22, 2022
@arvindbr8 arvindbr8 assigned easwars, dfawley and arvindbr8 and unassigned easwars and dfawley Dec 22, 2022
// MaxStreamIDForTesting is the upper bound for the stream ID before the current
// transport gracefully closes and a new transport is created for subsequent RPCs.
// This is set to 75% of math.MaxUint32. It's exported so that tests can override it.
var MaxStreamIDForTesting = uint32(float32(math.MaxUint32) * 0.75)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: functions that do stuff for testing are ForTesting. This can be just MaxStreamID, and some tests happen to override it, which seems fine to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg! i'm updating this

Comment on lines 829 to 830
// 75% of math.MaxUint32, which then signals gRPC to restart transport
// for subsequent RPCs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "which signals gRPC that the connection is closed and a new one must be created for subsequent RPCs"? "Restart transport" doesn't really imply the right things, and also this doesn't happen automatically (the LB policy would need to request it).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 831 to 833
if t.nextID > MaxStreamIDForTesting {
transportDrainRequired = true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shorten: transportDrainRequired = t.nextID > MaxStreamIDForTesting

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -862,6 +869,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
sh.HandleRPC(s.ctx, outHeader)
}
}
if transportDrainRequired {
if logger.V(logLevel) {
logger.Infof("t.nextID > MaxStreamID. transport: draining")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"transport" is supposed to be the prefix of the whole log message. How about transport: t.nextID > MaxStreamID. Draining?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! that makes sense

@@ -536,6 +536,40 @@ func (s) TestInflightStreamClosing(t *testing.T) {
}
}

// TestClientTransportDrainsAfterStreamIdExhausted tests that when
// streamID > MaxStreamId, the current client transport drains.
func (s) TestClientTransportDrainsAfterStreamIdExhausted(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: in Go style we write "ID" not "Id".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

transport.MaxStreamIDForTesting = originalMaxStreamID
}()

// setting up StubServer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar nits.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing...

return nil
},
}
testpb.RegisterTestServiceServer(s, ss)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate you have to have a ton of the boilerplate that StubServer is intended to help with.

Two options:

  1. use WithContextDialer on the client to detect when we're (re)connecting.
  2. Add some stuff to StubServer to facilitate wrapping the listener.

This will save you manually creating the server, registering the service, dialing the client, wrapping the client with the grpc stub, and dealing with extra shutdown tasks. The resulting test code will be the more interesting code which will make it easier to read and understand what it's doing that's unique.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also bouncing around different things to get ListenerWrapper to work. I'm doing ahead with option 2 to facilitate StubServer with ListenerWrapper.. let me know what you think of the approach @dfawley

Comment on lines 7081 to 6999
if _, err = stream.Recv(); err != nil && err != io.EOF {
t.Fatalf("stream.Recv() = _, %v want: nil or EOF", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we know what we should receive? Shouldn't it always be nil since the server sends 10 messages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, should never error

ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
for i := 0; i < 10; i++ {
m := &grpc.PreparedMsg{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a PreparedMsg? That seems unnecessarily complex. Just do stream.SendMsg(testpb.WhateverThisIs{}).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. ive updating this in the rev

t.Fatal("timeout expired when waiting to create new conn channel")
}

// verifying the connection to the old one is drained and closed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly you haven't fully received off the streams that were active. So this only works because the flow control is enough that the server finished the stream.

I'd recommend avoiding "coincidences" like this (even if they're literally never going to not be true).

Instead you should make the client & server behavior align.

It also could be interesting to test that the stream is still active even after the new connection is created, which is how "draining" behavior should work. E.g.

  1. Create a stream that ping-pongs back and forth.
  2. Use it and make sure it works.
  3. Create a stream on the next connection that does the same. (Verify that the new connection was used for it somehow.)
  4. Use it and make sure it works.
  5. Verify that the stream from (1) still works.
  6. Close streams from (1) and (3). Client-side context cancelation should be easy and fine here.
  7. Expect the original connection to close.

One way to validate which connection the stream is using: you could use a custom credentials (its AuthInfo ends up in the Peer) that counts connections and increments some number for each one. Creds could also be used to determine when connections are closed, since they return a wrapped net.Conn back to us that is what we really close.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the detailed comment. I've updated the test to align to your comment. lemme know what you think.

Comment on lines 831 to 833
if t.nextID > MaxStreamIDForTesting {
transportDrainRequired = true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh whoops. You have a race. This bool needs to be set inside checkForStreamQuota then read here (or below as is and ignore here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im made the change now

// MaxStreamIDForTesting is the upper bound for the stream ID before the current
// transport gracefully closes and a new transport is created for subsequent RPCs.
// This is set to 75% of math.MaxUint32. It's exported so that tests can override it.
var MaxStreamIDForTesting = uint32(float32(math.MaxUint32) * 0.75)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a nit: I'd simplify this to something like uint32(3*1024*1024*1024) or 3_221_225_472 or 0xc0000000. (Basically, extra casts and floats (in general) should be avoided.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i missed this one.. updating this now

@arvindbr8
Copy link
Member Author

arvindbr8 commented Dec 28, 2022

I just merged 3 of my commits by

git reset --soft HEAD^1 x3
and
git commit && git push

but now it shows up as forced push on this PR 😢 .. im going to refrain for any commit amends in the future. Luckily i see all the past comments here.

@arvindbr8 arvindbr8 assigned easwars and dfawley and unassigned arvindbr8 Dec 28, 2022
internal/transport/transport_test.go Outdated Show resolved Hide resolved
internal/transport/transport_test.go Outdated Show resolved Hide resolved
internal/transport/http2_client.go Outdated Show resolved Hide resolved
internal/transport/transport_test.go Show resolved Hide resolved
internal/transport/http2_client.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved
Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good. Just some minor comments this time around.

// connWrapperWithCloseCh wraps a net.Conn and pushes on a channel when closed.
type connWrapperWithCloseCh struct {
net.Conn
CloseCh *testutils.Channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one needs to unexported as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm.. isn't this unexported right now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseCh field is still exported, which is immaterial given that the struct is unexported now. But it is better to not have this anomaly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. thanks. updating this

test/end2end_test.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved

// Setting up 3 streams and calling Send() and Recv() once on each.
for i := 0; i < 3; i++ {
var p peer.Peer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: Since p is always used as a pointer, you could instead do p := new(peer.Peer) here and replace &p with p down below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. updating this

test/end2end_test.go Outdated Show resolved Hide resolved
test/end2end_test.go Outdated Show resolved Hide resolved
@arvindbr8 arvindbr8 requested a review from easwars January 3, 2023 16:27
@arvindbr8 arvindbr8 assigned easwars and unassigned arvindbr8 Jan 3, 2023
@arvindbr8 arvindbr8 assigned dfawley and unassigned arvindbr8 and dfawley Jan 10, 2023
@@ -0,0 +1,198 @@
/*
*
* Copyright 2022 gRPC authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023 now!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. In my head, i'm still stuck in 2020

@@ -0,0 +1,198 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Just transport_test.go; it's cleaner.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 57 to 63
for {
select {
case cw.closeCh <- nil:
return err
case <-cw.closeCh:
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't you just do close(cw.closeCh)?

Close shouldn't be called multiple times -- is it? If so, use a grpcsync.Event for simplicity?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. updated

func (c *transportRestartCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
conn := &connWrapperWithCloseCh{Conn: rawConn, closeCh: make(chan interface{}, 1)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the channel means you don't need a buffer on it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

c.mu.Lock()
defer c.mu.Unlock()
conn := &connWrapperWithCloseCh{Conn: rawConn, closeCh: make(chan interface{}, 1)}
c.connections = append(c.connections, *conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make connections a []*connWrapperWithCloseCh to avoid copies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt know that before. Updating.

t.Fatalf("Creating FullDuplex stream: %v", err)
}

streams = append(streams, s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the connection guaranteed to be established even without this?

I think so, as gRPC won't return from NewStream unless there's at least a connection for the RPC. But maybe not, because it could be that we see the first connection as okay and return a stream that uses it

So maybe we don't need this? Or if we do need it, please add a comment explaining why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heavily offline-d this with Doug. Turns out there was a bug in the way this implemented restarted the stream. The test was also checking the true end2end behavior incorrectly.

Like in this example, 3 streams are created (s1,s2,s3) while MaxStreamId is set to 5. Which means there should not be any transport restarts.

However, it was perceived that s3 was created on a new connection. this is because, since we are creating 3 streams on the same connection, the 3rd stream makes transport call graceClose. Now when a worker picks up an item from the controlbuf (which is a headerFrame for the stream), it would see that the stream is already in draining (checked in headerFrame.initStream). This is a race, and the test would have been flakey.

This bug would not have immediate regression due to the 3B buffer in streamIDs, but thanks @dfawley for calling this out!

// expectedNumConns when each stream is created.
expectedNumConns := []int{1, 1, 2}

// Set up 3 streams and call sendAndReceive() once on each.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no more sendAndReceive()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol yea.. sad old function. This PR has had this function created and deleted multiple times. Sucks to be sendAndReceive()


var connPerStream []net.Conn

// The peer passed via the call option is set up only after the RPC is complete.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. nixing it

connPerStream = append(connPerStream, p.AuthInfo.(*authInfoWithConn).conn)
}

// Verifying the first and second RPCs were made on the same connection.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not even sure we need any of this anymore, since you're verifying the number of connections after each stream, above. Please consider and simplify as much as possible (without compromising the validity/coverage of the test).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are correct. This doesnt seem to be important anymore. I can also get rid of the AuthInfo stuff along with this.

connPerStream = append(connPerStream, p.AuthInfo.(*authInfoWithConn).conn)
}

// Verifying the first and second RPCs were made on the same connection.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Verify and below 2x.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. and noted.

@dfawley dfawley assigned arvindbr8 and unassigned dfawley Jan 10, 2023
@arvindbr8 arvindbr8 removed their assignment Jan 10, 2023
Copy link
Member

@dfawley dfawley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM modulo nits/cleanups!

func (l *loopyWriter) originateStream(str *outStream) error {
hdr := str.itl.dequeue().(*headerFrame)
// originateStreamWithHeaderFrame calls the initStream function on the headerFrame and
// called writeHeader. If write succeeds the streamID is added to l.estdStreams
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

called->calls?

And end the last sentence with a period, please.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I nixed the comment itself. Seems like i really just wrote exactly what the below 10 lines do

// called writeHeader. If write succeeds the streamID is added to l.estdStreams
func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
// l.draining is set for an incomingGoAway. In which case, we want to avoid further
// writes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

Also... we want to avoid creating new streams instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// l.draining is set for an incomingGoAway. In which case, we want to avoid further
// writes
if l.draining {
hdr.onOrphaned(errStreamDrain)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: provide a better error with the reason we are in draining. e.g.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -742,15 +742,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
endStream: false,
initStream: func(id uint32) error {
t.mu.Lock()
if state := t.state; state != reachable {
// we want initStream to cleanup and return an error when transport is closing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: capitalize "We".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nixing this comment as well. Seems like it doesnt add value when i read it now

if state := t.state; state != reachable {
// we want initStream to cleanup and return an error when transport is closing.
// initStream is never called when transport is draining.
if t.state == closing {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: handle transport closure in loopy instead and remove this. e.g.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// connWrapperWithCloseCh wraps a net.Conn and pushes on a channel when closed.
type connWrapperWithCloseCh struct {
net.Conn
closeCh chan interface{}
close *grpcsync.Event
}

// Close closes the connection and sends a value on the close channel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and fires the close event. now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 45 to 47
err := cw.Conn.Close()
for {
select {
case cw.closeCh <- nil:
return err
case <-cw.closeCh:
}
}
cw.close.Fire()
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/optional:

cw.close.Fire()
return cw.Conn.Close()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@dfawley dfawley assigned arvindbr8 and unassigned dfawley Jan 10, 2023
@arvindbr8 arvindbr8 removed their assignment Jan 10, 2023
@easwars easwars assigned arvindbr8 and unassigned easwars Jan 11, 2023
@arvindbr8 arvindbr8 merged commit 6de8f50 into grpc:master Jan 11, 2023
1 check passed
@arvindbr8 arvindbr8 deleted the transportRestart branch January 11, 2023 20:58
@hexfusion
Copy link
Contributor

thanks @dfawley @arvindbr8

// MaxStreamID is the upper bound for the stream ID before the current
// transport gracefully closes and new transport is created for subsequent RPCs.
// This is set to 75% of math.MaxUint32. It's exported so that tests can override it.
var MaxStreamID = uint32(3_221_225_472)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamID is a 31-bit unsigned integer. The max value a streamID could be before overflow is 2^31-1 = 2,147,483,647.

I'm not sure this will actually prevent this issue from happening, as the overflow would occur before 3_221_225_472. You can read more about this in RFC-7540: https://www.rfc-editor.org/rfc/rfc7540#section-5.1.1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec:

   Streams are identified with an unsigned 31-bit integer.  Streams
   initiated by a client MUST use odd-numbered stream identifiers; those
   initiated by the server MUST use even-numbered stream identifiers.  A
   stream identifier of zero (0x0) is used for connection control
   messages; the stream identifier of zero cannot be used to establish a
   new stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @patrick-ogrady! Great catch. Im going to fix that

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Type: Behavior Change Behavior changes not categorized as bugs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Drain client transport when stream ID approaches the max stream ID
5 participants