Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: stop always closing connections when loopy returns #6110

Merged
merged 4 commits into from Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 30 additions & 18 deletions internal/transport/controlbuf.go
Expand Up @@ -22,6 +22,7 @@ import (
"bytes"
"errors"
"fmt"
"net"
"runtime"
"strconv"
"sync"
Expand Down Expand Up @@ -486,12 +487,13 @@ type loopyWriter struct {
hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator
draining bool
conn net.Conn

// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}

func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
Expand All @@ -504,6 +506,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
}
return l
}
Expand All @@ -521,15 +524,27 @@ const minBatchSize = 1000
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
// This results in writing of HTTP2 frames into an underlying write buffer.
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
// frame, loopy calls processData, which processes one node from the
// activeStreams linked-list. This results in writing of HTTP2 frames into an
// underlying write buffer. When there's no more control frames to read from
// controlBuf, loopy flushes the write buffer. As an optimization, to increase
// the batch size for each flush, loopy yields the processor, once if the batch
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
// Always flush the writer before exiting in case there are pending frames
// to be sent.
defer l.framer.writer.Flush()
defer func() {
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()
for {
it, err := l.cbuf.get(true)
if err != nil {
Expand Down Expand Up @@ -757,6 +772,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.draining && len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("finished processing active streams while in draining mode")
}
return nil
Expand Down Expand Up @@ -792,6 +808,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
// Flush and close the connection; we are done with it.
return errors.New("received GOAWAY with no active streams")
}
}
Expand All @@ -810,13 +827,6 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}

func (l *loopyWriter) closeConnectionHandler() error {
// Exit loopyWriter entirely by returning an error here. This will lead to
// the transport closing the connection, and, ultimately, transport
// closure.
return ErrConnClosing
}

func (l *loopyWriter) handle(i interface{}) error {
switch i := i.(type) {
case *incomingWindowUpdate:
Expand Down Expand Up @@ -846,7 +856,9 @@ func (l *loopyWriter) handle(i interface{}) error {
case *outFlowControlSizeRequest:
l.outFlowControlSizeRequestHandler(i)
case closeConnection:
return l.closeConnectionHandler()
// Just return a non-I/O error and run() will flush and close the
// connection.
return ErrConnClosing
default:
return fmt.Errorf("transport: unknown control message type %T", i)
}
Expand Down Expand Up @@ -905,7 +917,7 @@ func (l *loopyWriter) processData() (bool, error) {
return false, err
}
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
return false, nil
return false, err
}
} else {
l.activeStreams.enqueue(str)
Expand Down
11 changes: 2 additions & 9 deletions internal/transport/http2_client.go
Expand Up @@ -444,15 +444,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
// Do not close the transport. Let reader goroutine handle it since
// there might be data in the buffers.
t.conn.Close()
t.controlBuf.finish()
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.run()
close(t.writerDone)
}()
return t, nil
Expand Down
12 changes: 2 additions & 10 deletions internal/transport/http2_server.go
Expand Up @@ -331,14 +331,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)

go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
err := t.loopy.run()
if logger.V(logLevel) {
logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
}
t.conn.Close()
t.controlBuf.finish()
t.loopy.run()
close(t.writerDone)
}()
go t.keepalive()
Expand Down Expand Up @@ -1355,9 +1350,6 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
if retErr != nil {
// Abruptly close the connection following the GoAway (via
// loopywriter). But flush out what's inside the buffer first.
t.framer.writer.Flush()
return false, retErr
}
return true, nil
Expand Down
24 changes: 23 additions & 1 deletion internal/transport/http_util.go
Expand Up @@ -21,6 +21,7 @@ package transport
import (
"bufio"
"encoding/base64"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -330,7 +331,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b)
n, err = w.conn.Write(b)
return n, toIOError(err)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
Expand All @@ -352,10 +354,30 @@ func (w *bufWriter) Flush() error {
return nil
}
_, w.err = w.conn.Write(w.buf[:w.offset])
w.err = toIOError(w.err)
w.offset = 0
return w.err
}

type ioError struct {
error
}

func (i ioError) Unwrap() error {
return i.error
}

func isIOError(err error) bool {
return errors.As(err, &ioError{})
}

func toIOError(err error) error {
if err == nil {
return nil
}
return ioError{error: err}
}

type framer struct {
writer *bufWriter
fr *http2.Framer
Expand Down