Skip to content

Commit

Permalink
[ADDED] UpdateObjectStore and CreateOrUpdateObjectStore methods
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Feb 8, 2024
1 parent 1c24aa7 commit 3664988
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 15 deletions.
85 changes: 71 additions & 14 deletions jetstream/object.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -41,6 +41,10 @@ type (
ObjectStore(ctx context.Context, bucket string) (ObjectStore, error)
// CreateObjectStore will create an object store.
CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)
// UpdateObjectStore will update an existing object store.
UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)
// CreateOrUpdateObjectStore will create or update an object store.
CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error)
// DeleteObjectStore will delete the underlying stream for the named object.
DeleteObjectStore(ctx context.Context, bucket string) error
// ObjectStoreNames is used to retrieve a list of bucket names
Expand Down Expand Up @@ -253,10 +257,73 @@ const (
objDigestTmpl = objDigestType + "%s"
)

// CreateObjectStore will create an object store.
func (js *jetStream) CreateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) {
scfg, err := js.prepareObjectStoreConfig(ctx, cfg)
if err != nil {
return nil, err
}

stream, err := js.CreateStream(ctx, scfg)
if err != nil {
if errors.Is(err, ErrStreamNameAlreadyInUse) {
// errors are joined so that backwards compatibility is retained
// and previous checks for ErrStreamNameAlreadyInUse will still work.
err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err)
}
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToObjectStore(js, pushJS, cfg.Bucket, stream), nil
}

func (js *jetStream) UpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) {
scfg, err := js.prepareObjectStoreConfig(ctx, cfg)
if err != nil {
return nil, err
}

// Attempt to update the stream.
// If the stream does not exist, create it.
stream, err := js.UpdateStream(ctx, scfg)
if err != nil {
if errors.Is(err, ErrStreamNotFound) {
return nil, fmt.Errorf("%w: %s", ErrBucketNotFound, cfg.Bucket)
}
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToObjectStore(js, pushJS, cfg.Bucket, stream), nil
}

func (js *jetStream) CreateOrUpdateObjectStore(ctx context.Context, cfg ObjectStoreConfig) (ObjectStore, error) {
scfg, err := js.prepareObjectStoreConfig(ctx, cfg)
if err != nil {
return nil, err
}

stream, err := js.CreateOrUpdateStream(ctx, scfg)
if err != nil {
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToObjectStore(js, pushJS, cfg.Bucket, stream), nil
}

func (js *jetStream) prepareObjectStoreConfig(ctx context.Context, cfg ObjectStoreConfig) (StreamConfig, error) {
if !validBucketRe.MatchString(cfg.Bucket) {
return nil, ErrInvalidStoreName
return StreamConfig{}, ErrInvalidStoreName
}

name := cfg.Bucket
Expand Down Expand Up @@ -294,17 +361,7 @@ func (js *jetStream) CreateObjectStore(ctx context.Context, cfg ObjectStoreConfi
Compression: compression,
}

// Create our stream.
stream, err := js.CreateStream(ctx, scfg)
if err != nil {
return nil, err
}
pushJS, err := js.legacyJetStream()
if err != nil {
return nil, err
}

return mapStreamToObjectStore(js, pushJS, name, stream), nil
return scfg, nil
}

// ObjectStore will look up and bind to an existing object store instance.
Expand Down
65 changes: 64 additions & 1 deletion jetstream/test/object_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -129,6 +129,69 @@ func TestObjectBasics(t *testing.T) {
expectErr(t, err, jetstream.ErrBucketNotFound)
}

func TestCreateObjectStore(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// invalid bucket name
_, err := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST.", Description: "Test store"})
expectErr(t, err, jetstream.ErrInvalidStoreName)

_, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"})
expectOk(t, err)

// Check that we can't overwrite existing bucket.
_, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "New store"})
expectErr(t, err, jetstream.ErrBucketExists)

// assert that we're backwards compatible
expectErr(t, err, jetstream.ErrStreamNameAlreadyInUse)
}

func TestUpdateObjectStore(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// cannot update a non-existing bucket
_, err := js.UpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"})
expectErr(t, err, jetstream.ErrBucketNotFound)

_, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"})
expectOk(t, err)

// update the bucket
_, err = js.UpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "New store"})
expectOk(t, err)
}

func TestCreateOrUpdateObjectStore(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

// invalid bucket name
_, err := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST.", Description: "Test store"})
expectErr(t, err, jetstream.ErrInvalidStoreName)

_, err = js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "Test store"})
expectOk(t, err)

// update the bucket
_, err = js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{Bucket: "TEST", Description: "New store"})
expectOk(t, err)
}

func TestGetObjectDigestMismatch(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit 3664988

Please sign in to comment.