diff --git a/rumqttd/src/link/local.rs b/rumqttd/src/link/local.rs index 44b08b5ea..b97b81261 100644 --- a/rumqttd/src/link/local.rs +++ b/rumqttd/src/link/local.rs @@ -338,6 +338,23 @@ impl LinkRx { } } + pub async fn recv_async(&mut self) -> Result, LinkError> { + // Read from cache first + // One router_rx trigger signifies a bunch of notifications. So we + // should always check cache first + match self.cache.pop_front() { + Some(v) => Ok(Some(v)), + None => { + // If cache is empty, check for router trigger and get fresh notifications + self.router_rx.recv_async().await?; + // Collect 'all' the data in the buffer after a notification. + // Notification means fresh data which isn't previously collected + mem::swap(&mut *self.send_buffer.lock(), &mut self.cache); + Ok(self.cache.pop_front()) + } + } + } + pub fn recv_deadline(&mut self, deadline: Instant) -> Result, LinkError> { // Read from cache first // One router_rx trigger signifies a bunch of notifications. So we