From b0ce07c539961f3e1a4df637e78fc9f873dd3a76 Mon Sep 17 00:00:00 2001 From: Luong Minh Date: Fri, 3 Nov 2023 12:35:00 +0700 Subject: [PATCH] update testing for bus impl (#37) squash --- Cargo.toml | 3 +- packages/core/router/Cargo.toml | 4 + packages/core/router/src/lib.rs | 19 +- packages/network/Cargo.toml | 4 +- packages/network/src/plane.rs | 3 + packages/network/src/plane/bus.rs | 9 + packages/network/src/plane/bus_impl.rs | 247 +++++++++++++++++++++- packages/network/src/plane/single_conn.rs | 2 +- packages/network/src/transport.rs | 3 + 9 files changed, 280 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b515eb9e..887a864c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,4 +51,5 @@ rand = "0.8" parking_lot = "0.12" env_logger = "0.10" allocation-counter = { version = "*" } -clap = { version = "4.3.0", features = ["derive", "env"] } \ No newline at end of file +clap = { version = "4.3.0", features = ["derive", "env"] } +mockall = "0.11.4" \ No newline at end of file diff --git a/packages/core/router/Cargo.toml b/packages/core/router/Cargo.toml index a8384b23..8a8d0b33 100644 --- a/packages/core/router/Cargo.toml +++ b/packages/core/router/Cargo.toml @@ -3,7 +3,11 @@ name = "bluesea-router" version = "0.1.0" edition = "2021" +[features] +mock = ["mockall"] + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] bluesea-identity = { workspace = true } +mockall = { workspace = true, optional = true } diff --git a/packages/core/router/src/lib.rs b/packages/core/router/src/lib.rs index ab065ecf..3c68e92f 100644 --- a/packages/core/router/src/lib.rs +++ b/packages/core/router/src/lib.rs @@ -5,6 +5,9 @@ use bluesea_identity::{ConnId, NodeId}; pub use force_local::ForceLocalRouter; pub use force_node::ForceNodeRouter; +#[cfg(any(test, feature = "mock"))] +use mockall::automock; + /// ServiceMeta is using for determine which node will be routed, example node with lowest price or lowest latency, which for future use pub type ServiceMeta = u32; @@ -16,9 +19,13 @@ pub enum RouteRule { ToKey(NodeId), } +/// Determine the destination of an action/message pub enum RouteAction { + /// Reject the message Reject, + /// Will be processed locally Local, + /// Will be forward to the given node Next(ConnId, NodeId), } @@ -36,15 +43,17 @@ impl RouteAction { } } +#[cfg_attr(any(test, feature = "mock"), automock)] pub trait RouterTable: Send + Sync { - /// This is used for finding path for sending data out to node + /// Determine the next action for the given destination node fn path_to_node(&self, dest: NodeId) -> RouteAction; - /// This is used for finding path for sending data out to key + /// Determine the next action for the given key fn path_to_key(&self, key: NodeId) -> RouteAction; - /// This is used for finding path for sending data out to service + /// Determine the next action for the given service fn path_to_service(&self, service_id: u8) -> RouteAction; - /// This is used only for determine next action for incomming messages - fn action_for_incomming(&self, route: &RouteRule, service_id: u8) -> RouteAction { + /// Determine next action for incoming messages + /// given the route rule and service id + fn derive_action(&self, route: &RouteRule, service_id: u8) -> RouteAction { match route { RouteRule::Direct => RouteAction::Local, RouteRule::ToNode(dest) => self.path_to_node(*dest), diff --git a/packages/network/Cargo.toml b/packages/network/Cargo.toml index 06d692d5..6a54fbe3 100644 --- a/packages/network/Cargo.toml +++ b/packages/network/Cargo.toml @@ -21,4 +21,6 @@ bytes = "1.4.0" bincode = "1.3.3" [dev-dependencies] -env_logger = { workspace = true } \ No newline at end of file +env_logger = { workspace = true } +mockall = { workspace = true } +bluesea-router = { workspace = true, features = ["mock"]} \ No newline at end of file diff --git a/packages/network/src/plane.rs b/packages/network/src/plane.rs index d1be6529..eed70233 100644 --- a/packages/network/src/plane.rs +++ b/packages/network/src/plane.rs @@ -54,8 +54,11 @@ impl Awaker for HandlerAwa } } +#[derive(Debug, Eq, PartialEq)] pub enum NetworkPlaneInternalEvent { + /// Trigger on_awake() hook for the behavior of the given service. AwakeBehaviour { service_id: u8 }, + /// An Event sent from the Handler layer to the Behaviour layer ToBehaviourFromHandler { service_id: u8, node_id: NodeId, conn_id: ConnId, event: BE }, ToBehaviourLocalMsg { service_id: u8, msg: TransportMsg }, IncomingDisconnected(NodeId, ConnId), diff --git a/packages/network/src/plane/bus.rs b/packages/network/src/plane/bus.rs index 263d3529..0f5dfd75 100644 --- a/packages/network/src/plane/bus.rs +++ b/packages/network/src/plane/bus.rs @@ -2,6 +2,7 @@ use bluesea_identity::{ConnId, NodeId}; use crate::msg::TransportMsg; +#[derive(Debug, PartialEq, Eq)] pub(crate) enum HandleEvent { Awake, FromBehavior(HE), @@ -15,11 +16,19 @@ pub enum HandlerRoute { } pub(crate) trait PlaneBus: Send + Sync { + /// Trigger the internal awake behaviour event for the given service. + /// This will call the on_awaker() method of the behaviour. fn awake_behaviour(&self, service: u8) -> Option<()>; + /// Sends an Awake Handler event to the given service of a connection. fn awake_handler(&self, service: u8, conn: ConnId) -> Option<()>; + /// Forward the given event from the handler to the behaviour layer. fn to_behaviour_from_handler(&self, service_id: u8, node_id: NodeId, conn_id: ConnId, event: BE) -> Option<()>; + /// Sends an Event to the Handler of the given service by connection Id or node Id. fn to_handler(&self, service_id: u8, route: HandlerRoute, event: HandleEvent) -> Option<()>; + /// Sends a Message to the network layer. fn to_net(&self, msg: TransportMsg) -> Option<()>; + /// Sends a Message to the network layer, specify the destination node fn to_net_node(&self, node: NodeId, msg: TransportMsg) -> Option<()>; + /// Sends a Message to the network layer, specify the destination connection fn to_net_conn(&self, conn_id: ConnId, msg: TransportMsg) -> Option<()>; } diff --git a/packages/network/src/plane/bus_impl.rs b/packages/network/src/plane/bus_impl.rs index af9f05cd..277035b8 100644 --- a/packages/network/src/plane/bus_impl.rs +++ b/packages/network/src/plane/bus_impl.rs @@ -12,10 +12,15 @@ use utils::error_handle::ErrorUtils; use super::bus::{HandleEvent, PlaneBus}; pub(crate) struct PlaneBusImpl { + /// Current NodeId node_id: NodeId, + /// Network plane internal event sender plane_tx: Sender>, + /// NodeId -> (ConnId -> (Sender, ConnectionSender)) nodes: RwLock)>, Arc)>>>, + /// ConnId -> (Sender, ConnectionSender) conns: RwLock)>, Arc)>>, + /// Router table router: Arc, } @@ -34,15 +39,17 @@ where } } + /// Add a connection to plane bus. + /// Return a receiver for the connection. pub(crate) fn add_conn(&self, net_sender: Arc) -> Option)>> { let mut conns = self.conns.write(); let mut nodes = self.nodes.write(); - if let std::collections::hash_map::Entry::Vacant(e) = conns.entry(net_sender.conn_id()) { + if let std::collections::hash_map::Entry::Vacant(conn_entry) = conns.entry(net_sender.conn_id()) { log::info!("[PlaneBusImpl {}] add_con {} {}", self.node_id, net_sender.remote_node_id(), net_sender.conn_id()); let (tx, rx) = unbounded(); - let entry = nodes.entry(net_sender.remote_node_id()).or_insert_with(HashMap::new); - e.insert((tx.clone(), net_sender.clone())); - entry.insert(net_sender.conn_id(), (tx.clone(), net_sender.clone())); + let node_conns = nodes.entry(net_sender.remote_node_id()).or_insert_with(HashMap::new); + conn_entry.insert((tx.clone(), net_sender.clone())); + node_conns.insert(net_sender.conn_id(), (tx.clone(), net_sender.clone())); Some(rx) } else { log::warn!("[PlaneBusImpl {}] add_conn duplicate {}", self.node_id, net_sender.conn_id()); @@ -50,6 +57,7 @@ where } } + /// Remove a connection from plane bus. pub(crate) fn remove_conn(&self, node: NodeId, conn: ConnId) -> Option<()> { let mut conns = self.conns.write(); let mut nodes = self.nodes.write(); @@ -68,6 +76,7 @@ where } } + /// Close a connection. pub(crate) fn close_conn(&self, conn: ConnId) { let conns = self.conns.read(); if let Some((_s, c_s)) = conns.get(&conn) { @@ -78,6 +87,7 @@ where } } + /// Close every connections to node. pub(crate) fn close_node(&self, node: NodeId) { let nodes = self.nodes.read(); if let Some(conns) = nodes.get(&node) { @@ -151,7 +161,7 @@ where fn to_net(&self, msg: TransportMsg) -> Option<()> { log::debug!("[PlaneBusImpl {}] send_to_net service: {} route: {:?}", self.node_id, msg.header.service_id, msg.header.route); - match self.router.action_for_incomming(&msg.header.route, msg.header.service_id) { + match self.router.derive_action(&msg.header.route, msg.header.service_id) { RouteAction::Reject => { log::warn!("[PlaneBusImpl {}] send_to_net reject {} {:?}", self.node_id, msg.header.service_id, msg.header.route); None @@ -188,7 +198,7 @@ where fn to_net_node(&self, node: NodeId, msg: TransportMsg) -> Option<()> { log::debug!("[PlaneBusImpl {}] send_to_net_node service: {} route: ToNode({})", self.node_id, msg.header.service_id, node); - match self.router.action_for_incomming(&RouteRule::ToNode(node), msg.header.service_id) { + match self.router.derive_action(&RouteRule::ToNode(node), msg.header.service_id) { RouteAction::Reject => { log::warn!("[PlaneBusImpl {}] send_to_net reject {} ToNode({})", self.node_id, msg.header.service_id, node); None @@ -223,3 +233,228 @@ where Some(()) } } + +#[cfg(test)] +mod tests { + use crate::{ + msg::TransportMsg, + plane::{ + bus::{HandleEvent, HandlerRoute, PlaneBus}, + bus_impl::PlaneBusImpl, + NetworkPlaneInternalEvent, + }, + transport::{ConnectionSender, MockConnectionSender}, + }; + use async_std::channel::{unbounded, TryRecvError}; + use bluesea_identity::{ConnId, NodeAddr, NodeId}; + use bluesea_router::{MockRouterTable, RouteAction, RouteRule}; + use std::sync::Arc; + + type HE = (); + type BE = (); + + fn create_mock_connection(conn: ConnId, node: NodeId, addr: NodeAddr) -> MockConnectionSender { + let mut sender = MockConnectionSender::new(); + sender.expect_conn_id().return_const(conn); + sender.expect_remote_node_id().return_const(node); + sender.expect_remote_addr().return_const(addr); + sender.expect_send().return_const(()); + + sender + } + + #[test] + fn should_add_remove_close_connection() { + let local_node_id = 1; + let (plane_tx, _plane_rx) = unbounded(); + let router = Arc::new(MockRouterTable::new()); + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + let mut sender = create_mock_connection(ConnId::from_in(1, 1), 2u32, NodeAddr::empty()); + sender.expect_close().times(1).return_const(()); + let conn = Arc::new(sender); + let conn_c = conn.clone(); + let rx = bus.add_conn(conn.clone()); + assert!(rx.is_some()); + + // duplicate conn + assert!(bus.add_conn(conn).is_none()); + + assert_eq!(bus.conns.read().len(), 1); + assert_eq!(bus.nodes.read().len(), 1); + assert_eq!(bus.nodes.read().get(&conn_c.remote_node_id()).unwrap().len(), 1); + assert_eq!(bus.nodes.read().get(&conn_c.remote_node_id()).unwrap().get(&conn_c.conn_id()).unwrap().1.conn_id(), conn_c.conn_id()); + assert_eq!(bus.conns.read().get(&conn_c.conn_id()).unwrap().1.conn_id(), conn_c.conn_id()); + + // Close conn + bus.close_conn(conn_c.conn_id()); + + // Remove Conn + bus.remove_conn(2u32, ConnId::from_in(1, 1)); + assert_eq!(bus.conns.read().len(), 0); + assert_eq!(bus.nodes.read().len(), 0); + + // remove non-exist conn + assert!(bus.remove_conn(2u32, ConnId::from_in(1, 1)).is_none()); + } + + #[test] + fn should_close_node() { + let local_node_id = 1; + let (plane_tx, _plane_rx) = unbounded(); + let router = Arc::new(MockRouterTable::new()); + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + let mut data = create_mock_connection(ConnId::from_in(1, 1), 2u32, NodeAddr::empty()); + data.expect_close().times(1).return_const(()); + + let conn = Arc::new(data); + let rx = bus.add_conn(conn.clone()); + assert!(rx.is_some()); + + // duplicate conn + assert!(bus.add_conn(conn).is_none()); + + assert_eq!(bus.conns.read().len(), 1); + assert_eq!(bus.nodes.read().len(), 1); + + // Close node + bus.close_node(2u32); + } + + #[async_std::test] + async fn should_send_event_to_handler() { + let local_node_id = 1; + let (plane_tx, _plane_rx) = unbounded(); + let router = Arc::new(MockRouterTable::new()); + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + let data = create_mock_connection(ConnId::from_in(1, 1), 2u32, NodeAddr::empty()); + + let conn = Arc::new(data); + let rx = bus.add_conn(conn.clone()).expect("Should have rx"); + + // duplicate conn + assert!(bus.add_conn(conn).is_none()); + assert_eq!(bus.conns.read().len(), 1); + assert_eq!(bus.nodes.read().len(), 1); + // Send event to handler through conn + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + assert!(bus.to_handler(1, HandlerRoute::Conn(ConnId::from_in(1, 1)), HandleEvent::Awake).is_some()); + assert_eq!(rx.try_recv(), Ok((1, HandleEvent::Awake))); + + // Send event to handler through node + assert!(bus.to_handler(1, HandlerRoute::NodeFirst(2u32), HandleEvent::Awake).is_some()); + assert_eq!(rx.try_recv(), Ok((1, HandleEvent::Awake))); + } + + #[test] + fn should_correctly_reject_to_network() { + let local_node_id = 1; + let (plane_tx, _plane_rx) = unbounded(); + let mut mock_router = MockRouterTable::new(); + mock_router.expect_derive_action().returning(|_, _| RouteAction::Reject); + let router = Arc::new(mock_router); + + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + + assert!(bus.to_net(TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_none()); + } + + #[async_std::test] + async fn to_net_should_process_local() { + let local_node_id = 1; + let (plane_tx, plane_rx) = unbounded(); + let mut mock_router = MockRouterTable::new(); + mock_router.expect_derive_action().returning(|_, _| RouteAction::Local); + let router = Arc::new(mock_router); + + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + + assert_eq!(plane_rx.try_recv(), Err(TryRecvError::Empty)); + + assert!(bus.to_net(TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some()); + + assert_eq!( + plane_rx.try_recv(), + Ok(NetworkPlaneInternalEvent::ToBehaviourLocalMsg { + service_id: 1, + msg: TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8]), + }) + ); + } + + #[async_std::test] + async fn to_net_should_forward() { + let local_node_id = 1; + let (plane_tx, plane_rx) = unbounded(); + let mut mock_router = MockRouterTable::new(); + mock_router.expect_derive_action().returning(|_, _| RouteAction::Next(ConnId::from_in(1, 1), 2u32)); + let router = Arc::new(mock_router); + + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + + let sender = create_mock_connection(ConnId::from_in(1, 1), 2u32, NodeAddr::empty()); + + let conn = Arc::new(sender); + let _rx = bus.add_conn(conn).expect("Should have rx"); + assert_eq!(plane_rx.try_recv(), Err(TryRecvError::Empty)); + + assert!(bus.to_net(TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some()); + } + + #[async_std::test] + async fn to_net_node_should_process_local() { + let local_node_id = 1; + let (plane_tx, plane_rx) = unbounded(); + let mut mock_router = MockRouterTable::new(); + mock_router.expect_derive_action().returning(|_, _| RouteAction::Local); + let router = Arc::new(mock_router); + + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + + assert_eq!(plane_rx.try_recv(), Err(TryRecvError::Empty)); + + assert!(bus.to_net_node(2u32, TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some()); + + assert_eq!( + plane_rx.try_recv(), + Ok(NetworkPlaneInternalEvent::ToBehaviourLocalMsg { + service_id: 1, + msg: TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8]), + }) + ); + } + + #[async_std::test] + async fn to_net_node_should_forward() { + let local_node_id = 1; + let (plane_tx, plane_rx) = unbounded(); + let mut mock_router = MockRouterTable::new(); + mock_router.expect_derive_action().returning(|_, _| RouteAction::Next(ConnId::from_in(1, 1), 2u32)); + let router = Arc::new(mock_router); + + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + + let sender = create_mock_connection(ConnId::from_in(1, 1), 2u32, NodeAddr::empty()); + + let conn = Arc::new(sender); + let _rx = bus.add_conn(conn).expect("Should have rx"); + assert_eq!(plane_rx.try_recv(), Err(TryRecvError::Empty)); + + assert!(bus.to_net_node(2u32, TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some()); + } + + #[async_std::test] + async fn to_net_conn_should_forward() { + let local_node_id = 1; + let (plane_tx, _plane_rx) = unbounded(); + let mock_router = MockRouterTable::new(); + let router = Arc::new(mock_router); + + let bus = PlaneBusImpl::::new(local_node_id, router, plane_tx); + + let sender = create_mock_connection(ConnId::from_in(1, 1), 2u32, NodeAddr::empty()); + + let conn = Arc::new(sender); + let _rx = bus.add_conn(conn).expect("Should have rx"); + assert!(bus.to_net_conn(ConnId::from_in(1, 1), TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some()); +} +} diff --git a/packages/network/src/plane/single_conn.rs b/packages/network/src/plane/single_conn.rs index 21235957..53d1bba6 100644 --- a/packages/network/src/plane/single_conn.rs +++ b/packages/network/src/plane/single_conn.rs @@ -54,7 +54,7 @@ where } e = self.receiver.poll().fuse() => match e { Ok(event) => match event { - ConnectionEvent::Msg(msg) => match self.router.action_for_incomming(&msg.header.route, msg.header.service_id) { + ConnectionEvent::Msg(msg) => match self.router.derive_action(&msg.header.route, msg.header.service_id) { RouteAction::Reject => { Ok(()) } diff --git a/packages/network/src/transport.rs b/packages/network/src/transport.rs index 7fa5b45e..34fd76f2 100644 --- a/packages/network/src/transport.rs +++ b/packages/network/src/transport.rs @@ -5,6 +5,8 @@ use std::sync::Arc; use thiserror::Error; use utils::error_handle::ErrorUtils; +#[cfg(test)] +use mockall::automock; pub type TransportOutgoingLocalUuid = u64; pub enum TransportEvent { @@ -60,6 +62,7 @@ pub trait ConnectionAcceptor: Send + Sync { fn reject(&self, err: ConnectionRejectReason); } +#[cfg_attr(any(test, feature = "mock"), automock)] pub trait ConnectionSender: Send + Sync { fn remote_node_id(&self) -> NodeId; fn conn_id(&self) -> ConnId;