Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix gRPC generation/condition issues #8396

Merged
merged 2 commits into from Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
188 changes: 188 additions & 0 deletions storage/client_test.go
Expand Up @@ -1120,6 +1120,194 @@ func TestHMACKeyCRUDEmulated(t *testing.T) {
})
}

func TestObjectConditionsEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
ctx := context.Background()

// Create test bucket
if _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{Name: bucket}); err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

cases := []struct {
name string
call func() error
}{
{
name: "Object update generation",
call: func() error {
objName, gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
uattrs := &ObjectAttrsToUpdate{CustomTime: time.Now()}
_, err = client.UpdateObject(ctx, bucket, objName, uattrs, gen, nil, nil)
return err
},
},
{
name: "Object update ifMetagenerationMatch",
call: func() error {
objName, gen, metaGen, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
uattrs := &ObjectAttrsToUpdate{CustomTime: time.Now()}
conds := &Conditions{
GenerationMatch: gen,
MetagenerationMatch: metaGen,
}
_, err = client.UpdateObject(ctx, bucket, objName, uattrs, gen, nil, conds)
return err
},
},
{
name: "Object write ifGenerationMatch",
call: func() error {
var err error
_, err = client.OpenWriter(&openWriterParams{
ctx: ctx,
chunkSize: 256 * 1024,
chunkRetryDeadline: 0,
bucket: bucket,
attrs: &ObjectAttrs{},
conds: &Conditions{DoesNotExist: true},
encryptionKey: nil,
sendCRC32C: false,
donec: nil,
setError: func(e error) {
if e != nil {
err = e
}
},
progress: nil,
setObj: nil,
})
return err
},
},
{
name: "Object rewrite ifMetagenerationMatch",
call: func() error {
objName, gen, metaGen, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
_, err = client.RewriteObject(ctx, &rewriteObjectRequest{
srcObject: sourceObject{
name: objName,
bucket: bucket,
gen: gen,
conds: &Conditions{
GenerationMatch: gen,
MetagenerationMatch: metaGen,
},
},
dstObject: destinationObject{
name: fmt.Sprintf("%d-object", time.Now().Nanosecond()),
bucket: bucket,
conds: &Conditions{
DoesNotExist: true,
},
attrs: &ObjectAttrs{},
},
})
return err
},
},
{
name: "Object compose ifGenerationMatch",
call: func() error {
obj1, obj1Gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
obj2, obj2Gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
_, err = client.ComposeObject(ctx, &composeObjectRequest{
dstBucket: bucket,
dstObject: destinationObject{
name: fmt.Sprintf("%d-object", time.Now().Nanosecond()),
bucket: bucket,
conds: &Conditions{DoesNotExist: true},
attrs: &ObjectAttrs{},
},
srcs: []sourceObject{
{
name: obj1,
bucket: bucket,
gen: obj1Gen,
conds: &Conditions{
GenerationMatch: obj1Gen,
},
},
{
name: obj2,
bucket: bucket,
conds: &Conditions{
GenerationMatch: obj2Gen,
},
},
},
})
return err
},
},
{
name: "Object delete ifGenerationMatch",
call: func() error {
objName, gen, _, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
err = client.DeleteObject(ctx, bucket, objName, gen, &Conditions{GenerationMatch: gen})
return err
},
},
{
name: "Object get ifMetagenerationMatch",
call: func() error {
objName, gen, metaGen, err := createObject(ctx, bucket)
if err != nil {
return fmt.Errorf("creating object: %w", err)
}
_, err = client.GetObject(ctx, bucket, objName, gen, nil, &Conditions{GenerationMatch: gen, MetagenerationMatch: metaGen})
return err
},
},
}
for _, c := range cases {
t.Run(c.name, func(r *testing.T) {
if err := c.call(); err != nil {
r.Errorf("error: %v", err)
}
})
}
})
}

// createObject creates an object in the emulator and returns its name, generation, and
// metageneration.
func createObject(ctx context.Context, bucket string) (string, int64, int64, error) {
prefix := time.Now().Nanosecond()
objName := fmt.Sprintf("%d-object", prefix)

w := veneerClient.Bucket(bucket).Object(objName).NewWriter(ctx)
if _, err := w.Write(randomBytesToWrite); err != nil {
return "", 0, 0, fmt.Errorf("failed to populate test data: %w", err)
}
if err := w.Close(); err != nil {
return "", 0, 0, fmt.Errorf("closing object: %v", err)
}
attrs, err := veneerClient.Bucket(bucket).Object(objName).Attrs(ctx)
if err != nil {
return "", 0, 0, fmt.Errorf("update object: %v", err)
}
return objName, attrs.Generation, attrs.Metageneration, nil
}

// transportClienttest executes the given function with a sub-test, a project name
// based on the transport, a unique bucket name also based on the transport, and
// the transport-specific client to run the test with. It also checks the environment
Expand Down
20 changes: 14 additions & 6 deletions storage/grpc_client.go
Expand Up @@ -512,11 +512,15 @@ func (c *grpcStorageClient) GetObject(ctx context.Context, bucket, object string
func (c *grpcStorageClient) UpdateObject(ctx context.Context, bucket, object string, uattrs *ObjectAttrsToUpdate, gen int64, encryptionKey []byte, conds *Conditions, opts ...storageOption) (*ObjectAttrs, error) {
s := callSettings(c.settings, opts...)
o := uattrs.toProtoObject(bucketResourceName(globalProjectAlias, bucket), object)
// For Update, generation is passed via the object message rather than a field on the request.
if gen >= 0 {
o.Generation = gen
}
req := &storagepb.UpdateObjectRequest{
Object: o,
PredefinedAcl: uattrs.PredefinedACL,
}
if err := applyCondsProto("grpcStorageClient.UpdateObject", gen, conds, req); err != nil {
if err := applyCondsProto("grpcStorageClient.UpdateObject", defaultGen, conds, req); err != nil {
return nil, err
}
if s.userProject != "" {
Expand Down Expand Up @@ -783,17 +787,18 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec

dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket)
dstObjPb.Name = req.dstObject.name
if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, dstObjPb); err != nil {
return nil, err
}

if req.sendCRC32C {
dstObjPb.Checksums.Crc32C = &req.dstObject.attrs.CRC32C
}

srcs := []*storagepb.ComposeObjectRequest_SourceObject{}
for _, src := range req.srcs {
srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name}
if err := applyCondsProto("ComposeObject source", src.gen, src.conds, srcObjPb); err != nil {
srcObjPb := &storagepb.ComposeObjectRequest_SourceObject{Name: src.name, ObjectPreconditions: &storagepb.ComposeObjectRequest_SourceObject_ObjectPreconditions{}}
if src.gen >= 0 {
srcObjPb.Generation = src.gen
}
if err := applyCondsProto("ComposeObject source", defaultGen, src.conds, srcObjPb.ObjectPreconditions); err != nil {
return nil, err
}
srcs = append(srcs, srcObjPb)
Expand All @@ -803,6 +808,9 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec
Destination: dstObjPb,
SourceObjects: srcs,
}
if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, rawReq); err != nil {
tritone marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
if req.predefinedACL != "" {
rawReq.DestinationPredefinedAcl = req.predefinedACL
}
Expand Down