Skip to content

Commit 4833b57

Browse files
191220029A-Mavericks
authored andcommitted
Node
node
1 parent ccf29b3 commit 4833b57

File tree

14 files changed

+519
-12
lines changed

14 files changed

+519
-12
lines changed

.github/workflows/base.yml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
with:
2727
node-version: 16
2828

29-
test:
29+
check_fmt:
3030
name: Fmt Check
3131
runs-on: ubuntu-latest
3232
steps:
@@ -41,6 +41,21 @@ jobs:
4141
command: fmt
4242
args: --check
4343

44+
test:
45+
name: Unit test & Doc test
46+
runs-on: ubuntu-latest
47+
steps:
48+
- uses: actions/checkout@v2
49+
- uses: actions-rs/toolchain@v1
50+
with:
51+
profile: minimal
52+
toolchain: stable
53+
override: true
54+
- uses: actions-rs/cargo@v1
55+
with:
56+
command: test
57+
args: --all
58+
4459
# test:
4560
# name: Test Suite
4661
# runs-on: ubuntu-latest

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ members = ["."]
1616
tokio = { version = "1.28", features = ["rt", "sync", "rt-multi-thread"] }
1717
log = "0.4"
1818
env_logger = "0.10.1"
19+
async-trait = "0.1.83"
1920

2021
[dev-dependencies]
2122
simplelog = "0.12"

src/connection/out_channel.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,9 @@ impl OutChannels {
4545
}
4646

4747
/// # Output Channel
48-
/// Wrapper of senders of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will
48+
/// Wrapper of senderrs of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will
4949
/// decide the inner type of channel when building the graph.
50-
/// ## Implements
51-
/// - `blocking_send`: sends the message, blocked if no capacity left in the channel. Returns `Ok()`
52-
/// if message sent; returns `Err(SendErr)` if error occurs.
53-
/// - `send`: sends the message, waiting until there is capacity asynchronously. Returns `Ok()`
54-
/// if message sent; returns `Err(SendErr)` if error occurs.
50+
/// Learn more about [Tokio Channels](https://tokio.rs/tokio/tutorial/channels).
5551
enum OutChannel {
5652
/// Sender of a `tokio::sync::mpsc` channel.
5753
Mpsc(mpsc::Sender<Content>),

src/graph/mod.rs

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/graph/node.rs

Lines changed: 0 additions & 2 deletions
This file was deleted.

src/lib.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,15 @@
1-
mod connection;
2-
pub mod graph;
1+
pub mod connection;
2+
pub mod node;
3+
pub mod utils;
4+
5+
pub use connection::{
6+
in_channel::{InChannels, RecvErr},
7+
information_packet::Content,
8+
out_channel::{OutChannels, SendErr},
9+
};
10+
pub use node::{
11+
action::{Action, EmptyAction},
12+
default_node::DefaultNode,
13+
node::*,
14+
};
15+
pub use utils::{env::EnvVar, output::Output};

src/node/action.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use std::sync::Arc;
2+
3+
use async_trait::async_trait;
4+
5+
use crate::{
6+
connection::{in_channel::InChannels, out_channel::OutChannels},
7+
utils::{env::EnvVar, output::Output},
8+
};
9+
10+
/// Node specific behavior
11+
///
12+
/// [`Action`] stores the specific execution logic of a task.
13+
///
14+
/// # Example
15+
/// An implementation of [`Action`]: `HelloAction`, having private
16+
/// fields `statement` and `repeat`.
17+
///
18+
/// ```rust
19+
/// use std::sync::Arc;
20+
/// use dagrs::{Action, EnvVar, Output, InChannels, OutChannels};
21+
/// use async_trait::async_trait;
22+
///
23+
/// struct HelloAction{
24+
/// statement: String,
25+
/// repeat: usize,
26+
/// }
27+
///
28+
/// #[async_trait]
29+
/// impl Action for HelloAction{
30+
/// async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output{
31+
/// for i in 0..self.repeat {
32+
/// println!("{}",self.statement);
33+
/// }
34+
/// Output::empty()
35+
/// }
36+
/// }
37+
///
38+
/// let hello=HelloAction {
39+
/// statement: "hello world!".to_string(),
40+
/// repeat: 10
41+
/// };
42+
///
43+
/// ```
44+
#[async_trait]
45+
pub trait Action: Send + Sync {
46+
async fn run(
47+
&self,
48+
in_channels: &mut InChannels,
49+
out_channels: &OutChannels,
50+
env: Arc<EnvVar>,
51+
) -> Output;
52+
}
53+
54+
/// An empty implementaion of [`Action`].
55+
///
56+
/// Used as a placeholder when creating a `Node` without `Action`.
57+
pub struct EmptyAction;
58+
#[async_trait]
59+
impl Action for EmptyAction {
60+
async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
61+
Output::Out(None)
62+
}
63+
}

src/node/default_node.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use std::sync::Arc;
2+
3+
use crate::{
4+
connection::{in_channel::InChannels, out_channel::OutChannels},
5+
utils::{env::EnvVar, output::Output},
6+
};
7+
8+
use super::{
9+
action::{Action, EmptyAction},
10+
node::{Node, NodeId, NodeName, NodeTable},
11+
};
12+
13+
/// # Default node type
14+
///
15+
/// [`DefaultNode`] is a default implementation of the [`Node`] trait. Users can use this node
16+
/// type to build tasks to meet most needs.
17+
///
18+
/// ## Create a `DefaultNode`:
19+
/// - use the method `new`. Required attributes: node's name; [`NodeTable`](for id allocation).
20+
///
21+
/// ```rust
22+
/// use dagrs::{NodeName, NodeTable, DefaultNode};
23+
///
24+
/// let node_name = "Node X";
25+
/// let mut node_table = NodeTable::new();
26+
/// let mut node = DefaultNode::new(
27+
/// NodeName::from(node_name),
28+
/// &mut node_table,
29+
/// );
30+
/// ```
31+
///
32+
/// - use the method `with_action`. Required attributes: node's name; [`NodeTable`](for id allocation);
33+
/// execution logic [`Action`].
34+
///
35+
/// ```rust
36+
/// use dagrs::{NodeName, NodeTable, DefaultNode, EmptyAction};
37+
///
38+
/// let node_name = "Node X";
39+
/// let mut node_table = NodeTable::new();
40+
/// let mut node = DefaultNode::with_action(
41+
/// NodeName::from(node_name),
42+
/// Box::new(EmptyAction),
43+
/// &mut node_table,
44+
/// );
45+
/// ```
46+
pub struct DefaultNode {
47+
id: NodeId,
48+
name: NodeName,
49+
action: Box<dyn Action>,
50+
in_channels: InChannels,
51+
out_channels: OutChannels,
52+
}
53+
54+
impl Node for DefaultNode {
55+
fn id(&self) -> NodeId {
56+
self.id.clone()
57+
}
58+
59+
fn name(&self) -> NodeName {
60+
self.name.clone()
61+
}
62+
63+
fn input_channels(&mut self) -> &mut InChannels {
64+
&mut self.in_channels
65+
}
66+
67+
fn output_channels(&mut self) -> &mut OutChannels {
68+
&mut self.out_channels
69+
}
70+
71+
fn run(&mut self, env: Arc<EnvVar>) -> Output {
72+
tokio::runtime::Runtime::new().unwrap().block_on(async {
73+
self.action
74+
.run(&mut self.in_channels, &self.out_channels, env)
75+
.await
76+
})
77+
}
78+
}
79+
80+
impl DefaultNode {
81+
pub fn new(name: NodeName, node_table: &mut NodeTable) -> Self {
82+
Self {
83+
id: node_table.alloc_id_for(&name),
84+
name,
85+
action: Box::new(EmptyAction),
86+
in_channels: InChannels::default(),
87+
out_channels: OutChannels::default(),
88+
}
89+
}
90+
91+
pub fn with_action(
92+
name: NodeName,
93+
action: Box<dyn Action>,
94+
node_table: &mut NodeTable,
95+
) -> Self {
96+
Self {
97+
id: node_table.alloc_id_for(&name),
98+
name,
99+
action,
100+
in_channels: InChannels::default(),
101+
out_channels: OutChannels::default(),
102+
}
103+
}
104+
}
105+
106+
#[cfg(test)]
107+
mod test_default_node {
108+
109+
use std::sync::Arc;
110+
111+
use crate::{Content, EnvVar, InChannels, Node, NodeName, NodeTable, OutChannels, Output};
112+
113+
use super::{Action, DefaultNode};
114+
115+
use async_trait::async_trait;
116+
117+
/// An implementation of [`Action`] that returns [`Output::Out`] containing a String "Hello world".
118+
#[derive(Default)]
119+
pub struct HelloAction;
120+
#[async_trait]
121+
impl Action for HelloAction {
122+
async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
123+
Output::Out(Some(Content::new("Hello world".to_string())))
124+
}
125+
}
126+
127+
impl HelloAction {
128+
pub fn new() -> Box<Self> {
129+
Box::new(Self::default())
130+
}
131+
}
132+
133+
/// Test for create a default node.
134+
///
135+
/// Step 1: create a [`DefaultNode`] with [`HelloAction`].
136+
///
137+
/// Step 2: run the node and verify its output.
138+
#[test]
139+
fn create_default_node() {
140+
let node_name = "Test Node";
141+
142+
let mut node_table = NodeTable::new();
143+
let mut node = DefaultNode::with_action(
144+
NodeName::from(node_name),
145+
HelloAction::new(),
146+
&mut node_table,
147+
);
148+
149+
// Check if node table has key-value pair (node.name, node.id)
150+
assert_eq!(node_table.get(node_name).unwrap(), &node.id());
151+
152+
let env = Arc::new(EnvVar::new(node_table));
153+
let out = node.run(env).get_out().unwrap();
154+
let out: &String = out.get().unwrap();
155+
assert_eq!(out, "Hello world");
156+
}
157+
}

src/node/id_allocate.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use std::sync::atomic::AtomicUsize;
2+
3+
use super::node::NodeId;
4+
5+
/// IDAllocator for Node.
6+
struct IDAllocator {
7+
id: AtomicUsize,
8+
}
9+
10+
impl IDAllocator {
11+
fn alloc(&self) -> NodeId {
12+
let origin = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
13+
if origin > self.id.load(std::sync::atomic::Ordering::Relaxed) {
14+
panic!("Too many tasks.")
15+
} else {
16+
NodeId(origin)
17+
}
18+
}
19+
}
20+
21+
/// The global task uniquely identifies an instance of the allocator.
22+
static ID_ALLOCATOR: IDAllocator = IDAllocator {
23+
id: AtomicUsize::new(1),
24+
};
25+
26+
/// Assign node's id.
27+
pub(crate) fn alloc_id() -> NodeId {
28+
ID_ALLOCATOR.alloc()
29+
}

src/node/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pub mod action;
2+
pub mod default_node;
3+
pub mod id_allocate;
4+
pub mod node;

0 commit comments

Comments
 (0)