Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): change gRPC writes to use bi-directional streams #8930

Merged
merged 6 commits into from Nov 8, 2023

Conversation

BrennaEpp
Copy link
Contributor

@BrennaEpp BrennaEpp commented Oct 27, 2023

Emulated test is failing as bidi writes are not implemented in the emulator yet, so I added a quick skip for that test.

@BrennaEpp BrennaEpp requested review from a team as code owners October 27, 2023 23:01
@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: storage Issues related to the Cloud Storage API. labels Oct 27, 2023
Comment on lines 1674 to 1677
err = w.stream.CloseSend()

// Drop the stream reference as a new one will need to be created
w.stream = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we 100% sure we can drop the stream reference here? If w.stream.Send gets an io.EOF, it means the backend wants to send an error. The error status won't come from CloseSend(), I think the client side still needs to Recv to get the actual Status (will send a link offline). I think we need to call Recv or CloseAndRecv if the bidi stream still has it, before Closing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we'd have to get clarification here and test it out in practice...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Noah, you are 100% correct. In this case, it would be a Recv() (the bidi stream does not have CloseAndRecv). Updated the code.

As for testing it out in practice, I agree. I think the best is to do this via the conformance tests, pending testbench implementation.

Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks pretty good, but I had some questions

storage/grpc_client.go Show resolved Hide resolved
storage/grpc_client.go Outdated Show resolved Hide resolved
Comment on lines 1674 to 1677
err = w.stream.CloseSend()

// Drop the stream reference as a new one will need to be created
w.stream = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we'd have to get clarification here and test it out in practice...

storage/grpc_client.go Show resolved Hide resolved
storage/grpc_client.go Show resolved Hide resolved

// Stream receives do not block once send is closed, but we may not
// receive the response with the object right away; loop until we
// receive the object or error out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to do this without backoff? Wondering if this is a typical pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering that too, but I can't find any resources on this because it seems that for the typical use case bidirectional clients receive and send in separate goroutines.
Here is what the typical pattern looks like:

for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		} else if err != nil {
			return err
		}
		got = append(got, resp)
	}

All the resources I am looking at do say to receive from the stream until we get an error and I believe it is the same for response. For certain I know that the code as-is written may receive several responses (~1-4) before it will receive the object. Once we call CloseSend(), Recv() won't block and the server will always send, so this loop is just waiting for the server side to finish writing the object.
Either way, I've also added another loop following to make sure the stream is properly garbage collected, as seems to be required. I've also seen the following for example:

	// Keep reading to drain the stream.
	for {
		if _, err := stream.Recv(); err != nil {
			break
		}
	}

@BrennaEpp BrennaEpp changed the title feat(storage): change gRPC writes to use bi-direcional streams feat(storage): change gRPC writes to use bi-directional streams Nov 7, 2023
@@ -1688,6 +1691,9 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this happen outside of the if ShouldRetry() conditional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't matter too much either way here. If we don't go into the conditional (ie. shouldRetry == false) it'll return from uploadBuffer with the err and we won't reuse the stream. If we do go into the conditional, I don't think it matters if it happened before or during. Unless I'm missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right but it seems more readable to drop the stream reference immediately after the error if that's the reason we are doing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this line

storage/grpc_client.go Show resolved Hide resolved
// Even though we received the object response, continue reading
// until we receive a non-nil error, to ensure the stream does not
// leak even if the context isn't cancelled. See:
// https://github.com/grpc/grpc-go/commit/365770fcbd7dfb9d921cb44827ede770f33be44f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there docs that we can link to that officially document the "stream protocol" rather than just a commit message?

If not I think it'd be good to get a review from someone on the gRPC Go team if possible, given that these contracts are a little convoluted seemingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual docs: https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream

I'll update it in the comment as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at these docs I'm wondering if we can use the CloseAndRecv helper instead to avoid writing these loops? We already do this here:

resp, err := w.stream.CloseAndRecv()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have that helper function for the bidi stream

err = nil
for err == nil {
_, err = w.stream.Recv()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in another comment, I don't see why we would make this change; stream.CloseAndRecv should do what we want?

@@ -1688,6 +1691,9 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right but it seems more readable to drop the stream reference immediately after the error if that's the reason we are doing it.

storage/grpc_client.go Show resolved Hide resolved
@BrennaEpp BrennaEpp merged commit 3e23a36 into googleapis:main Nov 8, 2023
9 checks passed
gcf-merge-on-green bot pushed a commit that referenced this pull request Nov 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the Cloud Storage API. size: m Pull request size is medium.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants