diff --git a/examples/examples/manual_node.rs b/examples/examples/manual_node.rs index 8fa9f778..9cd9bacd 100644 --- a/examples/examples/manual_node.rs +++ b/examples/examples/manual_node.rs @@ -74,7 +74,8 @@ async fn main() { }); let spreads_layer_router = LayersSpreadRouterSyncBehavior::new(router.clone()); - let (key_value, key_value_sdk) = key_value::KeyValueBehavior::new(args.node_id, timer.clone(), 10000); + let key_value_sdk = key_value::KeyValueSdk::new(); + let key_value = key_value::KeyValueBehavior::new(args.node_id, 10000, Some(Box::new(key_value_sdk.clone()))); if let Some(addr) = args.redis_addr { let mut redis_server = RedisServer::new(addr, key_value_sdk); diff --git a/packages/core/utils/src/hashmap.rs b/packages/core/utils/src/hashmap.rs index 4d92a025..c09b1819 100644 --- a/packages/core/utils/src/hashmap.rs +++ b/packages/core/utils/src/hashmap.rs @@ -165,9 +165,8 @@ mod tests { let mut map = HashMap::new(); map.insert("key1", "value1"); map.insert("key2", "value2"); - let mut iter = map.iter(); - assert_eq!(iter.next(), Some((&"key1", &"value1"))); - assert_eq!(iter.next(), Some((&"key2", &"value2"))); - assert_eq!(iter.next(), None); + let mut slots = map.iter().collect::>(); + slots.sort_by_key(|a| a.0); + assert_eq!(slots, vec![(&"key1", &"value1"), (&"key2", &"value2")]); } } diff --git a/packages/network/src/plane.rs b/packages/network/src/plane.rs index 9fda2156..efbb232e 100644 --- a/packages/network/src/plane.rs +++ b/packages/network/src/plane.rs @@ -189,7 +189,7 @@ where fn pop_actions(&mut self, now_ms: u64) { while let Some(action) = self.internal.pop_action() { match action { - PlaneInternalAction::SpawnConnection(spwd_conn) => { + PlaneInternalAction::SpawnConnection(spwd_conn) => { let SpawnedConnection { outgoing, sender, receiver, handlers } = spwd_conn; let internal_tx = self.internal_tx.clone(); diff --git a/packages/network/src/plane/bus_impl.rs b/packages/network/src/plane/bus_impl.rs index 277035b8..4f9df227 100644 --- a/packages/network/src/plane/bus_impl.rs +++ b/packages/network/src/plane/bus_impl.rs @@ -456,5 +456,5 @@ mod tests { let conn = Arc::new(sender); let _rx = bus.add_conn(conn).expect("Should have rx"); assert!(bus.to_net_conn(ConnId::from_in(1, 1), TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some()); -} + } } diff --git a/packages/network/src/plane/internal.rs b/packages/network/src/plane/internal.rs index b2ce4151..07919869 100644 --- a/packages/network/src/plane/internal.rs +++ b/packages/network/src/plane/internal.rs @@ -273,7 +273,7 @@ mod tests { use crate::{ behaviour::MockNetworkBehavior, msg::TransportMsg, - transport::{ConnectionReceiver, ConnectionRejectReason, MockConnectionAcceptor, MockConnectionReceiver, MockConnectionSender, OutgoingConnectionError}, + transport::{ConnectionRejectReason, MockConnectionAcceptor, MockConnectionReceiver, MockConnectionSender, OutgoingConnectionError}, }; type BE = (); diff --git a/packages/runner/src/lib.rs b/packages/runner/src/lib.rs index 07b9ecb7..afe7ebbb 100644 --- a/packages/runner/src/lib.rs +++ b/packages/runner/src/lib.rs @@ -10,8 +10,8 @@ pub use network::{ convert_enum, }; pub use pub_sub::{ - ChannelIdentify, ChannelSourceHashmapReal, ChannelUuid, Consumer, ConsumerRaw, ConsumerSingle, Feedback, FeedbackType, LocalPubId, LocalSubId, NumberInfo, Publisher, PublisherRaw, PubsubSdk, - PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, + ChannelIdentify, ChannelUuid, Consumer, ConsumerRaw, ConsumerSingle, Feedback, FeedbackType, LocalPubId, LocalSubId, NumberInfo, Publisher, PublisherRaw, PubsubSdk, PubsubServiceBehaviour, + PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, }; pub use transport_udp::UdpTransport; pub use utils::{ diff --git a/packages/services/key_value/src/behavior.rs b/packages/services/key_value/src/behavior.rs index 75c89a7f..8a7254f1 100644 --- a/packages/services/key_value/src/behavior.rs +++ b/packages/services/key_value/src/behavior.rs @@ -1,15 +1,12 @@ use crate::handler::KeyValueConnectionHandler; use crate::msg::{KeyValueBehaviorEvent, KeyValueMsg, KeyValueSdkEvent}; -use crate::KEY_VALUE_SERVICE_ID; +use crate::{ExternalControl, KEY_VALUE_SERVICE_ID}; use bluesea_identity::{ConnId, NodeId}; use network::behaviour::{BehaviorContext, ConnectionHandler, NetworkBehavior, NetworkBehaviorAction}; use network::msg::{MsgHeader, TransportMsg}; use network::transport::{ConnectionRejectReason, ConnectionSender, OutgoingConnectionError, TransportOutgoingLocalUuid}; -use parking_lot::RwLock; use std::collections::VecDeque; use std::sync::Arc; -use utils::awaker::MockAwaker; -use utils::Timer; use self::hashmap_local::HashmapLocalStorage; use self::hashmap_remote::HashmapRemoteStorage; @@ -29,37 +26,30 @@ pub use sdk::KeyValueSdk; pub struct KeyValueBehavior { node_id: NodeId, simple_remote: SimpleRemoteStorage, - simple_local: Arc>, + simple_local: SimpleLocalStorage, hashmap_remote: HashmapRemoteStorage, - hashmap_local: Arc>, + hashmap_local: HashmapLocalStorage, outputs: VecDeque>, + external: Option>, } impl KeyValueBehavior where HE: Send + Sync + 'static, - SE: Send + Sync + 'static, + SE: From + TryInto + Send + Sync + 'static, { #[allow(unused)] - pub fn new(node_id: NodeId, timer: Arc, sync_each_ms: u64) -> (Self, sdk::KeyValueSdk) { + pub fn new(node_id: NodeId, sync_each_ms: u64, external: Option>) -> Self { log::info!("[KeyValueBehaviour {}] created with sync_each_ms {}", node_id, sync_each_ms); - let default_awake = Arc::new(MockAwaker::default()); - let simple_local = Arc::new(RwLock::new(SimpleLocalStorage::new(default_awake.clone(), sync_each_ms))); - let hashmap_local = Arc::new(RwLock::new(HashmapLocalStorage::new(default_awake.clone(), sync_each_ms))); - let sdk = sdk::KeyValueSdk::new(simple_local.clone(), hashmap_local.clone(), timer); - - let sdk_c = sdk.clone(); - ( - Self { - node_id, - simple_remote: SimpleRemoteStorage::new(), - simple_local, - hashmap_remote: HashmapRemoteStorage::new(node_id), - hashmap_local, - outputs: VecDeque::new(), - }, - sdk, - ) + Self { + node_id, + simple_remote: SimpleRemoteStorage::new(), + simple_local: SimpleLocalStorage::new(sync_each_ms), + hashmap_remote: HashmapRemoteStorage::new(node_id), + hashmap_local: HashmapLocalStorage::new(sync_each_ms), + outputs: VecDeque::new(), + external, + } } fn pop_all_events(&mut self, _ctx: &BehaviorContext) @@ -74,12 +64,37 @@ where .push_back(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &KeyValueMsg::SimpleLocal(action.0)))); } - while let Some(action) = self.simple_local.write().pop_action() { - log::debug!("[KeyValueBehavior {}] pop_all_events simple local: {:?}", self.node_id, action); - let mut header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, action.1, 0); - header.from_node = Some(self.node_id); - self.outputs - .push_back(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &KeyValueMsg::SimpleRemote(action.0)))); + while let Some(action) = self.simple_local.pop_action() { + match action { + simple_local::LocalStorageAction::SendNet(msg, route) => { + log::debug!("[KeyValueBehavior {}] pop_all_events simple local: {:?}", self.node_id, msg); + let mut header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, route, 0); + header.from_node = Some(self.node_id); + self.outputs + .push_back(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &KeyValueMsg::SimpleRemote(msg)))); + } + simple_local::LocalStorageAction::LocalOnChanged(service_id, uuid, key, value, version, source) => { + if service_id == KEY_VALUE_SERVICE_ID { + if let Some(external) = &self.external { + external.on_event(KeyValueSdkEvent::OnKeyChanged(uuid, key, value, version, source)); + } + } else { + self.outputs.push_back(NetworkBehaviorAction::ToSdkService( + service_id, + KeyValueSdkEvent::OnKeyChanged(uuid, key, value, version, source).into(), + )); + } + } + simple_local::LocalStorageAction::LocalOnGet(service_id, uuid, key, res) => { + if service_id == KEY_VALUE_SERVICE_ID { + if let Some(external) = &self.external { + external.on_event(KeyValueSdkEvent::OnGet(uuid, key, res)); + } + } else { + self.outputs.push_back(NetworkBehaviorAction::ToSdkService(service_id, KeyValueSdkEvent::OnGet(uuid, key, res).into())); + } + } + } } while let Some(action) = self.hashmap_remote.pop_action() { @@ -90,12 +105,37 @@ where .push_back(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &KeyValueMsg::HashmapLocal(action.0)))); } - while let Some(action) = self.hashmap_local.write().pop_action() { - log::debug!("[KeyValueBehavior {}] pop_all_events hashmap local: {:?}", self.node_id, action); - let mut header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, action.1, 0); - header.from_node = Some(self.node_id); - self.outputs - .push_back(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &KeyValueMsg::HashmapRemote(action.0)))); + while let Some(action) = self.hashmap_local.pop_action() { + match action { + hashmap_local::LocalStorageAction::SendNet(msg, route) => { + log::debug!("[KeyValueBehavior {}] pop_all_events hashmap local: {:?}", self.node_id, msg); + let mut header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, route, 0); + header.from_node = Some(self.node_id); + self.outputs + .push_back(NetworkBehaviorAction::ToNet(TransportMsg::from_payload_bincode(header, &KeyValueMsg::HashmapRemote(msg)))); + } + hashmap_local::LocalStorageAction::LocalOnChanged(service_id, uuid, key, sub_key, value, version, source) => { + if service_id == KEY_VALUE_SERVICE_ID { + if let Some(external) = &self.external { + external.on_event(KeyValueSdkEvent::OnKeyHChanged(uuid, key, sub_key, value, version, source)); + } + } else { + self.outputs.push_back(NetworkBehaviorAction::ToSdkService( + service_id, + KeyValueSdkEvent::OnKeyHChanged(uuid, key, sub_key, value, version, source).into(), + )); + } + } + hashmap_local::LocalStorageAction::LocalOnGet(service_id, uuid, key, res) => { + if service_id == KEY_VALUE_SERVICE_ID { + if let Some(external) = &self.external { + external.on_event(KeyValueSdkEvent::OnGetH(uuid, key, res)); + } + } else { + self.outputs.push_back(NetworkBehaviorAction::ToSdkService(service_id, KeyValueSdkEvent::OnGetH(uuid, key, res).into())); + } + } + } } } @@ -111,7 +151,7 @@ where } KeyValueMsg::SimpleLocal(msg) => { log::debug!("[KeyValueBehavior {}] process_key_value_msg simple local: {:?} from {}", self.node_id, msg, from); - self.simple_local.write().on_event(from, msg); + self.simple_local.on_event(from, msg); self.pop_all_events::(ctx); } KeyValueMsg::HashmapRemote(msg) => { @@ -121,11 +161,47 @@ where } KeyValueMsg::HashmapLocal(msg) => { log::debug!("[KeyValueBehavior {}] process_key_value_msg hashmap local: {:?} from {}", self.node_id, msg, from); - self.hashmap_local.write().on_event(from, msg); + self.hashmap_local.on_event(from, msg); self.pop_all_events::(ctx); } } } + + fn process_sdk_event(&mut self, _ctx: &BehaviorContext, now_ms: u64, from_service: u8, event: KeyValueSdkEvent) { + match event { + KeyValueSdkEvent::Get(req_id, key, timeout_ms) => { + self.simple_local.get(now_ms, key, req_id, from_service, timeout_ms); + } + KeyValueSdkEvent::GetH(req_id, key, timeout_ms) => { + self.hashmap_local.get(now_ms, key, req_id, from_service, timeout_ms); + } + KeyValueSdkEvent::Set(key, value, ex) => { + self.simple_local.set(now_ms, key, value, ex); + } + KeyValueSdkEvent::SetH(key, sub_key, value, ex) => { + self.hashmap_local.set(now_ms, key, sub_key, value, ex); + } + KeyValueSdkEvent::Del(key) => { + self.simple_local.del(key); + } + KeyValueSdkEvent::DelH(key, sub_key) => { + self.hashmap_local.del(key, sub_key); + } + KeyValueSdkEvent::Sub(uuid, key, ex) => { + self.simple_local.subscribe(key, ex, uuid, from_service); + } + KeyValueSdkEvent::SubH(uuid, key, ex) => { + self.hashmap_local.subscribe(key, ex, uuid, from_service); + } + KeyValueSdkEvent::Unsub(uuid, key) => { + self.simple_local.unsubscribe(key, uuid, from_service); + } + KeyValueSdkEvent::UnsubH(uuid, key) => { + self.hashmap_local.unsubscribe(key, uuid, from_service); + } + _ => {} + } + } } #[allow(unused)] @@ -142,13 +218,23 @@ where fn on_tick(&mut self, ctx: &BehaviorContext, now_ms: u64, interal_ms: u64) { log::trace!("[KeyValueBehavior {}] on_tick ts_ms {}, interal_ms {}", self.node_id, now_ms, interal_ms); self.simple_remote.tick(now_ms); - self.simple_local.write().tick(now_ms); + self.simple_local.tick(now_ms); self.hashmap_remote.tick(now_ms); - self.hashmap_local.write().tick(now_ms); + self.hashmap_local.tick(now_ms); self.pop_all_events::(ctx); } fn on_awake(&mut self, ctx: &BehaviorContext, now_ms: u64) { + loop { + if let Some(external) = &self.external { + while let Some(event) = external.pop_action() { + log::info!("[KeyValueBehavior {}] external event: {:?}", self.node_id, event); + self.process_sdk_event(ctx, now_ms, KEY_VALUE_SERVICE_ID, event); + break; + } + } + break; + } self.pop_all_events::(ctx); } @@ -175,10 +261,10 @@ where fn on_sdk_msg(&mut self, ctx: &BehaviorContext, now_ms: u64, from_service: u8, event: SE) { if let Ok(event) = event.try_into() { - match event { - KeyValueSdkEvent::Local(msg) => {} - KeyValueSdkEvent::FromNode(node_id, msg) => {} - } + self.process_sdk_event(ctx, now_ms, from_service, event); + self.pop_all_events::(ctx); + } else { + debug_assert!(false, "Invalid event") } } @@ -216,8 +302,9 @@ where fn on_started(&mut self, ctx: &BehaviorContext, now_ms: u64) { log::info!("[KeyValueBehavior {}] on_started", self.node_id); - self.simple_local.write().change_awake_notify(ctx.awaker.clone()); - self.hashmap_local.write().change_awake_notify(ctx.awaker.clone()); + if let Some(external) = &self.external { + external.set_awaker(ctx.awaker.clone()); + } } fn on_stopped(&mut self, ctx: &BehaviorContext, now_ms: u64) { @@ -239,10 +326,10 @@ mod tests { behaviour::{BehaviorContext, NetworkBehavior, NetworkBehaviorAction}, msg::{MsgHeader, TransportMsg}, }; - use utils::{awaker::MockAwaker, MockTimer, Timer}; + use utils::awaker::MockAwaker; use crate::{ - msg::{HashmapLocalEvent, HashmapRemoteEvent, SimpleLocalEvent, SimpleRemoteEvent, KeyValueSdkEvent}, + msg::{HashmapLocalEvent, HashmapRemoteEvent, KeyValueSdkEvent, SimpleLocalEvent, SimpleRemoteEvent}, KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueMsg, KEY_VALUE_SERVICE_ID, }; @@ -251,13 +338,12 @@ mod tests { type SE = KeyValueSdkEvent; #[test] - fn sdk_simple_set_del_should_fire_event() { + fn set_simple_key_send_to_net() { let local_node_id = 1; let remote_node_id = 2; let sync_ms = 10000; let key = 1000; - let timer = Arc::new(MockTimer::default()); - let (mut behaviour, sdk) = super::KeyValueBehavior::::new(local_node_id, timer.clone(), sync_ms); + let mut behaviour = super::KeyValueBehavior::::new(local_node_id, sync_ms, None); let behaviour: &mut dyn NetworkBehavior = &mut behaviour; let ctx = BehaviorContext { @@ -266,23 +352,18 @@ mod tests { awaker: Arc::new(MockAwaker::default()), }; - behaviour.on_started(&ctx, timer.now_ms()); + behaviour.on_started(&ctx, 0); - // now set key should be awake and output Set command - sdk.set(key, vec![1], None); - assert_eq!(ctx.awaker.pop_awake_count(), 1); + behaviour.on_sdk_msg(&ctx, 0, 0, KeyValueSdkEvent::Set(key, vec![1], None)); - behaviour.on_awake(&ctx, timer.now_ms()); - - // after awake this should send Set to Key(1000) + // after handle sdk this should send Set to Key(1000) let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); expected_header.from_node = Some(local_node_id); let expected_msg = TransportMsg::from_payload_bincode(expected_header, &KeyValueMsg::SimpleRemote(SimpleRemoteEvent::Set(0, key, vec![1], 0, None))); assert_eq!(behaviour.pop_action(), Some(NetworkBehaviorAction::ToNet(expected_msg))); // after tick without ack should resend - timer.fake(100); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 100, 100); let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); expected_header.from_node = Some(local_node_id); @@ -292,21 +373,17 @@ mod tests { // after handle ack should not resend behaviour.on_handler_event( &ctx, - timer.now_ms(), + 200, remote_node_id, ConnId::from_in(0, 0), KeyValueBehaviorEvent::FromNode(remote_node_id, KeyValueMsg::SimpleLocal(SimpleLocalEvent::SetAck(1, 1000, 0, true))), ); - timer.fake(200); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 200, 100); assert_eq!(behaviour.pop_action(), None); // after del should send Del to Key(1000) - sdk.del(key); - assert_eq!(ctx.awaker.pop_awake_count(), 1); - - behaviour.on_awake(&ctx, timer.now_ms()); + behaviour.on_sdk_msg(&ctx, 0, 0, KeyValueSdkEvent::Del(key)); // after awake this should send Del to Key(1000) let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); @@ -315,8 +392,7 @@ mod tests { assert_eq!(behaviour.pop_action(), Some(NetworkBehaviorAction::ToNet(expected_msg))); // after tick without ack should resend - timer.fake(300); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 300, 100); let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); expected_header.from_node = Some(local_node_id); @@ -326,14 +402,13 @@ mod tests { // after handle ack should not resend behaviour.on_handler_event( &ctx, - timer.now_ms(), + 400, remote_node_id, ConnId::from_in(0, 0), KeyValueBehaviorEvent::FromNode(remote_node_id, KeyValueMsg::SimpleLocal(SimpleLocalEvent::DelAck(3, 1000, Some(0)))), ); - timer.fake(400); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 400, 100); assert_eq!(behaviour.pop_action(), None); } @@ -344,8 +419,7 @@ mod tests { let sync_ms = 10000; let key = 1000; let sub_key = 111; - let timer = Arc::new(MockTimer::default()); - let (mut behaviour, sdk) = super::KeyValueBehavior::::new(local_node_id, timer.clone(), sync_ms); + let mut behaviour = super::KeyValueBehavior::::new(local_node_id, sync_ms, None); let behaviour: &mut dyn NetworkBehavior = &mut behaviour; let ctx = BehaviorContext { @@ -354,13 +428,10 @@ mod tests { awaker: Arc::new(MockAwaker::default()), }; - behaviour.on_started(&ctx, timer.now_ms()); + behaviour.on_started(&ctx, 0); // now set key should be awake and output Set command - sdk.hset(key, sub_key, vec![1], None); - assert_eq!(ctx.awaker.pop_awake_count(), 1); - - behaviour.on_awake(&ctx, timer.now_ms()); + behaviour.on_sdk_msg(&ctx, 0, 0, KeyValueSdkEvent::SetH(key, sub_key, vec![1], None)); // after awake this should send Set to Key(1000) let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); @@ -369,8 +440,7 @@ mod tests { assert_eq!(behaviour.pop_action(), Some(NetworkBehaviorAction::ToNet(expected_msg))); // after tick without ack should resend - timer.fake(100); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 100, 100); let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); expected_header.from_node = Some(local_node_id); @@ -380,21 +450,17 @@ mod tests { // after handle ack should not resend behaviour.on_handler_event( &ctx, - timer.now_ms(), + 200, remote_node_id, ConnId::from_in(0, 0), KeyValueBehaviorEvent::FromNode(remote_node_id, KeyValueMsg::HashmapLocal(HashmapLocalEvent::SetAck(1, key, sub_key, 0, true))), ); - timer.fake(200); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 200, 100); assert_eq!(behaviour.pop_action(), None); // after del should send Del to Key(1000) - sdk.hdel(key, sub_key); - assert_eq!(ctx.awaker.pop_awake_count(), 1); - - behaviour.on_awake(&ctx, timer.now_ms()); + behaviour.on_sdk_msg(&ctx, 0, 0, KeyValueSdkEvent::DelH(key, sub_key)); // after awake this should send Del to Key(1000) let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); @@ -403,8 +469,7 @@ mod tests { assert_eq!(behaviour.pop_action(), Some(NetworkBehaviorAction::ToNet(expected_msg))); // after tick without ack should resend - timer.fake(300); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 200, 100); let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); expected_header.from_node = Some(local_node_id); @@ -414,14 +479,13 @@ mod tests { // after handle ack should not resend behaviour.on_handler_event( &ctx, - timer.now_ms(), + 400, remote_node_id, ConnId::from_in(0, 0), KeyValueBehaviorEvent::FromNode(remote_node_id, KeyValueMsg::HashmapLocal(HashmapLocalEvent::DelAck(3, key, sub_key, Some(0)))), ); - timer.fake(400); - behaviour.on_tick(&ctx, timer.now_ms(), 100); + behaviour.on_tick(&ctx, 400, 100); assert_eq!(behaviour.pop_action(), None); } @@ -431,8 +495,7 @@ mod tests { let remote_node_id = 2; let sync_ms = 10000; let key = 1000; - let timer = Arc::new(MockTimer::default()); - let (mut behaviour, _sdk) = super::KeyValueBehavior::::new(local_node_id, timer.clone(), sync_ms); + let mut behaviour = super::KeyValueBehavior::::new(local_node_id, sync_ms, None); let behaviour: &mut dyn NetworkBehavior = &mut behaviour; let ctx = BehaviorContext { @@ -441,12 +504,12 @@ mod tests { awaker: Arc::new(MockAwaker::default()), }; - behaviour.on_started(&ctx, timer.now_ms()); + behaviour.on_started(&ctx, 0); // received Simple Set behaviour.on_handler_event( &ctx, - timer.now_ms(), + 0, remote_node_id, ConnId::from_in(0, 0), KeyValueBehaviorEvent::FromNode(remote_node_id, KeyValueMsg::SimpleRemote(SimpleRemoteEvent::Set(0, key, vec![1], 0, None))), @@ -466,8 +529,7 @@ mod tests { let sync_ms = 10000; let key = 1000; let sub_key = 111; - let timer = Arc::new(MockTimer::default()); - let (mut behaviour, _sdk) = super::KeyValueBehavior::::new(local_node_id, timer.clone(), sync_ms); + let mut behaviour = super::KeyValueBehavior::::new(local_node_id, sync_ms, None); let behaviour: &mut dyn NetworkBehavior = &mut behaviour; let ctx = BehaviorContext { @@ -476,12 +538,12 @@ mod tests { awaker: Arc::new(MockAwaker::default()), }; - behaviour.on_started(&ctx, timer.now_ms()); + behaviour.on_started(&ctx, 0); // received Simple Set behaviour.on_handler_event( &ctx, - timer.now_ms(), + 0, remote_node_id, ConnId::from_in(0, 0), KeyValueBehaviorEvent::FromNode(remote_node_id, KeyValueMsg::HashmapRemote(HashmapRemoteEvent::Set(0, key, sub_key, vec![1], 0, None))), @@ -499,8 +561,7 @@ mod tests { let local_node_id = 1; let sync_ms = 10000; let key = 1000; - let timer = Arc::new(MockTimer::default()); - let (mut behaviour, sdk) = super::KeyValueBehavior::::new(local_node_id, timer.clone(), sync_ms); + let mut behaviour = super::KeyValueBehavior::::new(local_node_id, sync_ms, None); let behaviour: &mut dyn NetworkBehavior = &mut behaviour; let ctx = BehaviorContext { @@ -509,13 +570,10 @@ mod tests { awaker: Arc::new(MockAwaker::default()), }; - behaviour.on_started(&ctx, timer.now_ms()); + behaviour.on_started(&ctx, 0); // now set key should be awake and output Set command - let _sub = sdk.subscribe(key, None); - assert_eq!(ctx.awaker.pop_awake_count(), 1); - - behaviour.on_awake(&ctx, timer.now_ms()); + behaviour.on_sdk_msg(&ctx, 0, 0, KeyValueSdkEvent::Sub(0, key, None)); // after awake this should send Set to Key(1000) let mut expected_header = MsgHeader::build_reliable(KEY_VALUE_SERVICE_ID, RouteRule::ToKey(key as u32), 0); @@ -523,6 +581,4 @@ mod tests { let expected_msg = TransportMsg::from_payload_bincode(expected_header, &KeyValueMsg::SimpleRemote(SimpleRemoteEvent::Sub(0, key, None))); assert_eq!(behaviour.pop_action(), Some(NetworkBehaviorAction::ToNet(expected_msg))); } - - //TODO test after received sub event and set event should send OnSet event } diff --git a/packages/services/key_value/src/behavior/hashmap_local.rs b/packages/services/key_value/src/behavior/hashmap_local.rs index 78af3a0f..7cf44886 100644 --- a/packages/services/key_value/src/behavior/hashmap_local.rs +++ b/packages/services/key_value/src/behavior/hashmap_local.rs @@ -1,5 +1,5 @@ use crate::{ - msg::{HashmapLocalEvent, HashmapRemoteEvent}, + msg::{HashmapLocalEvent, HashmapRemoteEvent, KeyValueSdkEventError}, KeyId, KeySource, KeyVersion, ReqId, SubKeyId, ValueType, }; use bluesea_identity::NodeId; @@ -17,12 +17,8 @@ use bluesea_router::RouteRule; /// Same with subscribe/unsubscribe use std::{ collections::{HashMap, VecDeque}, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, }; -use utils::awaker::Awaker; struct KeySlotData { value: Option>, @@ -37,22 +33,29 @@ struct KeySlotSubscribe { last_sync: u64, sub: bool, acked: bool, - handler: Box>, KeyVersion, KeySource) + Send + Sync>, + handlers: HashMap<(u64, u8), ()>, } #[derive(Debug, PartialEq, Eq)] pub enum HashmapKeyValueGetError { NetworkError, Timeout, + InternalError, } struct KeySlotGetCallback { + key: KeyId, timeout_after_ts: u64, - callback: Box>, HashmapKeyValueGetError>) + Send + Sync>, + uuid: u64, + service_id: u8, } #[derive(Debug, Eq, PartialEq)] -pub struct LocalStorageAction(pub(crate) HashmapRemoteEvent, pub(crate) RouteRule); +pub enum LocalStorageAction { + SendNet(HashmapRemoteEvent, RouteRule), + LocalOnChanged(u8, u64, KeyId, SubKeyId, Option, KeyVersion, KeySource), + LocalOnGet(u8, u64, KeyId, Result>, KeyValueSdkEventError>), +} pub struct HashmapLocalStorage { req_id_seed: AtomicU64, @@ -62,12 +65,11 @@ pub struct HashmapLocalStorage { subscribe: HashMap, output_events: VecDeque, get_queue: HashMap, - awake_notify: Arc, } impl HashmapLocalStorage { /// create new local storage with provided timer and sync_each_ms. Sync_each_ms is used for sync data to remote storage incase of acked - pub fn new(awake_notify: Arc, sync_each_ms: u64) -> Self { + pub fn new(sync_each_ms: u64) -> Self { Self { req_id_seed: AtomicU64::new(0), version_seed: 0, @@ -76,14 +78,9 @@ impl HashmapLocalStorage { subscribe: HashMap::new(), output_events: VecDeque::new(), get_queue: HashMap::new(), - awake_notify, } } - pub fn change_awake_notify(&mut self, awake_notify: Arc) { - self.awake_notify = awake_notify; - } - fn gen_req_id(&self) -> u64 { return self.req_id_seed.fetch_add(1, Ordering::SeqCst); } @@ -102,14 +99,16 @@ impl HashmapLocalStorage { let req_id = self.gen_req_id(); if let Some(value) = &slot.value { log::debug!("[HashmapLocal] resend set key {} with version {}", key, slot.version); - self.output_events.push_back(LocalStorageAction( + self.output_events.push_back(LocalStorageAction::SendNet( HashmapRemoteEvent::Set(req_id, *key, *sub_key, value.clone(), slot.version, slot.ex.clone()), RouteRule::ToKey(*key as u32), )); } else { log::debug!("[HashmapLocal] resend del key {} with version {}", key, slot.version); - self.output_events - .push_back(LocalStorageAction(HashmapRemoteEvent::Del(req_id, *key, *sub_key, slot.version), RouteRule::ToKey(*key as u32))); + self.output_events.push_back(LocalStorageAction::SendNet( + HashmapRemoteEvent::Del(req_id, *key, *sub_key, slot.version), + RouteRule::ToKey(*key as u32), + )); } } } @@ -121,10 +120,11 @@ impl HashmapLocalStorage { if slot.sub { log::debug!("[HashmapLocal] resend sub key {}", key); self.output_events - .push_back(LocalStorageAction(HashmapRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); } else { log::debug!("[HashmapLocal] resend unsub key {}", key); - self.output_events.push_back(LocalStorageAction(HashmapRemoteEvent::Unsub(req_id, *key), RouteRule::ToKey(*key as u32))); + self.output_events + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(req_id, *key), RouteRule::ToKey(*key as u32))); } } } @@ -136,7 +136,7 @@ impl HashmapLocalStorage { let req_id = self.gen_req_id(); if let Some(value) = &slot.value { log::debug!("[HashmapLocal] sync set key {} with version {}", key, slot.version); - self.output_events.push_back(LocalStorageAction( + self.output_events.push_back(LocalStorageAction::SendNet( HashmapRemoteEvent::Set(req_id, *key, *sub_key, value.clone(), slot.version, slot.ex.clone()), RouteRule::ToKey(*key as u32), )); @@ -163,7 +163,7 @@ impl HashmapLocalStorage { if slot.sub { log::debug!("[HashmapLocal] sync sub key {}", key); self.output_events - .push_back(LocalStorageAction(HashmapRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); } else { log::debug!("[HashmapLocal] remove sub key {} after acked", key); // Just remove if acked and unsub @@ -191,7 +191,8 @@ impl HashmapLocalStorage { for req_id in timeout_gets { if let Some(slot) = self.get_queue.remove(&req_id) { log::debug!("[HashmapLocal] get key {} timeout", req_id); - (slot.callback)(Err(HashmapKeyValueGetError::Timeout)); + self.output_events + .push_back(LocalStorageAction::LocalOnGet(slot.service_id, slot.uuid, slot.key, Err(KeyValueSdkEventError::Timeout))); } } @@ -229,7 +230,7 @@ impl HashmapLocalStorage { } HashmapLocalEvent::GetAck(req_id, _key, value) => { if let Some(slot) = self.get_queue.remove(&req_id) { - (slot.callback)(Ok(value)) + self.output_events.push_back(LocalStorageAction::LocalOnGet(slot.service_id, slot.uuid, slot.key, Ok(value))); } else { } } @@ -263,18 +264,26 @@ impl HashmapLocalStorage { } } HashmapLocalEvent::OnKeySet(req_id, key, sub_key, value, version, source) => { - self.output_events.push_back(LocalStorageAction(HashmapRemoteEvent::OnKeySetAck(req_id), RouteRule::ToNode(from))); + self.output_events + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::OnKeySetAck(req_id), RouteRule::ToNode(from))); if let Some(slot) = self.subscribe.get_mut(&key) { if slot.sub { - (slot.handler)(key, sub_key, Some(value), version, source); + for ((uuid, service_id), _) in slot.handlers.iter() { + self.output_events + .push_back(LocalStorageAction::LocalOnChanged(*service_id, *uuid, key, sub_key, Some(value.clone()), version, source)); + } } } } HashmapLocalEvent::OnKeyDel(req_id, key, sub_key, version, source) => { - self.output_events.push_back(LocalStorageAction(HashmapRemoteEvent::OnKeyDelAck(req_id), RouteRule::ToNode(from))); + self.output_events + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::OnKeyDelAck(req_id), RouteRule::ToNode(from))); if let Some(slot) = self.subscribe.get_mut(&key) { if slot.sub { - (slot.handler)(key, sub_key, None, version, source); + for ((uuid, service_id), _) in slot.handlers.iter() { + self.output_events + .push_back(LocalStorageAction::LocalOnChanged(*service_id, *uuid, key, sub_key, None, version, source)); + } } } } @@ -300,29 +309,26 @@ impl HashmapLocalStorage { }, ); - self.output_events - .push_back(LocalStorageAction(HashmapRemoteEvent::Set(req_id, key, sub_key, value, version, ex), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + self.output_events.push_back(LocalStorageAction::SendNet( + HashmapRemoteEvent::Set(req_id, key, sub_key, value, version, ex), + RouteRule::ToKey(key as u32), + )); } - pub fn get( - &mut self, - now_ms: u64, - key: KeyId, - callback: Box>, HashmapKeyValueGetError>) + Send + Sync>, - timeout_ms: u64, - ) { + pub fn get(&mut self, now_ms: u64, key: KeyId, uuid: u64, service_id: u8, timeout_ms: u64) { let req_id = self.gen_req_id(); log::debug!("[HashmapLocal] get key {} with req_id {}", key, req_id); self.get_queue.insert( req_id, KeySlotGetCallback { + key, timeout_after_ts: now_ms + timeout_ms, - callback, + uuid, + service_id, }, ); - self.output_events.push_back(LocalStorageAction(HashmapRemoteEvent::Get(req_id, key), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + self.output_events + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Get(req_id, key), RouteRule::ToKey(key as u32))); } pub fn del(&mut self, key: KeyId, sub_key: SubKeyId) { @@ -334,14 +340,14 @@ impl HashmapLocalStorage { slot.acked = false; self.output_events - .push_back(LocalStorageAction(HashmapRemoteEvent::Del(req_id, key, sub_key, slot.version), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Del(req_id, key, sub_key, slot.version), RouteRule::ToKey(key as u32))); } } - pub fn subscribe(&mut self, key: KeyId, ex: Option, handler: Box>, KeyVersion, KeySource) + Send + Sync>) { - if self.subscribe.contains_key(&key) { + pub fn subscribe(&mut self, key: KeyId, ex: Option, uuid: u64, service_id: u8) { + if let Some(slot) = self.subscribe.get_mut(&key) { log::warn!("[HashmapLocal] subscribe key {} but already subscribed", key); + slot.handlers.insert((uuid, service_id), ()); return; } @@ -354,24 +360,28 @@ impl HashmapLocalStorage { last_sync: 0, sub: true, acked: false, - handler, + handlers: HashMap::from([((uuid, service_id), ())]), }, ); - self.output_events.push_back(LocalStorageAction(HashmapRemoteEvent::Sub(req_id, key, ex), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + self.output_events + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(req_id, key, ex), RouteRule::ToKey(key as u32))); } - pub fn unsubscribe(&mut self, key: KeyId) { + pub fn unsubscribe(&mut self, key: KeyId, uuid: u64, service_id: u8) { let req_id = self.gen_req_id(); if let Some(slot) = self.subscribe.get_mut(&key) { - slot.sub = false; - slot.last_sync = 0; - slot.acked = false; + slot.handlers.remove(&(uuid, service_id)); - log::debug!("[HashmapLocal] unsubscribe key {} with req_id {}", key, req_id); + if slot.handlers.is_empty() { + slot.sub = false; + slot.last_sync = 0; + slot.acked = false; - self.output_events.push_back(LocalStorageAction(HashmapRemoteEvent::Unsub(req_id, key), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + log::debug!("[HashmapLocal] unsubscribe key {} with req_id {}", key, req_id); + + self.output_events + .push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(req_id, key), RouteRule::ToKey(key as u32))); + } } else { log::warn!("[HashmapLocal] unsubscribe key {} but not subscribed", key); } @@ -380,28 +390,25 @@ impl HashmapLocalStorage { #[cfg(test)] mod tests { - use std::sync::Arc; - use bluesea_router::RouteRule; - use parking_lot::Mutex; - use utils::awaker::{Awaker, MockAwaker}; use crate::{ behavior::hashmap_local::LocalStorageAction, - msg::{HashmapLocalEvent, HashmapRemoteEvent}, + msg::{HashmapLocalEvent, HashmapRemoteEvent, KeyValueSdkEventError}, }; use super::HashmapLocalStorage; #[test] fn set_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = HashmapLocalStorage::new(10000); storage.set(0, 1, 2, vec![1], None); - assert_eq!(awake_notify.pop_awake_count(), 1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Set(0, 1, 2, vec![1], 0, None), RouteRule::ToKey(1)))); + assert_eq!( + storage.pop_action(), + Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Set(0, 1, 2, vec![1], 0, None), RouteRule::ToKey(1))) + ); assert_eq!(storage.pop_action(), None); storage.on_event(2, HashmapLocalEvent::SetAck(0, 1, 2, 0, true)); @@ -415,7 +422,7 @@ mod tests { // fn should_renegerate_set_event_if_ack_failed() { // let timer = Arc::new(utils::MockTimer::default()); // let awake_notify = Arc::new(MockAwaker::default()); - // let mut storage = LocalStorage::new(awake_notify, 10000); + // let mut storage = LocalStorage::new(10000); // storage.set(1, vec![1], None); // assert_eq!(storage.pop_action(), Some(LocalStorageAction(RemoteEvent::Set(0, 1, vec![1], 0, None), RouteRule::ToKey(1)))); @@ -435,8 +442,7 @@ mod tests { #[test] fn set_should_generate_new_version() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); + let mut storage = HashmapLocalStorage::new(10000); storage.set(0, 1, 2, vec![1], None); assert!(storage.pop_action().is_some()); @@ -445,7 +451,7 @@ mod tests { storage.set(1000, 1, 2, vec![2], None); assert_eq!( storage.pop_action(), - Some(LocalStorageAction(HashmapRemoteEvent::Set(1, 1, 2, vec![2], 65536001, None), RouteRule::ToKey(1))) + Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Set(1, 1, 2, vec![2], 65536001, None), RouteRule::ToKey(1))) ); assert_eq!(storage.pop_action(), None); @@ -458,8 +464,7 @@ mod tests { #[test] fn set_should_retry_after_tick_and_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); + let mut storage = HashmapLocalStorage::new(10000); storage.set(0, 1, 2, vec![1], None); assert!(storage.pop_action().is_some()); @@ -467,14 +472,16 @@ mod tests { //because dont received ack, should resend event storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Set(1, 1, 2, vec![1], 0, None), RouteRule::ToKey(1)))); + assert_eq!( + storage.pop_action(), + Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Set(1, 1, 2, vec![1], 0, None), RouteRule::ToKey(1))) + ); assert_eq!(storage.pop_action(), None); } #[test] fn set_acked_should_resend_each_sync_each_ms() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); + let mut storage = HashmapLocalStorage::new(10000); storage.set(0, 1, 2, vec![1], None); assert!(storage.pop_action().is_some()); @@ -488,13 +495,15 @@ mod tests { //should resend if timer greater than sync_each_ms storage.tick(10001); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Set(1, 1, 2, vec![1], 0, None), RouteRule::ToKey(1)))); + assert_eq!( + storage.pop_action(), + Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Set(1, 1, 2, vec![1], 0, None), RouteRule::ToKey(1))) + ); } #[test] fn del_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = HashmapLocalStorage::new(10000); storage.set(0, 1, 2, vec![1], None); assert!(storage.pop_action().is_some()); @@ -502,8 +511,7 @@ mod tests { storage.on_event(2, HashmapLocalEvent::SetAck(0, 1, 2, 0, true)); storage.del(1, 2); - assert_eq!(awake_notify.pop_awake_count(), 2); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Del(1, 1, 2, 0), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Del(1, 1, 2, 0), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after received ack should not resend event @@ -514,8 +522,7 @@ mod tests { #[test] fn del_should_mark_after_ack_older() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = HashmapLocalStorage::new(10000); storage.set(0, 1, 2, vec![1], None); assert!(storage.pop_action().is_some()); @@ -528,7 +535,7 @@ mod tests { storage.on_event(2, HashmapLocalEvent::SetAck(0, 1, 2, 0, true)); storage.del(1, 2); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Del(2, 1, 2, 65536001), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Del(2, 1, 2, 65536001), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after received ack should not resend event @@ -539,8 +546,7 @@ mod tests { #[test] fn del_should_retry_after_tick_and_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = HashmapLocalStorage::new(10000); storage.set(0, 1, 2, vec![1], None); assert!(storage.pop_action().is_some()); @@ -548,21 +554,19 @@ mod tests { storage.on_event(2, HashmapLocalEvent::SetAck(0, 1, 2, 0, true)); storage.del(1, 2); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Del(1, 1, 2, 0), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Del(1, 1, 2, 0), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Del(2, 1, 2, 0), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Del(2, 1, 2, 0), RouteRule::ToKey(1)))); } #[test] fn sub_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = HashmapLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _, _| {})); - assert_eq!(awake_notify.pop_awake_count(), 1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + storage.subscribe(1, None, 1111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.on_event(2, HashmapLocalEvent::SubAck(0, 1)); @@ -573,19 +577,10 @@ mod tests { #[test] fn sub_event_test() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); - let received_events = Arc::new(Mutex::new(Vec::new())); - - let received_events_clone = received_events.clone(); - storage.subscribe( - 1, - None, - Box::new(move |key, sub_key, value, version, source| { - received_events_clone.lock().push((key, sub_key, value, version, source)); - }), - ); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + let mut storage = HashmapLocalStorage::new(10000); + + storage.subscribe(1, None, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.on_event(2, HashmapLocalEvent::SubAck(0, 1)); @@ -595,31 +590,33 @@ mod tests { // fake incoming event storage.on_event(2, HashmapLocalEvent::OnKeySet(0, 1, 2, vec![1], 0, 1000)); - storage.on_event(2, HashmapLocalEvent::OnKeyDel(0, 1, 2, 0, 1000)); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::OnKeySetAck(0), RouteRule::ToNode(2)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnChanged(10, 11111, 1, 2, Some(vec![1]), 0, 1000))); - assert_eq!(*received_events.lock(), vec![(1, 2, Some(vec![1]), 0, 1000), (1, 2, None, 0, 1000),]); + storage.on_event(2, HashmapLocalEvent::OnKeyDel(1, 1, 2, 0, 1000)); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::OnKeyDelAck(1), RouteRule::ToNode(2)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnChanged(10, 11111, 1, 2, None, 0, 1000))); + assert_eq!(storage.pop_action(), None); } #[test] fn sub_should_retry_after_tick_and_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); + let mut storage = HashmapLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _, _| {})); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + storage.subscribe(1, None, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); } #[test] fn sub_acked_should_resend_each_sync_each_ms() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); + let mut storage = HashmapLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _, _| {})); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + storage.subscribe(1, None, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.on_event(2, HashmapLocalEvent::SubAck(0, 1)); @@ -629,24 +626,22 @@ mod tests { //should resend if timer greater than sync_each_ms storage.tick(10001); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); } #[test] fn unsub_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = HashmapLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _, _| {})); + storage.subscribe(1, None, 11111, 10); assert!(storage.pop_action().is_some()); assert!(storage.pop_action().is_none()); storage.on_event(2, HashmapLocalEvent::SubAck(0, 1)); //sending unsub - storage.unsubscribe(1); - assert_eq!(awake_notify.pop_awake_count(), 2); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); + storage.unsubscribe(1, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after received ack should not resend event @@ -657,70 +652,52 @@ mod tests { #[test] fn unsub_should_retry_after_tick_if_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); + let mut storage = HashmapLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _, _| {})); + storage.subscribe(1, None, 11111, 10); assert!(storage.pop_action().is_some()); assert!(storage.pop_action().is_none()); storage.on_event(2, HashmapLocalEvent::SubAck(0, 1)); //sending unsub - storage.unsubscribe(1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); + storage.unsubscribe(1, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //if not received ack should resend event each tick storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Unsub(2, 1), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(2, 1), RouteRule::ToKey(1)))); } #[test] fn get_should_callback_correct_value() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); - - let got_value = Arc::new(Mutex::new(None)); - let got_value_clone = got_value.clone(); - storage.get( - 0, - 1, - Box::new(move |result| { - *got_value_clone.lock() = Some(result); - }), - 1000, - ); + let mut storage = HashmapLocalStorage::new(10000); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); + storage.get(0, 1, 11111, 10, 1000); + + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //fake received result storage.on_event(2, HashmapLocalEvent::GetAck(0, 1, Some(vec![(2, vec![1], 0, 1000)]))); - assert_eq!(*got_value.lock(), Some(Ok(Some(vec![(2, vec![1], 0, 1000)])))); + + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnGet(10, 11111, 1, Ok(Some(vec![(2, vec![1], 0, 1000)]))))); + assert_eq!(storage.pop_action(), None); } #[test] fn get_should_timeout_after_no_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = HashmapLocalStorage::new(awake_notify, 10000); - - let got_value = Arc::new(Mutex::new(None)); - let got_value_clone = got_value.clone(); - storage.get( - 0, - 1, - Box::new(move |result| { - *got_value_clone.lock() = Some(result); - }), - 1000, - ); + let mut storage = HashmapLocalStorage::new(10000); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(HashmapRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); + storage.get(0, 1, 11111, 10, 1000); + + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after timeout should callback error storage.tick(1001); - assert_eq!(*got_value.lock(), Some(Err(super::HashmapKeyValueGetError::Timeout))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnGet(10, 11111, 1, Err(KeyValueSdkEventError::Timeout)))); + assert_eq!(storage.pop_action(), None); } } diff --git a/packages/services/key_value/src/behavior/sdk.rs b/packages/services/key_value/src/behavior/sdk.rs index 0f78568c..23be82b9 100644 --- a/packages/services/key_value/src/behavior/sdk.rs +++ b/packages/services/key_value/src/behavior/sdk.rs @@ -1,114 +1,124 @@ -use std::sync::Arc; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; use async_std::channel::Sender; -use parking_lot::RwLock; -use utils::{error_handle::ErrorUtils, Timer}; +use parking_lot::{Mutex, RwLock}; +use utils::awaker::Awaker; -use crate::{KeyId, KeySource, KeyVersion, SubKeyId, ValueType}; +use crate::{msg::KeyValueSdkEventError, ExternalControl, KeyId, KeySource, KeyValueSdkEvent, KeyVersion, SubKeyId, ValueType}; -use super::{ - hashmap_local::{HashmapKeyValueGetError, HashmapLocalStorage}, - simple_local::{SimpleKeyValueGetError, SimpleLocalStorage}, -}; +use super::{hashmap_local::HashmapKeyValueGetError, simple_local::SimpleKeyValueGetError}; mod pub_sub; pub type SimpleKeyValueSubscriber = pub_sub::Subscriber, KeyVersion, KeySource)>; pub type HashmapKeyValueSubscriber = pub_sub::Subscriber, KeyVersion, KeySource)>; +static SDK_SUB_UUID: u64 = 0x11; + #[derive(Clone)] pub struct KeyValueSdk { - simple_local: Arc>, - hashmap_local: Arc>, + req_id_gen: Arc>, + awaker: Arc>>>, simple_publisher: Arc, KeyVersion, KeySource)>>, hashmap_publisher: Arc, KeyVersion, KeySource)>>, - timer: Arc, + simple_get_queue: Arc, SimpleKeyValueGetError>>>>>, + hashmap_get_queue: Arc>, HashmapKeyValueGetError>>>>>, + actions: Arc>>, } impl KeyValueSdk { - pub fn new(simple_local: Arc>, hashmap_local: Arc>, timer: Arc) -> Self { + pub fn new() -> Self { Self { - simple_local, - hashmap_local, + req_id_gen: Arc::new(Mutex::new(0)), + awaker: Arc::new(RwLock::new(None)), simple_publisher: Arc::new(pub_sub::PublisherManager::new()), hashmap_publisher: Arc::new(pub_sub::PublisherManager::new()), - timer, + actions: Arc::new(RwLock::new(VecDeque::new())), + simple_get_queue: Arc::new(Mutex::new(HashMap::new())), + hashmap_get_queue: Arc::new(Mutex::new(HashMap::new())), } } pub fn set(&self, key: KeyId, value: Vec, ex: Option) { - self.simple_local.write().set(self.timer.now_ms(), key, value, ex); + self.actions.write().push_back(crate::KeyValueSdkEvent::Set(key, value, ex)); + self.awaker.read().as_ref().unwrap().notify(); } pub async fn get(&self, key: KeyId, timeout_ms: u64) -> Result, SimpleKeyValueGetError> { - let (tx, rx) = async_std::channel::bounded(1); - let handler = move |res: Result, SimpleKeyValueGetError>| { - tx.try_send(res).print_error("Should send result"); + let req_id = { + let mut req_id_gen = self.req_id_gen.lock(); + *req_id_gen += 1; + *req_id_gen }; - self.simple_local.write().get(self.timer.now_ms(), key, Box::new(handler), timeout_ms); - rx.recv().await.unwrap_or(Err(SimpleKeyValueGetError::Timeout)) + self.actions.write().push_back(crate::KeyValueSdkEvent::Get(req_id, key, timeout_ms)); + self.awaker.read().as_ref().unwrap().notify(); + let (tx, rx) = async_std::channel::bounded(1); + self.simple_get_queue.lock().insert(req_id, tx); + rx.recv().await.map_err(|_| SimpleKeyValueGetError::InternalError)? } pub fn del(&self, key: KeyId) { - self.simple_local.write().del(key); + self.actions.write().push_back(crate::KeyValueSdkEvent::Del(key)); + self.awaker.read().as_ref().unwrap().notify(); } pub fn subscribe(&self, key: KeyId, ex: Option) -> SimpleKeyValueSubscriber { - let local = self.simple_local.clone(); + let actions = self.actions.clone(); + let awaker = self.awaker.clone(); let (subscriber, is_new) = self.simple_publisher.subscribe( key, Box::new(move || { - local.write().unsubscribe(key); + actions.write().push_back(crate::KeyValueSdkEvent::Unsub(SDK_SUB_UUID, key)); + awaker.read().as_ref().unwrap().notify(); }), ); if is_new { - let publisher = self.simple_publisher.clone(); - self.simple_local.write().subscribe( - key, - ex, - Box::new(move |key, value, version, source| { - publisher.publish(key, (key, value, version, source)); - }), - ); + self.actions.write().push_back(crate::KeyValueSdkEvent::Sub(SDK_SUB_UUID, key, ex)); + self.awaker.read().as_ref().unwrap().notify(); } subscriber } pub fn hset(&self, key: KeyId, sub_key: SubKeyId, value: Vec, ex: Option) { - self.hashmap_local.write().set(self.timer.now_ms(), key, sub_key, value, ex); + self.actions.write().push_back(crate::KeyValueSdkEvent::SetH(key, sub_key, value, ex)); + self.awaker.read().as_ref().unwrap().notify(); } pub async fn hget(&self, key: KeyId, timeout_ms: u64) -> Result>, HashmapKeyValueGetError> { - let (tx, rx) = async_std::channel::bounded(1); - let handler = move |res: Result>, HashmapKeyValueGetError>| { - tx.try_send(res).print_error("Should send result"); + let req_id = { + let mut req_id_gen = self.req_id_gen.lock(); + *req_id_gen += 1; + *req_id_gen }; - self.hashmap_local.write().get(self.timer.now_ms(), key, Box::new(handler), timeout_ms); - rx.recv().await.unwrap_or(Err(HashmapKeyValueGetError::Timeout)) + self.actions.write().push_back(crate::KeyValueSdkEvent::GetH(req_id, key, timeout_ms)); + self.awaker.read().as_ref().unwrap().notify(); + let (tx, rx) = async_std::channel::bounded(1); + self.hashmap_get_queue.lock().insert(req_id, tx); + rx.recv().await.map_err(|_| HashmapKeyValueGetError::InternalError)? } pub fn hdel(&self, key: KeyId, sub_key: SubKeyId) { - self.hashmap_local.write().del(key, sub_key); + self.actions.write().push_back(crate::KeyValueSdkEvent::DelH(key, sub_key)); + self.awaker.read().as_ref().unwrap().notify(); } pub fn hsubscribe(&self, key: u64, ex: Option) -> HashmapKeyValueSubscriber { - let local = self.hashmap_local.clone(); + let actions = self.actions.clone(); + let awaker = self.awaker.clone(); let (subscriber, is_new) = self.hashmap_publisher.subscribe( key, Box::new(move || { - local.write().unsubscribe(key); + actions.write().push_back(crate::KeyValueSdkEvent::UnsubH(SDK_SUB_UUID, key)); + awaker.read().as_ref().unwrap().notify(); }), ); if is_new { - let publisher = self.hashmap_publisher.clone(); - self.hashmap_local.write().subscribe( - key, - ex, - Box::new(move |key, sub_key, value, version, source| { - publisher.publish(key, (key, sub_key, value, version, source)); - }), - ); + self.actions.write().push_back(crate::KeyValueSdkEvent::SubH(SDK_SUB_UUID, key, ex)); + self.awaker.read().as_ref().unwrap().notify(); } subscriber @@ -116,20 +126,156 @@ impl KeyValueSdk { pub fn hsubscribe_raw(&self, key: u64, uuid: u64, ex: Option, tx: Sender<(KeyId, SubKeyId, Option, KeyVersion, KeySource)>) { if self.hashmap_publisher.sub_raw(key, uuid, tx) { - let publisher = self.hashmap_publisher.clone(); - self.hashmap_local.write().subscribe( - key, - ex, - Box::new(move |key, sub_key, value, version, source| { - publisher.publish(key, (key, sub_key, value, version, source)); - }), - ); + self.actions.write().push_back(crate::KeyValueSdkEvent::SubH(SDK_SUB_UUID, key, ex)); + self.awaker.read().as_ref().unwrap().notify(); } } pub fn hunsubscribe_raw(&self, key: u64, uuid: u64) { if self.hashmap_publisher.unsub_raw(key, uuid) { - self.hashmap_local.write().unsubscribe(key); + self.actions.write().push_back(crate::KeyValueSdkEvent::UnsubH(SDK_SUB_UUID, key)); + self.awaker.read().as_ref().unwrap().notify(); + } + } +} + +impl ExternalControl for KeyValueSdk { + fn set_awaker(&self, awaker: Arc) { + self.awaker.write().replace(awaker); + } + + fn on_event(&self, event: KeyValueSdkEvent) { + match event { + KeyValueSdkEvent::OnKeyChanged(_uuid, key, value, version, source) => { + self.simple_publisher.publish(key, (key, value, version, source)); + } + KeyValueSdkEvent::OnKeyHChanged(_uuid, key, sub_key, value, version, source) => { + self.hashmap_publisher.publish(key, (key, sub_key, value, version, source)); + } + KeyValueSdkEvent::OnGet(req_id, key, res) => { + if let Some(tx) = self.simple_get_queue.lock().remove(&req_id) { + if let Err(e) = tx.try_send(res.map_err(|e| match e { + KeyValueSdkEventError::NetworkError => SimpleKeyValueGetError::NetworkError, + KeyValueSdkEventError::Timeout => SimpleKeyValueGetError::Timeout, + KeyValueSdkEventError::InternalError => SimpleKeyValueGetError::InternalError, + })) { + log::error!("[KeyValueSdk] send get result request {req_id} for key {key} error: {:?}", e); + } + } + } + KeyValueSdkEvent::OnGetH(req_id, key, res) => { + if let Some(tx) = self.hashmap_get_queue.lock().remove(&req_id) { + if let Err(e) = tx.try_send(res.map_err(|e| match e { + KeyValueSdkEventError::NetworkError => HashmapKeyValueGetError::NetworkError, + KeyValueSdkEventError::Timeout => HashmapKeyValueGetError::Timeout, + KeyValueSdkEventError::InternalError => HashmapKeyValueGetError::InternalError, + })) { + log::error!("[KeyValueSdk] send get result request {req_id} for key {key} error: {:?}", e); + } + } + } + _ => {} } } + + fn pop_action(&self) -> Option { + self.actions.write().pop_front() + } +} + +#[cfg(test)] +mod test { + use std::{sync::Arc, time::Duration}; + + use utils::awaker::{Awaker, MockAwaker}; + + use crate::{behavior::sdk::SDK_SUB_UUID, ExternalControl, KeyValueSdk, KeyValueSdkEvent}; + + #[async_std::test] + async fn sdk_get_should_fire_awaker_and_action() { + let sdk = KeyValueSdk::new(); + let awaker = Arc::new(MockAwaker::default()); + + sdk.set_awaker(awaker.clone()); + + async_std::future::timeout(Duration::from_millis(100), sdk.get(1000, 100)).await.expect_err("Should timeout"); + assert_eq!(awaker.pop_awake_count(), 1); + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Get(1, 1000, 100))); + + async_std::future::timeout(Duration::from_millis(100), sdk.hget(1000, 100)).await.expect_err("Should timeout"); + assert_eq!(awaker.pop_awake_count(), 1); + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::GetH(2, 1000, 100))); + } + + #[test] + fn sdk_set_should_fire_awaker_and_action() { + let sdk = KeyValueSdk::new(); + let awaker = Arc::new(MockAwaker::default()); + + sdk.set_awaker(awaker.clone()); + + sdk.set(1000, vec![1], Some(20000)); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Set(1000, vec![1], Some(20000)))); + + sdk.del(1000); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Del(1000))) + } + + #[test] + fn sdk_sub_should_fire_awaker_and_action() { + let sdk = KeyValueSdk::new(); + let awaker = Arc::new(MockAwaker::default()); + + sdk.set_awaker(awaker.clone()); + + let handler = sdk.subscribe(1000, Some(20000)); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Sub(SDK_SUB_UUID, 1000, Some(20000)))); + + drop(handler); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::Unsub(SDK_SUB_UUID, 1000))) + } + + #[test] + fn sdk_hset_should_fire_awaker_and_action() { + let sdk = KeyValueSdk::new(); + let awaker = Arc::new(MockAwaker::default()); + + sdk.set_awaker(awaker.clone()); + + sdk.hset(1000, 11, vec![1], Some(20000)); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::SetH(1000, 11, vec![1], Some(20000)))); + + sdk.hdel(1000, 11); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::DelH(1000, 11))) + } + + #[test] + fn sdk_hsub_should_fire_awaker_and_action() { + let sdk = KeyValueSdk::new(); + let awaker = Arc::new(MockAwaker::default()); + + sdk.set_awaker(awaker.clone()); + + let handler = sdk.hsubscribe(1000, Some(20000)); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::SubH(SDK_SUB_UUID, 1000, Some(20000)))); + + drop(handler); + assert_eq!(awaker.pop_awake_count(), 1); + + assert_eq!(sdk.pop_action(), Some(KeyValueSdkEvent::UnsubH(SDK_SUB_UUID, 1000))) + } } diff --git a/packages/services/key_value/src/behavior/simple_local.rs b/packages/services/key_value/src/behavior/simple_local.rs index 5c2ae406..b0556380 100644 --- a/packages/services/key_value/src/behavior/simple_local.rs +++ b/packages/services/key_value/src/behavior/simple_local.rs @@ -1,5 +1,5 @@ use crate::{ - msg::{SimpleLocalEvent, SimpleRemoteEvent}, + msg::{KeyValueSdkEventError, SimpleLocalEvent, SimpleRemoteEvent}, KeyId, KeySource, KeyVersion, ReqId, ValueType, }; use bluesea_identity::NodeId; @@ -17,12 +17,8 @@ use bluesea_router::RouteRule; /// Same with subscribe/unsubscribe use std::{ collections::{HashMap, VecDeque}, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, }; -use utils::awaker::Awaker; struct KeySlotData { value: Option>, @@ -37,22 +33,29 @@ struct KeySlotSubscribe { last_sync: u64, sub: bool, acked: bool, - handler: Box>, KeyVersion, KeySource) + Send + Sync>, + handlers: HashMap<(u64, u8), ()>, } #[derive(Debug, PartialEq, Eq)] pub enum SimpleKeyValueGetError { NetworkError, Timeout, + InternalError, } struct KeySlotGetCallback { + key: KeyId, timeout_after_ts: u64, - callback: Box, SimpleKeyValueGetError>) + Send + Sync>, + uuid: u64, + service_id: u8, } #[derive(Debug, Eq, PartialEq)] -pub struct LocalStorageAction(pub(crate) SimpleRemoteEvent, pub(crate) RouteRule); +pub enum LocalStorageAction { + SendNet(SimpleRemoteEvent, RouteRule), + LocalOnChanged(u8, u64, KeyId, Option, KeyVersion, KeySource), + LocalOnGet(u8, u64, KeyId, Result, KeyValueSdkEventError>), +} pub struct SimpleLocalStorage { req_id_seed: AtomicU64, @@ -62,12 +65,11 @@ pub struct SimpleLocalStorage { subscribe: HashMap, output_events: VecDeque, get_queue: HashMap, - awake_notify: Arc, } impl SimpleLocalStorage { /// create new local storage with provided timer and sync_each_ms. Sync_each_ms is used for sync data to remote storage incase of acked - pub fn new(awake_notify: Arc, sync_each_ms: u64) -> Self { + pub fn new(sync_each_ms: u64) -> Self { Self { req_id_seed: AtomicU64::new(0), version_seed: 0, @@ -76,14 +78,9 @@ impl SimpleLocalStorage { subscribe: HashMap::new(), output_events: VecDeque::new(), get_queue: HashMap::new(), - awake_notify, } } - pub fn change_awake_notify(&mut self, awake_notify: Arc) { - self.awake_notify = awake_notify; - } - fn gen_req_id(&self) -> u64 { return self.req_id_seed.fetch_add(1, Ordering::SeqCst); } @@ -102,14 +99,14 @@ impl SimpleLocalStorage { let req_id = self.gen_req_id(); if let Some(value) = &slot.value { log::debug!("[SimpleLocal] resend set key {} with version {}", key, slot.version); - self.output_events.push_back(LocalStorageAction( + self.output_events.push_back(LocalStorageAction::SendNet( SimpleRemoteEvent::Set(req_id, *key, value.clone(), slot.version, slot.ex.clone()), RouteRule::ToKey(*key as u32), )); } else { log::debug!("[SimpleLocal] resend del key {} with version {}", key, slot.version); self.output_events - .push_back(LocalStorageAction(SimpleRemoteEvent::Del(req_id, *key, slot.version), RouteRule::ToKey(*key as u32))); + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Del(req_id, *key, slot.version), RouteRule::ToKey(*key as u32))); } } } @@ -121,10 +118,11 @@ impl SimpleLocalStorage { if slot.sub { log::debug!("[SimpleLocal] resend sub key {}", key); self.output_events - .push_back(LocalStorageAction(SimpleRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); } else { log::debug!("[SimpleLocal] resend unsub key {}", key); - self.output_events.push_back(LocalStorageAction(SimpleRemoteEvent::Unsub(req_id, *key), RouteRule::ToKey(*key as u32))); + self.output_events + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Unsub(req_id, *key), RouteRule::ToKey(*key as u32))); } } } @@ -136,7 +134,7 @@ impl SimpleLocalStorage { let req_id = self.gen_req_id(); if let Some(value) = &slot.value { log::debug!("[SimpleLocal] sync set key {} with version {}", key, slot.version); - self.output_events.push_back(LocalStorageAction( + self.output_events.push_back(LocalStorageAction::SendNet( SimpleRemoteEvent::Set(req_id, *key, value.clone(), slot.version, slot.ex.clone()), RouteRule::ToKey(*key as u32), )); @@ -163,7 +161,7 @@ impl SimpleLocalStorage { if slot.sub { log::debug!("[SimpleLocal] sync sub key {}", key); self.output_events - .push_back(LocalStorageAction(SimpleRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32))); } else { log::debug!("[SimpleLocal] remove sub key {} after acked", key); // Just remove if acked and unsub @@ -191,7 +189,8 @@ impl SimpleLocalStorage { for req_id in timeout_gets { if let Some(slot) = self.get_queue.remove(&req_id) { log::debug!("[SimpleLocal] get key {} timeout", req_id); - (slot.callback)(Err(SimpleKeyValueGetError::Timeout)); + self.output_events + .push_back(LocalStorageAction::LocalOnGet(slot.service_id, slot.uuid, slot.key, Err(KeyValueSdkEventError::Timeout))); } } @@ -229,7 +228,7 @@ impl SimpleLocalStorage { } SimpleLocalEvent::GetAck(req_id, _key, value) => { if let Some(slot) = self.get_queue.remove(&req_id) { - (slot.callback)(Ok(value)) + self.output_events.push_back(LocalStorageAction::LocalOnGet(slot.service_id, slot.uuid, slot.key, Ok(value))); } else { } } @@ -263,18 +262,25 @@ impl SimpleLocalStorage { } } SimpleLocalEvent::OnKeySet(req_id, key, value, version, source) => { - self.output_events.push_back(LocalStorageAction(SimpleRemoteEvent::OnKeySetAck(req_id), RouteRule::ToNode(from))); + self.output_events + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::OnKeySetAck(req_id), RouteRule::ToNode(from))); if let Some(slot) = self.subscribe.get_mut(&key) { if slot.sub { - (slot.handler)(key, Some(value), version, source); + for ((uuid, service_id), _) in slot.handlers.iter() { + self.output_events + .push_back(LocalStorageAction::LocalOnChanged(*service_id, *uuid, key, Some(value.clone()), version, source)); + } } } } SimpleLocalEvent::OnKeyDel(req_id, key, version, source) => { - self.output_events.push_back(LocalStorageAction(SimpleRemoteEvent::OnKeyDelAck(req_id), RouteRule::ToNode(from))); + self.output_events + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::OnKeyDelAck(req_id), RouteRule::ToNode(from))); if let Some(slot) = self.subscribe.get_mut(&key) { if slot.sub { - (slot.handler)(key, None, version, source); + for ((uuid, service_id), _) in slot.handlers.iter() { + self.output_events.push_back(LocalStorageAction::LocalOnChanged(*service_id, *uuid, key, None, version, source)); + } } } } @@ -301,22 +307,23 @@ impl SimpleLocalStorage { ); self.output_events - .push_back(LocalStorageAction(SimpleRemoteEvent::Set(req_id, key, value, version, ex), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Set(req_id, key, value, version, ex), RouteRule::ToKey(key as u32))); } - pub fn get(&mut self, now_ms: u64, key: KeyId, callback: Box, SimpleKeyValueGetError>) + Send + Sync>, timeout_ms: u64) { + pub fn get(&mut self, now_ms: u64, key: KeyId, uuid: u64, service_id: u8, timeout_ms: u64) { let req_id = self.gen_req_id(); log::debug!("[SimpleLocal] get key {} with req_id {}", key, req_id); self.get_queue.insert( req_id, KeySlotGetCallback { + key, timeout_after_ts: now_ms + timeout_ms, - callback, + uuid, + service_id, }, ); - self.output_events.push_back(LocalStorageAction(SimpleRemoteEvent::Get(req_id, key), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + self.output_events + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Get(req_id, key), RouteRule::ToKey(key as u32))); } pub fn del(&mut self, key: KeyId) { @@ -328,14 +335,14 @@ impl SimpleLocalStorage { slot.acked = false; self.output_events - .push_back(LocalStorageAction(SimpleRemoteEvent::Del(req_id, key, slot.version), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Del(req_id, key, slot.version), RouteRule::ToKey(key as u32))); } } - pub fn subscribe(&mut self, key: KeyId, ex: Option, handler: Box>, KeyVersion, KeySource) + Send + Sync>) { - if self.subscribe.contains_key(&key) { + pub fn subscribe(&mut self, key: KeyId, ex: Option, uuid: u64, service_id: u8) { + if let Some(slot) = self.subscribe.get_mut(&key) { log::warn!("[SimpleLocal] subscribe key {} but already subscribed", key); + slot.handlers.insert((uuid, service_id), ()); return; } @@ -348,24 +355,27 @@ impl SimpleLocalStorage { last_sync: 0, sub: true, acked: false, - handler, + handlers: HashMap::from([((uuid, service_id), ())]), }, ); - self.output_events.push_back(LocalStorageAction(SimpleRemoteEvent::Sub(req_id, key, ex), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + self.output_events + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(req_id, key, ex), RouteRule::ToKey(key as u32))); } - pub fn unsubscribe(&mut self, key: KeyId) { + pub fn unsubscribe(&mut self, key: KeyId, uuid: u64, service_id: u8) { let req_id = self.gen_req_id(); if let Some(slot) = self.subscribe.get_mut(&key) { - slot.sub = false; - slot.last_sync = 0; - slot.acked = false; + slot.handlers.remove(&(uuid, service_id)); + if slot.handlers.is_empty() { + slot.sub = false; + slot.last_sync = 0; + slot.acked = false; - log::debug!("[SimpleLocal] unsubscribe key {} with req_id {}", key, req_id); + log::debug!("[SimpleLocal] unsubscribe key {} with req_id {}", key, req_id); - self.output_events.push_back(LocalStorageAction(SimpleRemoteEvent::Unsub(req_id, key), RouteRule::ToKey(key as u32))); - self.awake_notify.notify(); + self.output_events + .push_back(LocalStorageAction::SendNet(SimpleRemoteEvent::Unsub(req_id, key), RouteRule::ToKey(key as u32))); + } } else { log::warn!("[SimpleLocal] unsubscribe key {} but not subscribed", key); } @@ -374,28 +384,25 @@ impl SimpleLocalStorage { #[cfg(test)] mod tests { - use std::sync::Arc; - use bluesea_router::RouteRule; - use parking_lot::Mutex; - use utils::awaker::{Awaker, MockAwaker}; use crate::{ behavior::simple_local::LocalStorageAction, - msg::{SimpleLocalEvent, SimpleRemoteEvent}, + msg::{KeyValueSdkEventError, SimpleLocalEvent, SimpleRemoteEvent}, }; use super::SimpleLocalStorage; #[test] fn set_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = SimpleLocalStorage::new(10000); storage.set(0, 1, vec![1], None); - assert_eq!(awake_notify.pop_awake_count(), 1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Set(0, 1, vec![1], 0, None), RouteRule::ToKey(1)))); + assert_eq!( + storage.pop_action(), + Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Set(0, 1, vec![1], 0, None), RouteRule::ToKey(1))) + ); assert_eq!(storage.pop_action(), None); storage.on_event(2, SimpleLocalEvent::SetAck(0, 1, 0, true)); @@ -409,7 +416,7 @@ mod tests { // fn should_renegerate_set_event_if_ack_failed() { // let timer = Arc::new(utils::MockTimer::default()); // let awake_notify = Arc::new(MockAwaker::default()); - // let mut storage = LocalStorage::new(awake_notify, 10000); + // let mut storage = LocalStorage::new(10000); // storage.set(1, vec![1], None); // assert_eq!(storage.pop_action(), Some(LocalStorageAction(RemoteEvent::Set(0, 1, vec![1], 0, None), RouteRule::ToKey(1)))); @@ -429,8 +436,7 @@ mod tests { #[test] fn set_should_generate_new_version() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); + let mut storage = SimpleLocalStorage::new(10000); storage.set(0, 1, vec![1], None); assert!(storage.pop_action().is_some()); @@ -439,7 +445,7 @@ mod tests { storage.set(1000, 1, vec![2], None); assert_eq!( storage.pop_action(), - Some(LocalStorageAction(SimpleRemoteEvent::Set(1, 1, vec![2], 65536001, None), RouteRule::ToKey(1))) + Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Set(1, 1, vec![2], 65536001, None), RouteRule::ToKey(1))) ); assert_eq!(storage.pop_action(), None); @@ -452,8 +458,7 @@ mod tests { #[test] fn set_should_retry_after_tick_and_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); + let mut storage = SimpleLocalStorage::new(10000); storage.set(0, 1, vec![1], None); assert!(storage.pop_action().is_some()); @@ -461,14 +466,16 @@ mod tests { //because dont received ack, should resend event storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Set(1, 1, vec![1], 0, None), RouteRule::ToKey(1)))); + assert_eq!( + storage.pop_action(), + Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Set(1, 1, vec![1], 0, None), RouteRule::ToKey(1))) + ); assert_eq!(storage.pop_action(), None); } #[test] fn set_acked_should_resend_each_sync_each_ms() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); + let mut storage = SimpleLocalStorage::new(10000); storage.set(0, 1, vec![1], None); assert!(storage.pop_action().is_some()); @@ -482,13 +489,15 @@ mod tests { //should resend if timer greater than sync_each_ms storage.tick(10001); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Set(1, 1, vec![1], 0, None), RouteRule::ToKey(1)))); + assert_eq!( + storage.pop_action(), + Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Set(1, 1, vec![1], 0, None), RouteRule::ToKey(1))) + ); } #[test] fn del_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = SimpleLocalStorage::new(10000); storage.set(0, 1, vec![1], None); assert!(storage.pop_action().is_some()); @@ -496,8 +505,7 @@ mod tests { storage.on_event(2, SimpleLocalEvent::SetAck(0, 1, 0, true)); storage.del(1); - assert_eq!(awake_notify.pop_awake_count(), 2); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Del(1, 1, 0), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Del(1, 1, 0), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after received ack should not resend event @@ -508,8 +516,7 @@ mod tests { #[test] fn del_should_mark_after_ack_older() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = SimpleLocalStorage::new(10000); storage.set(0, 1, vec![1], None); assert!(storage.pop_action().is_some()); @@ -522,7 +529,7 @@ mod tests { storage.on_event(2, SimpleLocalEvent::SetAck(0, 1, 0, true)); storage.del(1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Del(2, 1, 65536001), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Del(2, 1, 65536001), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after received ack should not resend event @@ -533,8 +540,7 @@ mod tests { #[test] fn del_should_retry_after_tick_and_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = SimpleLocalStorage::new(10000); storage.set(0, 1, vec![1], None); assert!(storage.pop_action().is_some()); @@ -542,21 +548,19 @@ mod tests { storage.on_event(2, SimpleLocalEvent::SetAck(0, 1, 0, true)); storage.del(1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Del(1, 1, 0), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Del(1, 1, 0), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Del(2, 1, 0), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Del(2, 1, 0), RouteRule::ToKey(1)))); } #[test] fn sub_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = SimpleLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _| {})); - assert_eq!(awake_notify.pop_awake_count(), 1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + storage.subscribe(1, None, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.on_event(2, SimpleLocalEvent::SubAck(0, 1)); @@ -567,19 +571,9 @@ mod tests { #[test] fn sub_event_test() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); - let received_events = Arc::new(Mutex::new(Vec::new())); - - let received_events_clone = received_events.clone(); - storage.subscribe( - 1, - None, - Box::new(move |key, value, version, source| { - received_events_clone.lock().push((key, value, version, source)); - }), - ); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + let mut storage = SimpleLocalStorage::new(10000); + storage.subscribe(1, None, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.on_event(2, SimpleLocalEvent::SubAck(0, 1)); @@ -589,31 +583,34 @@ mod tests { // fake incoming event storage.on_event(2, SimpleLocalEvent::OnKeySet(0, 1, vec![1], 0, 1000)); - storage.on_event(2, SimpleLocalEvent::OnKeyDel(0, 1, 0, 1000)); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::OnKeySetAck(0), RouteRule::ToNode(2)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnChanged(10, 11111, 1, Some(vec![1]), 0, 1000))); + + storage.on_event(2, SimpleLocalEvent::OnKeyDel(1, 1, 0, 1000)); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::OnKeyDelAck(1), RouteRule::ToNode(2)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnChanged(10, 11111, 1, None, 0, 1000))); - assert_eq!(*received_events.lock(), vec![(1, Some(vec![1]), 0, 1000), (1, None, 0, 1000),]); + assert_eq!(storage.pop_action(), None); } #[test] fn sub_should_retry_after_tick_and_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); + let mut storage = SimpleLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _| {})); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + storage.subscribe(1, None, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); } #[test] fn sub_acked_should_resend_each_sync_each_ms() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); + let mut storage = SimpleLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _| {})); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); + storage.subscribe(1, None, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); storage.on_event(2, SimpleLocalEvent::SubAck(0, 1)); @@ -623,24 +620,22 @@ mod tests { //should resend if timer greater than sync_each_ms storage.tick(10001); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Sub(1, 1, None), RouteRule::ToKey(1)))); } #[test] fn unsub_should_mark_after_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify.clone(), 10000); + let mut storage = SimpleLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _| {})); + storage.subscribe(1, None, 11111, 10); assert!(storage.pop_action().is_some()); assert!(storage.pop_action().is_none()); storage.on_event(2, SimpleLocalEvent::SubAck(0, 1)); //sending unsub - storage.unsubscribe(1); - assert_eq!(awake_notify.pop_awake_count(), 2); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); + storage.unsubscribe(1, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after received ack should not resend event @@ -651,70 +646,52 @@ mod tests { #[test] fn unsub_should_retry_after_tick_if_not_received_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); + let mut storage = SimpleLocalStorage::new(10000); - storage.subscribe(1, None, Box::new(|_, _, _, _| {})); + storage.subscribe(1, None, 11111, 10); assert!(storage.pop_action().is_some()); assert!(storage.pop_action().is_none()); storage.on_event(2, SimpleLocalEvent::SubAck(0, 1)); //sending unsub - storage.unsubscribe(1); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); + storage.unsubscribe(1, 11111, 10); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //if not received ack should resend event each tick storage.tick(0); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Unsub(2, 1), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Unsub(2, 1), RouteRule::ToKey(1)))); } #[test] fn get_should_callback_correct_value() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); - - let got_value = Arc::new(Mutex::new(None)); - let got_value_clone = got_value.clone(); - storage.get( - 0, - 1, - Box::new(move |result| { - *got_value_clone.lock() = Some(result); - }), - 1000, - ); + let mut storage = SimpleLocalStorage::new(10000); + storage.get(0, 1, 11111, 10, 1000); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //fake received result storage.on_event(2, SimpleLocalEvent::GetAck(0, 1, Some((vec![1], 0, 1000)))); - assert_eq!(*got_value.lock(), Some(Ok(Some((vec![1], 0, 1000))))); + + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnGet(10, 11111, 1, Ok(Some((vec![1], 0, 1000)))))); + assert_eq!(storage.pop_action(), None); } #[test] fn get_should_timeout_after_no_ack() { - let awake_notify = Arc::new(MockAwaker::default()); - let mut storage = SimpleLocalStorage::new(awake_notify, 10000); - - let got_value = Arc::new(Mutex::new(None)); - let got_value_clone = got_value.clone(); - storage.get( - 0, - 1, - Box::new(move |result| { - *got_value_clone.lock() = Some(result); - }), - 1000, - ); + let mut storage = SimpleLocalStorage::new(10000); + + storage.get(0, 1, 11111, 10, 1000); - assert_eq!(storage.pop_action(), Some(LocalStorageAction(SimpleRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); + assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(SimpleRemoteEvent::Get(0, 1), RouteRule::ToKey(1)))); assert_eq!(storage.pop_action(), None); //after timeout should callback error storage.tick(1001); - assert_eq!(*got_value.lock(), Some(Err(super::SimpleKeyValueGetError::Timeout))); + + assert_eq!(storage.pop_action(), Some(LocalStorageAction::LocalOnGet(10, 11111, 1, Err(KeyValueSdkEventError::Timeout)))); + assert_eq!(storage.pop_action(), None); } } diff --git a/packages/services/key_value/src/lib.rs b/packages/services/key_value/src/lib.rs index 8868aeaf..f8149b1f 100644 --- a/packages/services/key_value/src/lib.rs +++ b/packages/services/key_value/src/lib.rs @@ -11,10 +11,19 @@ mod handler; mod msg; mod storage; +use std::sync::Arc; + pub use behavior::KeyValueBehavior; pub use behavior::KeyValueSdk; use bluesea_identity::NodeId; pub use msg::{KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueMsg, KeyValueSdkEvent}; +use utils::awaker::Awaker; + +pub trait ExternalControl: Send + Sync { + fn set_awaker(&self, awaker: Arc); + fn on_event(&self, event: KeyValueSdkEvent); + fn pop_action(&self) -> Option; +} #[cfg(test)] mod tests { diff --git a/packages/services/key_value/src/msg.rs b/packages/services/key_value/src/msg.rs index 5d13b06c..a0870a24 100644 --- a/packages/services/key_value/src/msg.rs +++ b/packages/services/key_value/src/msg.rs @@ -11,10 +11,29 @@ pub enum KeyValueBehaviorEvent { #[derive(Debug, PartialEq, Eq)] pub enum KeyValueHandlerEvent {} +#[derive(Debug, PartialEq, Eq)] +pub enum KeyValueSdkEventError { + NetworkError, + Timeout, + InternalError, +} + #[derive(Debug, PartialEq, Eq)] pub enum KeyValueSdkEvent { - Local(KeyValueSdkMsg), - FromNode(NodeId, KeyValueSdkMsg), + Get(u64, KeyId, u64), + GetH(u64, KeyId, u64), + Set(KeyId, ValueType, Option), + SetH(KeyId, SubKeyId, ValueType, Option), + Del(KeyId), + DelH(KeyId, SubKeyId), + Sub(u64, KeyId, Option), + SubH(u64, KeyId, Option), + Unsub(u64, KeyId), + UnsubH(u64, KeyId), + OnGet(u64, KeyId, Result, KeyValueSdkEventError>), + OnGetH(u64, KeyId, Result>, KeyValueSdkEventError>), + OnKeyChanged(u64, KeyId, Option, KeyVersion, KeySource), + OnKeyHChanged(u64, KeyId, SubKeyId, Option, KeyVersion, KeySource), } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -80,11 +99,3 @@ pub enum KeyValueMsg { HashmapRemote(HashmapRemoteEvent), HashmapLocal(HashmapLocalEvent), } - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum KeyValueSdkMsg { - Set(KeyId, ValueType), - Del(KeyId), - Sub(KeyId), - Unsub(KeyId), -} diff --git a/packages/services/manual_discovery/examples/manual_node.rs b/packages/services/manual_discovery/examples/discovery_manual_node.rs similarity index 99% rename from packages/services/manual_discovery/examples/manual_node.rs rename to packages/services/manual_discovery/examples/discovery_manual_node.rs index 2e961a5f..3079ff90 100644 --- a/packages/services/manual_discovery/examples/manual_node.rs +++ b/packages/services/manual_discovery/examples/discovery_manual_node.rs @@ -22,8 +22,7 @@ enum NodeHandleEvent { } #[derive(convert_enum::From, convert_enum::TryInto)] -enum NodeSdkEvent { -} +enum NodeSdkEvent {} #[derive(convert_enum::From, convert_enum::TryInto, Serialize, Deserialize)] enum NodeMsg { diff --git a/packages/services/pub_sub/examples/benchmark.rs b/packages/services/pub_sub/examples/benchmark.rs index 94a2c67c..42540282 100644 --- a/packages/services/pub_sub/examples/benchmark.rs +++ b/packages/services/pub_sub/examples/benchmark.rs @@ -8,7 +8,7 @@ use layers_spread_router_sync::{LayersSpreadRouterSyncBehavior, LayersSpreadRout use manual_discovery::{ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent, ManualMsg}; use network::convert_enum; use network::plane::{NetworkPlane, NetworkPlaneConfig}; -use pub_sub::{ChannelSourceHashmapReal, PubsubRemoteEvent, PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent}; +use pub_sub::{PubsubRemoteEvent, PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent}; use utils::{SystemTimer, Timer}; #[derive(convert_enum::From, convert_enum::TryInto)] @@ -55,8 +55,8 @@ async fn run_node(node_id: NodeId, neighbours: Vec) -> (PubsubSdk, Nod }); let router_sync_behaviour = LayersSpreadRouterSyncBehavior::new(router.clone()); - let (kv_behaviour, kv_sdk) = KeyValueBehavior::new(node_id, timer.clone(), 3000); - let (pubsub_behavior, pubsub_sdk) = PubsubServiceBehaviour::new(node_id, Box::new(ChannelSourceHashmapReal::new(kv_sdk, node_id)), timer.clone()); + let kv_behaviour = KeyValueBehavior::new(node_id, 3000, None); + let (pubsub_behavior, pubsub_sdk) = PubsubServiceBehaviour::new(node_id, timer.clone()); let mut plane = NetworkPlane::::new(NetworkPlaneConfig { node_id, diff --git a/packages/services/pub_sub/examples/benchmark_local.rs b/packages/services/pub_sub/examples/benchmark_local.rs index 8f2e5704..f9ac8a38 100644 --- a/packages/services/pub_sub/examples/benchmark_local.rs +++ b/packages/services/pub_sub/examples/benchmark_local.rs @@ -8,7 +8,7 @@ use layers_spread_router_sync::{LayersSpreadRouterSyncBehavior, LayersSpreadRout use manual_discovery::{ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent, ManualMsg}; use network::convert_enum; use network::plane::{NetworkPlane, NetworkPlaneConfig}; -use pub_sub::{ChannelSourceHashmapReal, PubsubRemoteEvent, PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent}; +use pub_sub::{PubsubRemoteEvent, PubsubSdk, PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent}; use utils::{SystemTimer, Timer}; #[derive(convert_enum::From, convert_enum::TryInto)] @@ -55,8 +55,8 @@ async fn run_node(node_id: NodeId, neighbours: Vec) -> (PubsubSdk, Nod }); let router_sync_behaviour = LayersSpreadRouterSyncBehavior::new(router.clone()); - let (kv_behaviour, kv_sdk) = KeyValueBehavior::new(node_id, timer.clone(), 3000); - let (pubsub_behavior, pubsub_sdk) = PubsubServiceBehaviour::new(node_id, Box::new(ChannelSourceHashmapReal::new(kv_sdk, node_id)), timer.clone()); + let kv_behaviour = KeyValueBehavior::new(node_id, 3000, None); + let (pubsub_behavior, pubsub_sdk) = PubsubServiceBehaviour::new(node_id, timer.clone()); let mut plane = NetworkPlane::::new(NetworkPlaneConfig { node_id, diff --git a/packages/services/pub_sub/src/behaviour.rs b/packages/services/pub_sub/src/behaviour.rs index f6b06161..48b7ccd1 100644 --- a/packages/services/pub_sub/src/behaviour.rs +++ b/packages/services/pub_sub/src/behaviour.rs @@ -1,14 +1,13 @@ -use std::{collections::VecDeque, sync::Arc}; +use std::{collections::VecDeque, marker::PhantomData, sync::Arc}; -use async_std::{channel::Sender, task::JoinHandle}; use bluesea_identity::{ConnId, NodeId}; use bluesea_router::RouteRule; +use key_value::{KeyValueSdkEvent, KEY_VALUE_SERVICE_ID}; use network::{ behaviour::{BehaviorContext, ConnectionHandler, NetworkBehavior, NetworkBehaviorAction}, msg::{MsgHeader, TransportMsg}, transport::{ConnectionRejectReason, ConnectionSender, OutgoingConnectionError, TransportOutgoingLocalUuid}, }; -use parking_lot::Mutex; use utils::Timer; use crate::{ @@ -18,31 +17,27 @@ use crate::{ PubsubSdk, PUBSUB_SERVICE_ID, }; -use self::channel_source::{ChannelSourceHashmap, SourceMapEvent}; +const KEY_VALUE_TIMEOUT_MS: u64 = 30000; +const KEY_VALUE_SUB_UUID: u64 = 0; -pub(crate) mod channel_source; - -pub struct PubsubServiceBehaviour { +pub struct PubsubServiceBehaviour { + _tmp: PhantomData, node_id: NodeId, - channel_source_map_tx: Option>, - channel_source_map: Box, relay: PubsubRelay, - kv_rx_task: Option>, - kv_rx_queue: Arc>>, outputs: VecDeque>, } -impl PubsubServiceBehaviour { - pub fn new(node_id: NodeId, channel_source_map: Box, timer: Arc) -> (Self, PubsubSdk) { +impl PubsubServiceBehaviour +where + SE: From + TryInto, +{ + pub fn new(node_id: NodeId, timer: Arc) -> (Self, PubsubSdk) { let (relay, sdk) = PubsubRelay::new(node_id, timer); ( Self { + _tmp: Default::default(), node_id, - channel_source_map_tx: None, - channel_source_map, relay, - kv_rx_task: None, - kv_rx_queue: Arc::new(Mutex::new(VecDeque::new())), outputs: VecDeque::new(), }, sdk, @@ -76,11 +71,17 @@ impl PubsubServiceBehaviour { match action { LocalRelayAction::Publish(channel) => { log::info!("[PubSubServiceBehaviour {}] on added local channel {} => set Hashmap field", self.node_id, channel); - self.channel_source_map.add(channel as u64); + self.outputs.push_back(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::SetH(channel as u64, self.node_id as u64, vec![], Some(KEY_VALUE_TIMEOUT_MS)).into(), + )); } LocalRelayAction::Unpublish(channel) => { log::info!("[PubSubServiceBehaviour {}] on removed local channel {} => del Hashmap field", self.node_id, channel); - self.channel_source_map.remove(channel as u64); + self.outputs.push_back(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::DelH(channel as u64, self.node_id as u64).into(), + )); } } } @@ -89,22 +90,28 @@ impl PubsubServiceBehaviour { match action { SourceBindingAction::Subscribe(channel) => { log::info!("[PubSubServiceBehaviour {}] will sub hashmap {}", self.node_id, channel); - let tx = self.channel_source_map_tx.as_ref().expect("Should has channel_source_map_tx").clone(); - self.channel_source_map.subscribe(channel as u64, tx); + self.outputs.push_back(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::SubH(KEY_VALUE_SUB_UUID, channel as u64, Some(KEY_VALUE_TIMEOUT_MS)).into(), + )); } SourceBindingAction::Unsubscribe(channel) => { log::info!("[PubSubServiceBehaviour {}] will unsub hashmap {}", self.node_id, channel); - self.channel_source_map.unsubscribe(channel as u64); + self.outputs.push_back(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::UnsubH(KEY_VALUE_SUB_UUID, channel as u64).into(), + )); } } } } } -impl NetworkBehavior for PubsubServiceBehaviour +impl NetworkBehavior for PubsubServiceBehaviour where BE: From + TryInto + Send + Sync + 'static, HE: From + TryInto + Send + Sync + 'static, + SE: From + TryInto, { fn service_id(&self) -> u8 { PUBSUB_SERVICE_ID @@ -112,24 +119,8 @@ where fn on_started(&mut self, ctx: &BehaviorContext, _now_ms: u64) { log::info!("[PubSubServiceBehaviour {}] on_started", self.node_id); + //TODO avoid using awaker in relay, refer sameway with key-value self.relay.set_awaker(ctx.awaker.clone()); - let (tx, rx) = async_std::channel::unbounded(); - self.channel_source_map_tx = Some(tx); - let node_id = self.node_id; - let ctx = ctx.clone(); - let queue = self.kv_rx_queue.clone(); - self.kv_rx_task = Some(async_std::task::spawn(async move { - while let Ok((key, _sub_key, value, _version, source)) = rx.recv().await { - if value.is_some() { - log::debug!("[PubSubServiceBehaviour {}] channel {} add source {}", node_id, key, source); - queue.lock().push_back(PubsubServiceBehaviourEvent::OnHashmapSet(key, source)) - } else { - log::debug!("[PubSubServiceBehaviour {}] channel {} remove source {}", node_id, key, source); - queue.lock().push_back(PubsubServiceBehaviourEvent::OnHashmapDel(key, source)) - } - ctx.awaker.notify(); - } - })); } fn on_tick(&mut self, ctx: &BehaviorContext, now_ms: u64, _interval_ms: u64) { @@ -138,32 +129,6 @@ where } fn on_awake(&mut self, ctx: &BehaviorContext, _now_ms: u64) { - loop { - let event = self.kv_rx_queue.lock().pop_front(); - if let Some(event) = event { - if let Ok(event) = event.try_into() { - match event { - PubsubServiceBehaviourEvent::Awake => { - self.pop_all_events(ctx); - } - PubsubServiceBehaviourEvent::OnHashmapSet(channel, source) => { - log::info!("[PubSubServiceBehaviour {}] on channel {} added source {}", self.node_id, channel, source); - self.relay.on_source_added(channel as u32, source); - self.pop_all_events(ctx); - } - PubsubServiceBehaviourEvent::OnHashmapDel(channel, source) => { - log::info!("[PubSubServiceBehaviour {}] on channel {} removed source {}", self.node_id, channel, source); - self.relay.on_source_removed(channel as u32, source); - self.pop_all_events(ctx); - } - } - } else { - log::warn!("[PubSubServiceBehaviour {}] invalid event", self.node_id); - } - } else { - break; - } - } self.pop_all_events(ctx); } @@ -225,16 +190,175 @@ where fn on_stopped(&mut self, _ctx: &BehaviorContext, _now_ms: u64) { log::info!("[PubSubServiceBehaviour {}] on_stopped", self.node_id); - if let Some(task) = self.kv_rx_task.take() { - async_std::task::spawn(async move { - task.cancel().await; - }); - } } - fn on_sdk_msg(&mut self, _ctx: &BehaviorContext, _now_ms: u64, _from_service: u8, _event: SE) {} + fn on_sdk_msg(&mut self, ctx: &BehaviorContext, _now_ms: u64, from_service: u8, event: SE) { + if from_service != KEY_VALUE_SERVICE_ID { + return; + } + + if let Ok(event) = event.try_into() { + match event { + KeyValueSdkEvent::OnKeyHChanged(_uuid, key, _sub_key, value, _version, source) => { + if value.is_some() { + self.relay.on_source_added(key as u32, source); + } else { + self.relay.on_source_removed(key as u32, source); + } + self.pop_all_events(ctx); + } + _ => {} + } + } + } fn pop_action(&mut self) -> Option> { self.outputs.pop_front() } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use bluesea_identity::ConnId; + use bluesea_router::RouteRule; + use key_value::{KeyValueSdkEvent, KEY_VALUE_SERVICE_ID}; + use network::{ + behaviour::{BehaviorContext, NetworkBehavior, NetworkBehaviorAction}, + msg::{MsgHeader, TransportMsg}, + }; + use utils::{awaker::MockAwaker, MockTimer, Timer}; + + use crate::{ + behaviour::{KEY_VALUE_SUB_UUID, KEY_VALUE_TIMEOUT_MS}, + handler::CONTROL_META_TYPE, + ChannelIdentify, PubsubRemoteEvent, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent, PUBSUB_SERVICE_ID, + }; + + type BE = PubsubServiceBehaviourEvent; + type HE = PubsubServiceHandlerEvent; + type SE = KeyValueSdkEvent; + + #[test] + fn publish_unpublish_should_set_del_key() { + let local_node_id = 1; + let channel = 1000; + let timer = Arc::new(MockTimer::default()); + let (mut behaviour, sdk) = super::PubsubServiceBehaviour::::new(local_node_id, timer.clone()); + + let ctx = BehaviorContext { + service_id: PUBSUB_SERVICE_ID, + node_id: local_node_id, + awaker: Arc::new(MockAwaker::default()), + }; + + behaviour.on_started(&ctx, 0); + + let publisher = sdk.create_publisher(channel); + assert_eq!(ctx.awaker.pop_awake_count(), 1); + + behaviour.on_awake(&ctx, timer.now_ms()); + assert_eq!( + behaviour.pop_action(), + Some(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::SetH(channel as u64, local_node_id as u64, vec![], Some(KEY_VALUE_TIMEOUT_MS)).into() + )) + ); + + drop(publisher); + + assert_eq!(ctx.awaker.pop_awake_count(), 1); + + behaviour.on_awake(&ctx, timer.now_ms()); + assert_eq!( + behaviour.pop_action(), + Some(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::DelH(channel as u64, local_node_id as u64).into() + )) + ); + } + + #[test] + fn sub_unsub_direct_should_send_net() { + let local_node_id = 1; + let source_node_id = 10; + let channel_uuid = 1000; + let channel = ChannelIdentify::new(channel_uuid, source_node_id); + let timer = Arc::new(MockTimer::default()); + let (mut behaviour, sdk) = super::PubsubServiceBehaviour::::new(local_node_id, timer.clone()); + + let ctx = BehaviorContext { + service_id: PUBSUB_SERVICE_ID, + node_id: local_node_id, + awaker: Arc::new(MockAwaker::default()), + }; + + behaviour.on_started(&ctx, 0); + + let consumer = sdk.create_consumer_single(channel, None); + assert_eq!(ctx.awaker.pop_awake_count(), 1); + + behaviour.on_awake(&ctx, timer.now_ms()); + let mut expected_header = MsgHeader::build_reliable(PUBSUB_SERVICE_ID, RouteRule::Direct, 0); + expected_header.meta = CONTROL_META_TYPE; + let expected_msg = TransportMsg::from_payload_bincode(expected_header, &PubsubRemoteEvent::Sub(channel)); + assert_eq!(behaviour.pop_action(), Some(NetworkBehaviorAction::ToNetNode(source_node_id, expected_msg))); + + // after handle ack should not resend + behaviour + .relay + .on_event(timer.now_ms(), source_node_id, ConnId::from_in(0, 0), PubsubRemoteEvent::SubAck(channel, true)); + + drop(consumer); + + behaviour.on_awake(&ctx, timer.now_ms()); + let mut expected_header = MsgHeader::build_reliable(PUBSUB_SERVICE_ID, RouteRule::Direct, 0); + expected_header.meta = CONTROL_META_TYPE; + let expected_msg = TransportMsg::from_payload_bincode(expected_header, &PubsubRemoteEvent::Unsub(channel)); + assert_eq!(behaviour.pop_action(), Some(NetworkBehaviorAction::ToNetConn(ConnId::from_in(0, 0), expected_msg))); + } + + #[test] + fn sub_unsub_should_sub_unsub_key() { + let local_node_id = 1; + let channel = 1000; + let timer = Arc::new(MockTimer::default()); + let (mut behaviour, sdk) = super::PubsubServiceBehaviour::::new(local_node_id, timer.clone()); + + let ctx = BehaviorContext { + service_id: PUBSUB_SERVICE_ID, + node_id: local_node_id, + awaker: Arc::new(MockAwaker::default()), + }; + + behaviour.on_started(&ctx, 0); + + let consumer = sdk.create_consumer(channel, None); + assert_eq!(ctx.awaker.pop_awake_count(), 1); + + behaviour.on_awake(&ctx, timer.now_ms()); + assert_eq!( + behaviour.pop_action(), + Some(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::SubH(KEY_VALUE_SUB_UUID, channel as u64, Some(KEY_VALUE_TIMEOUT_MS)).into() + )) + ); + + drop(consumer); + + assert_eq!(ctx.awaker.pop_awake_count(), 1); + + behaviour.on_awake(&ctx, timer.now_ms()); + assert_eq!( + behaviour.pop_action(), + Some(NetworkBehaviorAction::ToSdkService( + KEY_VALUE_SERVICE_ID, + KeyValueSdkEvent::UnsubH(KEY_VALUE_SUB_UUID, channel as u64).into() + )) + ); + } +} diff --git a/packages/services/pub_sub/src/behaviour/channel_source.rs b/packages/services/pub_sub/src/behaviour/channel_source.rs deleted file mode 100644 index 2cee071b..00000000 --- a/packages/services/pub_sub/src/behaviour/channel_source.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, -}; - -use async_std::channel::Sender; -use bluesea_identity::NodeId; -use key_value::{KeyId, KeySource, KeyValueSdk, KeyVersion, SubKeyId, ValueType}; -use parking_lot::Mutex; - -pub type SourceMapEvent = (KeyId, SubKeyId, Option, KeyVersion, KeySource); - -pub trait ChannelSourceHashmap: Send + Sync { - fn add(&self, key: u64); - fn remove(&self, key: u64); - fn subscribe(&self, key: u64, tx: Sender); - fn unsubscribe(&self, key: u64); -} - -#[derive(Debug, PartialEq, Eq)] -pub enum ChannelSourceHashmapMockOutput { - Add(u64), - Remove(u64), - Subscribe(u64), - Unsubscribe(u64), -} - -pub struct ChannelSourceHashmapMock { - events: Arc>>, - hashmap: Arc>>>, -} - -impl ChannelSourceHashmapMock { - pub fn new() -> (Self, Arc>>) { - let events = Arc::new(Mutex::new(VecDeque::new())); - ( - Self { - events: events.clone(), - hashmap: Default::default(), - }, - events, - ) - } -} - -impl ChannelSourceHashmap for ChannelSourceHashmapMock { - fn subscribe(&self, key: u64, tx: Sender) { - self.events.lock().push_back(ChannelSourceHashmapMockOutput::Subscribe(key)); - self.hashmap.lock().insert(key, tx); - } - - fn unsubscribe(&self, key: u64) { - self.events.lock().push_back(ChannelSourceHashmapMockOutput::Unsubscribe(key)); - self.hashmap.lock().remove(&key); - } - - fn add(&self, key: u64) { - self.events.lock().push_back(ChannelSourceHashmapMockOutput::Add(key)); - } - - fn remove(&self, key: u64) { - self.events.lock().push_back(ChannelSourceHashmapMockOutput::Remove(key)); - } -} - -const HSUB_UUID: u64 = 8989933434898989; - -pub struct ChannelSourceHashmapReal { - node_id: NodeId, - sdk: KeyValueSdk, -} - -impl ChannelSourceHashmapReal { - pub fn new(sdk: KeyValueSdk, node_id: NodeId) -> Self { - Self { node_id, sdk } - } -} - -impl ChannelSourceHashmap for ChannelSourceHashmapReal { - fn add(&self, key: u64) { - self.sdk.hset(key, self.node_id as u64, vec![], Some(30000)); - } - - fn remove(&self, key: u64) { - self.sdk.hdel(key, self.node_id as u64); - } - - fn subscribe(&self, key: u64, tx: Sender) { - self.sdk.hsubscribe_raw(key, HSUB_UUID, Some(30000), tx); - } - - fn unsubscribe(&self, key: u64) { - self.sdk.hunsubscribe_raw(key, HSUB_UUID); - } -} diff --git a/packages/services/pub_sub/src/lib.rs b/packages/services/pub_sub/src/lib.rs index 195456a6..df9f6f4a 100644 --- a/packages/services/pub_sub/src/lib.rs +++ b/packages/services/pub_sub/src/lib.rs @@ -8,7 +8,7 @@ mod msg; mod relay; mod sdk; -pub use behaviour::{channel_source::ChannelSourceHashmapMock, channel_source::ChannelSourceHashmapReal, PubsubServiceBehaviour}; +pub use behaviour::PubsubServiceBehaviour; pub use msg::{PubsubRemoteEvent, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent}; pub use relay::{feedback::Feedback, feedback::FeedbackType, feedback::NumberInfo, ChannelIdentify, ChannelUuid, LocalPubId, LocalSubId}; pub use sdk::{consumer::Consumer, consumer_raw::ConsumerRaw, consumer_single::ConsumerSingle, publisher::Publisher, publisher_raw::PublisherRaw, PubsubSdk}; @@ -19,7 +19,7 @@ mod tests { use async_std::task::JoinHandle; use bluesea_identity::{NodeAddr, NodeAddrBuilder, NodeId, Protocol}; use bytes::Bytes; - use key_value::{KeyValueBehavior, KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueMsg, KeyValueSdkEvent}; + use key_value::{KeyValueBehavior, KeyValueBehaviorEvent, KeyValueHandlerEvent, KeyValueMsg, KeyValueSdk, KeyValueSdkEvent}; use layers_spread_router::SharedRouter; use layers_spread_router_sync::{LayersSpreadRouterSyncBehavior, LayersSpreadRouterSyncBehaviorEvent, LayersSpreadRouterSyncHandlerEvent, LayersSpreadRouterSyncMsg}; use manual_discovery::{ManualBehavior, ManualBehaviorConf, ManualBehaviorEvent, ManualHandlerEvent, ManualMsg}; @@ -31,11 +31,8 @@ mod tests { use transport_vnet::VnetEarth; use utils::{option_handle::OptionUtils, SystemTimer}; + use crate::msg::{PubsubRemoteEvent, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent}; use crate::relay::feedback::{FeedbackType, NumberInfo}; - use crate::{ - msg::{PubsubRemoteEvent, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent}, - ChannelSourceHashmapReal, - }; use crate::{PubsubSdk, PubsubServiceBehaviour}; #[derive(convert_enum::From, convert_enum::TryInto)] @@ -64,7 +61,7 @@ mod tests { #[derive(convert_enum::From, convert_enum::TryInto)] enum ImplSdkEvent { - KeyValue(KeyValueSdkEvent) + KeyValue(KeyValueSdkEvent), } async fn run_node(vnet: Arc, node_id: NodeId, neighbours: Vec) -> (PubsubSdk, NodeAddr, JoinHandle<()>) { @@ -83,8 +80,9 @@ mod tests { }); let router_sync_behaviour = LayersSpreadRouterSyncBehavior::new(router.clone()); - let (kv_behaviour, kv_sdk) = KeyValueBehavior::new(node_id, timer.clone(), 3000); - let (pubsub_behavior, pubsub_sdk) = PubsubServiceBehaviour::new(node_id, Box::new(ChannelSourceHashmapReal::new(kv_sdk, node_id)), timer.clone()); + let kv_sdk = KeyValueSdk::new(); + let kv_behaviour = KeyValueBehavior::new(node_id, 3000, Some(Box::new(kv_sdk.clone()))); + let (pubsub_behavior, pubsub_sdk) = PubsubServiceBehaviour::new(node_id, timer.clone()); let mut plane = NetworkPlane::::new(NetworkPlaneConfig { node_id, diff --git a/packages/services/pub_sub/src/msg.rs b/packages/services/pub_sub/src/msg.rs index 814d570f..2862be5e 100644 --- a/packages/services/pub_sub/src/msg.rs +++ b/packages/services/pub_sub/src/msg.rs @@ -1,12 +1,10 @@ use crate::relay::ChannelIdentify; -use bluesea_identity::NodeId; use serde::{Deserialize, Serialize}; -pub enum PubsubServiceBehaviourEvent { - Awake, - OnHashmapSet(u64, NodeId), - OnHashmapDel(u64, NodeId), -} +#[derive(Debug, PartialEq, Eq)] +pub enum PubsubServiceBehaviourEvent {} + +#[derive(Debug, PartialEq, Eq)] pub enum PubsubServiceHandlerEvent {} #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] @@ -17,14 +15,5 @@ pub enum PubsubRemoteEvent { UnsubAck(ChannelIdentify, bool), //did it removed, incase of false, it means it already unsubscribed } -pub enum PubsubSdkEvent { - Local(PubsubSdkMsg), - FromNode(NodeId, PubsubSdkMsg), -} - -pub enum PubsubSdkMsg { - Sub(ChannelIdentify), - Unsub(ChannelIdentify), - SubAck(ChannelIdentify, bool), - UnsubAck(ChannelIdentify, bool), -} +#[derive(Debug, PartialEq, Eq)] +pub enum PubsubSdkEvent {}