Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): change gRPC writes to use bi-directional streams #8930

Merged
merged 6 commits into from Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions storage/client_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"log"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -790,6 +791,10 @@ func TestOpenReaderEmulated(t *testing.T) {

func TestOpenWriterEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
if strings.Contains(project, "grpc") {
t.Skip("Implementation in testbench pending: https://github.com/googleapis/storage-testbench/issues/568")
}

// Populate test data.
_, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{
Name: bucket,
Expand Down
217 changes: 135 additions & 82 deletions storage/grpc_client.go
Expand Up @@ -1066,7 +1066,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
}
}

o, off, finalized, err := gw.uploadBuffer(recvd, offset, doneReading)
o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
if err != nil {
err = checkCanceled(err)
errorf(err)
Expand All @@ -1085,9 +1085,9 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
progress(offset)
}

// When we are done reading data and the chunk has been finalized,
// we are done.
if doneReading && finalized {
// When we are done reading data without errors, set the object and
// finish.
if doneReading {
// Build Object from server's response.
setObj(newObjectFromProto(o))
return
Expand Down Expand Up @@ -1537,7 +1537,7 @@ type gRPCWriter struct {
chunkSize int

// The gRPC client-stream used for sending buffers.
stream storagepb.Storage_WriteObjectClient
stream storagepb.Storage_BidiWriteObjectClient
tritone marked this conversation as resolved.
Show resolved Hide resolved

// The Resumable Upload ID started by a gRPC-based Writer.
upid string
Expand Down Expand Up @@ -1581,45 +1581,56 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
return persistedSize, err
}

// uploadBuffer opens a Write stream and uploads the buffer at the given offset (if
// uploading a chunk for a resumable uploadBuffer), and will mark the write as
// finished if we are done receiving data from the user. The resulting write
// offset after uploading the buffer is returned, as well as a boolean
// indicating if the Object has been finalized. If it has been finalized, the
// final Object will be returned as well. Finalizing the upload is primarily
// important for Resumable Uploads. A simple or multi-part upload will always
// be finalized once the entire buffer has been written.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, bool, error) {
var err error
var finishWrite bool
var sent, limit int = 0, maxPerMessageWriteSize
// uploadBuffer uploads the buffer at the given offset using a bi-directional
// Write stream. It will open a new stream if necessary (on the first call or
// after resuming from failure). The resulting write offset after uploading the
// buffer is returned, as well as well as the final Object if the upload is
// completed.
//
// Returns object, persisted size, and any error that is not retriable.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
var shouldRetry = ShouldRetry
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
shouldRetry = w.settings.retry.shouldRetry
}
offset := start

var err error
var lastWriteOfEntireObject bool

sent := 0
writeOffset := start

toWrite := w.buf[:recvd]

// Send a request with as many bytes as possible.
// Loop until all bytes are sent.
for {
// This indicates that this is the last message and the remaining
// data fits in one message.
belowLimit := recvd-sent <= limit
if belowLimit {
limit = recvd - sent
bytesNotYetSent := recvd - sent
remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize

if remainingDataFitsInSingleReq && doneReading {
lastWriteOfEntireObject = true
}
if belowLimit && doneReading {
finishWrite = true

// Send the maximum amount of bytes we can, unless we don't have that many.
bytesToSendInCurrReq := maxPerMessageWriteSize
if remainingDataFitsInSingleReq {
bytesToSendInCurrReq = bytesNotYetSent
}

// Prepare chunk section for upload.
data := toWrite[sent : sent+limit]
req := &storagepb.WriteObjectRequest{
Data: &storagepb.WriteObjectRequest_ChecksummedData{
data := toWrite[sent : sent+bytesToSendInCurrReq]

req := &storagepb.BidiWriteObjectRequest{
Data: &storagepb.BidiWriteObjectRequest_ChecksummedData{
ChecksummedData: &storagepb.ChecksummedData{
Content: data,
},
},
WriteOffset: offset,
FinishWrite: finishWrite,
WriteOffset: writeOffset,
FinishWrite: lastWriteOfEntireObject,
Flush: remainingDataFitsInSingleReq,
StateLookup: remainingDataFitsInSingleReq,
}

// Open a new stream if necessary and set the first_message field on
Expand All @@ -1628,19 +1639,20 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
if w.stream == nil {
hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
w.stream, err = w.c.raw.WriteObject(ctx)

w.stream, err = w.c.raw.BidiWriteObject(ctx)
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

if w.upid != "" {
req.FirstMessage = &storagepb.WriteObjectRequest_UploadId{UploadId: w.upid}
} else {
if w.upid != "" { // resumable upload
req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: w.upid}
} else { // non-resumable
spec, err := w.writeObjectSpec()
if err != nil {
return nil, 0, false, err
return nil, 0, err
}
req.FirstMessage = &storagepb.WriteObjectRequest_WriteObjectSpec{
req.FirstMessage = &storagepb.BidiWriteObjectRequest_WriteObjectSpec{
WriteObjectSpec: spec,
}
req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
Expand All @@ -1650,42 +1662,53 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// on the *last* message of the stream (instead of the first).
req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
}

}

err = w.stream.Send(req)
if err == io.EOF {
// err was io.EOF. The client-side of a stream only gets an EOF on Send
// when the backend closes the stream and wants to return an error
// status. Closing the stream receives the status as an error.
_, err = w.stream.CloseAndRecv()
// status.

// Receive from the stream Recv() until it returns a non-nil error
// to receive the server's status as an error. We may get multiple
// messages before the error due to buffering.
err = nil
for err == nil {
_, err = w.stream.Recv()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in another comment, I don't see why we would make this change; stream.CloseAndRecv should do what we want?

// Drop the stream reference as a new one will need to be created if
// we retry.
w.stream = nil

// Drop the stream reference as a new one will need to be created if
// we can retry the upload
w.stream = nil

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
// If not retriable, falling through will return the error received.
if shouldRetry(err) {
sent = 0
finishWrite = false
// TODO: Add test case for failure modes of querying progress.
offset, err = w.determineOffset(start)
if err == nil {
continue
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Continue sending requests, opening a new stream and resending
// any bytes not yet persisted as per QueryWriteStatus
continue
}
}
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

// Update the immediate stream's sent total and the upload offset with
// the data sent.
sent += len(data)
offset += int64(len(data))
writeOffset += int64(len(data))

// Not done sending data, do not attempt to commit it yet, loop around
// and send more data.
Expand All @@ -1694,31 +1717,81 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
}

// The buffer has been uploaded and there is still more data to be
// uploaded, but this is not a resumable upload session. Therefore
// keep the stream open and don't commit yet.
if !finishWrite && w.chunkSize == 0 {
return nil, offset, false, nil
// uploaded, but this is not a resumable upload session. Therefore,
// don't check persisted data.
if !lastWriteOfEntireObject && w.chunkSize == 0 {
return nil, writeOffset, nil
}

// Done sending data. Close the stream to "commit" the data sent.
resp, finalized, err := w.commit()
// Done sending data (remainingDataFitsInSingleReq should == true if we
// reach this code). Receive from the stream to confirm the persisted data.
resp, err := w.stream.Recv()

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
sent = 0
finishWrite = false
offset, err = w.determineOffset(start)
if err == nil {
continue
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil
tritone marked this conversation as resolved.
Show resolved Hide resolved

continue
}
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

return resp.GetResource(), offset, finalized, nil
// Confirm the persisted data if we have not finished uploading the object.
if !lastWriteOfEntireObject {
if resp.GetPersistedSize() != writeOffset {
tritone marked this conversation as resolved.
Show resolved Hide resolved
// Retry if not all bytes were persisted.
writeOffset = resp.GetPersistedSize()
sent = int(writeOffset) - int(start)
continue
}
} else {
// If the object is done uploading, close the send stream to signal
// to the server that we are done sending so that we can receive
// from the stream without blocking.
err = w.stream.CloseSend()
if err != nil {
tritone marked this conversation as resolved.
Show resolved Hide resolved
// CloseSend() retries the send internally. It never returns an
// error in the current implementation, but we check it anyway in
// case that it does in the future.
return nil, 0, err
}

// Stream receives do not block once send is closed, but we may not
// receive the response with the object right away; loop until we
// receive the object or error out.
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if err != nil {
return nil, 0, err
}

obj = resp.GetResource()
}

// Even though we received the object response, continue reading
// until we receive a non-nil error, to ensure the stream does not
// leak even if the context isn't cancelled. See:
// https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
for err == nil {
_, err = w.stream.Recv()
}

return obj, writeOffset, nil
}

return nil, writeOffset, nil
}
}

Expand All @@ -1738,26 +1811,6 @@ func (w *gRPCWriter) determineOffset(offset int64) (int64, error) {
return offset, nil
}

// commit closes the stream to commit the data sent and potentially receive
// the finalized object if finished uploading. If the last request sent
// indicated that writing was finished, the Object will be finalized and
// returned. If not, then the Object will be nil, and the boolean returned will
// be false.
func (w *gRPCWriter) commit() (*storagepb.WriteObjectResponse, bool, error) {
finalized := true
resp, err := w.stream.CloseAndRecv()
if err == io.EOF {
// Closing a stream for a resumable upload finish_write = false results
// in an EOF which can be ignored, as we aren't done uploading yet.
finalized = false
err = nil
}
// Drop the stream reference as it has been closed.
w.stream = nil

return resp, finalized, err
}

// writeObjectSpec constructs a WriteObjectSpec proto using the Writer's
// ObjectAttrs and applies its Conditions. This is only used for gRPC.
func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
Expand Down