|
2 | 2 | // Melnikov, which is licensed under the MIT license. |
3 | 3 |
|
4 | 4 | use bytes::{Buf, BytesMut}; |
5 | | -use futures_util::Stream; |
6 | 5 | use futures_util::StreamExt; |
| 6 | +use futures_util::TryStream; |
| 7 | +use futures_util::TryStreamExt; |
7 | 8 | use log::*; |
8 | 9 | use std::sync::Arc; |
9 | 10 | use tokio::io::AsyncWriteExt; |
@@ -51,7 +52,11 @@ impl BgpDumper { |
51 | 52 | .params |
52 | 53 | .prepare_message_buf(&mut buf, BgpMessageType::Open, messagelen)?; |
53 | 54 | self.write.lock().await.write_all(&buf[0..blen]).await?; |
54 | | - let (msgtype, buf) = self.next_message().await?; |
| 55 | + let (msgtype, buf) = self |
| 56 | + .messages() |
| 57 | + .try_next() |
| 58 | + .await? |
| 59 | + .ok_or(BgpError::static_str("Expected open message"))?; |
55 | 60 | if msgtype != BgpMessageType::Open { |
56 | 61 | return Err(BgpError::static_str("Invalid state to start_active")); |
57 | 62 | } |
@@ -84,51 +89,61 @@ impl BgpDumper { |
84 | 89 | }); |
85 | 90 | tx |
86 | 91 | } |
87 | | - async fn next_message(&mut self) -> Result<(BgpMessageType, BytesMut), BgpError> { |
88 | | - let mut buf = self |
89 | | - .read |
90 | | - .next() |
91 | | - .await |
92 | | - .ok_or(BgpError::static_str("unexpected end of stream"))??; |
93 | | - let msg = self.params.decode_message_head(&buf)?; |
94 | | - buf.advance(19); |
95 | | - buf.truncate(msg.1); |
96 | | - Ok((msg.0, buf)) |
| 92 | + fn messages( |
| 93 | + &mut self, |
| 94 | + ) -> impl TryStream<Ok = (BgpMessageType, BytesMut), Error = BgpError> + Send + Unpin + use<'_> |
| 95 | + { |
| 96 | + (&mut self.read).map(|buf| { |
| 97 | + let mut buf = buf?; |
| 98 | + let msg = self.params.decode_message_head(&buf)?; |
| 99 | + buf.advance(19); |
| 100 | + buf.truncate(msg.1); |
| 101 | + Ok((msg.0, buf)) |
| 102 | + }) |
97 | 103 | } |
98 | 104 | pub fn lifecycle( |
99 | | - mut self, |
100 | | - ) -> impl Stream<Item = Result<BgpUpdateMessage, Result<BgpNotificationMessage, BgpError>>> + Send |
101 | | - { |
| 105 | + &mut self, |
| 106 | + ) -> impl TryStream<Ok = BgpUpdateMessage, Error = Result<BgpNotificationMessage, BgpError>> |
| 107 | + + Unpin |
| 108 | + + Send |
| 109 | + + use<'_> { |
102 | 110 | self.stop_keepalives = Some(self.start_keepalives()); |
| 111 | + let params = self.params.clone(); |
| 112 | + Box::pin( |
| 113 | + self.messages() |
| 114 | + .map_err(Err) |
| 115 | + .try_filter_map(move |(msgtype, buf)| { |
| 116 | + let params = params.clone(); |
| 117 | + async move { |
| 118 | + if msgtype == BgpMessageType::Keepalive { |
| 119 | + return Ok(None); |
| 120 | + } |
| 121 | + match msgtype { |
| 122 | + BgpMessageType::Open => { |
| 123 | + Err(Err(BgpError::static_str("Incorrect open message"))) |
| 124 | + } |
| 125 | + BgpMessageType::Keepalive => Ok(None), |
| 126 | + BgpMessageType::Notification => { |
| 127 | + let mut msgnotification = BgpNotificationMessage::new(); |
| 128 | + msgnotification |
| 129 | + .decode_from(¶ms, &buf[..]) |
| 130 | + .map_err(Err)?; |
| 131 | + Err(Ok(msgnotification)) |
| 132 | + } |
| 133 | + BgpMessageType::Update => { |
| 134 | + let mut msgupdate = BgpUpdateMessage::new(); |
| 135 | + if let Err(e) = msgupdate.decode_from(¶ms, &buf[..]) { |
| 136 | + warn!("BGP update decode error: {:?}", e); |
| 137 | + warn!("{:x?}", &buf[..]); |
| 138 | + return Ok(None); |
| 139 | + } |
103 | 140 |
|
104 | | - async_stream::try_stream! { |
105 | | - loop { |
106 | | - let (msgtype, buf) = self.next_message().await.map_err(Err)?; |
107 | | - if msgtype == BgpMessageType::Keepalive { |
108 | | - continue; |
109 | | - } |
110 | | - match msgtype { |
111 | | - BgpMessageType::Open => { |
112 | | - Err(Err(BgpError::static_str("Incorrect open message")))?; |
113 | | - } |
114 | | - BgpMessageType::Keepalive => {} |
115 | | - BgpMessageType::Notification => { |
116 | | - let mut msgnotification = BgpNotificationMessage::new(); |
117 | | - msgnotification.decode_from(&self.params, &buf[..]).map_err(Err)?; |
118 | | - Err(Ok(msgnotification))?; |
119 | | - } |
120 | | - BgpMessageType::Update => { |
121 | | - let mut msgupdate = BgpUpdateMessage::new(); |
122 | | - if let Err(e) = msgupdate.decode_from(&self.params, &buf[..]) { |
123 | | - warn!("BGP update decode error: {:?}", e); |
124 | | - warn!("{:x?}", &buf[..]); |
125 | | - continue; |
| 141 | + Ok(Some(msgupdate)) |
| 142 | + } |
126 | 143 | } |
127 | | - yield msgupdate; |
128 | 144 | } |
129 | | - } |
130 | | - } |
131 | | - } |
| 145 | + }), |
| 146 | + ) |
132 | 147 | } |
133 | 148 | } |
134 | 149 | impl Drop for BgpDumper { |
|
0 commit comments