Skip to content

Commit

Permalink
feat(storage): support single-shot uploads in gRPC (#8348)
Browse files Browse the repository at this point in the history
When ChunkSize is zero, use a single shot upload (leaving the stream open between messages) rather than a resumable session. This matches better with what we do in JSON. Also reduce the buffer size to the minimum (256k).

Fixes #7798
  • Loading branch information
tritone committed Aug 1, 2023
1 parent e277213 commit 7de4a7d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
47 changes: 29 additions & 18 deletions storage/grpc_client.go
Expand Up @@ -1044,11 +1044,13 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
}

// The chunk buffer is full, but there is no end in sight. This
// means that a resumable upload will need to be used to send
// means that either:
// 1. A resumable upload will need to be used to send
// multiple chunks, until we are done reading data. Start a
// resumable upload if it has not already been started.
// Otherwise, all data will be sent over a single gRPC stream.
if !doneReading && gw.upid == "" {
// 2. ChunkSize of zero may also have a full buffer, but a resumable
// session should not be initiated in this case.
if !doneReading && gw.upid == "" && params.chunkSize != 0 {
err = gw.startResumableUpload()
if err != nil {
err = checkCanceled(err)
Expand All @@ -1065,11 +1067,15 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
pr.CloseWithError(err)
return
}

// At this point, the current buffer has been uploaded. For resumable
// uploads, capture the committed offset here in case the upload was not
// finalized and another chunk is to be uploaded.
if gw.upid != "" {
// uploads and chunkSize = 0, capture the committed offset here in case
// the upload was not finalized and another chunk is to be uploaded. Call
// the progress function for resumable uploads only.
if gw.upid != "" || gw.chunkSize == 0 {
offset = off
}
if gw.upid != "" {
progress(offset)
}

Expand Down Expand Up @@ -1485,14 +1491,11 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader)
size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize)
}

// A completely bufferless upload is not possible as it is in JSON because
// the buffer must be provided to the message. However use the minimum size
// possible in this case.
if params.chunkSize == 0 {
// TODO: Should we actually use the minimum of 256 KB here when the user
// indicates they want minimal memory usage? We cannot do a zero-copy,
// bufferless upload like HTTP/JSON can.
// TODO: We need to determine if we can avoid starting a
// resumable upload when the user *plans* to send more than bufSize but
// with a bufferless upload.
size = maxPerMessageWriteSize
size = googleapi.MinUploadChunkSize
}

return &gRPCWriter{
Expand All @@ -1505,6 +1508,7 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader)
conds: params.conds,
encryptionKey: params.encryptionKey,
sendCRC32C: params.sendCRC32C,
chunkSize: params.chunkSize,
}
}

Expand All @@ -1524,6 +1528,7 @@ type gRPCWriter struct {
settings *settings

sendCRC32C bool
chunkSize int

// The gRPC client-stream used for sending buffers.
stream storagepb.Storage_WriteObjectClient
Expand Down Expand Up @@ -1589,7 +1594,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
offset := start
toWrite := w.buf[:recvd]
for {
first := sent == 0
// This indicates that this is the last message and the remaining
// data fits in one message.
belowLimit := recvd-sent <= limit
Expand All @@ -1612,10 +1616,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
FinishWrite: finishWrite,
}

// Open a new stream and set the first_message field on the request.
// The first message on the WriteObject stream must either be the
// Object or the Resumable Upload ID.
if first {
// Open a new stream if necessary and set the first_message field on
// the request. The first message on the WriteObject stream must either
// be the Object or the Resumable Upload ID.
if w.stream == nil {
ctx := gapic.InsertMetadata(w.ctx, metadata.Pairs("x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))))
w.stream, err = w.c.raw.WriteObject(ctx)
if err != nil {
Expand Down Expand Up @@ -1678,6 +1682,13 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
continue
}

// The buffer has been uploaded and there is still more data to be
// uploaded, but this is not a resumable upload session. Therefore
// keep the stream open and don't commit yet.
if !finishWrite && w.chunkSize == 0 {
return nil, offset, false, nil
}

// Done sending data. Close the stream to "commit" the data sent.
resp, finalized, err := w.commit()
// Retriable errors mean we should start over and attempt to
Expand Down
9 changes: 9 additions & 0 deletions storage/integration_test.go
Expand Up @@ -2393,6 +2393,15 @@ func TestIntegration_WriterChunksize(t *testing.T) {
if callbacks != test.wantCallbacks {
t.Errorf("ProgressFunc was called %d times, expected %d", callbacks, test.wantCallbacks)
}

// Confirm all bytes were uploaded.
attrs, err := obj.Attrs(ctx)
if err != nil {
t.Fatalf("obj.Attrs: %v", err)
}
if attrs.Size != int64(objSize) {
t.Errorf("incorrect number of bytes written; got %v, want %v", attrs.Size, objSize)
}
})
}
})
Expand Down

0 comments on commit 7de4a7d

Please sign in to comment.