Skip to content

Commit

Permalink
fixing wrong release consumers. refactor some releated logs (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Nov 3, 2023
1 parent f178586 commit 09d5d9c
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 7 deletions.
4 changes: 2 additions & 2 deletions packages/services/pub_sub/src/relay.rs
Expand Up @@ -103,17 +103,17 @@ impl PubsubRelay {

pub fn on_source_added(&self, channel: ChannelUuid, source: NodeId) {
if let Some(subs) = self.source_binding.write().on_source_added(channel, source) {
log::debug!("[PubsubRelay] channel {} added source {} => auto sub for local subs {:?}", channel, source, subs);
for sub in subs {
log::debug!("[PubsubRelay] sub channel {} with source {} for local sub {}", channel, source, sub);
self.logic.write().on_local_sub(ChannelIdentify::new(channel, source), sub);
}
}
}

pub fn on_source_removed(&self, channel: ChannelUuid, source: NodeId) {
if let Some(subs) = self.source_binding.write().on_source_removed(channel, source) {
log::debug!("[PubsubRelay] channel {} removed source {} => auto unsub for local subs {:?}", channel, source, subs);
for sub in subs {
log::debug!("[PubsubRelay] unsub channel {} with source {} for local sub {}", channel, source, sub);
self.logic.write().on_local_unsub(ChannelIdentify::new(channel, source), sub);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/services/pub_sub/src/relay/local.rs
Expand Up @@ -83,7 +83,7 @@ impl LocalRelay {
log::trace!("[LocalRelay] relay to local {}", uuid);
sender.try_send((*uuid, source, channel, data.clone())).print_error("Should send data");
} else {
log::warn!("[LocalRelay] relay to local {} failed", uuid);
log::warn!("[LocalRelay] relay channel {} from {} to local {} consumer not found", channel, source, uuid);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/services/pub_sub/src/relay/logic.rs
Expand Up @@ -217,10 +217,10 @@ impl PubsubRelayLogic {
if let Some(slot) = self.channels.get_mut(&channel) {
slot.feedback_processor.on_unsub(FeedbackConsumerId::Local(handler));
if let Some(index) = slot.local_subscribers.iter().position(|&x| x == handler) {
log::info!("[PubsubRelayLogic {}] unsub {} event from {} removed from list", self.node_id, channel, handler);
log::info!("[PubsubRelayLogic {}] local unsub {} event from {} removed from list", self.node_id, channel, handler);
slot.local_subscribers.swap_remove(index);
} else {
log::info!("[PubsubRelayLogic {}] unsub {} event from {} allready removed from list", self.node_id, channel, handler);
log::info!("[PubsubRelayLogic {}] local unsub {} event from {} allready removed from list", self.node_id, channel, handler);
}

if slot.remote_subscribers.len() == 0 && slot.local_subscribers.len() == 0 {
Expand Down
4 changes: 4 additions & 0 deletions packages/services/pub_sub/src/relay/source_binding.rs
Expand Up @@ -131,6 +131,10 @@ impl SourceBinding {
self.channels.get(&channel).map(|x| x.sources.clone()).unwrap_or_default()
}

pub fn consumers_for(&self, channel: ChannelUuid) -> Vec<LocalSubId> {
self.channels.get(&channel).map(|x| x.subs.clone()).unwrap_or_default()
}

pub fn pop_action(&mut self) -> Option<SourceBindingAction> {
self.actions.pop_front()
}
Expand Down
43 changes: 42 additions & 1 deletion packages/services/pub_sub/src/sdk/consumer.rs
Expand Up @@ -80,11 +80,52 @@ impl Consumer {
impl Drop for Consumer {
fn drop(&mut self) {
self.local.write().on_local_unsub(self.uuid);
if let Some(sources) = self.source_binding.write().on_local_sub(self.channel, self.uuid) {
if let Some(sources) = self.source_binding.write().on_local_unsub(self.channel, self.uuid) {
for source in sources {
let channel = ChannelIdentify::new(self.channel, source);
self.logic.write().on_local_unsub(channel, self.uuid);
}
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use parking_lot::RwLock;
use utils::MockTimer;

use crate::{
relay::{local::LocalRelay, logic::PubsubRelayLogic, source_binding::SourceBinding},
ChannelIdentify, Consumer,
};
#[test]
fn correct_create_and_destroy() {
let node_id = 1;
let channel = 1111;
let channel_source = 2;
let source_binding = Arc::new(RwLock::new(SourceBinding::new()));

source_binding.write().on_source_added(channel, channel_source);

let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id)));
let local = Arc::new(RwLock::new(LocalRelay::new()));
let timer = Arc::new(MockTimer::default());
let sub_uuid = 10000;
let consumer = Consumer::new(sub_uuid, channel, logic, local, source_binding, 100, timer);

assert_eq!(
consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)),
Some((vec![].as_slice(), vec![sub_uuid].as_slice()))
);
assert_eq!(consumer.source_binding.read().consumers_for(channel), vec![sub_uuid]);

let sb = consumer.source_binding.clone();
let logic = consumer.logic.clone();
drop(consumer);

assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None);
assert_eq!(sb.read().consumers_for(channel), vec![]);
}
}
44 changes: 43 additions & 1 deletion packages/services/pub_sub/src/sdk/consumer_raw.rs
Expand Up @@ -73,11 +73,53 @@ impl ConsumerRaw {
impl Drop for ConsumerRaw {
fn drop(&mut self) {
self.local.write().on_local_unsub(self.sub_uuid);
if let Some(sources) = self.source_binding.write().on_local_sub(self.channel, self.sub_uuid) {
if let Some(sources) = self.source_binding.write().on_local_unsub(self.channel, self.sub_uuid) {
for source in sources {
let channel = ChannelIdentify::new(self.channel, source);
self.logic.write().on_local_unsub(channel, self.sub_uuid);
}
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use parking_lot::RwLock;
use utils::MockTimer;

use crate::{
relay::{local::LocalRelay, logic::PubsubRelayLogic, source_binding::SourceBinding},
ChannelIdentify, ConsumerRaw,
};
#[test]
fn correct_create_and_destroy() {
let node_id = 1;
let channel = 1111;
let channel_source = 2;
let source_binding = Arc::new(RwLock::new(SourceBinding::new()));

source_binding.write().on_source_added(channel, channel_source);

let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id)));
let local = Arc::new(RwLock::new(LocalRelay::new()));
let timer = Arc::new(MockTimer::default());
let sub_uuid = 10000;
let (tx, _rx) = async_std::channel::bounded(100);
let consumer = ConsumerRaw::new(sub_uuid, channel, logic, local, source_binding, tx, timer);

assert_eq!(
consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)),
Some((vec![].as_slice(), vec![sub_uuid].as_slice()))
);
assert_eq!(consumer.source_binding.read().consumers_for(channel), vec![sub_uuid]);

let sb = consumer.source_binding.clone();
let logic = consumer.logic.clone();
drop(consumer);

assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None);
assert_eq!(sb.read().consumers_for(channel), vec![]);
}
}
36 changes: 36 additions & 0 deletions packages/services/pub_sub/src/sdk/consumer_single.rs
Expand Up @@ -63,3 +63,39 @@ impl Drop for ConsumerSingle {
self.local.write().on_local_unsub(self.uuid);
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use parking_lot::RwLock;
use utils::MockTimer;

use crate::{
relay::{local::LocalRelay, logic::PubsubRelayLogic},
ChannelIdentify, ConsumerSingle,
};

#[test]
fn correct_create_and_destroy() {
let node_id = 1;
let channel = 1111;
let channel_source = 2;

let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id)));
let local = Arc::new(RwLock::new(LocalRelay::new()));
let timer = Arc::new(MockTimer::default());
let sub_uuid = 10000;
let consumer = ConsumerSingle::new(sub_uuid, ChannelIdentify::new(channel, channel_source), logic, local, 100, timer);

assert_eq!(
consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)),
Some((vec![].as_slice(), vec![sub_uuid].as_slice()))
);

let logic = consumer.logic.clone();
drop(consumer);

assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None);
}
}

0 comments on commit 09d5d9c

Please sign in to comment.