Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] KeyValue documentation #1537

Merged
merged 3 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
201 changes: 197 additions & 4 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ This doc covers the basic usage of the `jetstream` package in `nats.go` client.
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Examples](#examples)

## Overview

Expand Down Expand Up @@ -53,11 +58,12 @@ JetStream API. Key differences between `jetstream` and `nats` packages include:
- `Msg` - used for message-specific operations - reading data, headers and
metadata, as well as performing various types of acknowledgements

> __NOTE__: `jetstream` requires nats-server >= 2.9.0 to work correctly.
Additionally, `jetstream` exposes [KeyValue Store](#keyvalue-store) and
[ObjectStore](#object-store) capabilities. KV and Object stores are abstraction
layers on top of JetStream Streams, simplifying key value and large data
storage on Streams.

> __WARNING__: The new API is currently provided as a _preview_, and will
> deprecate previous JetStream subscribe APIs. It is encouraged to start
experimenting with the new APIs as soon as possible.
> __NOTE__: `jetstream` requires nats-server >= 2.9.0 to work correctly.

## Basic usage

Expand Down Expand Up @@ -603,6 +609,193 @@ ackF, err = js.PublishAsync("ORDERS.new", []byte("hello"))
Just as for synchronous publish, `PublishAsync()` and `PublishMsgAsync()` accept
options for setting headers.

## KeyValue Store

JetStream KeyValue Stores offer a straightforward method for storing key-value
pairs within JetStream. These stores are supported by a specially configured
stream, designed to efficiently and compactly store these pairs. This structure
ensures rapid and convenient access to the data.

The KV Store, also known as a bucket, enables the execution of various operations:

- create/update a value for a given key
- get a value for a given key
- delete a value for a given key
- purge all values from a bucket
- list all keys in a bucket
- watch for changes on given key set or the whole bucket
- retrieve history of changes for a given key

### Basic usage of KV bucket

The most basic usage of KV bucket is to create or retrieve a bucket and perform
basic CRUD operations on keys.

```go
js, _ := jetstream.New(nc)
ctx := context.Background()

// Create a new bucket. Bucket name is required and has to be unique within a stream.
Jarema marked this conversation as resolved.
Show resolved Hide resolved
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

// Set a value for a given key
// Put will either create or update a value for a given key
kv.Put(ctx, "sue.color", []byte("blue"))

// Get an entry for a given key
// Entry contains key/value, but also metadata (revision, timestamp, etc.))
entry, _ := kv.Get(ctx, "sue.color")

// Prints `sue.color @ 1 -> "blue"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// Update a value for a given key
// Update will fail if the key does not exist or the revision is not up to date
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, "or the revision changed"?

kv.Update(ctx, "sue.color", []byte("red"), 1)

// Create will fail if the key already exists
_, err := kv.Create(ctx, "sue.color", []byte("purple"))
fmt.Println(err) // prints `nats: key exists`

// Delete a value for a given key
Jarema marked this conversation as resolved.
Show resolved Hide resolved
kv.Delete(ctx, "sue.color")

// getting a deleted key will return an error
_, err = kv.Get(ctx, "sue.color")
fmt.Println(err) // prints `nats: key not found`

// A bucket can be deleted once it is no longer needed
js.DeleteKeyValue(ctx, "profiles")
```

### Watching for changes on a bucket

KV buckets support Watchers, which can be used to watch for changes on a given
key or the whole bucket. Watcher will receive a notification on a channel when a
change occurs. By default, watcher will return initial values for all matching
keys. After sending all initial values, watcher will send nil on the channel to
signal that all initial values have been sent and it will start sending updates when
changes occur.

Watcher supports several configuration options:

- `IncludeHistory` will have the key watcher send all historical values
for each key (up to KeyValueMaxHistory).
- `IgnoreDeletes` will have the key watcher not pass any keys with
delete markers.
- `UpdatesOnly` will have the key watcher only pass updates on values
(without latest values when started).
Jarema marked this conversation as resolved.
Show resolved Hide resolved
- `MetaOnly` will have the key watcher retrieve only the entry meta
data, not the entry value.
Jarema marked this conversation as resolved.
Show resolved Hide resolved
- `ResumeFromRevision` instructs the key watcher to resume from a
specific revision number.

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))

// A watcher can be created to watch for changes on a given key or the whole bucket
// Watcher will receive a notification on a channel when a change occurs
// By default, watcher will return initial values for all matching keys.
Jarema marked this conversation as resolved.
Show resolved Hide resolved
// Watcher can be configured to only return updates by using jetstream.UpdatesOnly() option.
watcher, _ := kv.Watch(ctx, "sue.*")
defer watcher.Stop()

kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "sue.color", []byte("red"))

// First, the watcher sends initial values for all matching keys
Jarema marked this conversation as resolved.
Show resolved Hide resolved
// In this case, it will send a single entry for `sue.color`.
entry := <-watcher.Updates()
// Prints `sue.color @ 1 -> "blue"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

// After all initial values have been sent, watcher will send nil on the channel.
Jarema marked this conversation as resolved.
Show resolved Hide resolved
entry = <-watcher.Updates()
if entry != nil {
fmt.Println("Unexpected entry received")
}

// After that, watcher will send updates when changes occur
// In this case, it will send an entry for `sue.color` and `sue.age`.

entry = <-watcher.Updates()
// Prints `sue.age @ 2 -> "43"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))

entry = <-watcher.Updates()
// Prints `sue.color @ 3 -> "red"`
fmt.Printf("%s @ %d -> %q\n", entry.Key(), entry.Revision(), string(entry.Value()))
```

### Additional operations on a bucket

In addition to basic CRUD operations and watching for changes, KV buckets
support several additional operations:

- `ListKeys` will return all keys in a bucket"

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

keys, _ := kv.ListKeys(ctx)

// Prints all 3 keys
for key := range keys.Keys() {
fmt.Println(key)
}
```

- `Purge` and `PurgeDeletes` for removing all keys from a bucket

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

// Purge will remove all keys from a bucket
// The latest revision of each key will be kept
// with a delete marker, all previous revisions will be removed
Jarema marked this conversation as resolved.
Show resolved Hide resolved
kv.Purge(ctx)

// PurgeDeletes will remove all keys from a bucket
// with a delete marker.
kv.PurgeDeletes(ctx)
```

- `Status` will return the current status of a bucket

```go
js, _ := jetstream.New(nc)
ctx := context.Background()
kv, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "profiles"})

kv.Put(ctx, "sue.color", []byte("blue"))
kv.Put(ctx, "sue.age", []byte("43"))
kv.Put(ctx, "bucket", []byte("profiles"))

status, _ := kv.Status(ctx)

fmt.Println(status.Bucket()) // prints `profiles`
fmt.Println(status.Values()) // prints `3`
fmt.Println(status.Bytes()) // prints the size of all values in bytes
```

## Object Store

## Examples

You can find more examples of `jetstream` usage [here](https://github.com/nats-io/nats.go/tree/main/examples/jetstream).