Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor to use cross-service sdk in pub-sub #43

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/examples/manual_node.rs
Expand Up @@ -74,7 +74,8 @@ async fn main() {
});

let spreads_layer_router = LayersSpreadRouterSyncBehavior::new(router.clone());
let (key_value, key_value_sdk) = key_value::KeyValueBehavior::new(args.node_id, timer.clone(), 10000);
let key_value_sdk = key_value::KeyValueSdk::new();
let key_value = key_value::KeyValueBehavior::new(args.node_id, 10000, Some(Box::new(key_value_sdk.clone())));

if let Some(addr) = args.redis_addr {
let mut redis_server = RedisServer::new(addr, key_value_sdk);
Expand Down
7 changes: 3 additions & 4 deletions packages/core/utils/src/hashmap.rs
Expand Up @@ -165,9 +165,8 @@ mod tests {
let mut map = HashMap::new();
map.insert("key1", "value1");
map.insert("key2", "value2");
let mut iter = map.iter();
assert_eq!(iter.next(), Some((&"key1", &"value1")));
assert_eq!(iter.next(), Some((&"key2", &"value2")));
assert_eq!(iter.next(), None);
let mut slots = map.iter().collect::<Vec<_>>();
slots.sort_by_key(|a| a.0);
assert_eq!(slots, vec![(&"key1", &"value1"), (&"key2", &"value2")]);
}
}
2 changes: 1 addition & 1 deletion packages/network/src/plane.rs
Expand Up @@ -189,7 +189,7 @@
fn pop_actions(&mut self, now_ms: u64) {
while let Some(action) = self.internal.pop_action() {
match action {
PlaneInternalAction::SpawnConnection(spwd_conn) => {
PlaneInternalAction::SpawnConnection(spwd_conn) => {
let SpawnedConnection { outgoing, sender, receiver, handlers } = spwd_conn;
let internal_tx = self.internal_tx.clone();

Expand Down Expand Up @@ -293,7 +293,7 @@
NetworkBehaviorAction::CloseNode(node) => {
self.bus.close_node(node);
}
NetworkBehaviorAction::ToSdkService(_, _) => {}

Check warning on line 296 in packages/network/src/plane.rs

View check run for this annotation

Codecov / codecov/patch

packages/network/src/plane.rs#L296

Added line #L296 was not covered by tests
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/network/src/plane/bus_impl.rs
Expand Up @@ -456,5 +456,5 @@ mod tests {
let conn = Arc::new(sender);
let _rx = bus.add_conn(conn).expect("Should have rx");
assert!(bus.to_net_conn(ConnId::from_in(1, 1), TransportMsg::build_unreliable(1, RouteRule::ToService(2), 1, &[1u8])).is_some());
}
}
}
2 changes: 1 addition & 1 deletion packages/network/src/plane/internal.rs
Expand Up @@ -10,7 +10,7 @@

use super::NetworkPlaneInternalEvent;

#[derive(Debug, Eq, PartialEq)]

Check warning on line 13 in packages/network/src/plane/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/network/src/plane/internal.rs#L13

Added line #L13 was not covered by tests
pub enum PlaneInternalError {
InvalidServiceId(u8),
}
Expand Down Expand Up @@ -257,7 +257,7 @@
if let Some((to_behaviour, to_context)) = &mut self.behaviors[to as usize] {
to_behaviour.on_sdk_msg(to_context, now_ms, from, msg);
} else {
debug_assert!(false, "service not found {}", to);

Check warning on line 260 in packages/network/src/plane/internal.rs

View check run for this annotation

Codecov / codecov/patch

packages/network/src/plane/internal.rs#L260

Added line #L260 was not covered by tests
}
}
}
Expand All @@ -273,7 +273,7 @@
use crate::{
behaviour::MockNetworkBehavior,
msg::TransportMsg,
transport::{ConnectionReceiver, ConnectionRejectReason, MockConnectionAcceptor, MockConnectionReceiver, MockConnectionSender, OutgoingConnectionError},
transport::{ConnectionRejectReason, MockConnectionAcceptor, MockConnectionReceiver, MockConnectionSender, OutgoingConnectionError},
};

type BE = ();
Expand Down
4 changes: 2 additions & 2 deletions packages/runner/src/lib.rs
Expand Up @@ -10,8 +10,8 @@ pub use network::{
convert_enum,
};
pub use pub_sub::{
ChannelIdentify, ChannelSourceHashmapReal, ChannelUuid, Consumer, ConsumerRaw, ConsumerSingle, Feedback, FeedbackType, LocalPubId, LocalSubId, NumberInfo, Publisher, PublisherRaw, PubsubSdk,
PubsubServiceBehaviour, PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent,
ChannelIdentify, ChannelUuid, Consumer, ConsumerRaw, ConsumerSingle, Feedback, FeedbackType, LocalPubId, LocalSubId, NumberInfo, Publisher, PublisherRaw, PubsubSdk, PubsubServiceBehaviour,
PubsubServiceBehaviourEvent, PubsubServiceHandlerEvent,
};
pub use transport_udp::UdpTransport;
pub use utils::{
Expand Down
278 changes: 167 additions & 111 deletions packages/services/key_value/src/behavior.rs

Large diffs are not rendered by default.