Skip to content

Commit

Permalink
[ADDED] Compression option for object store buckets (#1544)
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Bareuther <40851416+johbar@users.noreply.github.com>
  • Loading branch information
johbar committed Feb 8, 2024
1 parent 547cafa commit 1c24aa7
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 2 deletions.
12 changes: 11 additions & 1 deletion jetstream/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type (
Storage StorageType `json:"storage,omitempty"`
Replicas int `json:"num_replicas,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Compression bool `json:"compression,omitempty"`

// Bucket-specific metadata
// NOTE: Metadata requires nats-server v2.10.0+
Expand Down Expand Up @@ -151,6 +152,8 @@ type (
BackingStore() string
// Metadata is the user supplied metadata for the bucket
Metadata() map[string]string
// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
}

// ObjectMetaOptions
Expand Down Expand Up @@ -271,7 +274,10 @@ func (js *jetStream) CreateObjectStore(ctx context.Context, cfg ObjectStoreConfi
if maxBytes == 0 {
maxBytes = -1
}

var compression StoreCompression
if cfg.Compression {
compression = S2Compression
}
scfg := StreamConfig{
Name: fmt.Sprintf(objNameTmpl, name),
Description: cfg.Description,
Expand All @@ -285,6 +291,7 @@ func (js *jetStream) CreateObjectStore(ctx context.Context, cfg ObjectStoreConfi
AllowRollup: true,
AllowDirect: true,
Metadata: cfg.Metadata,
Compression: compression,
}

// Create our stream.
Expand Down Expand Up @@ -1101,6 +1108,9 @@ func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.
// StreamInfo is the stream info retrieved to create the status
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }

// IsCompressed indicates if the data is compressed on disk
func (s *ObjectBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }

// Status retrieves run-time status about a bucket
func (obs *obs) Status(ctx context.Context) (ObjectStoreStatus, error) {
nfo, err := obs.stream.Info(ctx)
Expand Down
35 changes: 35 additions & 0 deletions jetstream/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,38 @@ func TestObjectStoreGetObjectContextTimeout(t *testing.T) {
expectErr(t, err, nats.ErrTimeout)
r.Close()
}

func TestObjectStoreCompression(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx := context.Background()

objCompressed, err := js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{
Bucket: "A",
Compression: true,
})
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}

status, err := objCompressed.Status(ctx)
if err != nil {
t.Fatalf("Error getting bucket status: %v", err)
}

if !status.IsCompressed() {
t.Fatalf("Expected bucket to be compressed")
}

objStream, err := js.Stream(ctx, "OBJ_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}

if objStream.CachedInfo().Config.Compression != jetstream.S2Compression {
t.Fatalf("Expected stream to be compressed with S2")
}
}
14 changes: 13 additions & 1 deletion object.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ type ObjectStoreConfig struct {
// Bucket-specific metadata
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
// Enable underlying stream compression.
// NOTE: Compression is supported for nats-server 2.10.0+
Compression bool
}

type ObjectStoreStatus interface {
Expand All @@ -174,6 +177,8 @@ type ObjectStoreStatus interface {
BackingStore() string
// Metadata is the user supplied metadata for the bucket
Metadata() map[string]string
// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
}

// ObjectMetaOptions
Expand Down Expand Up @@ -266,7 +271,10 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
if maxBytes == 0 {
maxBytes = -1
}

var compression StoreCompression
if cfg.Compression {
compression = S2Compression
}
scfg := &StreamConfig{
Name: fmt.Sprintf(objNameTmpl, name),
Description: cfg.Description,
Expand All @@ -280,6 +288,7 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
AllowRollup: true,
AllowDirect: true,
Metadata: cfg.Metadata,
Compression: compression,
}

// Create our stream.
Expand Down Expand Up @@ -1224,6 +1233,9 @@ func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.
// StreamInfo is the stream info retrieved to create the status
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }

// IsCompressed indicates if the data is compressed on disk
func (s *ObjectBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }

// Status retrieves run-time status about a bucket
func (obs *obs) Status() (ObjectStoreStatus, error) {
nfo, err := obs.js.StreamInfo(obs.stream)
Expand Down
34 changes: 34 additions & 0 deletions test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,3 +1087,37 @@ func TestObjectStoreGetObjectContextTimeout(t *testing.T) {
expectErr(t, err, nats.ErrTimeout)
r.Close()
}

func TestObjectStoreCompression(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

obj, err := js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: "A",
Compression: true,
})
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}

status, err := obj.Status()
if err != nil {
t.Fatalf("Error getting bucket status: %v", err)
}

if !status.IsCompressed() {
t.Fatalf("Expected bucket to be compressed")
}

objStream, err := js.StreamInfo("OBJ_A")
if err != nil {
t.Fatalf("Error getting stream info: %v", err)
}

if objStream.Config.Compression != nats.S2Compression {
t.Fatalf("Expected stream to be compressed with S2")
}
}

0 comments on commit 1c24aa7

Please sign in to comment.