From 68abdd79fdefd5e3903baaff7c672135c5cb3100 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Fri, 28 Jul 2023 21:39:29 +0000 Subject: [PATCH 1/2] feat(storage): support single-shot uploads in gRPC When ChunkSize is zero, use a single shot upload (leaving the stream open between messages) rather than a resumable session. This matches better with what we do in JSON. Also reduce the buffer size to the minimum (256k). Fixes #7798 --- storage/grpc_client.go | 47 +++++++++++++++++++++++-------------- storage/integration_test.go | 9 +++++++ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 4dc69d1eadb..7aec4aa2917 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -1044,11 +1044,13 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage } // The chunk buffer is full, but there is no end in sight. This - // means that a resumable upload will need to be used to send + // means that either: + // 1. A resumable upload will need to be used to send // multiple chunks, until we are done reading data. Start a // resumable upload if it has not already been started. - // Otherwise, all data will be sent over a single gRPC stream. - if !doneReading && gw.upid == "" { + // 2. ChunkSize of zero may also have a full buffer, but a resumable + // session should not be initiated in this case. + if !doneReading && gw.upid == "" && params.chunkSize != 0 { err = gw.startResumableUpload() if err != nil { err = checkCanceled(err) @@ -1065,11 +1067,15 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage pr.CloseWithError(err) return } + // At this point, the current buffer has been uploaded. For resumable - // uploads, capture the committed offset here in case the upload was not - // finalized and another chunk is to be uploaded. - if gw.upid != "" { + // uploads and chunkSize = 0, capture the committed offset here in case + // the upload was not finalized and another chunk is to be uploaded. Call + // the progress function for resumable uploads only. + if gw.upid != "" || gw.chunkSize == 0 { offset = off + } + if gw.upid != "" { progress(offset) } @@ -1485,14 +1491,11 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize) } + // A completely bufferless upload is not possible as it is in JSON because + // the buffer must be provided to the message. However use the minimum size + // possible in this case. if params.chunkSize == 0 { - // TODO: Should we actually use the minimum of 256 KB here when the user - // indicates they want minimal memory usage? We cannot do a zero-copy, - // bufferless upload like HTTP/JSON can. - // TODO: We need to determine if we can avoid starting a - // resumable upload when the user *plans* to send more than bufSize but - // with a bufferless upload. - size = maxPerMessageWriteSize + size = googleapi.MinUploadChunkSize } return &gRPCWriter{ @@ -1505,6 +1508,7 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) conds: params.conds, encryptionKey: params.encryptionKey, sendCRC32C: params.sendCRC32C, + chunkSize: params.chunkSize, } } @@ -1524,6 +1528,7 @@ type gRPCWriter struct { settings *settings sendCRC32C bool + chunkSize int // The gRPC client-stream used for sending buffers. stream storagepb.Storage_WriteObjectClient @@ -1589,7 +1594,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st offset := start toWrite := w.buf[:recvd] for { - first := sent == 0 // This indicates that this is the last message and the remaining // data fits in one message. belowLimit := recvd-sent <= limit @@ -1612,10 +1616,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st FinishWrite: finishWrite, } - // Open a new stream and set the first_message field on the request. - // The first message on the WriteObject stream must either be the - // Object or the Resumable Upload ID. - if first { + // Open a new stream if necessary and set the first_message field on + // the request. The first message on the WriteObject stream must either + // be the Object or the Resumable Upload ID. + if w.stream == nil { ctx := gapic.InsertMetadata(w.ctx, metadata.Pairs("x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket)))) w.stream, err = w.c.raw.WriteObject(ctx) if err != nil { @@ -1678,6 +1682,13 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st continue } + // The buffer has been uploaded and there is still more data to be + // uploaded, but this is not a resumable upload session. Therefore + // keep the stream open and don't commit yet. + if !finishWrite && w.chunkSize == 0 { + return nil, offset, false, nil + } + // Done sending data. Close the stream to "commit" the data sent. resp, finalized, err := w.commit() // Retriable errors mean we should start over and attempt to diff --git a/storage/integration_test.go b/storage/integration_test.go index bca8951c598..5abc7f118cc 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2393,6 +2393,15 @@ func TestIntegration_WriterChunksize(t *testing.T) { if callbacks != test.wantCallbacks { t.Errorf("ProgressFunc was called %d times, expected %d", callbacks, test.wantCallbacks) } + + // Confirm all bytes were uploaded. + attrs, err := obj.Attrs(ctx) + if err != nil { + t.Errorf("obj.Attrs: %v", err) + } + if attrs.Size != int64(objSize) { + t.Errorf("incorrect number of bytes written; got %v, want %v", attrs.Size, objSize) + } }) } }) From 105766b0669460087c612f40e9c4b5b8098104fb Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 1 Aug 2023 20:28:39 +0000 Subject: [PATCH 2/2] use Fatalf in test where appropriate --- storage/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/integration_test.go b/storage/integration_test.go index 5abc7f118cc..947581ca931 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2397,7 +2397,7 @@ func TestIntegration_WriterChunksize(t *testing.T) { // Confirm all bytes were uploaded. attrs, err := obj.Attrs(ctx) if err != nil { - t.Errorf("obj.Attrs: %v", err) + t.Fatalf("obj.Attrs: %v", err) } if attrs.Size != int64(objSize) { t.Errorf("incorrect number of bytes written; got %v, want %v", attrs.Size, objSize)