Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support single-shot uploads in gRPC #8348

Merged
merged 4 commits into from Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 29 additions & 18 deletions storage/grpc_client.go
Expand Up @@ -1044,11 +1044,13 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
}

// The chunk buffer is full, but there is no end in sight. This
// means that a resumable upload will need to be used to send
// means that either:
// 1. A resumable upload will need to be used to send
// multiple chunks, until we are done reading data. Start a
// resumable upload if it has not already been started.
// Otherwise, all data will be sent over a single gRPC stream.
if !doneReading && gw.upid == "" {
// 2. ChunkSize of zero may also have a full buffer, but a resumable
// session should not be initiated in this case.
if !doneReading && gw.upid == "" && params.chunkSize != 0 {
err = gw.startResumableUpload()
if err != nil {
err = checkCanceled(err)
Expand All @@ -1065,11 +1067,15 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
pr.CloseWithError(err)
return
}

// At this point, the current buffer has been uploaded. For resumable
// uploads, capture the committed offset here in case the upload was not
// finalized and another chunk is to be uploaded.
if gw.upid != "" {
// uploads and chunkSize = 0, capture the committed offset here in case
// the upload was not finalized and another chunk is to be uploaded. Call
// the progress function for resumable uploads only.
if gw.upid != "" || gw.chunkSize == 0 {
offset = off
}
if gw.upid != "" {
progress(offset)
}

Expand Down Expand Up @@ -1485,14 +1491,11 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader)
size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize)
}

// A completely bufferless upload is not possible as it is in JSON because
// the buffer must be provided to the message. However use the minimum size
// possible in this case.
if params.chunkSize == 0 {
// TODO: Should we actually use the minimum of 256 KB here when the user
// indicates they want minimal memory usage? We cannot do a zero-copy,
// bufferless upload like HTTP/JSON can.
// TODO: We need to determine if we can avoid starting a
// resumable upload when the user *plans* to send more than bufSize but
// with a bufferless upload.
size = maxPerMessageWriteSize
size = googleapi.MinUploadChunkSize
}

return &gRPCWriter{
Expand All @@ -1505,6 +1508,7 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader)
conds: params.conds,
encryptionKey: params.encryptionKey,
sendCRC32C: params.sendCRC32C,
chunkSize: params.chunkSize,
}
}

Expand All @@ -1524,6 +1528,7 @@ type gRPCWriter struct {
settings *settings

sendCRC32C bool
chunkSize int

// The gRPC client-stream used for sending buffers.
stream storagepb.Storage_WriteObjectClient
Expand Down Expand Up @@ -1589,7 +1594,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
offset := start
toWrite := w.buf[:recvd]
for {
first := sent == 0
// This indicates that this is the last message and the remaining
// data fits in one message.
belowLimit := recvd-sent <= limit
Expand All @@ -1612,10 +1616,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
FinishWrite: finishWrite,
}

// Open a new stream and set the first_message field on the request.
// The first message on the WriteObject stream must either be the
// Object or the Resumable Upload ID.
if first {
// Open a new stream if necessary and set the first_message field on
// the request. The first message on the WriteObject stream must either
// be the Object or the Resumable Upload ID.
if w.stream == nil {
ctx := gapic.InsertMetadata(w.ctx, metadata.Pairs("x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))))
w.stream, err = w.c.raw.WriteObject(ctx)
if err != nil {
Expand Down Expand Up @@ -1678,6 +1682,13 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
continue
}

// The buffer has been uploaded and there is still more data to be
// uploaded, but this is not a resumable upload session. Therefore
// keep the stream open and don't commit yet.
if !finishWrite && w.chunkSize == 0 {
return nil, offset, false, nil
}

// Done sending data. Close the stream to "commit" the data sent.
resp, finalized, err := w.commit()
// Retriable errors mean we should start over and attempt to
Expand Down
9 changes: 9 additions & 0 deletions storage/integration_test.go
Expand Up @@ -2393,6 +2393,15 @@ func TestIntegration_WriterChunksize(t *testing.T) {
if callbacks != test.wantCallbacks {
t.Errorf("ProgressFunc was called %d times, expected %d", callbacks, test.wantCallbacks)
}

// Confirm all bytes were uploaded.
attrs, err := obj.Attrs(ctx)
if err != nil {
t.Fatalf("obj.Attrs: %v", err)
}
if attrs.Size != int64(objSize) {
t.Errorf("incorrect number of bytes written; got %v, want %v", attrs.Size, objSize)
}
})
}
})
Expand Down