Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support delete groups in the admin client #510

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
/// Native rdkafka delete topic object.
pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;

/// Native rdkafka delete group object.
pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;

/// Native rdkafka new partitions object.
pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;

Expand All @@ -85,6 +88,9 @@ pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
/// Native rdkafka topic result.
pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;

/// Native rdkafka group result.
pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t;

// ENUMS

/// Client types.
Expand Down
95 changes: 95 additions & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,46 @@ impl<C: ClientContext> AdminClient<C> {
Ok(rx)
}

/// Deletes the named groups.
pub fn delete_groups(
&self,
group_names: &[&str],
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<GroupResult>>> {
match self.delete_groups_inner(group_names, opts) {
Ok(rx) => Either::Left(DeleteGroupsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn delete_groups_inner(
&self,
group_names: &[&str],
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut native_groups = Vec::new();
let mut err_buf = ErrBuf::new();
for gn in group_names {
let gn_t = CString::new(*gn)?;
let native_group = unsafe {
NativeDeleteGroup::from_ptr(rdsys::rd_kafka_DeleteGroup_new(gn_t.as_ptr())).unwrap()
};
native_groups.push(native_group);
}
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;

unsafe {
rdsys::rd_kafka_DeleteGroups(
self.client.native_ptr(),
native_groups.as_c_array(),
native_groups.len(),
native_opts.ptr(),
self.queue.ptr(),
)
}
Ok(rx)
}

/// Adds additional partitions to existing topics according to the provided
/// `NewPartitions` specifications.
///
Expand Down Expand Up @@ -561,6 +601,27 @@ fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Ve
out
}

/// The result of a DeleteGroup operation.
pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;

fn build_group_results(groups: *const *const RDKafkaGroupResult, n: usize) -> Vec<GroupResult> {
let mut out = Vec::with_capacity(n);
for i in 0..n {
let group = unsafe { *groups.add(i) };
let name = unsafe { cstr_to_owned(rdsys::rd_kafka_group_result_name(group)) };
let err = unsafe {
let err = rdsys::rd_kafka_group_result_error(group);
rdsys::rd_kafka_error_code(err)
};
if err.is_error() {
out.push(Err((name, err.into())));
} else {
out.push(Ok(name));
}
}
out
}

//
// Create topic handling
//
Expand Down Expand Up @@ -740,6 +801,40 @@ impl Future for DeleteTopicsFuture {
}
}

//
// Delete group handling
//
type NativeDeleteGroup = NativePtr<RDKafkaDeleteGroup>;

unsafe impl KafkaDrop for RDKafkaDeleteGroup {
const TYPE: &'static str = "delete group";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteGroup_destroy;
}

struct DeleteGroupsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for DeleteGroupsFuture {
type Output = KafkaResult<Vec<GroupResult>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteGroups_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete groups request received response of incorrect type ({})",
typ
))));
}
let mut n = 0;
let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
Poll::Ready(Ok(build_group_results(groups, n)))
}
}

//
// Create partitions handling
//
Expand Down
100 changes: 96 additions & 4 deletions tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::time::Duration;
use backoff::{ExponentialBackoff, Operation};

use rdkafka::admin::{
AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, NewPartitions, NewTopic,
OwnedResourceSpecifier, ResourceSpecifier, TopicReplication,
AdminClient, AdminOptions, AlterConfig, ConfigEntry, ConfigSource, GroupResult, NewPartitions,
NewTopic, OwnedResourceSpecifier, ResourceSpecifier, TopicReplication,
};
use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext};
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::metadata::Metadata;
use rdkafka::ClientConfig;
use rdkafka::{ClientConfig, TopicPartitionList};

use crate::utils::*;

Expand Down Expand Up @@ -380,6 +380,98 @@ async fn test_configs() {
assert_eq!(res, &[Ok(OwnedResourceSpecifier::Broker(0))]);
}

#[tokio::test]
async fn test_groups() {
let admin_client = create_admin_client();

// Happy path: delete a consumer group returns Ok
{
let group_name = "test_group";
create_consumer_group(&group_name).await;
// act
let res = admin_client
.delete_groups(&[&group_name], &AdminOptions::default())
.await;

assert_eq!(res, Ok(vec![Ok(group_name.to_string())]));
}
// For each group returns the deletion result
{
let group_name = "test_group";
let unknown_group_name = "unknown_group";
create_consumer_group(&group_name).await;
// act
let res = admin_client
.delete_groups(
&[&group_name, &unknown_group_name],
&AdminOptions::default(),
)
.await;

assert_eq!(
res,
Ok(vec![
Ok(group_name.to_string()),
Err((
unknown_group_name.to_string(),
RDKafkaErrorCode::GroupIdNotFound
))
])
);
}
// Trying to delete a consumer group that doesn't exists returns GroupIdNotFound
{
let res = admin_client
.delete_groups(&[&"non_existing_group"], &AdminOptions::default())
.await;
let expected: GroupResult = Err((
"non_existing_group".to_string(),
RDKafkaErrorCode::GroupIdNotFound,
));
assert_eq!(res, Ok(vec![expected]));
}

// helper function to create a consumer group
async fn create_consumer_group(consumer_group_name: &str) {
let admin_client = create_admin_client();
let topic_name = "test_topic";
let consumer: BaseConsumer = create_config()
.set("group.id", consumer_group_name.clone())
.create()
.expect("create consumer failed");

admin_client
.create_topics(
&[NewTopic {
name: topic_name,
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
}],
&AdminOptions::default(),
)
.await
.expect("topic creation failed");
let topic_partition_list = {
let mut lst = TopicPartitionList::new();
lst.add_partition(topic_name, 0);
lst
};
consumer
.assign(&topic_partition_list)
.expect("assign topic partition list failed");
consumer
.fetch_metadata(None, Duration::from_secs(3))
.expect("unable to fetch metadata");
consumer
.store_offset(topic_name, 0, -1)
.expect("store offset failed");
consumer
.commit_consumer_state(CommitMode::Sync)
.expect("commit the consumer state failed");
}
}

// Tests whether each admin operation properly reports an error if the entire
// request fails. The original implementations failed to check this, resulting
// in confusing situations where a failed admin request would return Ok([]).
Expand Down