Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: wait to close connection until incoming socket is drained (with timeout) #6977

Merged
merged 2 commits into from Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/transport/controlbuf.go
Expand Up @@ -535,16 +535,15 @@ const minBatchSize = 1000
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
// flushes the underlying connection. The connection is always left open to
// allow different closing behavior on the client and server.
func (l *loopyWriter) run() (err error) {
defer func() {
if l.logger.V(logLevel) {
l.logger.Infof("loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()
Expand Down
8 changes: 7 additions & 1 deletion internal/transport/http2_client.go
Expand Up @@ -451,7 +451,13 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.run()
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
// server sent a GOAWAY). For I/O errors, the reader will hit it
// after draining any remaining incoming data.
t.conn.Close()
}
close(t.writerDone)
}()
return t, nil
Expand Down
23 changes: 20 additions & 3 deletions internal/transport/http2_server.go
Expand Up @@ -322,8 +322,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy.run()
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
// Close the connection if a non-I/O error occurs (for I/O errors
// the reader will also encounter the error and close). Wait 1
// second before closing the connection, or when the reader is done
// (i.e. the client already closed the connection or a connection
// error occurred). This avoids the potential problem where there
// is unread data on the receive side of the connection, which, if
// closed, would lead to a TCP RST instead of FIN, and the client
// encountering errors. For more info:
// https://github.com/grpc/grpc-go/issues/5358
select {
case <-t.readerDone:
case <-time.After(time.Second):
Copy link

@mtekeli mtekeli Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that no one can stop this scheduled timer when the reader is done first (which would probably happen most of the times). I would suggest to change it to a timer object so it can be stopped manually (and therefore released) when the reader is done first.

In a scenario where this happens with many clients (or the same client) repeatedly wouldn't you just schedule many timers that cannot be stopped until they fire (i.e a temporary memory leak)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True that it isn't stopped, but it is fixed for 1 second, so I don't believe this could be a real problem for anyone.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even when many connections having non IO errors? Do you think it's unlikely or even not possible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the key here is that it's a non-I/O error, which means it would be initiated by the server itself. If the server is terminating already-handshaked connections at a rate of >1 per second, then that seems like a real problem.

That said, I think it's OK to just forbid the use of time.After in our repo (outside of tests) since it's always a potential concern. I'll add a vet.sh check for this and change it on master.

}
t.conn.Close()
}
}()
go t.keepalive()
return t, nil
Expand Down Expand Up @@ -609,8 +625,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
defer func() {
<-t.loopyWriterDone
close(t.readerDone)
<-t.loopyWriterDone
}()
for {
t.controlBuf.throttle()
Expand Down Expand Up @@ -1325,6 +1341,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
t.framer.writer.Flush()
if retErr != nil {
return false, retErr
}
Expand All @@ -1345,7 +1362,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
go func() {
timer := time.NewTimer(time.Minute)
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-t.drainEvent.Done():
Expand Down