Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync.Pool to share per-connection transport write buffer. #6309

Merged
merged 19 commits into from Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0f1058d
Release write buffer after Flush() in transport
s-matyukevich May 15, 2023
63f4ba3
Use sync.Pool
s-matyukevich May 15, 2023
4faff38
Use interface{} instead of any
s-matyukevich May 15, 2023
f594b29
use pool map
s-matyukevich May 16, 2023
ec75cb1
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 16, 2023
95bde71
fix flush()
s-matyukevich May 16, 2023
a243c24
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 17, 2023
a7ededc
Fix linter error
s-matyukevich May 17, 2023
16345bb
make variable names more descripptive
s-matyukevich May 23, 2023
c8036d0
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 23, 2023
09407f4
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich May 24, 2023
0b42642
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich Jun 28, 2023
e1bdb20
Add options to enable write buffer sharing
s-matyukevich Jun 28, 2023
9b97327
delete unsed variable
s-matyukevich Jun 28, 2023
d274f35
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich Jul 12, 2023
00e8de7
address review comments
s-matyukevich Jul 12, 2023
9aeb148
fix linter error
s-matyukevich Jul 13, 2023
195a467
Merge branch 'master' of github.com:grpc/grpc-go into dynamic-buffer-v1
s-matyukevich Jul 20, 2023
82a400e
Add Experimental tag, rename flush to flushKeepBuffer
s-matyukevich Jul 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions benchmark/benchmain/main.go
Expand Up @@ -115,6 +115,8 @@ var (
sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list")
connections = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams")
recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools)
sharedWriteBuffer = flags.StringWithAllowedValues("sharedWriteBuffer", toggleModeOff,
fmt.Sprintf("Configures both client and server to share write buffer - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)

logger = grpclog.Component("benchmark")
)
Expand Down Expand Up @@ -335,6 +337,10 @@ func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func())
if bf.ServerReadBufferSize >= 0 {
sopts = append(sopts, grpc.ReadBufferSize(bf.ServerReadBufferSize))
}
if bf.SharedWriteBuffer {
opts = append(opts, grpc.WithSharedWriteBuffer(true))
sopts = append(sopts, grpc.SharedWriteBuffer(true))
}
if bf.ServerWriteBufferSize >= 0 {
sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize))
}
Expand Down Expand Up @@ -603,6 +609,7 @@ type featureOpts struct {
serverWriteBufferSize []int
sleepBetweenRPCs []time.Duration
recvBufferPools []string
sharedWriteBuffer []bool
}

// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
Expand Down Expand Up @@ -651,6 +658,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.sleepBetweenRPCs)
case stats.RecvBufferPool:
featuresNum[i] = len(b.features.recvBufferPools)
case stats.SharedWriteBuffer:
featuresNum[i] = len(b.features.sharedWriteBuffer)
default:
log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
}
Expand Down Expand Up @@ -720,6 +729,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
SleepBetweenRPCs: b.features.sleepBetweenRPCs[curPos[stats.SleepBetweenRPCs]],
RecvBufferPool: b.features.recvBufferPools[curPos[stats.RecvBufferPool]],
SharedWriteBuffer: b.features.sharedWriteBuffer[curPos[stats.SharedWriteBuffer]],
}
if len(b.features.reqPayloadCurves) == 0 {
f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
Expand Down Expand Up @@ -793,6 +803,7 @@ func processFlags() *benchOpts {
serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
sleepBetweenRPCs: append([]time.Duration(nil), *sleepBetweenRPCs...),
recvBufferPools: setRecvBufferPool(*recvBufferPool),
sharedWriteBuffer: setToggleMode(*sharedWriteBuffer),
},
}

Expand Down
9 changes: 7 additions & 2 deletions benchmark/stats/stats.go
Expand Up @@ -58,6 +58,7 @@ const (
ServerWriteBufferSize
SleepBetweenRPCs
RecvBufferPool
SharedWriteBuffer

// MaxFeatureIndex is a place holder to indicate the total number of feature
// indices we have. Any new feature indices should be added above this.
Expand Down Expand Up @@ -129,6 +130,8 @@ type Features struct {
SleepBetweenRPCs time.Duration
// RecvBufferPool represents the shared recv buffer pool used.
RecvBufferPool string
// SharedWriteBuffer configures whether both client and server share per-connection write buffer
SharedWriteBuffer bool
}

// String returns all the feature values as a string.
Expand All @@ -148,13 +151,13 @@ func (f Features) String() string {
"trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+
"compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+
"clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+
"sleepBetweenRPCs_%v-connections_%v-recvBufferPool_%v-",
"sleepBetweenRPCs_%v-connections_%v-recvBufferPool_%v-sharedWriteBuffer_%v",
f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace,
f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString,
respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader,
f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize,
f.ServerWriteBufferSize, f.SleepBetweenRPCs, f.Connections,
f.RecvBufferPool)
f.RecvBufferPool, f.SharedWriteBuffer)
}

// SharedFeatures returns the shared features as a pretty printable string.
Expand Down Expand Up @@ -230,6 +233,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim
b.WriteString(fmt.Sprintf("SleepBetweenRPCs%v%v%v", sep, f.SleepBetweenRPCs, delim))
case RecvBufferPool:
b.WriteString(fmt.Sprintf("RecvBufferPool%v%v%v", sep, f.RecvBufferPool, delim))
case SharedWriteBuffer:
b.WriteString(fmt.Sprintf("SharedWriteBuffer%v%v%v", sep, f.SharedWriteBuffer, delim))
default:
log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex)
}
Expand Down
14 changes: 14 additions & 0 deletions dialoptions.go
Expand Up @@ -139,6 +139,20 @@ func newJoinDialOption(opts ...DialOption) DialOption {
return &joinDialOption{opts: opts}
}

// WithSharedWriteBuffer allows reusing per-connection transport write buffer.
// If this option is set to true every connection will release the buffer after
// flushing the data on the wire.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithSharedWriteBuffer(val bool) DialOption {
s-matyukevich marked this conversation as resolved.
Show resolved Hide resolved
return newFuncDialOption(func(o *dialOptions) {
o.copts.SharedWriteBuffer = val
})
}

// WithWriteBufferSize determines how much data can be batched before doing a
// write on the wire. The corresponding memory allocation for this buffer will
// be twice the size to keep syscalls low. The default value for this buffer is
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/http2_client.go
Expand Up @@ -330,7 +330,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
activeStreams: make(map[uint32]*Stream),
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/http2_server.go
Expand Up @@ -165,7 +165,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if config.MaxHeaderListSize != nil {
maxHeaderListSize = *config.MaxHeaderListSize
}
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
// Send initial settings as connection preface to client.
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Expand Down
59 changes: 53 additions & 6 deletions internal/transport/http_util.go
Expand Up @@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -309,19 +310,25 @@ func decodeGrpcMessageUnchecked(msg string) string {
}

type bufWriter struct {
pool *sync.Pool
buf []byte
offset int
batchSize int
conn net.Conn
err error
}

func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
return &bufWriter{
buf: make([]byte, batchSize*2),
func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
w := &bufWriter{
batchSize: batchSize,
conn: conn,
pool: pool,
}
// this indicates that we should use non shared buf
if pool == nil {
w.buf = make([]byte, batchSize)
}
return w
}

func (w *bufWriter) Write(b []byte) (n int, err error) {
Expand All @@ -332,19 +339,34 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
n, err = w.conn.Write(b)
return n, toIOError(err)
}
if w.buf == nil {
b := w.pool.Get().(*[]byte)
w.buf = *b
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize {
err = w.Flush()
err = w.flushKeepBuffer()
}
}
return n, err
}

func (w *bufWriter) Flush() error {
err := w.flushKeepBuffer()
// Only release the buffer if we are in a "shared" mode
if w.buf != nil && w.pool != nil {
b := w.buf
w.pool.Put(&b)
easwars marked this conversation as resolved.
Show resolved Hide resolved
w.buf = nil
}
return err
}

func (w *bufWriter) flushKeepBuffer() error {
if w.err != nil {
return w.err
}
Expand Down Expand Up @@ -381,15 +403,22 @@ type framer struct {
fr *http2.Framer
}

func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer {
var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
var writeBufferMutex sync.Mutex

func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
if writeBufferSize < 0 {
writeBufferSize = 0
}
var r io.Reader = conn
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
w := newBufWriter(conn, writeBufferSize)
var pool *sync.Pool
if sharedWriteBuffer {
pool = getWriteBufferPool(writeBufferSize)
}
w := newBufWriter(conn, writeBufferSize, pool)
f := &framer{
writer: w,
fr: http2.NewFramer(w, r),
Expand All @@ -403,6 +432,24 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
return f
}

func getWriteBufferPool(writeBufferSize int) *sync.Pool {
writeBufferMutex.Lock()
defer writeBufferMutex.Unlock()
size := writeBufferSize * 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the significance of the * 2 here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this matches the legacy behavior, which always doubled for reasons nobody knows but we don't want to change now due to it affecting existing applications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. Do you mind expanding on the legacy behavior you are describing? It seems like if there is no shared buffer pool, the buffer is allocated per the input size. It only seems to be in the case of the buffer pool that we're allocating 2x the size (and this seems to be the PR the write buffer pool was added)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be right.. The old code multiplied by 2 when doing this allocation, so it seems we have changed the behavior for legacy users already:

https://github.com/grpc/grpc-go/blame/d524b409462c601ef3f05a7e1fba19755a337c77/internal/transport/http_util.go#L321

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - it just felt somewhat bizarre because this is the very PR that also removed that same * 2 in the default path (see line 321 above), but added the * 2 for the buffer pool path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think it's worth updating, I created #6983

pool, ok := writeBufferPoolMap[size]
if ok {
return pool
}
pool = &sync.Pool{
New: func() interface{} {
b := make([]byte, size)
return &b
},
}
writeBufferPoolMap[size] = pool
return pool
}

// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
net := "tcp"
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/keepalive_test.go
Expand Up @@ -191,7 +191,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
if n, err := conn.Write(clientPreface); err != nil || n != len(clientPreface) {
t.Fatalf("conn.Write(clientPreface) failed: n=%v, err=%v", n, err)
}
framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, 0)
framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, false, 0)
if err := framer.fr.WriteSettings(http2.Setting{}); err != nil {
t.Fatal("framer.WriteSettings(http2.Setting{}) failed:", err)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/transport/transport.go
Expand Up @@ -559,6 +559,7 @@ type ServerConfig struct {
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
SharedWriteBuffer bool
ChannelzParentID *channelz.Identifier
MaxHeaderListSize *uint32
HeaderTableSize *uint32
Expand Down Expand Up @@ -592,6 +593,8 @@ type ConnectOptions struct {
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// SharedWriteBuffer indicates whether connections should reuse write buffer
SharedWriteBuffer bool
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID *channelz.Identifier
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
Expand Down
16 changes: 16 additions & 0 deletions server.go
Expand Up @@ -170,6 +170,7 @@ type serverOptions struct {
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
sharedWriteBuffer bool
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
Expand Down Expand Up @@ -235,6 +236,20 @@ func newJoinServerOption(opts ...ServerOption) ServerOption {
return &joinServerOption{opts: opts}
}

// SharedWriteBuffer allows reusing per-connection transport write buffer.
// If this option is set to true every connection will release the buffer after
// flushing the data on the wire.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SharedWriteBuffer(val bool) ServerOption {
s-matyukevich marked this conversation as resolved.
Show resolved Hide resolved
return newFuncServerOption(func(o *serverOptions) {
o.sharedWriteBuffer = val
})
}

// WriteBufferSize determines how much data can be batched before doing a write
// on the wire. The corresponding memory allocation for this buffer will be
// twice the size to keep syscalls low. The default value for this buffer is
Expand Down Expand Up @@ -938,6 +953,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
InitialConnWindowSize: s.opts.initialConnWindowSize,
WriteBufferSize: s.opts.writeBufferSize,
ReadBufferSize: s.opts.readBufferSize,
SharedWriteBuffer: s.opts.sharedWriteBuffer,
ChannelzParentID: s.channelzID,
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
Expand Down