From 24b071ccf9fb03716d27f7a0df9ca7f773f261da Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Sun, 6 Nov 2022 00:53:26 -0400 Subject: [PATCH] Introduce a callback for rewriting broker addresses Use librdkafka's new DNS resolution callback to expose a rust-rdkafka callback for rewriting broker addresses. This allows tunneling of a Kafka connection over SSH or PrivateLink, by rewriting the broker addresses returned during bootstrapping to addresses that will route through the tunnel. --- .gitmodules | 2 +- rdkafka-sys/librdkafka | 2 +- rdkafka-sys/src/bindings.rs | 308 +++++++++++++++----------------- src/client.rs | 70 +++++++- src/producer/future_producer.rs | 6 +- 5 files changed, 213 insertions(+), 175 deletions(-) diff --git a/.gitmodules b/.gitmodules index 7d2f654d2..3943bcdf7 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "rdkafka-sys/librdkafka"] path = rdkafka-sys/librdkafka - url = https://github.com/edenhill/librdkafka + url = https://github.com/MaterializeInc/librdkafka diff --git a/rdkafka-sys/librdkafka b/rdkafka-sys/librdkafka index 9b72ca3aa..a61f71d36 160000 --- a/rdkafka-sys/librdkafka +++ b/rdkafka-sys/librdkafka @@ -1 +1 @@ -Subproject commit 9b72ca3aa6c49f8f57eea02f70aadb1453d3ba1f +Subproject commit a61f71d3613c12433dc1f9a6a47b6448b2d29529 diff --git a/rdkafka-sys/src/bindings.rs b/rdkafka-sys/src/bindings.rs index 8abaeb04a..e9f70591a 100644 --- a/rdkafka-sys/src/bindings.rs +++ b/rdkafka-sys/src/bindings.rs @@ -1,52 +1,9 @@ -/* automatically generated by rust-bindgen 0.60.1 */ +/* automatically generated by rust-bindgen 0.61.0 */ -use libc::{c_char, c_int, c_void, size_t, sockaddr, ssize_t, FILE}; +use libc::{addrinfo, c_char, c_int, c_void, sockaddr, FILE}; use num_enum::TryFromPrimitive; -#[repr(C)] -pub struct __BindgenUnionField(::std::marker::PhantomData); -impl __BindgenUnionField { - #[inline] - pub const fn new() -> Self { - __BindgenUnionField(::std::marker::PhantomData) - } - #[inline] - pub unsafe fn as_ref(&self) -> &T { - ::std::mem::transmute(self) - } - #[inline] - pub unsafe fn as_mut(&mut self) -> &mut T { - ::std::mem::transmute(self) - } -} -impl ::std::default::Default for __BindgenUnionField { - #[inline] - fn default() -> Self { - Self::new() - } -} -impl ::std::clone::Clone for __BindgenUnionField { - #[inline] - fn clone(&self) -> Self { - Self::new() - } -} -impl ::std::marker::Copy for __BindgenUnionField {} -impl ::std::fmt::Debug for __BindgenUnionField { - fn fmt(&self, fmt: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - fmt.write_str("__BindgenUnionField") - } -} -impl ::std::hash::Hash for __BindgenUnionField { - fn hash(&self, _state: &mut H) {} -} -impl ::std::cmp::PartialEq for __BindgenUnionField { - fn eq(&self, _other: &__BindgenUnionField) -> bool { - true - } -} -impl ::std::cmp::Eq for __BindgenUnionField {} -pub const RD_KAFKA_VERSION: u32 = 17367295; +pub const RD_KAFKA_VERSION: u32 = 17367807; pub const RD_KAFKA_DEBUG_CONTEXTS : & [u8 ; 138usize] = b"all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf\0" ; pub const RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE: u32 = 8; pub const RD_KAFKA_OFFSET_BEGINNING: i32 = -2; @@ -350,7 +307,7 @@ pub struct rd_kafka_err_desc { pub desc: *const c_char, } extern "C" { - pub fn rd_kafka_get_err_descs(errdescs: *mut *const rd_kafka_err_desc, cntp: *mut size_t); + pub fn rd_kafka_get_err_descs(errdescs: *mut *const rd_kafka_err_desc, cntp: *mut usize); } extern "C" { pub fn rd_kafka_err2str(err: rd_kafka_resp_err_t) -> *const c_char; @@ -371,7 +328,7 @@ extern "C" { pub fn rd_kafka_fatal_error( rk: *mut rd_kafka_t, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { @@ -410,12 +367,13 @@ extern "C" { ) -> *mut rd_kafka_error_t; } #[repr(C)] +#[derive(Debug, Copy, Clone)] pub struct rd_kafka_topic_partition_s { pub topic: *mut c_char, pub partition: i32, pub offset: i64, pub metadata: *mut c_void, - pub metadata_size: size_t, + pub metadata_size: usize, pub opaque: *mut c_void, pub err: rd_kafka_resp_err_t, pub _private: *mut c_void, @@ -515,38 +473,41 @@ pub enum rd_kafka_vtype_t { RD_KAFKA_VTYPE_HEADERS = 10, } #[repr(C)] +#[derive(Copy, Clone)] pub struct rd_kafka_vu_s { pub vtype: rd_kafka_vtype_t, pub u: rd_kafka_vu_s__bindgen_ty_1, } #[repr(C)] -pub struct rd_kafka_vu_s__bindgen_ty_1 { - pub cstr: __BindgenUnionField<*const c_char>, - pub rkt: __BindgenUnionField<*mut rd_kafka_topic_t>, - pub i: __BindgenUnionField, - pub i32_: __BindgenUnionField, - pub i64_: __BindgenUnionField, - pub mem: __BindgenUnionField, - pub header: __BindgenUnionField, - pub headers: __BindgenUnionField<*mut rd_kafka_headers_t>, - pub ptr: __BindgenUnionField<*mut c_void>, - pub _pad: __BindgenUnionField<[c_char; 64usize]>, - pub bindgen_union_field: [u64; 8usize], +#[derive(Copy, Clone)] +pub union rd_kafka_vu_s__bindgen_ty_1 { + pub cstr: *const c_char, + pub rkt: *mut rd_kafka_topic_t, + pub i: c_int, + pub i32_: i32, + pub i64_: i64, + pub mem: rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_1, + pub header: rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_2, + pub headers: *mut rd_kafka_headers_t, + pub ptr: *mut c_void, + pub _pad: [c_char; 64usize], } #[repr(C)] +#[derive(Debug, Copy, Clone)] pub struct rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_1 { pub ptr: *mut c_void, - pub size: size_t, + pub size: usize, } #[repr(C)] +#[derive(Debug, Copy, Clone)] pub struct rd_kafka_vu_s__bindgen_ty_1__bindgen_ty_2 { pub name: *const c_char, pub val: *const c_void, - pub size: ssize_t, + pub size: isize, } pub type rd_kafka_vu_t = rd_kafka_vu_s; extern "C" { - pub fn rd_kafka_headers_new(initial_count: size_t) -> *mut rd_kafka_headers_t; + pub fn rd_kafka_headers_new(initial_count: usize) -> *mut rd_kafka_headers_t; } extern "C" { pub fn rd_kafka_headers_destroy(hdrs: *mut rd_kafka_headers_t); @@ -558,9 +519,9 @@ extern "C" { pub fn rd_kafka_header_add( hdrs: *mut rd_kafka_headers_t, name: *const c_char, - name_size: ssize_t, + name_size: isize, value: *const c_void, - value_size: ssize_t, + value_size: isize, ) -> rd_kafka_resp_err_t; } extern "C" { @@ -574,36 +535,37 @@ extern "C" { hdrs: *const rd_kafka_headers_t, name: *const c_char, valuep: *mut *const c_void, - sizep: *mut size_t, + sizep: *mut usize, ) -> rd_kafka_resp_err_t; } extern "C" { pub fn rd_kafka_header_get( hdrs: *const rd_kafka_headers_t, - idx: size_t, + idx: usize, name: *const c_char, valuep: *mut *const c_void, - sizep: *mut size_t, + sizep: *mut usize, ) -> rd_kafka_resp_err_t; } extern "C" { pub fn rd_kafka_header_get_all( hdrs: *const rd_kafka_headers_t, - idx: size_t, + idx: usize, namep: *mut *const c_char, valuep: *mut *const c_void, - sizep: *mut size_t, + sizep: *mut usize, ) -> rd_kafka_resp_err_t; } #[repr(C)] +#[derive(Debug, Copy, Clone)] pub struct rd_kafka_message_s { pub err: rd_kafka_resp_err_t, pub rkt: *mut rd_kafka_topic_t, pub partition: i32, pub payload: *mut c_void, - pub len: size_t, + pub len: usize, pub key: *mut c_void, - pub key_len: size_t, + pub key_len: usize, pub offset: i64, pub _private: *mut c_void, } @@ -645,7 +607,7 @@ extern "C" { ); } extern "C" { - pub fn rd_kafka_header_cnt(hdrs: *const rd_kafka_headers_t) -> size_t; + pub fn rd_kafka_header_cnt(hdrs: *const rd_kafka_headers_t) -> usize; } #[repr(u32)] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] @@ -676,7 +638,7 @@ extern "C" { extern "C" { pub fn rd_kafka_conf_dup_filter( conf: *const rd_kafka_conf_t, - filter_cnt: size_t, + filter_cnt: usize, filter: *mut *const c_char, ) -> *mut rd_kafka_conf_t; } @@ -689,7 +651,7 @@ extern "C" { name: *const c_char, value: *const c_char, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_conf_res_t; } extern "C" { @@ -714,7 +676,7 @@ extern "C" { unsafe extern "C" fn( rk: *mut rd_kafka_t, payload: *mut c_void, - len: size_t, + len: usize, err: rd_kafka_resp_err_t, opaque: *mut c_void, msg_opaque: *mut c_void, @@ -815,7 +777,7 @@ extern "C" { unsafe extern "C" fn( rk: *mut rd_kafka_t, json: *mut c_char, - json_len: size_t, + json_len: usize, opaque: *mut c_void, ) -> c_int, >, @@ -869,6 +831,20 @@ extern "C" { closesocket_cb: Option c_int>, ); } +extern "C" { + pub fn rd_kafka_conf_set_resolve_cb( + conf: *mut rd_kafka_conf_t, + resolve_cb: Option< + unsafe extern "C" fn( + node: *const c_char, + service: *const c_char, + hints: *const addrinfo, + res: *mut *mut addrinfo, + opaque: *mut c_void, + ) -> c_int, + >, + ); +} extern "C" { pub fn rd_kafka_conf_set_ssl_cert_verify_cb( conf: *mut rd_kafka_conf_t, @@ -880,9 +856,9 @@ extern "C" { x509_error: *mut c_int, depth: c_int, buf: *const c_char, - size: size_t, + size: usize, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, opaque: *mut c_void, ) -> c_int, >, @@ -910,9 +886,9 @@ extern "C" { cert_type: rd_kafka_cert_type_t, cert_enc: rd_kafka_cert_enc_t, buffer: *const c_void, - size: size_t, + size: usize, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_conf_res_t; } extern "C" { @@ -943,7 +919,7 @@ extern "C" { conf: *const rd_kafka_conf_t, name: *const c_char, dest: *mut c_char, - dest_size: *mut size_t, + dest_size: *mut usize, ) -> rd_kafka_conf_res_t; } extern "C" { @@ -951,20 +927,20 @@ extern "C" { conf: *const rd_kafka_topic_conf_t, name: *const c_char, dest: *mut c_char, - dest_size: *mut size_t, + dest_size: *mut usize, ) -> rd_kafka_conf_res_t; } extern "C" { - pub fn rd_kafka_conf_dump(conf: *mut rd_kafka_conf_t, cntp: *mut size_t) -> *mut *const c_char; + pub fn rd_kafka_conf_dump(conf: *mut rd_kafka_conf_t, cntp: *mut usize) -> *mut *const c_char; } extern "C" { pub fn rd_kafka_topic_conf_dump( conf: *mut rd_kafka_topic_conf_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const c_char; } extern "C" { - pub fn rd_kafka_conf_dump_free(arr: *mut *const c_char, cnt: size_t); + pub fn rd_kafka_conf_dump_free(arr: *mut *const c_char, cnt: usize); } extern "C" { pub fn rd_kafka_conf_properties_show(fp: *mut FILE); @@ -989,7 +965,7 @@ extern "C" { name: *const c_char, value: *const c_char, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_conf_res_t; } extern "C" { @@ -1005,7 +981,7 @@ extern "C" { unsafe extern "C" fn( rkt: *const rd_kafka_topic_t, keydata: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1034,7 +1010,7 @@ extern "C" { pub fn rd_kafka_msg_partitioner_random( rkt: *const rd_kafka_topic_t, key: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1044,7 +1020,7 @@ extern "C" { pub fn rd_kafka_msg_partitioner_consistent( rkt: *const rd_kafka_topic_t, key: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1054,7 +1030,7 @@ extern "C" { pub fn rd_kafka_msg_partitioner_consistent_random( rkt: *const rd_kafka_topic_t, key: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1064,7 +1040,7 @@ extern "C" { pub fn rd_kafka_msg_partitioner_murmur2( rkt: *const rd_kafka_topic_t, key: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1074,7 +1050,7 @@ extern "C" { pub fn rd_kafka_msg_partitioner_murmur2_random( rkt: *const rd_kafka_topic_t, key: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1084,7 +1060,7 @@ extern "C" { pub fn rd_kafka_msg_partitioner_fnv1a( rkt: *const rd_kafka_topic_t, key: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1094,7 +1070,7 @@ extern "C" { pub fn rd_kafka_msg_partitioner_fnv1a_random( rkt: *const rd_kafka_topic_t, key: *const c_void, - keylen: size_t, + keylen: usize, partition_cnt: i32, rkt_opaque: *mut c_void, msg_opaque: *mut c_void, @@ -1105,7 +1081,7 @@ extern "C" { type_: rd_kafka_type_t, conf: *mut rd_kafka_conf_t, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> *mut rd_kafka_t; } extern "C" { @@ -1190,10 +1166,10 @@ extern "C" { ) -> rd_kafka_resp_err_t; } extern "C" { - pub fn rd_kafka_mem_calloc(rk: *mut rd_kafka_t, num: size_t, size: size_t) -> *mut c_void; + pub fn rd_kafka_mem_calloc(rk: *mut rd_kafka_t, num: usize, size: usize) -> *mut c_void; } extern "C" { - pub fn rd_kafka_mem_malloc(rk: *mut rd_kafka_t, size: size_t) -> *mut c_void; + pub fn rd_kafka_mem_malloc(rk: *mut rd_kafka_t, size: usize) -> *mut c_void; } extern "C" { pub fn rd_kafka_mem_free(rk: *mut rd_kafka_t, ptr: *mut c_void); @@ -1236,14 +1212,14 @@ extern "C" { ) -> rd_kafka_resp_err_t; } extern "C" { - pub fn rd_kafka_queue_length(rkqu: *mut rd_kafka_queue_t) -> size_t; + pub fn rd_kafka_queue_length(rkqu: *mut rd_kafka_queue_t) -> usize; } extern "C" { pub fn rd_kafka_queue_io_event_enable( rkqu: *mut rd_kafka_queue_t, fd: c_int, payload: *const c_void, - size: size_t, + size: usize, ); } extern "C" { @@ -1299,8 +1275,8 @@ extern "C" { partition: i32, timeout_ms: c_int, rkmessages: *mut *mut rd_kafka_message_t, - rkmessages_size: size_t, - ) -> ssize_t; + rkmessages_size: usize, + ) -> isize; } extern "C" { pub fn rd_kafka_consume_callback( @@ -1324,8 +1300,8 @@ extern "C" { rkqu: *mut rd_kafka_queue_t, timeout_ms: c_int, rkmessages: *mut *mut rd_kafka_message_t, - rkmessages_size: size_t, - ) -> ssize_t; + rkmessages_size: usize, + ) -> isize; } extern "C" { pub fn rd_kafka_consume_callback_queue( @@ -1481,14 +1457,14 @@ extern "C" { pub fn rd_kafka_consumer_group_metadata_write( cgmd: *const rd_kafka_consumer_group_metadata_t, bufferp: *mut *mut c_void, - sizep: *mut size_t, + sizep: *mut usize, ) -> *mut rd_kafka_error_t; } extern "C" { pub fn rd_kafka_consumer_group_metadata_read( cgmdp: *mut *mut rd_kafka_consumer_group_metadata_t, buffer: *const c_void, - size: size_t, + size: usize, ) -> *mut rd_kafka_error_t; } extern "C" { @@ -1497,9 +1473,9 @@ extern "C" { partition: i32, msgflags: c_int, payload: *mut c_void, - len: size_t, + len: usize, key: *const c_void, - keylen: size_t, + keylen: usize, msg_opaque: *mut c_void, ) -> c_int; } @@ -1510,7 +1486,7 @@ extern "C" { pub fn rd_kafka_produceva( rk: *mut rd_kafka_t, vus: *const rd_kafka_vu_t, - cnt: size_t, + cnt: usize, ) -> *mut rd_kafka_error_t; } extern "C" { @@ -1697,11 +1673,11 @@ extern "C" { pub fn rd_kafka_event_message_array( rkev: *mut rd_kafka_event_t, rkmessages: *mut *const rd_kafka_message_t, - size: size_t, - ) -> size_t; + size: usize, + ) -> usize; } extern "C" { - pub fn rd_kafka_event_message_count(rkev: *mut rd_kafka_event_t) -> size_t; + pub fn rd_kafka_event_message_count(rkev: *mut rd_kafka_event_t) -> usize; } extern "C" { pub fn rd_kafka_event_config_string(rkev: *mut rd_kafka_event_t) -> *const c_char; @@ -1730,7 +1706,7 @@ extern "C" { pub fn rd_kafka_event_debug_contexts( rkev: *mut rd_kafka_event_t, dst: *mut c_char, - dstsize: size_t, + dstsize: usize, ) -> c_int; } extern "C" { @@ -1826,7 +1802,7 @@ pub type rd_kafka_plugin_f_conf_init_t = Option< conf: *mut rd_kafka_conf_t, plug_opaquep: *mut *mut c_void, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t, >; pub type rd_kafka_interceptor_f_on_conf_set_t = Option< @@ -1835,7 +1811,7 @@ pub type rd_kafka_interceptor_f_on_conf_set_t = Option< name: *const c_char, val: *const c_char, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ic_opaque: *mut c_void, ) -> rd_kafka_conf_res_t, >; @@ -1843,7 +1819,7 @@ pub type rd_kafka_interceptor_f_on_conf_dup_t = Option< unsafe extern "C" fn( new_conf: *mut rd_kafka_conf_t, old_conf: *const rd_kafka_conf_t, - filter_cnt: size_t, + filter_cnt: usize, filter: *mut *const c_char, ic_opaque: *mut c_void, ) -> rd_kafka_resp_err_t, @@ -1856,7 +1832,7 @@ pub type rd_kafka_interceptor_f_on_new_t = Option< conf: *const rd_kafka_conf_t, ic_opaque: *mut c_void, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t, >; pub type rd_kafka_interceptor_f_on_destroy_t = Option< @@ -1900,7 +1876,7 @@ pub type rd_kafka_interceptor_f_on_request_sent_t = Option< ApiKey: i16, ApiVersion: i16, CorrId: i32, - size: size_t, + size: usize, ic_opaque: *mut c_void, ) -> rd_kafka_resp_err_t, >; @@ -1913,7 +1889,7 @@ pub type rd_kafka_interceptor_f_on_response_received_t = Option< ApiKey: i16, ApiVersion: i16, CorrId: i32, - size: size_t, + size: usize, rtt: i64, err: rd_kafka_resp_err_t, ic_opaque: *mut c_void, @@ -2102,7 +2078,7 @@ extern "C" { options: *mut rd_kafka_AdminOptions_t, timeout_ms: c_int, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { @@ -2110,7 +2086,7 @@ extern "C" { options: *mut rd_kafka_AdminOptions_t, timeout_ms: c_int, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { @@ -2118,7 +2094,7 @@ extern "C" { options: *mut rd_kafka_AdminOptions_t, true_or_false: c_int, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { @@ -2126,7 +2102,7 @@ extern "C" { options: *mut rd_kafka_AdminOptions_t, broker_id: i32, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { @@ -2147,7 +2123,7 @@ extern "C" { num_partitions: c_int, replication_factor: c_int, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> *mut rd_kafka_NewTopic_t; } extern "C" { @@ -2156,7 +2132,7 @@ extern "C" { extern "C" { pub fn rd_kafka_NewTopic_destroy_array( new_topics: *mut *mut rd_kafka_NewTopic_t, - new_topic_cnt: size_t, + new_topic_cnt: usize, ); } extern "C" { @@ -2164,9 +2140,9 @@ extern "C" { new_topic: *mut rd_kafka_NewTopic_t, partition: i32, broker_ids: *mut i32, - broker_id_cnt: size_t, + broker_id_cnt: usize, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { @@ -2180,7 +2156,7 @@ extern "C" { pub fn rd_kafka_CreateTopics( rk: *mut rd_kafka_t, new_topics: *mut *mut rd_kafka_NewTopic_t, - new_topic_cnt: size_t, + new_topic_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2188,7 +2164,7 @@ extern "C" { extern "C" { pub fn rd_kafka_CreateTopics_result_topics( result: *const rd_kafka_CreateTopics_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_topic_result_t; } #[repr(C)] @@ -2206,14 +2182,14 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteTopic_destroy_array( del_topics: *mut *mut rd_kafka_DeleteTopic_t, - del_topic_cnt: size_t, + del_topic_cnt: usize, ); } extern "C" { pub fn rd_kafka_DeleteTopics( rk: *mut rd_kafka_t, del_topics: *mut *mut rd_kafka_DeleteTopic_t, - del_topic_cnt: size_t, + del_topic_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2221,7 +2197,7 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteTopics_result_topics( result: *const rd_kafka_DeleteTopics_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_topic_result_t; } #[repr(C)] @@ -2233,9 +2209,9 @@ pub type rd_kafka_NewPartitions_t = rd_kafka_NewPartitions_s; extern "C" { pub fn rd_kafka_NewPartitions_new( topic: *const c_char, - new_total_cnt: size_t, + new_total_cnt: usize, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> *mut rd_kafka_NewPartitions_t; } extern "C" { @@ -2244,7 +2220,7 @@ extern "C" { extern "C" { pub fn rd_kafka_NewPartitions_destroy_array( new_parts: *mut *mut rd_kafka_NewPartitions_t, - new_parts_cnt: size_t, + new_parts_cnt: usize, ); } extern "C" { @@ -2252,16 +2228,16 @@ extern "C" { new_parts: *mut rd_kafka_NewPartitions_t, new_partition_idx: i32, broker_ids: *mut i32, - broker_id_cnt: size_t, + broker_id_cnt: usize, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { pub fn rd_kafka_CreatePartitions( rk: *mut rd_kafka_t, new_parts: *mut *mut rd_kafka_NewPartitions_t, - new_parts_cnt: size_t, + new_parts_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2269,7 +2245,7 @@ extern "C" { extern "C" { pub fn rd_kafka_CreatePartitions_result_topics( result: *const rd_kafka_CreatePartitions_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_topic_result_t; } #[repr(u32)] @@ -2318,7 +2294,7 @@ extern "C" { extern "C" { pub fn rd_kafka_ConfigEntry_synonyms( entry: *const rd_kafka_ConfigEntry_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_ConfigEntry_t; } #[repr(u32)] @@ -2367,7 +2343,7 @@ extern "C" { extern "C" { pub fn rd_kafka_ConfigResource_destroy_array( config: *mut *mut rd_kafka_ConfigResource_t, - config_cnt: size_t, + config_cnt: usize, ); } extern "C" { @@ -2380,7 +2356,7 @@ extern "C" { extern "C" { pub fn rd_kafka_ConfigResource_configs( config: *const rd_kafka_ConfigResource_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_ConfigEntry_t; } extern "C" { @@ -2405,7 +2381,7 @@ extern "C" { pub fn rd_kafka_AlterConfigs( rk: *mut rd_kafka_t, configs: *mut *mut rd_kafka_ConfigResource_t, - config_cnt: size_t, + config_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2413,14 +2389,14 @@ extern "C" { extern "C" { pub fn rd_kafka_AlterConfigs_result_resources( result: *const rd_kafka_AlterConfigs_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_ConfigResource_t; } extern "C" { pub fn rd_kafka_DescribeConfigs( rk: *mut rd_kafka_t, configs: *mut *mut rd_kafka_ConfigResource_t, - config_cnt: size_t, + config_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2428,7 +2404,7 @@ extern "C" { extern "C" { pub fn rd_kafka_DescribeConfigs_result_resources( result: *const rd_kafka_DescribeConfigs_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_ConfigResource_t; } #[repr(C)] @@ -2448,14 +2424,14 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteRecords_destroy_array( del_records: *mut *mut rd_kafka_DeleteRecords_t, - del_record_cnt: size_t, + del_record_cnt: usize, ); } extern "C" { pub fn rd_kafka_DeleteRecords( rk: *mut rd_kafka_t, del_records: *mut *mut rd_kafka_DeleteRecords_t, - del_record_cnt: size_t, + del_record_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2480,14 +2456,14 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteGroup_destroy_array( del_groups: *mut *mut rd_kafka_DeleteGroup_t, - del_group_cnt: size_t, + del_group_cnt: usize, ); } extern "C" { pub fn rd_kafka_DeleteGroups( rk: *mut rd_kafka_t, del_groups: *mut *mut rd_kafka_DeleteGroup_t, - del_group_cnt: size_t, + del_group_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2495,7 +2471,7 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteGroups_result_groups( result: *const rd_kafka_DeleteGroups_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_group_result_t; } #[repr(C)] @@ -2518,14 +2494,14 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteConsumerGroupOffsets_destroy_array( del_grpoffsets: *mut *mut rd_kafka_DeleteConsumerGroupOffsets_t, - del_grpoffset_cnt: size_t, + del_grpoffset_cnt: usize, ); } extern "C" { pub fn rd_kafka_DeleteConsumerGroupOffsets( rk: *mut rd_kafka_t, del_grpoffsets: *mut *mut rd_kafka_DeleteConsumerGroupOffsets_t, - del_grpoffsets_cnt: size_t, + del_grpoffsets_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2533,7 +2509,7 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteConsumerGroupOffsets_result_groups( result: *const rd_kafka_DeleteConsumerGroupOffsets_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_group_result_t; } #[repr(C)] @@ -2593,7 +2569,7 @@ extern "C" { operation: rd_kafka_AclOperation_t, permission_type: rd_kafka_AclPermissionType_t, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> *mut rd_kafka_AclBinding_t; } extern "C" { @@ -2606,7 +2582,7 @@ extern "C" { operation: rd_kafka_AclOperation_t, permission_type: rd_kafka_AclPermissionType_t, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> *mut rd_kafka_AclBindingFilter_t; } extern "C" { @@ -2647,20 +2623,20 @@ extern "C" { extern "C" { pub fn rd_kafka_AclBinding_destroy_array( acl_bindings: *mut *mut rd_kafka_AclBinding_t, - acl_bindings_cnt: size_t, + acl_bindings_cnt: usize, ); } extern "C" { pub fn rd_kafka_CreateAcls_result_acls( result: *const rd_kafka_CreateAcls_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_acl_result_t; } extern "C" { pub fn rd_kafka_CreateAcls( rk: *mut rd_kafka_t, new_acls: *mut *mut rd_kafka_AclBinding_t, - new_acls_cnt: size_t, + new_acls_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2668,7 +2644,7 @@ extern "C" { extern "C" { pub fn rd_kafka_DescribeAcls_result_acls( result: *const rd_kafka_DescribeAcls_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_AclBinding_t; } extern "C" { @@ -2688,7 +2664,7 @@ pub type rd_kafka_DeleteAcls_result_response_t = rd_kafka_DeleteAcls_result_resp extern "C" { pub fn rd_kafka_DeleteAcls_result_responses( result: *const rd_kafka_DeleteAcls_result_t, - cntp: *mut size_t, + cntp: *mut usize, ) -> *mut *const rd_kafka_DeleteAcls_result_response_t; } extern "C" { @@ -2699,14 +2675,14 @@ extern "C" { extern "C" { pub fn rd_kafka_DeleteAcls_result_response_matching_acls( result_response: *const rd_kafka_DeleteAcls_result_response_t, - matching_acls_cntp: *mut size_t, + matching_acls_cntp: *mut usize, ) -> *mut *const rd_kafka_AclBinding_t; } extern "C" { pub fn rd_kafka_DeleteAcls( rk: *mut rd_kafka_t, del_acls: *mut *mut rd_kafka_AclBindingFilter_t, - del_acls_cnt: size_t, + del_acls_cnt: usize, options: *const rd_kafka_AdminOptions_t, rkqu: *mut rd_kafka_queue_t, ); @@ -2718,9 +2694,9 @@ extern "C" { md_lifetime_ms: i64, md_principal_name: *const c_char, extensions: *mut *const c_char, - extension_size: size_t, + extension_size: usize, errstr: *mut c_char, - errstr_size: size_t, + errstr_size: usize, ) -> rd_kafka_resp_err_t; } extern "C" { diff --git a/src/client.rs b/src/client.rs index 91e25f089..6838e9a6e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -21,6 +21,7 @@ use std::slice; use std::string::ToString; use std::sync::Arc; +use libc::addrinfo; use rdkafka_sys as rdsys; use rdkafka_sys::types::*; @@ -115,6 +116,19 @@ pub trait ClientContext: Send + Sync { error!("librdkafka: {}: {}", error, reason); } + /// Rewrites a broker address for DNS resolution. + /// + /// This method is invoked before performing DNS resolution on a broker + /// address. The returned address is used in place of the original address. + /// It is useful to allow connecting to a Kafka cluster over a tunnel (e.g., + /// SSH or AWS PrivateLink), where the broker addresses returned by the + /// bootstrap server need to be rewritten to be routed through the tunnel. + /// + /// The default implementation returns the address unchanged. + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { + addr + } + /// Generates an OAuth token from the provided configuration. /// /// Override with an appropriate implementation when using the `OAUTHBEARER` @@ -229,13 +243,12 @@ impl Client { Arc::as_ptr(&context) as *mut c_void, ) }; - unsafe { rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::)) }; unsafe { - rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::)) - }; - unsafe { - rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::)) - }; + rdsys::rd_kafka_conf_set_log_cb(native_config.ptr(), Some(native_log_cb::)); + rdsys::rd_kafka_conf_set_stats_cb(native_config.ptr(), Some(native_stats_cb::)); + rdsys::rd_kafka_conf_set_error_cb(native_config.ptr(), Some(native_error_cb::)); + rdsys::rd_kafka_conf_set_resolve_cb(native_config.ptr(), Some(native_resolve_cb::)); + } if C::ENABLE_REFRESH_OAUTH_TOKEN { unsafe { rdsys::rd_kafka_conf_set_oauthbearer_token_refresh_cb( @@ -494,6 +507,51 @@ pub(crate) unsafe extern "C" fn native_error_cb( context.error(error, reason.trim()); } +pub(crate) unsafe extern "C" fn native_resolve_cb( + node: *const c_char, + service: *const c_char, + hints: *const addrinfo, + res: *mut *mut addrinfo, + opaque: *mut c_void, +) -> i32 { + // Convert host and port to Rust strings. + let host = match CStr::from_ptr(node).to_str() { + Ok(host) => host.into(), + Err(_) => return libc::EAI_FAIL, + }; + let port = match CStr::from_ptr(service).to_str() { + Ok(port) => port.into(), + Err(_) => return libc::EAI_FAIL, + }; + + // Apply the rewrite in the context. + let context = &mut *(opaque as *mut C); + let addr = context.rewrite_broker_addr(BrokerAddr { host, port }); + + // Convert host and port back to C strings. + let node = match CString::new(addr.host) { + Ok(node) => node, + Err(_) => return libc::EAI_FAIL, + }; + let service = match CString::new(addr.port) { + Ok(service) => service, + Err(_) => return libc::EAI_FAIL, + }; + + // Perform DNS resolution. + unsafe { libc::getaddrinfo(node.as_ptr(), service.as_ptr(), hints, res) } +} + +/// Describes the address of a broker in a Kafka cluster. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct BrokerAddr { + /// The host name. + pub host: String, + /// The port, either as a decimal number or the name of a service in + /// the services database. + pub port: String, +} + /// A generated OAuth token and its associated metadata. /// /// When using the `OAUTHBEARER` SASL authentication method, this type is diff --git a/src/producer/future_producer.rs b/src/producer/future_producer.rs index 5aa0e9860..f7565a35b 100644 --- a/src/producer/future_producer.rs +++ b/src/producer/future_producer.rs @@ -14,7 +14,7 @@ use std::time::{Duration, Instant}; use futures_channel::oneshot; use futures_util::FutureExt; -use crate::client::{Client, ClientContext, DefaultClientContext, OAuthToken}; +use crate::client::{BrokerAddr, Client, ClientContext, DefaultClientContext, OAuthToken}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}; use crate::consumer::ConsumerGroupMetadata; use crate::error::{KafkaError, KafkaResult, RDKafkaErrorCode}; @@ -156,6 +156,10 @@ impl ClientContext for FutureProducerContext { self.wrapped_context.error(error, reason); } + fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr { + self.wrapped_context.rewrite_broker_addr(addr) + } + fn generate_oauth_token( &self, oauthbearer_config: Option<&str>,