Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Jan 26, 2024
1 parent b5ae4f4 commit 48e18b2
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 255 deletions.
48 changes: 36 additions & 12 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ type (
//
// This package provides two implementations of Consumer interface:
//
// - Basic named/ephemeral pull consumers. These consumers are created using
// CreateConsumer method on Stream or JetStream interface. They have to be
// - Standard named/ephemeral pull consumers. These consumers are created using
// CreateConsumer method on Stream or JetStream interface. They can be
// explicitly configured (using [ConsumerConfig]) and managed by the user,
// either from this package or externally.
//
// - Ordered consumers. These consumers are created using OrderedConsumer
// method on Stream or JetStream interface. They are managed by the library
// and provide a simple way to consume messages from a stream in order. Ordered
// and provide a simple way to consume messages from a stream. Ordered
// consumers are ephemeral in-memory pull consumers and are resilient to
// deletes and restarts. They provide limited configuration options
// using [OrderedConsumerConfig].
Expand All @@ -47,22 +47,46 @@ type (
// retrieval using Fetch and Next methods.
Consumer interface {
// Fetch is used to retrieve up to a provided number of messages from a
// stream. This method will always send a single request and wait until
// either all messages are retrieved or request times out.
// stream. This method will send a single request and deliver either all
// requested messages unless time out is met earlier. Fetch timeout
// defaults to 30 seconds and can be configured using FetchMaxWait
// option.
//
// Fetch is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)

// FetchBytes is used to retrieve up to a provided bytes from the
// stream. This method will always send a single request and wait until
// provided number of bytes is exceeded or request times out.
// stream. This method will send a single request and deliver the
// provided number of bytes unless time out is met earlier. FetchBytes
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
//
// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error)

// FetchNoWait is used to retrieve up to a provided number of messages
// from a stream. This method will send a single request and
// immediately return up to a provided number of messages or wait until
// at least one message is available or request times out.
// from a stream. Unlike Fetch, FetchNoWait will only deliver messages
// that are currently available in the stream and will not wait for new
// messages to arrive, even if batch size is not met. FetchNoWait
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
//
// FetchNoWait is non-blocking and returns MessageBatch, exposing a
// channel for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
FetchNoWait(batch int) (MessageBatch, error)

// Consume can be used to continuously receive messages and handle them
// Consume will continuously receive messages and handle them
// with the provided callback function. Consume can be configured using
// PullConsumeOpt options:
//
Expand Down Expand Up @@ -91,7 +115,7 @@ type (
// This option is enabled by default.
Messages(opts ...PullMessagesOpt) (MessagesContext, error)

// Next is used to retrieve the next message from the stream. This
// Next is used to retrieve the next message from the consumer. This
// method will block until the message is retrieved or timeout is
// reached.
Next(opts ...FetchOpt) (Msg, error)
Expand Down
78 changes: 48 additions & 30 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type (
// message, including its sequence numbers and timestamp.
Delivered SequenceInfo `json:"delivered"`

// AckFloor tracks the highest sequence number of a message that has
// been acknowledged.
// AckFloor indicates the message before the first unacknowledged
// message.
AckFloor SequenceInfo `json:"ack_floor"`

// NumAckPending is the number of messages that have been delivered but
Expand All @@ -60,8 +60,8 @@ type (
// for pull-based consumers.
NumWaiting int `json:"num_waiting"`

// NumPending is the number of messages that are pending to be delivered
// to this consumer.
// NumPending is the number of messages that match the consumer's
// filter, but have not been delivered yet.
NumPending uint64 `json:"num_pending"`

// Cluster contains information about the cluster to which this consumer
Expand All @@ -78,12 +78,17 @@ type (
ConsumerConfig struct {
// Name is an optional name for the consumer. If not set, one is
// generated automatically.
//
// Name cannot contain whitespace, ., *, >, path separators (forward or
// backwards slash), and non-printable characters.
Name string `json:"name,omitempty"`

// Durable is an optional durable name for the consumer. If both Durable
// and Name are set, they have to be equal. If Durable is set and Name
// is not, Name is set to Durable. Unless InactiveThreshold is set, a
// and Name are set, they have to be equal. Unless InactiveThreshold is set, a
// durable consumer will not be cleaned up automatically.
//
// Durable cannot contain whitespace, ., *, >, path separators (forward or
// backwards slash), and non-printable characters.
Durable string `json:"durable_name,omitempty"`

// Description provides an optional description of the consumer.
Expand All @@ -108,33 +113,36 @@ type (
AckPolicy AckPolicy `json:"ack_policy"`

// AckWait defines how long the server will wait for an acknowledgement
// before resending a message. Defaults to 30 seconds.
// before resending a message. If not set, server default is 30 seconds.
AckWait time.Duration `json:"ack_wait,omitempty"`

// MaxDeliver defines the maximum number of delivery attempts for a
// message. Applies to any message that is re-sent due to ack policy.
// Defaults to -1 (unlimited).
// If not set, server default is -1 (unlimited).
MaxDeliver int `json:"max_deliver,omitempty"`

// BackOff specifies the optional back-off intervals for retrying
// message delivery after a failed acknowledgement. It overrides
// AckWait.
//
// BackOff only applies to messages not acknowledged in specified time,
// not messages that were nack'ed.
//
// The number of intervals specified must be lower or equal to
// MaxDeliver. If the number of intervals is lower, the last interval is
// used for all remaining attempts.
BackOff []time.Duration `json:"backoff,omitempty"`

// FilterSubject can be used to filter messages delivered form the
// stream. It has to overlap with the subjects defined on the stream.
// FilterSubject is exclusive with FilterSubjects.
// FilterSubject can be used to filter messages delivered from the
// stream. FilterSubject is exclusive with FilterSubjects.
FilterSubject string `json:"filter_subject,omitempty"`

// ReplayPolicy defines the rate at which messages are sent to the
// consumer. If ReplayOriginalPolicy is set, messages are sent at the
// same rate as they were stored in the stream. If ReplayInstantPolicy
// is set, messages are sent as fast as possible. Defaults to
// ReplayInstantPolicy.
// consumer. If ReplayOriginalPolicy is set, messages are sent in the
// same intervals in which they were stored on stream. This can be used
// e.g. to simulate production traffic in development environments. If
// ReplayInstantPolicy is set, messages are sent as fast as possible.
// Defaults to ReplayInstantPolicy.
ReplayPolicy ReplayPolicy `json:"replay_policy"`

// RateLimit specifies an optional maximum rate of message delivery in
Expand All @@ -147,12 +155,15 @@ type (
SampleFrequency string `json:"sample_freq,omitempty"`

// MaxWaiting is a maximum number of pull requests waiting to be
// fulfilled. Defaults to 512.
// fulfilled. If not set, this will inherit settings from stream's
// ConsumerLimits or (if those are not set) from account settings. If
// neither are set, server default is 512.
MaxWaiting int `json:"max_waiting,omitempty"`

// MaxAckPending is a maximum number of outstanding unacknowledged
// messages. Once this limit is reached, the server will suspend sending
// messages to the consumer. Defaults to 1000. Set to -1 for unlimited.
// messages to the consumer. If not set, server default is 1000
// seconds. Set to -1 for unlimited.
MaxAckPending int `json:"max_ack_pending,omitempty"`

// HeadersOnly indicates whether only headers of messages should be sent
Expand All @@ -175,9 +186,15 @@ type (

// InactiveThreshold is a duration which instructs the server to clean
// up the consumer if it has been inactive for the specified duration.
// Defaults to 5s for non-durable consumers. Durable consumers will not
// be cleaned up by default, but if InactiveThreshold is set, they will
// be.
// Durable consumers will not be cleaned up by default, but if
// InactiveThreshold is set, they will be. If not set, this will inherit
// settings from stream's ConsumerLimits. If neither are set, server
// default is 5 seconds.
//
// A consumer is considered inactive there are not pull requests
// received by the server (for pull consumers), or no interest detected
// on deliver subject (for push consumers), not if there are no
// messages to be delivered.
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

// Replicas the number of replicas for the consumer's state. By default,
Expand All @@ -188,9 +205,9 @@ type (
// rather than inherit the storage type from the stream.
MemoryStorage bool `json:"mem_storage,omitempty"`

// FilterSubjects is a set of subjects that overlap with the subjects
// bound to the stream to filter delivered messages. This field is
// exclusive with FilterSubject. Requires nats-server v2.10.0 or later.
// FilterSubjects allows filtering messages from a stream by subject.
// This field is exclusive with FilterSubject. Requires nats-server
// v2.10.0 or later.
FilterSubjects []string `json:"filter_subjects,omitempty"`

// Metadata is a set of application-defined key-value pairs for
Expand All @@ -204,9 +221,9 @@ type (
//
// [Ordered Consumers]: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md#ordered-consumers
OrderedConsumerConfig struct {
// FilterSubjects is a set of subjects that overlap with the subjects
// bound to the stream to filter delivered messages. Requires
// nats-server v2.10.0 or later.
// FilterSubjects allows filtering messages from a stream by subject.
// This field is exclusive with FilterSubject. Requires nats-server
// v2.10.0 or later.
FilterSubjects []string `json:"filter_subjects,omitempty"`

// DeliverPolicy defines from which point to start delivering messages
Expand All @@ -224,10 +241,11 @@ type (
OptStartTime *time.Time `json:"opt_start_time,omitempty"`

// ReplayPolicy defines the rate at which messages are sent to the
// consumer. If ReplayOriginalPolicy is set, messages are sent at the
// same rate as they were stored in the stream. If ReplayInstantPolicy
// is set, messages are sent as fast as possible. Defaults to
// ReplayInstantPolicy.
// consumer. If ReplayOriginalPolicy is set, messages are sent in the
// same intervals in which they were stored on stream. This can be used
// e.g. to simulate production traffic in development environments. If
// ReplayInstantPolicy is set, messages are sent as fast as possible.
// Defaults to ReplayInstantPolicy.
ReplayPolicy ReplayPolicy `json:"replay_policy"`

// InactiveThreshold is a duration which instructs the server to clean
Expand Down

0 comments on commit 48e18b2

Please sign in to comment.