diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index a7bbfd2..13ab8fd 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -47,7 +47,7 @@ where peer_ssb_id: String, active_request: Option, ) -> Result { - trace!(target: "ebt-handler", "Received MUXRPC input: {:?}", op); + trace!(target: "muxrpc-ebt-handler", "Received MUXRPC input: {:?}", op); // An outbound EBT replicate request was made before the handler was // called; add it to the map of active requests. @@ -88,8 +88,7 @@ where } // Handle an incoming MUXRPC response. RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { - self.recv_rpc_response(api, ch_broker, *req_no, res, peer_ssb_id) - .await + self.recv_rpc_response(ch_broker, *req_no, res).await } // Handle an incoming MUXRPC 'cancel stream' response. RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => { @@ -119,21 +118,6 @@ where } _ => Ok(false), }, - /* - RpcInput::Message(msg) => { - if let Some(kv_event) = msg.downcast_ref::() { - match kv_event { - // Notification from the key-value store indicating that - // a new message has just been appended to the feed - // identified by `id`. - StoreKvEvent::IdChanged(id) => { - return self.recv_storageevent_idchanged(api, id).await - } - } - } - Ok(false) - } - */ _ => Ok(false), } } @@ -203,56 +187,39 @@ where Ok(false) } - /// Process an incoming MUXRPC response. The response is expected to - /// contain a vector clock or an SSB message. + /// Process an incoming MUXRPC response. + /// The response is expected to contain an SSB message. async fn recv_rpc_response( &mut self, - _api: &mut ApiCaller, ch_broker: &mut ChBrokerSend, req_no: ReqNo, res: &[u8], - peer_ssb_id: String, ) -> Result { + trace!(target: "ebt-handler", "Received RPC response: {}", req_no); + // Only handle the response if the associated request number is known // to us, either because we sent or received the initiating replicate // request. if self.active_requests.contains_key(&req_no) { - // The response may be a vector clock (aka. notes) or an SSB message. - // - // Since there is no explicit way to determine which was received, - // we first attempt deserialization of a vector clock and move on - // to attempting message deserialization if that fails. + // First try to deserialize the response into a message value. + // If that fails, try to deserialize into a message KVT and then + // convert that into a message value. Return an error if that fails. + // This approach allows us to handle the unlikely event that + // messages are sent as KVTs and not simply values. // - // TODO: Is matching on clock here redundant? - // We are already matching on `OtherRequest` in the handler. - if let Ok(clock) = serde_json::from_slice(res) { - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)), - )) - .await?; - } else { - // First try to deserialize the response into a message value. - // If that fails, try to deserialize into a message KVT and then - // convert that into a message value. Return an error if that fails. - // This approach allows us to handle the unlikely event that - // messages are sent as KVTs and not simply values. - // - // Validation of the message signature and fields is also performed - // as part of the call to `from_slice`. - let msg = match Message::from_slice(res) { - Ok(msg) => msg, - Err(_) => MessageKvt::from_slice(res)?.into_message()?, - }; - - ch_broker - .send(BrokerEvent::new( - Destination::Broadcast, - BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)), - )) - .await?; - } + // Validation of the message signature and fields is also performed + // as part of the call to `from_slice`. + let msg = match Message::from_slice(res) { + Ok(msg) => msg, + Err(_) => MessageKvt::from_slice(res)?.into_message()?, + }; + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)), + )) + .await?; } Ok(false) @@ -263,6 +230,7 @@ where async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { api.rpc().send_stream_eof(-req_no).await?; self.active_requests.remove(&req_no); + Ok(true) } @@ -277,22 +245,6 @@ where } /* - /// Extract blob references from post-type messages. - fn extract_blob_refs(&mut self, msg: &Message) -> Vec { - let mut refs = Vec::new(); - - let msg = serde_json::from_value(msg.content().clone()); - - if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg { - for cap in BLOB_REGEX.captures_iter(&text) { - let key = cap.get(0).unwrap().as_str().to_owned(); - refs.push(key); - } - } - - refs - } - /// Process an incoming MUXRPC response. The response is expected to /// contain an SSB message. async fn recv_rpc_response( @@ -365,28 +317,4 @@ where } } */ - - /* - /// Respond to a key-value store state change for the given peer. - /// This is triggered when a new message is appended to the local feed. - /// Remove the peer from the list of active streams, send the requested - /// messages from the local feed to the peer and then reinsert the public - /// key of the peer to the list of active streams. - async fn recv_storageevent_idchanged( - &mut self, - api: &mut ApiCaller, - id: &str, - ) -> Result { - // Attempt to remove the peer from the list of active streams. - if let Some(mut req) = self.reqs.remove(id) { - // Send local messages to the peer. - self.send_history(api, &mut req).await?; - // Reinsert the peer into the list of active streams. - self.reqs.insert(id.to_string(), req); - Ok(true) - } else { - Ok(false) - } - } - */ } diff --git a/solar/src/actors/muxrpc/history_stream.rs b/solar/src/actors/muxrpc/history_stream.rs index 5fcae43..406f5d7 100644 --- a/solar/src/actors/muxrpc/history_stream.rs +++ b/solar/src/actors/muxrpc/history_stream.rs @@ -85,11 +85,11 @@ where self.recv_error_response(api, *req_no, err).await } // Handle a broker message. - RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(id))) => { + RpcInput::Message(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) => { // Notification from the key-value store indicating that // a new message has just been appended to the feed - // identified by `id`. - return self.recv_storageevent_idchanged(api, id).await; + // identified by `ssb_id`. + return self.recv_storageevent_idchanged(api, ssb_id).await; } // Handle a timer event. RpcInput::Timer => self.on_timer(api).await, @@ -315,14 +315,14 @@ where async fn recv_storageevent_idchanged( &mut self, api: &mut ApiCaller, - id: &str, + ssb_id: &str, ) -> Result { // Attempt to remove the peer from the list of active streams. - if let Some(mut req) = self.reqs.remove(id) { + if let Some(mut req) = self.reqs.remove(ssb_id) { // Send local messages to the peer. self.send_history(api, &mut req).await?; // Reinsert the peer into the list of active streams. - self.reqs.insert(id.to_string(), req); + self.reqs.insert(ssb_id.to_string(), req); Ok(true) } else { Ok(false) diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index 7172a8a..6432f6a 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -34,7 +34,8 @@ use crate::{ broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER}, config::PEERS_TO_REPLICATE, node::{BLOB_STORE, KV_STORE}, - Result, + storage::kv::StoreKvEvent, + Error, Result, }; /// EBT replication events. @@ -71,7 +72,7 @@ impl Display for SessionRole { #[derive(Debug)] pub struct EbtManager { /// Active EBT peer sessions. - active_sessions: HashSet, + active_sessions: HashMap, /// Duration to wait before switching feed request to a different peer. _feed_wait_timeout: u64, /// The state of the replication loop. @@ -89,12 +90,20 @@ pub struct EbtManager { _requested_feeds: HashSet, /// Duration to wait for a connected peer to initiate an EBT session. session_wait_timeout: u64, + /// The latest vector clock sent for each session, identified by the + /// request number. + // + // TODO: Do we want to remove each entry when the session concludes? + sent_clocks: HashMap, + /// The sequence number of the latest message sent to each peer + /// for each requested feed. + sent_messages: HashMap>, } impl Default for EbtManager { fn default() -> Self { EbtManager { - active_sessions: HashSet::new(), + active_sessions: HashMap::new(), _feed_wait_timeout: 3, _is_replication_loop_active: false, local_clock: HashMap::new(), @@ -102,6 +111,8 @@ impl Default for EbtManager { peer_clocks: HashMap::new(), _requested_feeds: HashSet::new(), session_wait_timeout: 5, + sent_clocks: HashMap::new(), + sent_messages: HashMap::new(), } } } @@ -137,21 +148,52 @@ impl EbtManager { Ok(()) } - /// Retrieve the vector clock for the given SSB ID. - fn _get_clock(self, peer_id: &SsbId) -> Option { - if peer_id == &self.local_id { - Some(self.local_clock) - } else { - self.peer_clocks.get(peer_id).cloned() + /// Retrieve either the local vector clock or the stored vector clock + /// for the peer represented by the given SSB ID. + fn get_clock(&self, ssb_id: Option<&SsbId>) -> Option { + match ssb_id { + Some(id) => self.peer_clocks.get(id).cloned(), + None => Some(self.local_clock.to_owned()), } } /// Set or update the vector clock for the given SSB ID. - fn set_clock(&mut self, peer_id: &SsbId, clock: VectorClock) { - if peer_id == &self.local_id { + fn set_clock(&mut self, ssb_id: &SsbId, clock: VectorClock) { + if ssb_id == &self.local_id { self.local_clock = clock } else { - self.peer_clocks.insert(peer_id.to_owned(), clock); + self.peer_clocks.insert(ssb_id.to_owned(), clock); + } + } + + /// Retrieve the stored vector clock for the first peer, check for the + /// second peer in the vector clock and return the value of the receive + /// flag. + fn _is_receiving(&self, peer_ssb_id: SsbId, ssb_id: SsbId) -> Result { + // Retrieve the vector clock for the first peer. + if let Some(clock) = self.get_clock(Some(&peer_ssb_id)) { + // Check if the second peer is represented in the vector clock. + if let Some(encoded_seq_no) = clock.get(&ssb_id) { + // Check if the receive flag is true. + if let (_replicate_flag, Some(true), _seq) = clock::decode(*encoded_seq_no)? { + return Ok(true); + } + } + } + + Ok(false) + } + + /// Get the sequence number of the latest message sent to the given + /// peer SSB ID for the feed represented by the given SSB ID. + fn _get_latest_sent_seq(self, peer_ssb_id: &SsbId, ssb_id: &SsbId) -> Option { + // Get the state of the messages sent to `peer_ssb_id`. + if let Some(sent_state) = self.sent_messages.get(peer_ssb_id) { + // Get the sequence number of the latest message sent for feed + // `ssb_id`. + sent_state.get(ssb_id).copied() + } else { + None } } @@ -165,8 +207,8 @@ impl EbtManager { self.local_clock.insert(peer_id.to_owned(), encoded_value); } else { // No messages are stored in the local database for this feed. - // Set replicate flag to `true`, receive to `false` and `seq` to 0. - let encoded_value: EncodedClockValue = clock::encode(true, Some(false), Some(0))?; + // Set replicate flag to `true`, receive to `true` and `seq` to 0. + let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(0))?; self.local_clock.insert(peer_id.to_owned(), encoded_value); } @@ -174,10 +216,11 @@ impl EbtManager { } /// Register a new EBT session for the given peer. - fn register_session(&mut self, peer_ssb_id: &SsbId) { - self.active_sessions.insert(peer_ssb_id.to_owned()); + fn register_session(&mut self, peer_ssb_id: &SsbId, req_no: ReqNo, session_role: SessionRole) { + self.active_sessions + .insert(peer_ssb_id.to_owned(), (req_no, session_role)); - trace!(target: "ebt-session", "Registered new EBT session for {}", peer_ssb_id); + trace!(target: "ebt-session", "Registered new EBT session {} for {}", req_no, peer_ssb_id); } /// Remove the given peer from the list of active session. @@ -196,30 +239,63 @@ impl EbtManager { self._requested_feeds.insert(peer_id.to_owned()); } + /// Decode the encoded sequence number from a vector clock and push + /// the latest desired messages to the given vector of messages. + /// + /// This method will only push messages to the vector if the replicate + /// flag is set to `true`. + async fn retrieve_latest_messages( + encoded_seq_no: i64, + feed_id: &SsbId, + messages: &mut Vec, + ) -> Result<()> { + if encoded_seq_no != -1 { + if let (_replicate_flag, Some(true), Some(seq)) = clock::decode(encoded_seq_no)? { + if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { + for n in (seq + 1)..=last_seq { + if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(feed_id, n)? { + messages.push(msg_kvt.value) + } + } + } + } + } + + Ok(()) + } + /// Decode a peer's vector clock and retrieve all requested messages. + /// + /// If an SSB ID is supplied, retrieve only the lastest requested + /// messages authored by that ID. + /// + /// If no SSB ID is supplied, retrieve the latest requested messages + /// for all authors listed in the vector clock. async fn retrieve_requested_messages( - // TODO: Do we need these two parameters? - _req_no: &ReqNo, - _peer_ssb_id: &SsbId, + peer_ssb_id: Option<&SsbId>, clock: VectorClock, ) -> Result> { let mut messages_to_be_sent = Vec::new(); - // Iterate over all key-value pairs in the vector clock. - for (feed_id, encoded_seq_no) in clock.iter() { - if *encoded_seq_no != -1 { - // Decode the encoded vector clock sequence number. - // TODO: Match properly on the values of replicate_flag and receive_flag. - let (_replicate_flag, _receive_flag, sequence) = clock::decode(*encoded_seq_no)?; - if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { - if let Some(seq) = sequence { - for n in seq..(last_seq + 1) { - if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(feed_id, n)? { - messages_to_be_sent.push(msg_kvt.value) - } - } - } - } + // We only want to retrieve messages authored by `peer_ssb_id`. + if let Some(feed_id) = peer_ssb_id { + if let Some(encoded_seq_no) = clock.get(feed_id) { + EbtManager::retrieve_latest_messages( + *encoded_seq_no, + feed_id, + &mut messages_to_be_sent, + ) + .await?; + } + } else { + // We want to retrieve messages for all feeds in the vector clock. + for (feed_id, encoded_seq_no) in clock.iter() { + EbtManager::retrieve_latest_messages( + *encoded_seq_no, + feed_id, + &mut messages_to_be_sent, + ) + .await?; } } @@ -247,7 +323,7 @@ impl EbtManager { // Only proceed with session initiation if there // is no currently active session with the given peer. - if !self.active_sessions.contains(&peer_ssb_id) { + if !self.active_sessions.contains_key(&peer_ssb_id) { trace!( target: "ebt", "Requesting an EBT session with {:?}", @@ -272,7 +348,7 @@ impl EbtManager { ) -> Result<()> { trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); - self.register_session(&peer_ssb_id); + self.register_session(&peer_ssb_id, req_no, session_role.to_owned()); let local_clock = self.local_clock.to_owned(); match session_role { @@ -296,6 +372,10 @@ impl EbtManager { Ok(()) } + fn handle_send_clock(&mut self, req_no: ReqNo, clock: VectorClock) -> Option { + self.sent_clocks.insert(req_no, clock) + } + async fn handle_received_clock( &mut self, req_no: ReqNo, @@ -304,15 +384,18 @@ impl EbtManager { ) -> Result<()> { trace!(target: "ebt-replication", "Received vector clock: {:?}", clock); + // Update the stored vector clock for the remote peer. + self.set_clock(&peer_ssb_id, clock.to_owned()); + // Create channel to send messages to broker. let mut ch_broker = BROKER.lock().await.create_sender(); // If a clock is received without a prior EBT replicate // request having been received from the associated peer, it is // assumed that the clock was sent in response to a locally-sent - // EBT replicate request. Ie. The session was requested by the + // EBT replicate request. Ie. the session was requested by the // local peer. - if !self.active_sessions.contains(&peer_ssb_id) { + if !self.active_sessions.contains_key(&peer_ssb_id) { ch_broker .send(BrokerEvent::new( Destination::Broadcast, @@ -325,9 +408,23 @@ impl EbtManager { .await?; } - self.set_clock(&peer_ssb_id, clock.to_owned()); + // If we have not previously sent a clock, send one now. + // + // This indicates that the local peer is acting as the session + // requester. + if self.sent_clocks.get(&req_no).is_none() { + let local_clock = self.local_clock.to_owned(); + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + )) + .await?; + } - let msgs = EbtManager::retrieve_requested_messages(&req_no, &peer_ssb_id, clock).await?; + // We want messages for all feeds in the clock, therefore the + // `peer_ssb_id` parameter is set to `None`. + let msgs = EbtManager::retrieve_requested_messages(None, clock).await?; for msg in msgs { ch_broker .send(BrokerEvent::new( @@ -340,6 +437,31 @@ impl EbtManager { Ok(()) } + async fn handle_send_message(&mut self, peer_ssb_id: SsbId, msg: Value) -> Result<()> { + // Update the hashmap of sent messages. + // + // For each peer, keep a list of feed ID's and the sequence of the + // latest sent message for each. This is useful to consult when a new + // message is appended to the local store and may need to be sent to + // peers with whom we have an active EBT session. + + let msg_author = msg["author"] + .as_str() + .ok_or(Error::OptionIsNone)? + .to_string(); + let msg_sequence = msg["sequence"].as_u64().ok_or(Error::OptionIsNone)?; + + if let Some(feeds) = self.sent_messages.get_mut(&peer_ssb_id) { + feeds.insert(msg_author, msg_sequence); + } else { + let mut feeds = HashMap::new(); + feeds.insert(msg_author, msg_sequence); + self.sent_messages.insert(peer_ssb_id, feeds); + } + + Ok(()) + } + async fn handle_received_message(&mut self, msg: Message) -> Result<()> { trace!(target: "ebt-replication", "Received message: {:?}", msg); @@ -387,6 +509,42 @@ impl EbtManager { Ok(()) } + /// Look up the latest sequence number for the updated feed, encode it as + /// the single entry of a vector clock and send that to any active session + /// peers. + async fn handle_local_store_updated(&self, ssb_id: SsbId) -> Result<()> { + // Iterate over all active EBT sessions. + for (_peer_ssb_id, (req_no, _session_role)) in self.active_sessions.iter() { + // Look up the latest sequence for the given ID. + if let Some(seq) = KV_STORE.read().await.get_latest_seq(&ssb_id)? { + // Encode the replicate flag, receive flag and sequence. + let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(seq))?; + + // Update the entry for `ssb_id` in the local vector clock. + if let Some(mut local_clock) = self.get_clock(None) { + local_clock.insert(ssb_id.to_owned(), encoded_value); + } + + // Create a vector clock with a single entry. + let mut updated_clock = HashMap::new(); + updated_clock.insert(ssb_id.to_owned(), encoded_value); + + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Send the single-entry vector clock to the active session. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SendClock(*req_no, updated_clock)), + )) + .await?; + } + } + + Ok(()) + } + async fn handle_session_concluded(&mut self, peer_ssb_id: SsbId) { trace!(target: "ebt-replication", "Session concluded with: {}", peer_ssb_id); self.remove_session(&peer_ssb_id); @@ -476,9 +634,9 @@ impl EbtManager { error!("Error while handling 'session initiated' event: {}", err) } } - EbtEvent::SendClock(_, clock) => { + EbtEvent::SendClock(req_no, clock) => { trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock); - // TODO: Update sent clocks. + let _ = self.handle_send_clock(req_no, clock); } EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock) => { if let Err(err) = self.handle_received_clock(req_no, peer_ssb_id, clock).await { @@ -490,9 +648,11 @@ impl EbtManager { error!("Error while handling 'received message' event: {}", err) } } - EbtEvent::SendMessage(_req_no, _peer_ssb_id, _msg) => { - trace!(target: "ebt-replication", "Sending message..."); - // TODO: Update sent messages. + EbtEvent::SendMessage(_req_no, peer_ssb_id, msg) => { + trace!(target: "ebt-replication", "Sending message: {:?}...", msg); + if let Err(err) = self.handle_send_message(peer_ssb_id, msg).await { + error!("Error while handling 'send message' event: {}", err) + } } EbtEvent::SessionConcluded(connection_data) => { self.handle_session_concluded(connection_data).await; @@ -508,6 +668,14 @@ impl EbtManager { } } } + } else if let Some(BrokerMessage::StoreKv(StoreKvEvent(ssb_id))) = msg { + debug!("Received KV store event from broker"); + + // Respond to a key-value store state change for the given peer. + // This is triggered when a new message is appended to the local feed. + if let Err(err) = self.handle_local_store_updated(ssb_id).await { + error!("Error while handling 'local store updated' event: {}", err) + } } } } diff --git a/solar/src/storage/kv.rs b/solar/src/storage/kv.rs index e92249c..f208f33 100644 --- a/solar/src/storage/kv.rs +++ b/solar/src/storage/kv.rs @@ -25,6 +25,8 @@ const PREFIX_PEER: u8 = 4u8; /// The feed belonging to the given SSB ID has changed /// (ie. a new message has been appended to the feed). +/// +/// The JSON value of the appended message is included. #[derive(Debug, Clone)] pub struct StoreKvEvent(pub String);