From 09d5d9c0f9bcc6dd2e17da6976f09b9e41f80d55 Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Fri, 3 Nov 2023 12:01:24 +0700 Subject: [PATCH] fixing wrong release consumers. refactor some releated logs (#39) --- packages/services/pub_sub/src/relay.rs | 4 +- packages/services/pub_sub/src/relay/local.rs | 2 +- packages/services/pub_sub/src/relay/logic.rs | 4 +- .../pub_sub/src/relay/source_binding.rs | 4 ++ packages/services/pub_sub/src/sdk/consumer.rs | 43 +++++++++++++++++- .../services/pub_sub/src/sdk/consumer_raw.rs | 44 ++++++++++++++++++- .../pub_sub/src/sdk/consumer_single.rs | 36 +++++++++++++++ 7 files changed, 130 insertions(+), 7 deletions(-) diff --git a/packages/services/pub_sub/src/relay.rs b/packages/services/pub_sub/src/relay.rs index fd201528..25af8c14 100644 --- a/packages/services/pub_sub/src/relay.rs +++ b/packages/services/pub_sub/src/relay.rs @@ -103,8 +103,8 @@ impl PubsubRelay { pub fn on_source_added(&self, channel: ChannelUuid, source: NodeId) { if let Some(subs) = self.source_binding.write().on_source_added(channel, source) { + log::debug!("[PubsubRelay] channel {} added source {} => auto sub for local subs {:?}", channel, source, subs); for sub in subs { - log::debug!("[PubsubRelay] sub channel {} with source {} for local sub {}", channel, source, sub); self.logic.write().on_local_sub(ChannelIdentify::new(channel, source), sub); } } @@ -112,8 +112,8 @@ impl PubsubRelay { pub fn on_source_removed(&self, channel: ChannelUuid, source: NodeId) { if let Some(subs) = self.source_binding.write().on_source_removed(channel, source) { + log::debug!("[PubsubRelay] channel {} removed source {} => auto unsub for local subs {:?}", channel, source, subs); for sub in subs { - log::debug!("[PubsubRelay] unsub channel {} with source {} for local sub {}", channel, source, sub); self.logic.write().on_local_unsub(ChannelIdentify::new(channel, source), sub); } } diff --git a/packages/services/pub_sub/src/relay/local.rs b/packages/services/pub_sub/src/relay/local.rs index 9a8811c0..2b910ee4 100644 --- a/packages/services/pub_sub/src/relay/local.rs +++ b/packages/services/pub_sub/src/relay/local.rs @@ -83,7 +83,7 @@ impl LocalRelay { log::trace!("[LocalRelay] relay to local {}", uuid); sender.try_send((*uuid, source, channel, data.clone())).print_error("Should send data"); } else { - log::warn!("[LocalRelay] relay to local {} failed", uuid); + log::warn!("[LocalRelay] relay channel {} from {} to local {} consumer not found", channel, source, uuid); } } } diff --git a/packages/services/pub_sub/src/relay/logic.rs b/packages/services/pub_sub/src/relay/logic.rs index 7fdcf0e0..4ce2c1cb 100644 --- a/packages/services/pub_sub/src/relay/logic.rs +++ b/packages/services/pub_sub/src/relay/logic.rs @@ -217,10 +217,10 @@ impl PubsubRelayLogic { if let Some(slot) = self.channels.get_mut(&channel) { slot.feedback_processor.on_unsub(FeedbackConsumerId::Local(handler)); if let Some(index) = slot.local_subscribers.iter().position(|&x| x == handler) { - log::info!("[PubsubRelayLogic {}] unsub {} event from {} removed from list", self.node_id, channel, handler); + log::info!("[PubsubRelayLogic {}] local unsub {} event from {} removed from list", self.node_id, channel, handler); slot.local_subscribers.swap_remove(index); } else { - log::info!("[PubsubRelayLogic {}] unsub {} event from {} allready removed from list", self.node_id, channel, handler); + log::info!("[PubsubRelayLogic {}] local unsub {} event from {} allready removed from list", self.node_id, channel, handler); } if slot.remote_subscribers.len() == 0 && slot.local_subscribers.len() == 0 { diff --git a/packages/services/pub_sub/src/relay/source_binding.rs b/packages/services/pub_sub/src/relay/source_binding.rs index 260814c2..80263088 100644 --- a/packages/services/pub_sub/src/relay/source_binding.rs +++ b/packages/services/pub_sub/src/relay/source_binding.rs @@ -131,6 +131,10 @@ impl SourceBinding { self.channels.get(&channel).map(|x| x.sources.clone()).unwrap_or_default() } + pub fn consumers_for(&self, channel: ChannelUuid) -> Vec { + self.channels.get(&channel).map(|x| x.subs.clone()).unwrap_or_default() + } + pub fn pop_action(&mut self) -> Option { self.actions.pop_front() } diff --git a/packages/services/pub_sub/src/sdk/consumer.rs b/packages/services/pub_sub/src/sdk/consumer.rs index 87715de3..eddae00d 100644 --- a/packages/services/pub_sub/src/sdk/consumer.rs +++ b/packages/services/pub_sub/src/sdk/consumer.rs @@ -80,7 +80,7 @@ impl Consumer { impl Drop for Consumer { fn drop(&mut self) { self.local.write().on_local_unsub(self.uuid); - if let Some(sources) = self.source_binding.write().on_local_sub(self.channel, self.uuid) { + if let Some(sources) = self.source_binding.write().on_local_unsub(self.channel, self.uuid) { for source in sources { let channel = ChannelIdentify::new(self.channel, source); self.logic.write().on_local_unsub(channel, self.uuid); @@ -88,3 +88,44 @@ impl Drop for Consumer { } } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use parking_lot::RwLock; + use utils::MockTimer; + + use crate::{ + relay::{local::LocalRelay, logic::PubsubRelayLogic, source_binding::SourceBinding}, + ChannelIdentify, Consumer, + }; + #[test] + fn correct_create_and_destroy() { + let node_id = 1; + let channel = 1111; + let channel_source = 2; + let source_binding = Arc::new(RwLock::new(SourceBinding::new())); + + source_binding.write().on_source_added(channel, channel_source); + + let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id))); + let local = Arc::new(RwLock::new(LocalRelay::new())); + let timer = Arc::new(MockTimer::default()); + let sub_uuid = 10000; + let consumer = Consumer::new(sub_uuid, channel, logic, local, source_binding, 100, timer); + + assert_eq!( + consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)), + Some((vec![].as_slice(), vec![sub_uuid].as_slice())) + ); + assert_eq!(consumer.source_binding.read().consumers_for(channel), vec![sub_uuid]); + + let sb = consumer.source_binding.clone(); + let logic = consumer.logic.clone(); + drop(consumer); + + assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None); + assert_eq!(sb.read().consumers_for(channel), vec![]); + } +} diff --git a/packages/services/pub_sub/src/sdk/consumer_raw.rs b/packages/services/pub_sub/src/sdk/consumer_raw.rs index bb931b78..a09cfea1 100644 --- a/packages/services/pub_sub/src/sdk/consumer_raw.rs +++ b/packages/services/pub_sub/src/sdk/consumer_raw.rs @@ -73,7 +73,7 @@ impl ConsumerRaw { impl Drop for ConsumerRaw { fn drop(&mut self) { self.local.write().on_local_unsub(self.sub_uuid); - if let Some(sources) = self.source_binding.write().on_local_sub(self.channel, self.sub_uuid) { + if let Some(sources) = self.source_binding.write().on_local_unsub(self.channel, self.sub_uuid) { for source in sources { let channel = ChannelIdentify::new(self.channel, source); self.logic.write().on_local_unsub(channel, self.sub_uuid); @@ -81,3 +81,45 @@ impl Drop for ConsumerRaw { } } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use parking_lot::RwLock; + use utils::MockTimer; + + use crate::{ + relay::{local::LocalRelay, logic::PubsubRelayLogic, source_binding::SourceBinding}, + ChannelIdentify, ConsumerRaw, + }; + #[test] + fn correct_create_and_destroy() { + let node_id = 1; + let channel = 1111; + let channel_source = 2; + let source_binding = Arc::new(RwLock::new(SourceBinding::new())); + + source_binding.write().on_source_added(channel, channel_source); + + let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id))); + let local = Arc::new(RwLock::new(LocalRelay::new())); + let timer = Arc::new(MockTimer::default()); + let sub_uuid = 10000; + let (tx, _rx) = async_std::channel::bounded(100); + let consumer = ConsumerRaw::new(sub_uuid, channel, logic, local, source_binding, tx, timer); + + assert_eq!( + consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)), + Some((vec![].as_slice(), vec![sub_uuid].as_slice())) + ); + assert_eq!(consumer.source_binding.read().consumers_for(channel), vec![sub_uuid]); + + let sb = consumer.source_binding.clone(); + let logic = consumer.logic.clone(); + drop(consumer); + + assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None); + assert_eq!(sb.read().consumers_for(channel), vec![]); + } +} diff --git a/packages/services/pub_sub/src/sdk/consumer_single.rs b/packages/services/pub_sub/src/sdk/consumer_single.rs index 2c37b82e..97d652cc 100644 --- a/packages/services/pub_sub/src/sdk/consumer_single.rs +++ b/packages/services/pub_sub/src/sdk/consumer_single.rs @@ -63,3 +63,39 @@ impl Drop for ConsumerSingle { self.local.write().on_local_unsub(self.uuid); } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use parking_lot::RwLock; + use utils::MockTimer; + + use crate::{ + relay::{local::LocalRelay, logic::PubsubRelayLogic}, + ChannelIdentify, ConsumerSingle, + }; + + #[test] + fn correct_create_and_destroy() { + let node_id = 1; + let channel = 1111; + let channel_source = 2; + + let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id))); + let local = Arc::new(RwLock::new(LocalRelay::new())); + let timer = Arc::new(MockTimer::default()); + let sub_uuid = 10000; + let consumer = ConsumerSingle::new(sub_uuid, ChannelIdentify::new(channel, channel_source), logic, local, 100, timer); + + assert_eq!( + consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)), + Some((vec![].as_slice(), vec![sub_uuid].as_slice())) + ); + + let logic = consumer.logic.clone(); + drop(consumer); + + assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None); + } +}