diff --git a/bindings/ceylon/src/agent/agent.rs b/bindings/ceylon/src/agent/agent.rs index 202e27ca..3d9ad2c8 100644 --- a/bindings/ceylon/src/agent/agent.rs +++ b/bindings/ceylon/src/agent/agent.rs @@ -26,13 +26,13 @@ pub struct AgentCore { _workspace_id: Option, _processor: Arc>>, _on_message: Arc>>, - rx_0: Arc>>>, - tx_0: tokio::sync::mpsc::Sender>, + rx_0: Arc>>, + tx_0: tokio::sync::mpsc::Sender, } impl AgentCore { pub fn new(name: String, is_leader: bool, on_message: Arc, processor: Arc) -> Self { - let (tx_0, rx_0) = tokio::sync::mpsc::channel::>(100); + let (tx_0, rx_0) = tokio::sync::mpsc::channel::(100); let id = uuid::Uuid::new_v4().to_string(); Self { _name: name, @@ -67,14 +67,14 @@ impl AgentCore { } pub async fn broadcast(&self, message: Vec) { - self.tx_0.send(message).await.unwrap(); + self.tx_0.send(Message::data(self._name.clone(), self._id.clone(), message)).await.unwrap(); } } impl AgentCore { pub(crate) async fn start(&self, topic: String, url: String, inputs: HashMap) { let agent_name = self._name.clone(); - let (tx_0, rx_0) = tokio::sync::mpsc::channel::>(100); + let (tx_0, rx_0) = tokio::sync::mpsc::channel::(100); let (mut node_0, mut rx_o_0) = create_node(agent_name.clone(), true, rx_0); let on_message = self._on_message.clone(); diff --git a/bindings/ceylon/src/agent/workspace.rs b/bindings/ceylon/src/agent/workspace.rs index 73f34eb1..6d433a6b 100644 --- a/bindings/ceylon/src/agent/workspace.rs +++ b/bindings/ceylon/src/agent/workspace.rs @@ -39,7 +39,7 @@ impl Workspace { pub async fn run(&self, inputs: HashMap) { debug!("Workspace {} running", self.id); - let rt = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let mut tasks = vec![]; let _inputs = inputs.clone(); for agent in self._agents.iter() { diff --git a/bindings/ceylon/src/ceylon.udl b/bindings/ceylon/src/ceylon.udl index 03b59467..17f0d73b 100644 --- a/bindings/ceylon/src/ceylon.udl +++ b/bindings/ceylon/src/ceylon.udl @@ -23,7 +23,8 @@ dictionary Message{ bytes data; string message; u64 time; - string from; + string originator; + string originator_id; MessageType type; }; diff --git a/bindings/ceylon/tests/ceylon_agent_test.py b/bindings/ceylon/tests/ceylon_agent_test.py index f7e2c9bd..d86321cc 100644 --- a/bindings/ceylon/tests/ceylon_agent_test.py +++ b/bindings/ceylon/tests/ceylon_agent_test.py @@ -14,7 +14,7 @@ def __init__(self, name, is_leader): async def on_message(self, agent_id, message): if message.type == MessageType.MESSAGE: dt = bytes(message.data) - print(dt.decode("utf-8")) + print(self.id(), self.name(), dt.decode("utf-8"), message.originator_id, message.originator) # message = json.loads(str(message, "utf-8")) # if message and message.get("type") == "Message": # print(self.name(), message["from"]) diff --git a/libs/sangedama/src/node/node.rs b/libs/sangedama/src/node/node.rs index 420bed8a..deb79c83 100644 --- a/libs/sangedama/src/node/node.rs +++ b/libs/sangedama/src/node/node.rs @@ -51,29 +51,31 @@ pub struct Message { pub data: Vec, pub message: String, pub time: u64, - pub from: String, + pub originator: String, + pub originator_id: String, pub r#type: MessageType, } impl Message { - fn new(from: String, message: String, data: Vec, message_type: MessageType) -> Self { + fn new(originator: String, originator_id: String, message: String, data: Vec, message_type: MessageType) -> Self { Self { data, time: SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_millis() as u64, - from, + originator, + originator_id, r#type: message_type, message, } } - fn event(from: String, event: EventType) -> Self { - Self::new(from, event.as_str().to_string(), vec![], MessageType::Event) + fn event(originator: String, event: EventType) -> Self { + Self::new(originator, "SELF".to_string(), event.as_str().to_string(), vec![], MessageType::Event) } - fn data(from: String, data: Vec) -> Self { - Self::new(from, "".to_string(), data, MessageType::Message) + pub fn data(from: String, originator_id: String, data: Vec) -> Self { + Self::new(from, originator_id, "DATA-MESSAGE".to_string(), data, MessageType::Message) } fn to_json(&self) -> String { @@ -112,8 +114,9 @@ pub struct Node { is_leader: bool, subscribed_topics: Vec, - in_rx: mpsc::Receiver>, + in_rx: mpsc::Receiver, out_tx: mpsc::Sender, + id: String, } impl Node { @@ -138,7 +141,7 @@ impl Node { } } - pub fn broadcast(&mut self, message: Vec) -> Result, PublishError> { + pub fn broadcast(&mut self, message: Message) -> Result, PublishError> { let mut message_ids = vec![]; for topic in self.subscribed_topics.clone() { let topic = gossipsub::IdentTopic::new(topic); @@ -147,7 +150,7 @@ impl Node { .swarm .behaviour_mut() .gossipsub - .publish(topic, message.clone()) + .publish(topic, message.to_json().as_bytes()) { Ok(id) => { message_ids.push(id); @@ -169,7 +172,7 @@ impl Node { select! { message = self.in_rx.recv() => match message { Some(message) => { - debug!("{:?} Received To Broadcast: {:?}", self.name, String::from_utf8_lossy(&message)); + debug!("{:?} Received To Broadcast", self.name); match self.broadcast(message){ Ok(message_ids) => { debug!("{:?} Broadcasted message: {:?}", self.name, message_ids); @@ -188,16 +191,16 @@ impl Node { event = self.swarm.select_next_some() => match event { SwarmEvent::NewListenAddr { address, .. } => { debug!("{:?} Listening on {:?}", self.name, address); - self.pass_message_to_node(Message::event( self.swarm.local_peer_id().to_string(),EventType::OnListen,)).await + self.pass_message_to_node(Message::event(self.swarm.local_peer_id().to_string(),EventType::OnListen,)).await }, SwarmEvent::ConnectionEstablished { peer_id, .. } => { debug!("{:?} Connected to {:?}", self.name, peer_id); - self.pass_message_to_node(Message::event( self.swarm.local_peer_id().to_string(),EventType::OnConnectionEstablished,)).await + self.pass_message_to_node(Message::event(self.swarm.local_peer_id().to_string(),EventType::OnConnectionEstablished,)).await }, SwarmEvent::ConnectionClosed { peer_id, .. } => { debug!("{:?} Disconnected from {:?}", self.name, peer_id); - self.pass_message_to_node(Message::event( self.swarm.local_peer_id().to_string(),EventType::OnConnectionClosed ,)).await + self.pass_message_to_node(Message::event(self.swarm.local_peer_id().to_string(),EventType::OnConnectionClosed ,)).await }, SwarmEvent::Behaviour(Event::Gossipsub(event)) => { @@ -206,8 +209,7 @@ impl Node { match event { gossipsub::Event::Message { propagation_source, message_id, message } => { debug!("{:?} Received message '{:?}' from {:?} on {:?}", self.name, String::from_utf8_lossy(&message.data), propagation_source, message_id); - - let msg = Message::data(self.name.clone(), message.data.clone()); + let msg = serde_json::from_slice(message.data.as_slice()).unwrap(); self.pass_message_to_node(msg).await }, @@ -254,7 +256,7 @@ impl Node { pub fn create_node( name: String, is_leader: bool, - in_rx: mpsc::Receiver>, + in_rx: mpsc::Receiver, ) -> (Node, mpsc::Receiver) { let swarm = SwarmBuilder::with_new_identity() .with_tokio() @@ -303,6 +305,7 @@ pub fn create_node( ( Node { name, + id: swarm.local_peer_id().to_string(), swarm, is_leader, subscribed_topics: Vec::new(), @@ -319,7 +322,7 @@ mod tests { use log::{debug, info, trace, warn}; use serde_json::json; - use crate::node::node::create_node; + use crate::node::node::{create_node, Message}; #[test] fn test_ping() { @@ -329,11 +332,14 @@ mod tests { let url = format!("/ip4/0.0.0.0/tcp/{}", port_id); - let (tx_0, mut rx_0) = tokio::sync::mpsc::channel::>(100); - let (tx_1, mut rx_1) = tokio::sync::mpsc::channel::>(100); + let (tx_0, mut rx_0) = tokio::sync::mpsc::channel::(100); + let (tx_1, mut rx_1) = tokio::sync::mpsc::channel::(100); let (mut node_0, mut rx_o_0) = create_node("node_0".to_string(), true, rx_0); let (mut node_1, mut rx_o_1) = create_node("node_1".to_string(), false, rx_1); + + let node_0_id = node_0.id.clone(); + let node_1_id = node_1.id.clone(); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -343,14 +349,13 @@ mod tests { runtime.spawn(async move { while let Some(message_data) = rx_o_0.recv().await { debug!("Node_0 Received: {:?}", message_data); - tx_0.send( - json!({ - "data": format!("Hi from Node_1: {}", message_data.message).as_str(), + let msg = Message::data("node_0".to_string(),node_0_id.clone(), json!({ + "data": format!("Hi from Node_0: {}", message_data.message).as_str(), }) - .to_string() - .as_bytes() - .to_vec(), - ) + .to_string() + .as_bytes() + .to_vec()); + tx_0.send(msg) .await .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(3000)).await; @@ -359,15 +364,14 @@ mod tests { runtime.spawn(async move { while let Some(message_data) = rx_o_1.recv().await { - debug!("Node_0 Received: {:?}", message_data); - tx_1.send( - json!({ + debug!("Node_1 Received: {:?}", message_data); + let msg = Message::data("node_1".to_string(), node_1_id.clone(), json!({ "data": format!("Hi from Node_1: {}", message_data.message).as_str(), }) - .to_string() - .as_bytes() - .to_vec(), - ) + .to_string() + .as_bytes() + .to_vec()); + tx_1.send(msg) .await .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(3000)).await;