Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable ordered delivery and processing of messages per stream by NATS in a seamless way to the applications #4855

Open
geekox86 opened this issue Dec 6, 2023 · 1 comment
Assignees
Labels
proposal Enhancement idea or proposal stale This issue has had no activity in a while

Comments

@geekox86
Copy link

geekox86 commented Dec 6, 2023

Proposed change

Hi everyone.

This is a proposal that I would like to discuss with you before detailing the technical aspects.
The goal is to enable ordered delivery and processing of messages per stream by NATS in a seamless way to the applications.

I have quickly went through the relevant issues on NATS GitHub and discussions around similar requests by other users.
Below is a summary of the mentioned problems that need to be solved efficiently for such proposal to be accepted.
Please, let me know if there is more that I missed in order to try to tackle in this discussion.

1- Partial failures and concurrency in the publishers may cause out-of-order delivery of messages
2- Partial failures and concurrency in the subscribers may cause out-of-order processing of messages
3- Seamless ordering by NATS requires complex rebalancing of subscribers when added/removed
4- Sequential message delivery or processing may harm the overall availability/scalability

Note, the context of use cases related to this discussion is mostly SOA, DDD, ES, and CQRS.
When I mention a stream, I mostly mean event messages that represent the changes to an aggregate root (AR) representing a domain model (DM), usually due to the processing of command requests. Though, it could very well represent anything else that is sensitive to message ordering.

In NATS, such a stream will be represented by a subject made of AR/DM type name + the stream ID being the AR/DM ID.
So when talking about message ordering here, I really mean ordering messages of the same stream, AR/DM. In other words, ordered messages per stream. Messages from other streams may interleave.

Problem #1:

Concurrent publishers acting at different speeds or due to partial failures may publish messages of the same stream out-of-order.

Proposed solution:
Have publishers specify a message sequence number when publishing. Usually, this will be the version of the AR/DM at the time of publishing. Publishing multiple messages in one batch will simply increment the given message sequence number by the number of published messages in the batch minus 1.

The NATS servers will then simply delay the delivery of out-of-order messages of a single stream (as identified by the subject) to order-sensitive subscribers until all delayed messages are contiguous (i.e. no gaps in their sequence numbers).
For order-insensitive subscribers, message delivery will be business as usual.

Problem #2:

Concurrent subscribers acting at different speeds or due to partial failures may receive or process messages of the same stream out-of-order.

Proposed solution:
Have subscribers indicate that they are sensitive to message ordering, hence streams will be assigned by NATS servers to subscribers using consistent hashing of the subject. In case of partial failures in message processing, no further messages are processed from the same stream until previous ones are successfully processed. Messages from other streams can still be processed as usual.

Alternative solutions:
Have the applications handle the stream assignments to subscribers either via dedicated streams or subject transforms. Scaling such solutions is usually disruptive.

Problem #3:

When subscribers go down or when scaled up/down, streams need to be reassigned to available subscribers based on their new count, in order to maintain in-order delivery/processing.

Proposed solution:
When a subscriber is added/removed, request previous/remaining subscribers to inform the NATS servers when they have processed (successfully or not) all messages in their memory queues. As soon all have informed NATS servers, new messages can be delivered and processed by all subscribers based on the new stream assignments.

Another option is to request previous/remaining subscribers to drop all messages in their memory queues and start delivering and processing dropped and new messages immediately based on the new stream assignments.

NATS servers can wait for a configured time to allow for any down subscribers to start again then continue as is, if was not requested gracefully by the admin.

Problem #4

Usually, load is evenly distributed due to consistent hashing. In cases where particular streams receive higher load, some subscribers may be overloaded while others are underloaded.

Proposed solution:
Monitor the load (throughput and latency) of top N streams (where N is a configured number) for each subscriber and reassign them to other subscribers as necessary to better optimize the load distribution over the subscribers. This requires two kind of stream assignments: range and individual. Range-stream assignment uses consistent hashing whereas individual-stream assignment uses inclusion/exclusion lists of stream subjects.

Use case

Ordered delivery and processing of messages per stream is a common use case and repeatedly requested feature, specially to integrate systems and services.

Contribution

Yes, although I will need to learn Go and get some support to digest the existing source code base.

@geekox86 geekox86 added the proposal Enhancement idea or proposal label Dec 6, 2023
@jnmoyne
Copy link
Contributor

jnmoyne commented Dec 28, 2023

Problem #1: you could do this in your application by putting that sequence number in a header of the message, and then on the receiving end have the application buffer messages if it detects a gap in the sequence numbers stored in that header and only pass them to the callback/message processing logic if those sequence numbers are contiguous.

Problem #2: you can do this using subject mapping and transformation, for example as part of the Stream configuration such that a token containing a partition number is inserted in the subject name of the messages recorded in the stream and such that then individual client applications can create filtered consumers to only consume messages (in a strict order (e.g. max acks pending=1)) on one (or more) of those partitions using a subject filter that filters on the partition number in the subject name.

Problem #3: you can leverage the 'working queue' storage policy to ensure that as you change the number of consumers (e.g. change the number of partitions that each consumer consumes on) or the number of partitions, it will ensure that the same message can not be consumed more than once even if the number of consumers (or what partitions they consume on) changes.

Problem #4: If you use consistent hashing properly there should not be any 'hot spots' in the distribution of the subjects between the buckets, if you get some imbalance then you should just use more partitions, or put more data in, consistent hashing works in a way that it will 'tend' towards 'perfectly balanced' distribution of the keys over the buckets as the number of keys increases.

@github-actions github-actions bot added the stale This issue has had no activity in a while label Mar 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Enhancement idea or proposal stale This issue has had no activity in a while
Projects
None yet
Development

No branches or pull requests

3 participants