Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): retry errors from last recv on uploads #9616

Merged
merged 4 commits into from
Mar 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 14 additions & 8 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st

// Send a request with as many bytes as possible.
// Loop until all bytes are sent.
sendBytes: // label this loop so that we can use a continue statement from a nested block
for {
bytesNotYetSent := recvd - sent
remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
Expand Down Expand Up @@ -1949,10 +1950,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// we retry.
w.stream = nil

// Drop the stream reference as a new one will need to be created if
// we can retry the upload
w.stream = nil

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received.
Expand All @@ -1966,7 +1963,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st

// Continue sending requests, opening a new stream and resending
// any bytes not yet persisted as per QueryWriteStatus
continue
continue sendBytes
}
}
if err != nil {
Expand All @@ -1981,7 +1978,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Not done sending data, do not attempt to commit it yet, loop around
// and send more data.
if recvd-sent > 0 {
continue
continue sendBytes
}

// The buffer has been uploaded and there is still more data to be
Expand Down Expand Up @@ -2012,7 +2009,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Drop the stream reference as a new one will need to be created.
w.stream = nil

continue
continue sendBytes
}
if err != nil {
return nil, 0, err
Expand All @@ -2022,7 +2019,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Retry if not all bytes were persisted.
writeOffset = resp.GetPersistedSize()
sent = int(writeOffset) - int(start)
continue
continue sendBytes
}
} else {
// If the object is done uploading, close the send stream to signal
Expand All @@ -2042,6 +2039,15 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if shouldRetry(err) {
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)
w.stream = nil
continue sendBytes
}
if err != nil {
return nil, 0, err
}
Expand Down