Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: stop always closing connections when loopy returns #6110

Merged
merged 4 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 21 additions & 10 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"bytes"
"errors"
"fmt"
"net"
"runtime"
"strconv"
"sync"
Expand Down Expand Up @@ -486,12 +487,13 @@ type loopyWriter struct {
hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator
draining bool
conn net.Conn

// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}

func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
Expand All @@ -504,6 +506,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
}
return l
}
Expand All @@ -527,12 +530,16 @@ const minBatchSize = 1000
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func (l *loopyWriter) run() (err error) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
// Always flush the writer before exiting in case there are pending frames
// to be sent.
defer l.framer.writer.Flush()
defer func() {
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exiting with error: %v", err)
}
l.cbuf.finish()
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
l.closeConnection()
return err
}
if err = l.handle(it); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle() returns an error only when it encounters an unknown control message type. Is this an I/O error? Shouldn't we close the connection here? Same applies to the handle() call down below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I see that processData() today returns an error only when writing of data or headers fails. But how can we guarantee that in the future? Should we at least document that handle() and processData() should return errors only for I/O related events. And also document loopy.run() saying it will close the connection only when it sees a non-I/O error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle returns errors from the handlers themselves which has a lot of I/O error possibilities. We'll need to do that closing in handle itself. Or we could make the errors carry a type with a bool to indicate whether they are I/O errors, but that felt messier.

It seems there's two ways to do this. Commit 1 is wrapping in a lot of different places which feels finnicky, and commit 2 is wrapping in the writer which might not work if the http2 framer decides to start wrapping errors without supporting Unwrap (but that seems very unlikely and we could deal with it if it ever happens).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the ioError option as well, and the PR looks good to me.

Expand All @@ -546,6 +553,7 @@ func (l *loopyWriter) run() (err error) {
for {
it, err := l.cbuf.get(false)
if err != nil {
l.closeConnection()
return err
}
if it != nil {
Expand Down Expand Up @@ -757,6 +765,8 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.draining && len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
l.closeConnection()
return errors.New("finished processing active streams while in draining mode")
}
return nil
Expand Down Expand Up @@ -792,6 +802,8 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
l.closeConnection()
return errors.New("received GOAWAY with no active streams")
}
}
Expand All @@ -810,11 +822,9 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}

func (l *loopyWriter) closeConnectionHandler() error {
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.
return ErrConnClosing
func (l *loopyWriter) closeConnection() {
l.framer.writer.Flush()
l.conn.Close()
}

func (l *loopyWriter) handle(i interface{}) error {
Expand Down Expand Up @@ -846,7 +856,8 @@ func (l *loopyWriter) handle(i interface{}) error {
case *outFlowControlSizeRequest:
l.outFlowControlSizeRequestHandler(i)
case closeConnection:
return l.closeConnectionHandler()
l.closeConnection()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only current usage of the closeConnection type is from the server-side keepalive code (when the grace period expires). Why do we need this separate type? Why can't we instead simply close the transport and let is call Close() on the underlying connection. Is this intended to be way to close the connection after completing all pending tasks in the controlbuf, while closing the transport will immediately close the underlying connection without completing pending tasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to flush any pending writes that may have made it legal to close the transport at this time (vs waiting longer for streams to finish). This was added recently: #5821

return ErrConnClosing
default:
return fmt.Errorf("transport: unknown control message type %T", i)
}
Expand Down
11 changes: 2 additions & 9 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,15 +444,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.run()
close(t.writerDone)
}()
return t, nil
Expand Down
14 changes: 5 additions & 9 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)

go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
t.loopy.run()
close(t.writerDone)
}()
go t.keepalive()
Expand Down Expand Up @@ -1355,9 +1350,10 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
// Abruptly close the connection following the GoAway, but flush
// out what's inside the buffer first.
t.framer.writer.Flush()
t.conn.Close()
return false, retErr
}
return true, nil
Expand Down