Skip to content

Commit

Permalink
Support delete groups in the admin client
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewinci authored and benesch committed Nov 13, 2022
1 parent 65520c8 commit 2cd37b9
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 4 deletions.
10 changes: 10 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

## Unreleased

* Add the [`AdminClient::delete_groups`] method, which deletes consumer groups
from a Kafka cluster ([#510]).

Thanks, [@andrewinci].

[@andrewinci]: https://github.com/andrewinci
[#510]: https://github.com/fede1024/rust-rdkafka/issues/510

## 0.29.0 (2022-10-29)

* **Breaking change.** Pass through errors from librdkafka in
Expand Down
5 changes: 5 additions & 0 deletions rdkafka-sys/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Unreleased

* Add the `RDKafkaDeleteGroup` and `RDKafkaGroupResult` type aliases to the
`types` module.

## v4.3.0+1.9.2 (2022-10-29)

* Upgrade to librdkafka v1.9.2.
Expand Down
6 changes: 6 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
/// Native rdkafka delete topic object.
pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;

/// Native rdkafka delete group object.
pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;

/// Native rdkafka new partitions object.
pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;

Expand All @@ -85,6 +88,9 @@ pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
/// Native rdkafka topic result.
pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;

/// Native rdkafka group result.
pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t;

// ENUMS

/// Client types.
Expand Down
96 changes: 96 additions & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,46 @@ impl<C: ClientContext> AdminClient<C> {
Ok(rx)
}

/// Deletes the named groups.
pub fn delete_groups(
&self,
group_names: &[&str],
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<GroupResult>>> {
match self.delete_groups_inner(group_names, opts) {
Ok(rx) => Either::Left(DeleteGroupsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn delete_groups_inner(
&self,
group_names: &[&str],
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut native_groups = Vec::new();
let mut err_buf = ErrBuf::new();
for gn in group_names {
let gn_t = CString::new(*gn)?;
let native_group = unsafe {
NativeDeleteGroup::from_ptr(rdsys::rd_kafka_DeleteGroup_new(gn_t.as_ptr())).unwrap()
};
native_groups.push(native_group);
}
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;

unsafe {
rdsys::rd_kafka_DeleteGroups(
self.client.native_ptr(),
native_groups.as_c_array(),
native_groups.len(),
native_opts.ptr(),
self.queue.ptr(),
)
}
Ok(rx)
}

/// Adds additional partitions to existing topics according to the provided
/// `NewPartitions` specifications.
///
Expand Down Expand Up @@ -561,6 +601,27 @@ fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Ve
out
}

/// The result of a DeleteGroup operation.
pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;

fn build_group_results(groups: *const *const RDKafkaGroupResult, n: usize) -> Vec<GroupResult> {
let mut out = Vec::with_capacity(n);
for i in 0..n {
let group = unsafe { *groups.add(i) };
let name = unsafe { cstr_to_owned(rdsys::rd_kafka_group_result_name(group)) };
let err = unsafe {
let err = rdsys::rd_kafka_group_result_error(group);
rdsys::rd_kafka_error_code(err)
};
if err.is_error() {
out.push(Err((name, err.into())));
} else {
out.push(Ok(name));
}
}
out
}

//
// Create topic handling
//
Expand Down Expand Up @@ -740,6 +801,41 @@ impl Future for DeleteTopicsFuture {
}
}

//
// Delete group handling
//

type NativeDeleteGroup = NativePtr<RDKafkaDeleteGroup>;

unsafe impl KafkaDrop for RDKafkaDeleteGroup {
const TYPE: &'static str = "delete group";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteGroup_destroy;
}

struct DeleteGroupsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for DeleteGroupsFuture {
type Output = KafkaResult<Vec<GroupResult>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteGroups_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete groups request received response of incorrect type ({})",
typ
))));
}
let mut n = 0;
let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
Poll::Ready(Ok(build_group_results(groups, n)))
}
}

//
// Create partitions handling
//
Expand Down
97 changes: 93 additions & 4 deletions tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::time::Duration;
use backoff::{ExponentialBackoff, Operation};

use rdkafka::admin::{
AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic,
OwnedResourceSpecifier, ResourceSpecifier, TopicReplication,
AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, GroupResult, NewPartitions,
NewTopic, OwnedResourceSpecifier, ResourceSpecifier, TopicReplication,
};
use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext};
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::metadata::Metadata;
use rdkafka::ClientConfig;
use rdkafka::{ClientConfig, TopicPartitionList};

use crate::utils::*;

Expand All @@ -30,6 +30,45 @@ fn create_admin_client() -> AdminClient<DefaultClientContext> {
.expect("admin client creation failed")
}

async fn create_consumer_group(consumer_group_name: &str) {
let admin_client = create_admin_client();
let topic_name = &rand_test_topic();
let consumer: BaseConsumer = create_config()
.set("group.id", consumer_group_name.clone())
.create()
.expect("create consumer failed");

admin_client
.create_topics(
&[NewTopic {
name: topic_name,
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
}],
&AdminOptions::default(),
)
.await
.expect("topic creation failed");
let topic_partition_list = {
let mut lst = TopicPartitionList::new();
lst.add_partition(topic_name, 0);
lst
};
consumer
.assign(&topic_partition_list)
.expect("assign topic partition list failed");
consumer
.fetch_metadata(None, Duration::from_secs(3))
.expect("unable to fetch metadata");
consumer
.store_offset(topic_name, 0, -1)
.expect("store offset failed");
consumer
.commit_consumer_state(CommitMode::Sync)
.expect("commit the consumer state failed");
}

fn fetch_metadata(topic: &str) -> Metadata {
let consumer: BaseConsumer<DefaultConsumerContext> =
create_config().create().expect("consumer creation failed");
Expand Down Expand Up @@ -380,6 +419,56 @@ async fn test_configs() {
assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]);
}

#[tokio::test]
async fn test_groups() {
let admin_client = create_admin_client();

// Verify that a valid group can be deleted.
{
let group_name = rand_test_group();
create_consumer_group(&group_name).await;
let res = admin_client
.delete_groups(&[&group_name], &AdminOptions::default())
.await;
assert_eq!(res, Ok(vec![Ok(group_name.to_string())]));
}

// Verify that attempting to delete an unknown group returns a "group not
// found" error.
{
let unknown_group_name = rand_test_group();
let res = admin_client
.delete_groups(&[&unknown_group_name], &AdminOptions::default())
.await;
let expected: GroupResult = Err((unknown_group_name, RDKafkaErrorCode::GroupIdNotFound));
assert_eq!(res, Ok(vec![expected]));
}

// Verify that deleting a valid and invalid group results in a mixed result
// set.
{
let group_name = rand_test_group();
let unknown_group_name = rand_test_group();
create_consumer_group(&group_name).await;
let res = admin_client
.delete_groups(
&[&group_name, &unknown_group_name],
&AdminOptions::default(),
)
.await;
assert_eq!(
res,
Ok(vec![
Ok(group_name.to_string()),
Err((
unknown_group_name.to_string(),
RDKafkaErrorCode::GroupIdNotFound
))
])
);
}
}

// Tests whether each admin operation properly reports an error if the entire
// request fails. The original implementations failed to check this, resulting
// in confusing situations where a failed admin request would return Ok([]).
Expand Down

0 comments on commit 2cd37b9

Please sign in to comment.