Skip to content

Commit

Permalink
Merge pull request #2 from flow-rs/feature-wasm
Browse files Browse the repository at this point in the history
Feature wasm
  • Loading branch information
FWuermse committed Aug 15, 2023
2 parents 7905db6 + f569f58 commit 577acd1
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ serde_json = "1.0.100"
thiserror = "1.0.44"
anyhow = "1.0"
crossbeam-channel = "0.5.8"
web-time = "0.2.0"

[dev-dependencies]
wasm-bindgen-test = "0.3.37"
14 changes: 9 additions & 5 deletions flowrs_derive/src/connectable.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use core::panic;
use proc_macro::TokenStream;
use syn::{
Arm, DataStruct, DeriveInput, Field, Generics, Type, WherePredicate,
};
use syn::{Arm, DataStruct, DeriveInput, Field, Generics, Type, WherePredicate};

pub fn impl_connectable_trait(ast: DeriveInput) -> TokenStream {
let struct_ident = ast.clone().ident;
Expand Down Expand Up @@ -44,8 +42,14 @@ pub fn impl_connectable_trait(ast: DeriveInput) -> TokenStream {
arm_ast
})
.collect::<Vec<Arm>>();
let (_, ty_generics, _) = ast.generics.split_for_impl();
let generic_bounds = get_generic_bounds(ast.clone().generics);
let (_, ty_generics, where_clause) = ast.generics.split_for_impl();
let mut generic_bounds = get_generic_bounds(ast.clone().generics);
if let Some(existing_bounds) = where_clause {
let pairs = existing_bounds.predicates.pairs();
for pair in pairs {
generic_bounds.push(pair.into_tuple().0.clone());
}
}
quote::quote! {
impl #ty_generics flowrs::connection::RuntimeConnectable for #struct_ident #ty_generics
where
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ pub use self::flow::flow as flow_impl;
pub use self::flow::version;

pub use self::sched::scheduler;

pub use flowrs_derive::RuntimeConnectable;
47 changes: 47 additions & 0 deletions src/nodes/connection.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use serde::{Serialize, Serializer, Deserialize, Deserializer, de::IgnoredAny};

use crate::node::{ChangeObserver, Node, ReceiveError, SendError};
use std::{
any::Any,
Expand Down Expand Up @@ -54,12 +56,46 @@ impl<I> Edge<I> {

pub type Input<I> = Edge<I>;

impl<T> Serialize for Edge<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer {
serializer.serialize_unit()
}
}

impl<'de, T> Deserialize<'de> for Edge<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de> {
deserializer.deserialize_any(IgnoredAny).unwrap();
Ok(Self::new())
}
}

#[derive(Clone)]
pub struct Output<T> {
edge: Arc<Mutex<Option<Edge<T>>>>,
change_notifier: Option<Sender<bool>>,
}

impl<T> Serialize for Output<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer {
serializer.serialize_unit()
}
}

impl<'de, T> Deserialize<'de> for Output<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de> {
deserializer.deserialize_any(IgnoredAny).unwrap();
Ok(Self::new(None))
}
}

impl<O> Output<O> {
pub fn new(change_observer: Option<&ChangeObserver>) -> Self {
let change_notifier = change_observer.map(|observer| observer.notifier.clone());
Expand All @@ -69,6 +105,17 @@ impl<O> Output<O> {
}
}

pub fn set_sender(mut self, edge: Edge<O>) -> Self {
self.edge = Arc::new(Mutex::new(Some(edge)));
self
}

pub fn set_observer(mut self, change_observer: &ChangeObserver) -> Self {
let change_notifier = change_observer.notifier.clone();
self.change_notifier = Some(change_notifier);
self
}

pub fn send(&mut self, elem: O) -> Result<(), SendError> {
let _res = self
.edge
Expand Down
19 changes: 18 additions & 1 deletion src/nodes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ pub enum UpdateError {
message: String,
},

#[error("SendError error. Message: {message:?}")]
SendError {
message: String,
},

#[error("RecvError error. Message: {message:?}")]
RecvError {
message: String,
Expand All @@ -126,4 +131,16 @@ pub enum UpdateError {

#[error(transparent)]
Other(#[from] anyhow::Error)
}
}

impl From<SendError> for UpdateError {
fn from(value: SendError) -> Self {
UpdateError::SendError { message: value.to_string() }
}
}

impl From<ReceiveError> for UpdateError {
fn from(value: ReceiveError) -> Self {
UpdateError::SendError { message: value.to_string() }
}
}
2 changes: 1 addition & 1 deletion src/sched/round_robin.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Instant;
use web_time::Instant;

use crate::scheduler::{Scheduler, SchedulingInfo};

Expand Down
2 changes: 1 addition & 1 deletion src/sched/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use web_time::Duration;

#[derive(Default)]
pub struct SchedulingInfo {
Expand Down
2 changes: 1 addition & 1 deletion tests/nodes/test_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use flowrs::{
node::{ChangeObserver, Node, UpdateError},
};

use flowrs_derive::RuntimeConnectable;
use flowrs::RuntimeConnectable;

#[derive(Clone)]
enum AddNodeState<I1, I2> {
Expand Down
2 changes: 1 addition & 1 deletion tests/sched/test_execution.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Error;
use flowrs::connection::{Input, Output};
use flowrs::node::{ChangeObserver, InitError, Node, UpdateError};
use flowrs_derive::RuntimeConnectable;
use flowrs::RuntimeConnectable;

use std::fs::File;

Expand Down
17 changes: 8 additions & 9 deletions tests/sched/test_proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use flowrs::{
connection::{Input, Output},
node::{ChangeObserver, Node},
};
use flowrs_derive::RuntimeConnectable;
use flowrs::RuntimeConnectable;

#[derive(RuntimeConnectable)]
pub struct PassthroughNode {
Expand Down Expand Up @@ -58,7 +58,7 @@ mod test_execution {
connection::{Input, Output, RuntimeConnectable},
node::{ChangeObserver, Node},
};
use flowrs_derive::RuntimeConnectable;
use flowrs::RuntimeConnectable;

use crate::sched::test_proc_macro::{MultiNode, PassthroughNode};

Expand Down Expand Up @@ -207,7 +207,6 @@ mod test_execution {
#[test]
fn should_return_multiple_input_output_at_runtime() {
let change_observer: ChangeObserver = ChangeObserver::new();

let nested_generic_node: MultiNode<[u8; 20]> = MultiNode::new(&change_observer);
let input1: Result<_, Rc<dyn Any>> =
nested_generic_node.input_at(0).downcast::<Input<i32>>();
Expand All @@ -231,25 +230,25 @@ mod test_execution {
fn should_fail_on_mult_input_out_of_bounds() {
let change_observer: ChangeObserver = ChangeObserver::new();

let passthrough_node: MultiNode<u8> = MultiNode::new(&change_observer);
passthrough_node.input_at(2);
let multi_node: MultiNode<Vec<u8>> = MultiNode::new(&change_observer);
multi_node.input_at(2);
}

#[test]
#[should_panic(expected = "Index 4 out of bounds for MultiNode with output len 3.")]
fn should_fail_on_mult_output_out_of_bounds() {
let change_observer: ChangeObserver = ChangeObserver::new();

let passthrough_node: MultiNode<u8> = MultiNode::new(&change_observer);
passthrough_node.output_at(4);
let multi_node: MultiNode<Vec<u8>> = MultiNode::new(&change_observer);
multi_node.output_at(4);
}

#[test]
#[should_panic(expected = "Index 3 out of bounds for MultiNode with output len 3.")]
fn should_fail_on_mult_output_out_of_bounds_at_three() {
let change_observer: ChangeObserver = ChangeObserver::new();

let passthrough_node: MultiNode<u8> = MultiNode::new(&change_observer);
passthrough_node.output_at(3);
let multi_node: MultiNode<Vec<u8>> = MultiNode::new(&change_observer);
multi_node.output_at(3);
}
}

0 comments on commit 577acd1

Please sign in to comment.