Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): change gRPC writes to use bi-directional streams #8930

Merged
merged 6 commits into from Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions storage/client_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"log"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -790,6 +791,10 @@ func TestOpenReaderEmulated(t *testing.T) {

func TestOpenWriterEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
if strings.Contains(project, "grpc") {
t.Skip("Implementation in testbench pending: https://github.com/googleapis/storage-testbench/issues/568")
}

// Populate test data.
_, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{
Name: bucket,
Expand Down
187 changes: 108 additions & 79 deletions storage/grpc_client.go
Expand Up @@ -1068,7 +1068,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
}
}

o, off, finalized, err := gw.uploadBuffer(recvd, offset, doneReading)
o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
if err != nil {
err = checkCanceled(err)
errorf(err)
Expand All @@ -1087,9 +1087,9 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
progress(offset)
}

// When we are done reading data and the chunk has been finalized,
// we are done.
if doneReading && finalized {
// When we are done reading data without errors, set the object and
// finish.
if doneReading {
// Build Object from server's response.
setObj(newObjectFromProto(o))
return
Expand Down Expand Up @@ -1539,7 +1539,7 @@ type gRPCWriter struct {
chunkSize int

// The gRPC client-stream used for sending buffers.
stream storagepb.Storage_WriteObjectClient
stream storagepb.Storage_BidiWriteObjectClient
tritone marked this conversation as resolved.
Show resolved Hide resolved

// The Resumable Upload ID started by a gRPC-based Writer.
upid string
Expand Down Expand Up @@ -1583,45 +1583,56 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
return persistedSize, err
}

// uploadBuffer opens a Write stream and uploads the buffer at the given offset (if
// uploading a chunk for a resumable uploadBuffer), and will mark the write as
// finished if we are done receiving data from the user. The resulting write
// offset after uploading the buffer is returned, as well as a boolean
// indicating if the Object has been finalized. If it has been finalized, the
// final Object will be returned as well. Finalizing the upload is primarily
// important for Resumable Uploads. A simple or multi-part upload will always
// be finalized once the entire buffer has been written.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, bool, error) {
var err error
var finishWrite bool
var sent, limit int = 0, maxPerMessageWriteSize
// uploadBuffer opens a bi-directional Write stream and uploads the buffer at
tritone marked this conversation as resolved.
Show resolved Hide resolved
// the given offset, and will mark the write as finished if we are done
// receiving data from the user. The resulting write offset after uploading the
// buffer is returned, as well as well as the final Object if the upload is
// completed.
//
// Returns object, persisted size, and any error that is not retriable.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
var shouldRetry = ShouldRetry
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
shouldRetry = w.settings.retry.shouldRetry
}
offset := start

var err error
var lastWriteOfEntireObject bool

sent := 0
writeOffset := start

toWrite := w.buf[:recvd]

// Send a request with as many bytes as possible
// Loop until all bytes are sent
for {
// This indicates that this is the last message and the remaining
// data fits in one message.
belowLimit := recvd-sent <= limit
if belowLimit {
limit = recvd - sent
bytesNotYetSent := recvd - sent
remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize

if remainingDataFitsInSingleReq && doneReading {
lastWriteOfEntireObject = true
}
if belowLimit && doneReading {
finishWrite = true

// Send the maximum amount of bytes we can, unless we don't have that many
bytesToSendInCurrReq := maxPerMessageWriteSize
if remainingDataFitsInSingleReq {
bytesToSendInCurrReq = bytesNotYetSent
}

// Prepare chunk section for upload.
data := toWrite[sent : sent+limit]
req := &storagepb.WriteObjectRequest{
Data: &storagepb.WriteObjectRequest_ChecksummedData{
data := toWrite[sent : sent+bytesToSendInCurrReq]

req := &storagepb.BidiWriteObjectRequest{
Data: &storagepb.BidiWriteObjectRequest_ChecksummedData{
ChecksummedData: &storagepb.ChecksummedData{
Content: data,
},
},
WriteOffset: offset,
FinishWrite: finishWrite,
WriteOffset: writeOffset,
FinishWrite: lastWriteOfEntireObject,
Flush: remainingDataFitsInSingleReq,
StateLookup: remainingDataFitsInSingleReq,
}

// Open a new stream if necessary and set the first_message field on
Expand All @@ -1630,19 +1641,20 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
if w.stream == nil {
hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
w.stream, err = w.c.raw.WriteObject(ctx)

w.stream, err = w.c.raw.BidiWriteObject(ctx)
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

if w.upid != "" {
req.FirstMessage = &storagepb.WriteObjectRequest_UploadId{UploadId: w.upid}
} else {
if w.upid != "" { // resumable upload
req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: w.upid}
} else { // non-resumable
spec, err := w.writeObjectSpec()
if err != nil {
return nil, 0, false, err
return nil, 0, err
}
req.FirstMessage = &storagepb.WriteObjectRequest_WriteObjectSpec{
req.FirstMessage = &storagepb.BidiWriteObjectRequest_WriteObjectSpec{
WriteObjectSpec: spec,
}
req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
Expand All @@ -1652,38 +1664,43 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// on the *last* message of the stream (instead of the first).
req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
}

}

err = w.stream.Send(req)
if err == io.EOF {
// err was io.EOF. The client-side of a stream only gets an EOF on Send
// when the backend closes the stream and wants to return an error
// status. Closing the stream receives the status as an error.
_, err = w.stream.CloseAndRecv()
err = w.stream.CloseSend()

// Drop the stream reference as a new one will need to be created
w.stream = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we 100% sure we can drop the stream reference here? If w.stream.Send gets an io.EOF, it means the backend wants to send an error. The error status won't come from CloseSend(), I think the client side still needs to Recv to get the actual Status (will send a link offline). I think we need to call Recv or CloseAndRecv if the bidi stream still has it, before Closing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we'd have to get clarification here and test it out in practice...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Noah, you are 100% correct. In this case, it would be a Recv() (the bidi stream does not have CloseAndRecv). Updated the code.

As for testing it out in practice, I agree. I think the best is to do this via the conformance tests, pending testbench implementation.


// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
sent = 0
finishWrite = false
// TODO: Add test case for failure modes of querying progress.
offset, err = w.determineOffset(start)
if err == nil {
continue
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Continue sending requests, opening a new stream and resending
// any bytes not yet persisted as per QueryWriteStatus
continue
}
}
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

// Update the immediate stream's sent total and the upload offset with
// the data sent.
sent += len(data)
offset += int64(len(data))
writeOffset += int64(len(data))

// Not done sending data, do not attempt to commit it yet, loop around
// and send more data.
Expand All @@ -1692,31 +1709,63 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
}

// The buffer has been uploaded and there is still more data to be
// uploaded, but this is not a resumable upload session. Therefore
// keep the stream open and don't commit yet.
if !finishWrite && w.chunkSize == 0 {
return nil, offset, false, nil
// uploaded, but this is not a resumable upload session. Therefore,
// don't check persisted data.
if !lastWriteOfEntireObject && w.chunkSize == 0 {
return nil, writeOffset, nil
}

// Done sending data. Close the stream to "commit" the data sent.
resp, finalized, err := w.commit()
// Done sending data (remainingDataFitsInSingleReq should == true if we
// reach this code). Receive from the stream to confirm the persisted data.
resp, err := w.stream.Recv()
// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
sent = 0
finishWrite = false
offset, err = w.determineOffset(start)
if err == nil {
continue
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)
continue
}
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

return resp.GetResource(), offset, finalized, nil
// Confirm the persisted data
if !lastWriteOfEntireObject {
if resp.GetPersistedSize() != writeOffset {
tritone marked this conversation as resolved.
Show resolved Hide resolved
// retry
writeOffset = resp.GetPersistedSize()
sent = int(writeOffset) - int(start)
continue
}
} else {
// If the object is done uploading, close the send stream and check for errors.
err = w.stream.CloseSend()
if err != nil {
tritone marked this conversation as resolved.
Show resolved Hide resolved
return nil, 0, err
}

// Stream receives do not block once send is closed, but we may not
// receive the response with the object right away; loop until we
// receive the object or error out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to do this without backoff? Wondering if this is a typical pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering that too, but I can't find any resources on this because it seems that for the typical use case bidirectional clients receive and send in separate goroutines.
Here is what the typical pattern looks like:

for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		} else if err != nil {
			return err
		}
		got = append(got, resp)
	}

All the resources I am looking at do say to receive from the stream until we get an error and I believe it is the same for response. For certain I know that the code as-is written may receive several responses (~1-4) before it will receive the object. Once we call CloseSend(), Recv() won't block and the server will always send, so this loop is just waiting for the server side to finish writing the object.
Either way, I've also added another loop following to make sure the stream is properly garbage collected, as seems to be required. I've also seen the following for example:

	// Keep reading to drain the stream.
	for {
		if _, err := stream.Recv(); err != nil {
			break
		}
	}

var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if err != nil {
return nil, 0, err
}

obj = resp.GetResource()
}

return obj, writeOffset, nil
}

return nil, writeOffset, nil
}
}

Expand All @@ -1736,26 +1785,6 @@ func (w *gRPCWriter) determineOffset(offset int64) (int64, error) {
return offset, nil
}

// commit closes the stream to commit the data sent and potentially receive
// the finalized object if finished uploading. If the last request sent
// indicated that writing was finished, the Object will be finalized and
// returned. If not, then the Object will be nil, and the boolean returned will
// be false.
func (w *gRPCWriter) commit() (*storagepb.WriteObjectResponse, bool, error) {
finalized := true
resp, err := w.stream.CloseAndRecv()
if err == io.EOF {
// Closing a stream for a resumable upload finish_write = false results
// in an EOF which can be ignored, as we aren't done uploading yet.
finalized = false
err = nil
}
// Drop the stream reference as it has been closed.
w.stream = nil

return resp, finalized, err
}

// writeObjectSpec constructs a WriteObjectSpec proto using the Writer's
// ObjectAttrs and applies its Conditions. This is only used for gRPC.
func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
Expand Down