Skip to content

Commit

Permalink
chore(storage): round up Chunksize to nearest multiple of 256kib [GRP…
Browse files Browse the repository at this point in the history
…C] (#7799)

Fixes #7754

GRPC skipped due to #7798 affecting the last test chunksize 0 uploads everything

Also cleans up a typo in ProgressFunc docs
  • Loading branch information
BrennaEpp committed May 1, 2023
1 parent 49a1621 commit ca738ab
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 1 deletion.
7 changes: 7 additions & 0 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
gapic "cloud.google.com/go/storage/internal/apiv2"
"cloud.google.com/go/storage/internal/apiv2/storagepb"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand Down Expand Up @@ -1471,6 +1472,12 @@ func (r *gRPCReader) reopenStream() (*storagepb.ReadObjectResponse, error) {

func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) *gRPCWriter {
size := params.chunkSize

// Round up chunksize to nearest 256KiB
if size%googleapi.MinUploadChunkSize != 0 {
size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize)
}

if params.chunkSize == 0 {
// TODO: Should we actually use the minimum of 256 KB here when the user
// indicates they want minimal memory usage? We cannot do a zero-copy,
Expand Down
87 changes: 87 additions & 0 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2250,6 +2250,93 @@ func TestIntegration_WriterContentType(t *testing.T) {
})
}

func TestIntegration_WriterChunksize(t *testing.T) {
ctx := skipJSONReads(skipGRPC("https://github.com/googleapis/google-cloud-go/issues/7839"), "no reads in test")
multiTransportTest(ctx, t, func(t *testing.T, ctx context.Context, bucket, _ string, client *Client) {
obj := client.Bucket(bucket).Object("writer-chunksize-test" + uidSpaceObjects.New())
objSize := 1<<10<<10 + 1 // 1 Mib + 1 byte
contents := bytes.Repeat([]byte("a"), objSize)

for _, test := range []struct {
desc string
chunksize int
wantBytesPerCall int64
wantCallbacks int
}{
{
desc: "default chunksize",
chunksize: 16 << 10 << 10,
wantBytesPerCall: 16 << 10 << 10,
wantCallbacks: 0,
},
{
desc: "small chunksize rounds up to 256kib",
chunksize: 1,
wantBytesPerCall: 256 << 10,
wantCallbacks: 5,
},
{
desc: "chunksize of 256kib",
chunksize: 256 << 10,
wantBytesPerCall: 256 << 10,
wantCallbacks: 5,
},
{
desc: "chunksize of just over 256kib rounds up",
chunksize: 256<<10 + 1,
wantBytesPerCall: 256 * 2 << 10,
wantCallbacks: 3,
},
{
desc: "multiple of 256kib",
chunksize: 256 * 3 << 10,
wantBytesPerCall: 256 * 3 << 10,
wantCallbacks: 2,
},
{
desc: "chunksize 0 uploads everything",
chunksize: 0,
wantBytesPerCall: int64(objSize),
wantCallbacks: 0,
},
} {
t.Run(test.desc, func(t *testing.T) {
t.Cleanup(func() { obj.Delete(ctx) })

w := obj.Retryer(WithPolicy(RetryAlways)).NewWriter(ctx)
w.ChunkSize = test.chunksize

bytesWrittenSoFar := int64(0)
callbacks := 0

w.ProgressFunc = func(i int64) {
bytesWrittenByCall := i - bytesWrittenSoFar

// Error if this is not the last call and we don't write exactly wantBytesPerCall
if i != int64(objSize) && bytesWrittenByCall != test.wantBytesPerCall {
t.Errorf("unexpected number of bytes written by call; wanted: %d, written: %d", test.wantBytesPerCall, bytesWrittenByCall)
}

bytesWrittenSoFar = i
callbacks++
}

if _, err := w.Write(contents); err != nil {
_ = w.Close()
t.Fatalf("writer.Write: %v", err)
}
if err := w.Close(); err != nil {
t.Fatalf("writer.Close: %v", err)
}

if callbacks != test.wantCallbacks {
t.Errorf("ProgressFunc was called %d times, expected %d", callbacks, test.wantCallbacks)
}
})
}
})
}

func TestIntegration_ZeroSizedObject(t *testing.T) {
t.Parallel()
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket, _ string, client *Client) {
Expand Down
2 changes: 1 addition & 1 deletion storage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Writer struct {
// cancellation.
ChunkRetryDeadline time.Duration

// ProgressFunc can be used to monitor the progress of a large write.
// ProgressFunc can be used to monitor the progress of a large write
// operation. If ProgressFunc is not nil and writing requires multiple
// calls to the underlying service (see
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload),
Expand Down

0 comments on commit ca738ab

Please sign in to comment.