Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PartitionQueue forwarding #558

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

matyama
Copy link

@matyama matyama commented Mar 14, 2023

Hi!

This PR is proposing a small extension to the PartitionQueue, or rather it just exposes the unsafe rd_kafka_queue_forward binding in a safe manner on the PartitionQueue.

Currently, the BaseConsumer can split its main queue into partition queues, but it comes quite handy to have an option to "join" these splitted queues back. For instance, one can have a pool of worker threads, each assigned with a subset of partitions of a topic, and then it comes quite handy to have the worker just forward all the queues into one and poll it (this is previewed in the tests).

To prevent misuse, the added forward function first checks that both involved consumers (of the forwarded and destination queues) point to the same underlying client. In case they don't, a newly introduced KafkaError::ClientMismatch is returned to inform about this problem. I've decided to add a new type because none of the existing ones seem quite fitting for this.

@matyama matyama force-pushed the partition-queue-forward branch 2 times, most recently from 82f4b44 to b53a337 Compare April 13, 2023 12:22
@matyama
Copy link
Author

matyama commented Jul 3, 2023

Hi, I've added a small update that introduces a PartitionQueue::reset_forwarding method. This one is complementary to the forward one and forwards a partition queue back to the main consumer queue.

Also, would there be a chance to fit this into one of the next releases (after a review etc. of course)? I'm currently depending on the fork, which is not ideal :)

This commit extends the `PartitionQueue` with new `forward`
functionality which simply exposes the unsafe `rd_kafka_queue_forward`
binding in a safe manner.

To prevent misuse, the function first checks that both involved
consumers (of the forwarded and destination queues) point to the same
underlying client. In case they don't, a newly introduced
`KafkaError::ClientMismatch` is returned to inform about this problem.
This commit introduces new `PartitionQueue::reset_forwarding` method and
updates an integration test for queue forwarding.

The `reset_forwarding` method will forward a partition queue back to the
main queue of its underlying consumer.
/// Forwards this queue back into the consumer queue.
pub fn reset_forwarding(&mut self) {
if let Some(consumer_queue) = self.consumer.client().consumer_queue() {
unsafe { rdsys::rd_kafka_queue_forward(self.queue.ptr(), consumer_queue.ptr()) };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it might be surprising to have reset_forwarding send this queue back to the main consumer queue? I would have expected it to be the inverse of forward, where any previous forwarding would be removed and this queue will resume receiving messages. This is the behavior when calling rd_kafka_queue_forward with a NULL pointer for the dst queue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the naming might be unfortunate and misleading. IIRC the motivation here was to provide a method to go back ("reset") to the main consumer queue as the current public API does not expose the consumer queue (i.e., it can't be passed to forward as the dst).

But it totally makes sense to have a way how to clear the forwarding as well. I think there are several ways how to approach this:

  1. There could be two methods to reset/clear forwarding - reset being the current one and clear_forwarding would use the null ptr.
  2. Expose a ConsumerQueue and change reset_forwarding as you propose (and generalize forward). But I'm not quite sure if there's enough value in adding such a public wrapper for the main queue.
  3. Stick closer to the bindings and overload forward for all the forwarding modes. Something along these lines:
    // impl PartitionQueue<C>
    pub fn forward<'a>(&mut self, mode: impl Into<ForwardMode<'a, C>>) -> KafkaResult<()> {
        match mode.into() {
            ForwardMode::Clear => todo!("forward using null ptr"),
            ForwardMode::Consumer => todo!("forward to consumer queue"),
            ForwardMode::Queue(dst) => todo!("forward to dst"),
        }
    }
    
    // describes how/where to forward
    pub enum ForwardMode<'a, C: ConsumerContext> {
        // -> ptr::null_mut()
        Clear,
        // -> self.consumer.client().consumer_queue()
        Consumer,
        // -> this PartitionQueue
        // with an impl From<&mut 'a PartitionQueue<C>>
        Queue(&'a mut PartitionQueue<C>),
    }
    I quite like this option, but what's a bit sad here is that Clear and Consumer would still return a KafkaResult even though they should/could be infallible.
  4. Remove reset_forwarding altogether and leave this feature for future PRs. My use-case was rather static (i.e., forward and forget), I just thought that some form of reset might be useful in a more dynamic setup.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the 3rd option the best. If you don't have a specific use case for it however, I'm wondering if a simpler API would be to just update the Drop implementation for PartitionQueue to forward the queue back to the main consumer queue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've considered implementing the reset inside Drop before, but the issue here is that it may produce unexpected side-effects when one wants to just forward a queue and then drop it - for instance:

start with a 3-way split (q0, q1, q2)
forward q0 -> q1  // now we expect q1 to receive (q0 + q1)
drop(q0)          // but here q0 returns back to main due to the implicit reset
// now we have (q0 -> main, q1, q2), which might be confusing

This could be prevented by keeping these "source" queues around, but that's not great either.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I went with the 3rd option (the generalized forward) in the latest commit and added couple more tests ;)

This commit removes `PartitionQueue::reset_forwarding` and instead
generalizes `PartitionQueue::forward` over different forwarding _modes_.

These modes are defined by an enum `ForwardMode` and mimic the
underlying `librdkafka` binding behavior. Specifically:
 - `Clear` removes any forwarding on the target queue
 - `Consumer` forwards the queue back to the main consumer queue (note:
   this replaces the `reset_forwarding` method)
 - `Queue` triggers the previous implementation of `forward` (i.e., it
   forwards `self -> Queue(dst)`)

Additionally, there are new unit test scenarios covering the
forward/clear dynamics.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants