Skip to content

Commit

Permalink
chore: derive serde for missing nodes
Browse files Browse the repository at this point in the history
This is mainly for loading/storing state.
  • Loading branch information
FWuermse committed Aug 17, 2023
1 parent 5dc7143 commit e07de73
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 52 deletions.
23 changes: 12 additions & 11 deletions src/nodes/io/file.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{io::prelude::*, fs::remove_file};
use std::io::prelude::*;
use std::fs::File;
use flowrs::{node::{Node, UpdateError, ChangeObserver}, connection::{Input, Output, connect}};
use flowrs::{node::{Node, UpdateError, ChangeObserver}, connection::{Input, Output}};

use flowrs::RuntimeConnectable;
use serde::{Serialize, Deserialize};

#[derive(Clone)]
#[derive(Clone, Deserialize, Serialize)]
pub struct FileWriterConfig {
file: String
}
Expand All @@ -14,13 +15,13 @@ pub struct FileReaderConfig {
file: String
}

#[derive(RuntimeConnectable)]
#[derive(RuntimeConnectable, Deserialize, Serialize)]
pub struct BinaryFileWriterNode {

//#[input]
#[input]
pub data_input: Input<Vec<u8>>,

//#[input]
#[input]
pub config_input: Input<FileWriterConfig>,

current_config: Option<FileWriterConfig>
Expand Down Expand Up @@ -55,13 +56,13 @@ impl Node for BinaryFileWriterNode {
}
}

//#[derive(RuntimeConnectable)]
#[derive(RuntimeConnectable, Deserialize, Serialize)]
pub struct BinaryFileReaderNode {

//#[output]
#[output]
pub data_output: Output<Vec<u8>>,

//#[input]
#[input]
pub config_input: Input<FileReaderConfig>,
}

Expand Down Expand Up @@ -101,7 +102,7 @@ fn test_file_read_and_write() {

let data_input: flowrs::connection::Edge<Vec<u8>> = flowrs::connection::Edge::new();

connect(reader.data_output.clone(), data_input.clone());
flowrs::connection::connect(reader.data_output.clone(), data_input.clone());

let data : Vec<u8> = "123".as_bytes().to_vec();

Expand All @@ -120,7 +121,7 @@ fn test_file_read_and_write() {
assert!(false);
}

let _ = remove_file(file);
let _ = std::fs::remove_file(file);

//println!("{:?}", odd_res_nums);
//println!("{:?}", even_res_nums);
Expand Down
23 changes: 15 additions & 8 deletions src/nodes/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use std::{time::Instant, thread, sync::{Condvar, Mutex, Arc}, marker::PhantomData};
use core::time::Duration;

#[derive(Clone)]
#[derive(Clone, Deserialize, Serialize)]
pub struct TimerNodeConfig {
pub duration: Duration
}
Expand Down Expand Up @@ -38,9 +38,10 @@ impl UpdateController for WaitTimerUpdateController {
}
}

#[derive(Clone)]
#[derive(Clone, Deserialize, Serialize)]
pub struct WaitTimer<U> {
own_thread: bool,
#[serde(skip)]
cond_var: Arc<(Mutex<bool>, Condvar)>,
_marker: PhantomData<U>
}
Expand Down Expand Up @@ -100,9 +101,15 @@ impl<U> WaitTimer<U> {
}
}

#[derive(Clone)]
fn now() -> Instant {
Instant::now()
}

#[derive(Clone, Deserialize, Serialize)]
pub struct PollTimer<U> {
every: Duration,
#[serde(skip)]
#[serde(default = "now")]
last_tick: Instant,
_marker: PhantomData<U>
}
Expand Down Expand Up @@ -135,7 +142,7 @@ impl<U> TimerStrategy<U> for PollTimer<U> {
}

#[derive(RuntimeConnectable, Deserialize, Serialize)]
pub struct TimerNode<T, U>
pub struct TimerNode<T, U> where T: TimerStrategy<U>
{
timer: T,

Expand All @@ -151,8 +158,8 @@ pub struct TimerNode<T, U>
token_object: Option<U>
}

impl<T, U> TimerNode<T, U>
where T : TimerStrategy<U>, U: Clone {
impl<'a, T, U> TimerNode<T, U>
where T : Deserialize<'a> + Serialize + TimerStrategy<U>, U: Clone {
pub fn new(timer: T, token_object : Option<U>, change_observer: Option<&ChangeObserver>) -> Self {
Self {
config_input: Input::new(),
Expand All @@ -164,8 +171,8 @@ impl<T, U> TimerNode<T, U>
}
}

impl<T, U> Node for TimerNode<T, U>
where T : TimerStrategy<U> + Send, U: Clone + Send + Copy + 'static {
impl<'a, T, U> Node for TimerNode<T, U>
where T : Deserialize<'a> + Serialize + TimerStrategy<U> + Send, U: Clone + Send + Copy + 'static {

fn on_update(&mut self) -> Result<(), UpdateError> {

Expand Down
2 changes: 1 addition & 1 deletion src/nodes/transform/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ where T: Serialize + Send {
}
}

#[derive(RuntimeConnectable)]
#[derive(RuntimeConnectable, Deserialize, Serialize)]
pub struct FromJsonStringNode<T> {
#[output]
pub output: Output<T>,
Expand Down
135 changes: 103 additions & 32 deletions tests/nodes/test_timer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use flowrs::{connection::Input, node::{Node, UpdateError}};
use flowrs::RuntimeConnectable;
use flowrs::{
connection::Input,
node::{Node, UpdateError},
};
use std::sync::mpsc::Sender;

#[derive(RuntimeConnectable)]
Expand All @@ -14,16 +17,14 @@ impl ReportNode {
pub fn new(sender: Sender<bool>) -> Self {
Self {
sender: sender,
input: Input::new()
input: Input::new(),
}
}
}

impl Node for ReportNode {

fn on_update(&mut self) -> Result<(), UpdateError> {

if let Ok(input) = self.input.next() {
if let Ok(input) = self.input.next() {
let _res = self.sender.send(input);
}
Ok(())
Expand All @@ -32,30 +33,51 @@ impl Node for ReportNode {

#[cfg(test)]
mod nodes {
use std::{thread, time::Duration, sync::mpsc::channel};
use std::{sync::mpsc::channel, thread, time::Duration, task::Poll};

use flowrs::{
connection::connect,
node::ChangeObserver, exec::{node_updater::{NodeUpdater, MultiThreadedNodeUpdater, SingleThreadedNodeUpdater}, execution::{StandardExecutor, Executor}}, version::Version, flow_impl::Flow, sched::round_robin::RoundRobinScheduler,
exec::{
execution::{Executor, StandardExecutor},
node_updater::{MultiThreadedNodeUpdater, NodeUpdater, SingleThreadedNodeUpdater},
},
flow_impl::Flow,
node::ChangeObserver,
sched::round_robin::RoundRobinScheduler,
version::Version,
};

use flowrs_std::{value::ValueNode, timer::{TimerNodeConfig, TimerNode, WaitTimer, PollTimer, TimerStrategy}, debug::DebugNode};

use crate::nodes::test_timer::ReportNode;
use flowrs_std::{
debug::DebugNode,
timer::{PollTimer, TimerNode, TimerNodeConfig, TimerStrategy, WaitTimer},
value::ValueNode,
};
use serde::{Deserialize, Serialize};

fn timer_test_with<T: TimerStrategy<bool> + Send + 'static, U: NodeUpdater + Drop + Send + 'static>(node_updater: U, timer: T) where T: Clone {
use crate::nodes::test_timer::ReportNode;

fn timer_test_with<
T: TimerStrategy<bool> + Send + 'static,
U: NodeUpdater + Drop + Send + 'static,
>(
node_updater: U,
timer: T,
) where
T: Clone + Deserialize<'static> + Serialize,
{
let sleep_seconds = 5;
let timer_interval_seconds = 1;

let change_observer: ChangeObserver = ChangeObserver::new();
let change_observer: ChangeObserver = ChangeObserver::new();
let (sender, receiver) = channel::<bool>();

let node_1 = ValueNode::new(
TimerNodeConfig {duration: core::time::Duration::from_secs(timer_interval_seconds) },
Some(&change_observer)

let node_1 = ValueNode::new(
TimerNodeConfig {
duration: core::time::Duration::from_secs(timer_interval_seconds),
},
Some(&change_observer),
);

let node_2: TimerNode<T, bool> = TimerNode::new(timer, Some(true), Some(&change_observer));

let node_3 = DebugNode::<bool>::new(Some(&change_observer));
Expand All @@ -66,31 +88,33 @@ mod nodes {
connect(node_2.token_output.clone(), node_3.input.clone());
connect(node_3.output.clone(), node_4.input.clone());

let mut flow:Flow = Flow::new_empty("flow_1", Version::new(1,0,0));
let mut flow: Flow = Flow::new_empty("flow_1", Version::new(1, 0, 0));
flow.add_node(node_1);
flow.add_node(node_2);
flow.add_node(node_3);
flow.add_node(node_4);

let (controller_sender, controller_receiver) = channel();
let thread_handle = thread::spawn( move || {

let thread_handle = thread::spawn(move || {
let mut executor = StandardExecutor::new(change_observer);

controller_sender.send(executor.controller()).expect("Controller sender cannot send.");

executor.run(flow, RoundRobinScheduler::new(), node_updater).expect("Run failed.");
controller_sender
.send(executor.controller())
.expect("Controller sender cannot send.");

executor
.run(flow, RoundRobinScheduler::new(), node_updater)
.expect("Run failed.");
});

let controller = controller_receiver.recv().unwrap();


thread::sleep(Duration::from_secs(sleep_seconds));

//println!(" ----> {:?} CANCEL", std::thread::current().id());

controller.lock().unwrap().cancel();

thread_handle.join().unwrap();

let num_iters = receiver.iter().count();
Expand All @@ -99,24 +123,71 @@ mod nodes {

//println!("{} {}", num_iters, asserted_num_iters.abs_diff(num_iters as u64));
assert!(asserted_num_iters.abs_diff(num_iters as u64) <= 1);

}

#[test]
fn test() {

timer_test_with(MultiThreadedNodeUpdater::new(4), WaitTimer::new(true));

timer_test_with(MultiThreadedNodeUpdater::new(4), WaitTimer::new(false));

timer_test_with(SingleThreadedNodeUpdater::new(Some(100)), WaitTimer::new(true));

timer_test_with(
SingleThreadedNodeUpdater::new(Some(100)),
WaitTimer::new(true),
);

timer_test_with(SingleThreadedNodeUpdater::new(Some(100)), PollTimer::new());

// This combination cannot work, since the single execution thread is blocked by the timer.
// This combination cannot work, since the single execution thread is blocked by the timer.
// timer_test_with(SingleThreadedNodeUpdater::new(), WaitTimer::new(false));

// This combination cannot work, since with multiple workers, a the execution unit sleeps without written outputs.
// timer_test_with(MultiThreadedNodeUpdater::new(4), TimeSliceTimer::new());
}

#[test]
fn should_deserialize_serialize() {
let change_observer: ChangeObserver = ChangeObserver::new();
let timer = WaitTimer::new(true);
let node = TimerNode::new(timer, Some(true), Some(&change_observer));

let expected = r#"{"timer":{"own_thread":true,"_marker":null},"config_input":null,"token_input":null,"token_output":null,"token_object":true}"#;
let actual = serde_json::to_string(&node).unwrap();

assert_eq!(expected, actual);

let res = serde_json::from_str::<TimerNode<WaitTimer<bool>, bool>>(expected);
let expected;
match res {
Ok(val) => expected = val,
Err(e) => panic!("{}", e),
}
let actual = node.config_input.clone();

assert_eq!(
serde_json::to_string(&expected.config_input.clone()).unwrap(),
serde_json::to_string(&actual).unwrap()
);

let timer = PollTimer::new();
let node = TimerNode::new(timer, Some(true), Some(&change_observer));

let expected = r#"{"timer":{"every":{"secs":0,"nanos":0},"_marker":null},"config_input":null,"token_input":null,"token_output":null,"token_object":true}"#;
let actual = serde_json::to_string(&node).unwrap();

assert_eq!(expected, actual);

let res = serde_json::from_str::<TimerNode<PollTimer<bool>, bool>>(expected);
let expected;
match res {
Ok(val) => expected = val,
Err(e) => panic!("{}", e),
}
let actual = node.config_input.clone();

assert_eq!(
serde_json::to_string(&expected.config_input.clone()).unwrap(),
serde_json::to_string(&actual).unwrap()
);
}
}

0 comments on commit e07de73

Please sign in to comment.