Skip to content

Commit

Permalink
refactor to use cross-service sdk in pub-sub (#43)
Browse files Browse the repository at this point in the history
* refactor to use cross-service sdk in pub-sub

* fixing test in key-value service

* fixing test for pubsub

* fixing warn and build example

* added more test in key-value sdk
  • Loading branch information
giangndm committed Nov 13, 2023
1 parent 0448da6 commit dc2cc50
Show file tree
Hide file tree
Showing 19 changed files with 889 additions and 698 deletions.
3 changes: 2 additions & 1 deletion examples/examples/manual_node.rs
Expand Up @@ -74,7 +74,8 @@ async fn main() {
});

let spreads_layer_router = LayersSpreadRouterSyncBehavior::new(router.clone());
let (key_value, key_value_sdk) = key_value::KeyValueBehavior::new(args.node_id, timer.clone(), 10000);
let key_value_sdk = key_value::KeyValueSdk::new();
let key_value = key_value::KeyValueBehavior::new(args.node_id, 10000, Some(Box::new(key_value_sdk.clone())));

if let Some(addr) = args.redis_addr {
let mut redis_server = RedisServer::new(addr, key_value_sdk);
Expand Down
7 changes: 3 additions & 4 deletions packages/core/utils/src/hashmap.rs
Expand Up @@ -165,9 +165,8 @@ mod tests {
let mut map = HashMap::new();
map.insert("key1", "value1");
map.insert("key2", "value2");
let mut iter = map.iter();
assert_eq!(iter.next(), Some((&"key1", &"value1")));
assert_eq!(iter.next(), Some((&"key2", &"value2")));
assert_eq!(iter.next(), None);
let mut slots = map.iter().collect::<Vec<_>>();
slots.sort_by_key(|a| a.0);
assert_eq!(slots, vec![(&"key1", &"value1"), (&"key2", &"value2")]);
}
}
2 changes: 1 addition & 1 deletion packages/network/src/plane.rs
Expand Up @@ -189,7 +189,7 @@ where
fn pop_actions(&mut self, now_ms: u64) {
while let Some(action) = self.internal.pop_action() {
match action {
PlaneInternalAction::SpawnConnection(spwd_conn) => {
PlaneInternalAction::SpawnConnection(spwd_conn) => {
let SpawnedConnection { outgoing, sender, receiver, handlers } = spwd_conn;
let internal_tx = self.internal_tx.clone();

Expand Down
2 changes: 1 addition & 1 deletion packages/network/src/plane/bus_impl.rs
Expand Up @@ -456,5 +456,5 @@ mod tests {
let conn = Arc::new(sender);
let _rx = bus.add_conn(conn).expect("Should have rx");
assert!(bus.to_net_conn(ConnId::from_in(1, 1), TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some());
}
}
}
2 changes: 1 addition & 1 deletion packages/network/src/plane/internal.rs
Expand Up @@ -273,7 +273,7 @@ mod tests {
use crate::{
behaviour::MockNetworkBehavior,
msg::TransportMsg,
transport::{ConnectionReceiver, ConnectionRejectReason, MockConnectionAcceptor, MockConnectionReceiver, MockConnectionSender, OutgoingConnectionError},
transport::{ConnectionRejectReason, MockConnectionAcceptor, MockConnectionReceiver, MockConnectionSender, OutgoingConnectionError},
};

type BE = ();
Expand Down
4 changes: 2 additions & 2 deletions packages/runner/src/lib.rs
Expand Up @@ -10,8 +10,8 @@ pub use network::{
convert_enum,
};
pub use pub_sub::{
ChannelIdentify, ChannelSourceHashmapReal, ChannelUuid, Consumer, ConsumerRaw, ConsumerSingle, Feedback, FeedbackType, LocalPubId, LocalSubId, NumberInfo, Publisher, PublisherRaw, PubsubSdk,
PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent,
ChannelIdentify, ChannelUuid, Consumer, ConsumerRaw, ConsumerSingle, Feedback, FeedbackType, LocalPubId, LocalSubId, NumberInfo, Publisher, PublisherRaw, PubsubSdk, PubsubServiceBehaviour,
PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent,
};
pub use transport_udp::UdpTransport;
pub use utils::{
Expand Down
278 changes: 167 additions & 111 deletions packages/services/key_value/src/behavior.rs

Large diffs are not rendered by default.

0 comments on commit dc2cc50

Please sign in to comment.