Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve jetstream documentation #1532

Merged
merged 6 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
118 changes: 97 additions & 21 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -25,30 +25,107 @@ import (

type (

// Consumer contains methods for fetching/processing messages from a stream, as well as fetching consumer info
// Consumer contains methods for fetching/processing messages from a stream,
// as well as fetching consumer info.
//
// This package provides two implementations of Consumer interface:
//
// - Standard named/ephemeral pull consumers. These consumers are created using
// CreateConsumer method on Stream or JetStream interface. They can be
// explicitly configured (using [ConsumerConfig]) and managed by the user,
Jarema marked this conversation as resolved.
Show resolved Hide resolved
// either from this package or externally.
//
// - Ordered consumers. These consumers are created using OrderedConsumer
// method on Stream or JetStream interface. They are managed by the library
// and provide a simple way to consume messages from a stream. Ordered
// consumers are ephemeral in-memory pull consumers and are resilient to
// deletes and restarts. They provide limited configuration options
// using [OrderedConsumerConfig].
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
//
// Consumer provides method for optimized continuous consumption of messages
// using Consume and Messages methods, as well as simple one-off messages
// retrieval using Fetch and Next methods.
Consumer interface {
// Fetch is used to retrieve up to a provided number of messages from a stream.
// This method will always send a single request and wait until either all messages are retrieved
// or request times out.
// Fetch is used to retrieve up to a provided number of messages from a
// stream. This method will send a single request and deliver either all
// requested messages unless time out is met earlier. Fetch timeout
// defaults to 30 seconds and can be configured using FetchMaxWait
// option.
//
// Fetch is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
// FetchBytes is used to retrieve up to a provided bytes from the stream.
// This method will always send a single request and wait until provided number of bytes is
// exceeded or request times out.

// FetchBytes is used to retrieve up to a provided bytes from the
Jarema marked this conversation as resolved.
Show resolved Hide resolved
// stream. This method will send a single request and deliver the
// provided number of bytes unless time out is met earlier. FetchBytes
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
//
// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error)
// FetchNoWait is used to retrieve up to a provided number of messages from a stream.
// This method will always send a single request and immediately return up to a provided number of messages.

// FetchNoWait is used to retrieve up to a provided number of messages
// from a stream. Unlike Fetch, FetchNoWait will only deliver messages
// that are currently available in the stream and will not wait for new
// messages to arrive, even if batch size is not met. FetchNoWait
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
//
// FetchNoWait is non-blocking and returns MessageBatch, exposing a
// channel for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
FetchNoWait(batch int) (MessageBatch, error)
// Consume can be used to continuously receive messages and handle them with the provided callback function

// Consume will continuously receive messages and handle them
// with the provided callback function. Consume can be configured using
// PullConsumeOpt options:
//
// - Error handling and monitoring can be configured using ConsumeErrHandler
// option, which provides information about errors encountered during
// consumption (both transient and terminal)
// - Consume can be configured to stop after a certain number of
// messages is received using StopAfter option.
// - Consume can be optimized for throughput or memory usage using
// PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options.
// Unless there is a specific use case, these options should not be used.
//
// Consume returns a ConsumeContext, which can be used to stop or drain
// the consumer.
Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error)
// Messages returns [MessagesContext], allowing continuously iterating over messages on a stream.

// Messages returns MessagesContext, allowing continuously iterating
// over messages on a stream. Messages can be configured using
// PullMessagesOpt options:
//
// - Messages can be optimized for throughput or memory usage using
// PullExpiry, PullMaxMessages, PullMaxBytes and PullHeartbeat options.
// Unless there is a specific use case, these options should not be used.
// - WithMessagesErrOnMissingHeartbeat can be used to enable/disable
// erroring out on MessagesContext.Next when a heartbeat is missing.
// This option is enabled by default.
Messages(opts ...PullMessagesOpt) (MessagesContext, error)
// Next is used to retrieve the next message from the stream.
// This method will block until the message is retrieved or timeout is reached.

// Next is used to retrieve the next message from the consumer. This
// method will block until the message is retrieved or timeout is
// reached.
Next(opts ...FetchOpt) (Msg, error)

// Info returns Consumer details
// Info fetches current ConsumerInfo from the server.
Info(context.Context) (*ConsumerInfo, error)
// CachedInfo returns [*ConsumerInfo] cached on a consumer struct

// CachedInfo returns ConsumerInfo currently cached on this consumer.
// This method does not perform any network requests. The cached
// ConsumerInfo is updated on every call to Info and Update.
CachedInfo() *ConsumerInfo
}

Expand All @@ -59,7 +136,7 @@ type (
}
)

// Info returns [ConsumerInfo] for a given consumer
// Info fetches current ConsumerInfo from the server.
func (p *pullConsumer) Info(ctx context.Context) (*ConsumerInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
Expand All @@ -85,10 +162,9 @@ func (p *pullConsumer) Info(ctx context.Context) (*ConsumerInfo, error) {
return resp.ConsumerInfo, nil
}

// CachedInfo returns [ConsumerInfo] fetched when initializing/updating a consumer
//
// NOTE: The returned object might not be up to date with the most recent updates on the server
// For up-to-date information, use [Info]
// CachedInfo returns ConsumerInfo currently cached on this consumer.
// This method does not perform any network requests. The cached
// ConsumerInfo is updated on every call to Info and Update.
func (p *pullConsumer) CachedInfo() *ConsumerInfo {
return p.info
}
Expand Down