forked from 8xFF/atm0s-sdn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
relay.rs
154 lines (129 loc) · 5.16 KB
/
relay.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use std::{fmt::Display, sync::Arc};
use bluesea_identity::{ConnId, NodeId};
use bytes::Bytes;
use network::{msg::TransportMsg, transport::ConnectionSender};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use utils::{awaker::Awaker, Timer};
use crate::{msg::PubsubRemoteEvent, PubsubSdk};
use self::{
feedback::FeedbackConsumerId,
local::{LocalRelay, LocalRelayAction},
logic::{PubsubRelayLogic, PubsubRelayLogicOutput},
remote::RemoteRelay,
source_binding::{SourceBinding, SourceBindingAction},
};
pub(crate) mod feedback;
pub(crate) mod local;
pub(crate) mod logic;
pub(crate) mod remote;
pub(crate) mod source_binding;
pub type ChannelUuid = u32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ChannelIdentify(ChannelUuid, NodeId);
pub type LocalPubId = u64;
pub type LocalSubId = u64;
impl Display for ChannelIdentify {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.0, self.1)
}
}
impl ChannelIdentify {
pub fn new(uuid: ChannelUuid, source: NodeId) -> Self {
Self(uuid, source)
}
pub fn uuid(&self) -> ChannelUuid {
self.0
}
pub fn source(&self) -> NodeId {
self.1
}
}
pub struct PubsubRelay {
logic: Arc<RwLock<PubsubRelayLogic>>,
remote: Arc<RwLock<RemoteRelay>>,
local: Arc<RwLock<LocalRelay>>,
source_binding: Arc<RwLock<SourceBinding>>,
}
impl Clone for PubsubRelay {
fn clone(&self) -> Self {
Self {
logic: self.logic.clone(),
remote: self.remote.clone(),
local: self.local.clone(),
source_binding: self.source_binding.clone(),
}
}
}
impl PubsubRelay {
pub fn new(node_id: NodeId, timer: Arc<dyn Timer>) -> (Self, PubsubSdk) {
let s = Self {
logic: Arc::new(RwLock::new(PubsubRelayLogic::new(node_id))),
remote: Arc::new(RwLock::new(RemoteRelay::new())),
local: Arc::new(RwLock::new(LocalRelay::new())),
source_binding: Arc::new(RwLock::new(SourceBinding::new())),
};
let sdk = PubsubSdk::new(node_id, s.logic.clone(), s.remote.clone(), s.local.clone(), s.source_binding.clone(), timer);
(s, sdk)
}
pub fn set_awaker(&self, awaker: Arc<dyn Awaker>) {
self.logic.write().set_awaker(awaker.clone());
self.local.write().set_awaker(awaker.clone());
self.source_binding.write().set_awaker(awaker);
}
pub fn on_connection_opened(&self, conn_id: ConnId, sender: Arc<dyn ConnectionSender>) {
self.remote.write().on_connection_opened(conn_id, sender);
}
pub fn on_connection_closed(&self, conn_id: ConnId) {
self.remote.write().on_connection_closed(conn_id);
}
pub fn tick(&self, now_ms: u64) {
let local_fbs = self.logic.write().tick(now_ms);
for fb in local_fbs {
self.local.read().feedback(fb.channel.uuid(), fb);
}
}
pub fn on_source_added(&self, channel: ChannelUuid, source: NodeId) {
if let Some(subs) = self.source_binding.write().on_source_added(channel, source) {
log::debug!("[PubsubRelay] channel {} added source {} => auto sub for local subs {:?}", channel, source, subs);
for sub in subs {
self.logic.write().on_local_sub(ChannelIdentify::new(channel, source), sub);
}
}
}
pub fn on_source_removed(&self, channel: ChannelUuid, source: NodeId) {
if let Some(subs) = self.source_binding.write().on_source_removed(channel, source) {
log::debug!("[PubsubRelay] channel {} removed source {} => auto unsub for local subs {:?}", channel, source, subs);
for sub in subs {
self.logic.write().on_local_unsub(ChannelIdentify::new(channel, source), sub);
}
}
}
pub fn on_event(&self, now_ms: u64, from: NodeId, conn: ConnId, event: PubsubRemoteEvent) {
self.logic.write().on_event(now_ms, from, conn, event);
}
pub fn on_feedback(&self, now_ms: u64, channel: ChannelIdentify, _from: NodeId, conn: ConnId, fb: feedback::Feedback) {
if let Some(local_fb) = self.logic.write().on_feedback(now_ms, channel, FeedbackConsumerId::Remote(conn), fb) {
self.local.read().feedback(channel.uuid(), local_fb);
}
}
pub fn relay(&self, channel: ChannelIdentify, msg: TransportMsg) {
if let Some((remotes, locals)) = self.logic.read().relay(channel) {
self.remote.read().relay(remotes, &msg);
if !locals.is_empty() {
self.local.read().relay(channel.source(), channel.uuid(), locals, Bytes::from(msg.payload().to_vec()));
} else {
log::trace!("No local subscriber for channel {}", channel);
}
}
}
pub fn pop_logic_action(&mut self) -> Option<(NodeId, Option<ConnId>, PubsubRelayLogicOutput)> {
self.logic.write().pop_action()
}
pub fn pop_local_action(&mut self) -> Option<LocalRelayAction> {
self.local.write().pop_action()
}
pub fn pop_source_binding_action(&mut self) -> Option<SourceBindingAction> {
self.source_binding.write().pop_action()
}
}