Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing wrong release consumers. refactor some releated logs #39

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/services/pub_sub/src/relay.rs
Expand Up @@ -103,17 +103,17 @@

pub fn on_source_added(&self, channel: ChannelUuid, source: NodeId) {
if let Some(subs) = self.source_binding.write().on_source_added(channel, source) {
log::debug!("[PubsubRelay] channel {} added source {} => auto sub for local subs {:?}", channel, source, subs);
for sub in subs {
log::debug!("[PubsubRelay] sub channel {} with source {} for local sub {}", channel, source, sub);
self.logic.write().on_local_sub(ChannelIdentify::new(channel, source), sub);
}
}
}

pub fn on_source_removed(&self, channel: ChannelUuid, source: NodeId) {
if let Some(subs) = self.source_binding.write().on_source_removed(channel, source) {
log::debug!("[PubsubRelay] channel {} removed source {} => auto unsub for local subs {:?}", channel, source, subs);

Check warning on line 115 in packages/services/pub_sub/src/relay.rs

View check run for this annotation

Codecov / codecov/patch

packages/services/pub_sub/src/relay.rs#L115

Added line #L115 was not covered by tests
for sub in subs {
log::debug!("[PubsubRelay] unsub channel {} with source {} for local sub {}", channel, source, sub);
self.logic.write().on_local_unsub(ChannelIdentify::new(channel, source), sub);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/services/pub_sub/src/relay/local.rs
Expand Up @@ -83,7 +83,7 @@
log::trace!("[LocalRelay] relay to local {}", uuid);
sender.try_send((*uuid, source, channel, data.clone())).print_error("Should send data");
} else {
log::warn!("[LocalRelay] relay to local {} failed", uuid);
log::warn!("[LocalRelay] relay channel {} from {} to local {} consumer not found", channel, source, uuid);

Check warning on line 86 in packages/services/pub_sub/src/relay/local.rs

View check run for this annotation

Codecov / codecov/patch

packages/services/pub_sub/src/relay/local.rs#L86

Added line #L86 was not covered by tests
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/services/pub_sub/src/relay/logic.rs
Expand Up @@ -217,10 +217,10 @@
if let Some(slot) = self.channels.get_mut(&channel) {
slot.feedback_processor.on_unsub(FeedbackConsumerId::Local(handler));
if let Some(index) = slot.local_subscribers.iter().position(|&x| x == handler) {
log::info!("[PubsubRelayLogic {}] unsub {} event from {} removed from list", self.node_id, channel, handler);
log::info!("[PubsubRelayLogic {}] local unsub {} event from {} removed from list", self.node_id, channel, handler);
slot.local_subscribers.swap_remove(index);
} else {
log::info!("[PubsubRelayLogic {}] unsub {} event from {} allready removed from list", self.node_id, channel, handler);
log::info!("[PubsubRelayLogic {}] local unsub {} event from {} allready removed from list", self.node_id, channel, handler);

Check warning on line 223 in packages/services/pub_sub/src/relay/logic.rs

View check run for this annotation

Codecov / codecov/patch

packages/services/pub_sub/src/relay/logic.rs#L223

Added line #L223 was not covered by tests
}

if slot.remote_subscribers.len() == 0 && slot.local_subscribers.len() == 0 {
Expand Down
4 changes: 4 additions & 0 deletions packages/services/pub_sub/src/relay/source_binding.rs
Expand Up @@ -131,6 +131,10 @@ impl SourceBinding {
self.channels.get(&channel).map(|x| x.sources.clone()).unwrap_or_default()
}

pub fn consumers_for(&self, channel: ChannelUuid) -> Vec<LocalSubId> {
self.channels.get(&channel).map(|x| x.subs.clone()).unwrap_or_default()
}

pub fn pop_action(&mut self) -> Option<SourceBindingAction> {
self.actions.pop_front()
}
Expand Down
43 changes: 42 additions & 1 deletion packages/services/pub_sub/src/sdk/consumer.rs
Expand Up @@ -80,11 +80,52 @@ impl Consumer {
impl Drop for Consumer {
fn drop(&mut self) {
self.local.write().on_local_unsub(self.uuid);
if let Some(sources) = self.source_binding.write().on_local_sub(self.channel, self.uuid) {
if let Some(sources) = self.source_binding.write().on_local_unsub(self.channel, self.uuid) {
for source in sources {
let channel = ChannelIdentify::new(self.channel, source);
self.logic.write().on_local_unsub(channel, self.uuid);
}
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use parking_lot::RwLock;
use utils::MockTimer;

use crate::{
relay::{local::LocalRelay, logic::PubsubRelayLogic, source_binding::SourceBinding},
ChannelIdentify, Consumer,
};
#[test]
fn correct_create_and_destroy() {
let node_id = 1;
let channel = 1111;
let channel_source = 2;
let source_binding = Arc::new(RwLock::new(SourceBinding::new()));

source_binding.write().on_source_added(channel, channel_source);

let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id)));
let local = Arc::new(RwLock::new(LocalRelay::new()));
let timer = Arc::new(MockTimer::default());
let sub_uuid = 10000;
let consumer = Consumer::new(sub_uuid, channel, logic, local, source_binding, 100, timer);

assert_eq!(
consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)),
Some((vec![].as_slice(), vec![sub_uuid].as_slice()))
);
assert_eq!(consumer.source_binding.read().consumers_for(channel), vec![sub_uuid]);

let sb = consumer.source_binding.clone();
let logic = consumer.logic.clone();
drop(consumer);

assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None);
assert_eq!(sb.read().consumers_for(channel), vec![]);
}
}
44 changes: 43 additions & 1 deletion packages/services/pub_sub/src/sdk/consumer_raw.rs
Expand Up @@ -73,11 +73,53 @@ impl ConsumerRaw {
impl Drop for ConsumerRaw {
fn drop(&mut self) {
self.local.write().on_local_unsub(self.sub_uuid);
if let Some(sources) = self.source_binding.write().on_local_sub(self.channel, self.sub_uuid) {
if let Some(sources) = self.source_binding.write().on_local_unsub(self.channel, self.sub_uuid) {
for source in sources {
let channel = ChannelIdentify::new(self.channel, source);
self.logic.write().on_local_unsub(channel, self.sub_uuid);
}
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use parking_lot::RwLock;
use utils::MockTimer;

use crate::{
relay::{local::LocalRelay, logic::PubsubRelayLogic, source_binding::SourceBinding},
ChannelIdentify, ConsumerRaw,
};
#[test]
fn correct_create_and_destroy() {
let node_id = 1;
let channel = 1111;
let channel_source = 2;
let source_binding = Arc::new(RwLock::new(SourceBinding::new()));

source_binding.write().on_source_added(channel, channel_source);

let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id)));
let local = Arc::new(RwLock::new(LocalRelay::new()));
let timer = Arc::new(MockTimer::default());
let sub_uuid = 10000;
let (tx, _rx) = async_std::channel::bounded(100);
let consumer = ConsumerRaw::new(sub_uuid, channel, logic, local, source_binding, tx, timer);

assert_eq!(
consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)),
Some((vec![].as_slice(), vec![sub_uuid].as_slice()))
);
assert_eq!(consumer.source_binding.read().consumers_for(channel), vec![sub_uuid]);

let sb = consumer.source_binding.clone();
let logic = consumer.logic.clone();
drop(consumer);

assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None);
assert_eq!(sb.read().consumers_for(channel), vec![]);
}
}
36 changes: 36 additions & 0 deletions packages/services/pub_sub/src/sdk/consumer_single.rs
Expand Up @@ -63,3 +63,39 @@ impl Drop for ConsumerSingle {
self.local.write().on_local_unsub(self.uuid);
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use parking_lot::RwLock;
use utils::MockTimer;

use crate::{
relay::{local::LocalRelay, logic::PubsubRelayLogic},
ChannelIdentify, ConsumerSingle,
};

#[test]
fn correct_create_and_destroy() {
let node_id = 1;
let channel = 1111;
let channel_source = 2;

let logic = Arc::new(RwLock::new(PubsubRelayLogic::new(node_id)));
let local = Arc::new(RwLock::new(LocalRelay::new()));
let timer = Arc::new(MockTimer::default());
let sub_uuid = 10000;
let consumer = ConsumerSingle::new(sub_uuid, ChannelIdentify::new(channel, channel_source), logic, local, 100, timer);

assert_eq!(
consumer.logic.read().relay(ChannelIdentify::new(channel, channel_source)),
Some((vec![].as_slice(), vec![sub_uuid].as_slice()))
);

let logic = consumer.logic.clone();
drop(consumer);

assert_eq!(logic.read().relay(ChannelIdentify::new(channel, channel_source)), None);
}
}