|
1 | | -use async_trait::async_trait; |
2 | 1 | use futures_util::Stream; |
3 | 2 | use ipnet::{IpNet, Ipv4Net, Ipv6Net}; |
4 | 3 | use log::*; |
@@ -178,144 +177,161 @@ impl Default for QueryLimits { |
178 | 177 | } |
179 | 178 | } |
180 | 179 |
|
181 | | -#[async_trait] |
182 | 180 | pub trait Store: Clone + Send + Sync + 'static { |
183 | | - async fn update_route( |
| 181 | + fn update_route( |
184 | 182 | &self, |
185 | 183 | path_id: PathId, |
186 | 184 | net: IpNet, |
187 | 185 | table: TableSelector, |
188 | 186 | attrs: RouteAttrs, |
189 | | - ); |
| 187 | + ) -> impl std::future::Future<Output = ()> + std::marker::Send; |
190 | 188 |
|
191 | | - async fn withdraw_route(&self, path_id: PathId, net: IpNet, table: TableSelector); |
| 189 | + fn withdraw_route( |
| 190 | + &self, |
| 191 | + path_id: PathId, |
| 192 | + net: IpNet, |
| 193 | + table: TableSelector, |
| 194 | + ) -> impl std::future::Future<Output = ()> + std::marker::Send; |
192 | 195 |
|
193 | 196 | fn get_routes(&self, query: Query) -> Pin<Box<dyn Stream<Item = QueryResult> + Send>>; |
194 | 197 |
|
195 | 198 | fn get_routers(&self) -> HashMap<SocketAddr, Client>; |
196 | 199 |
|
197 | 200 | fn get_routing_instances(&self) -> HashMap<SocketAddr, HashSet<RouteDistinguisher>>; |
198 | 201 |
|
199 | | - async fn client_up( |
| 202 | + fn client_up( |
200 | 203 | &self, |
201 | 204 | client_addr: SocketAddr, |
202 | 205 | route_state: RouteState, |
203 | 206 | client_data: Client, |
204 | | - ); |
| 207 | + ) -> impl std::future::Future<Output = ()> + std::marker::Send; |
205 | 208 |
|
206 | | - async fn client_down(&self, client_addr: SocketAddr); |
| 209 | + fn client_down( |
| 210 | + &self, |
| 211 | + client_addr: SocketAddr, |
| 212 | + ) -> impl std::future::Future<Output = ()> + std::marker::Send; |
207 | 213 |
|
208 | | - async fn session_up(&self, session: SessionId, session_data: Session); |
| 214 | + fn session_up( |
| 215 | + &self, |
| 216 | + session: SessionId, |
| 217 | + session_data: Session, |
| 218 | + ) -> impl std::future::Future<Output = ()> + std::marker::Send; |
209 | 219 |
|
210 | | - async fn session_down(&self, session: SessionId, new_state: Option<Session>); |
| 220 | + fn session_down( |
| 221 | + &self, |
| 222 | + session: SessionId, |
| 223 | + new_state: Option<Session>, |
| 224 | + ) -> impl std::future::Future<Output = ()> + std::marker::Send; |
211 | 225 |
|
212 | | - async fn insert_bgp_update( |
| 226 | + fn insert_bgp_update( |
213 | 227 | &self, |
214 | 228 | session: TableSelector, |
215 | 229 | update: zettabgp::prelude::BgpUpdateMessage, |
216 | | - ) { |
217 | | - use zettabgp::prelude::*; |
218 | | - let mut attrs: RouteAttrs = Default::default(); |
219 | | - let mut nexthop = None; |
220 | | - let mut update_nets = vec![]; |
221 | | - let mut withdraw_nets = vec![]; |
222 | | - for attr in update.attrs { |
223 | | - match attr { |
224 | | - BgpAttrItem::MPUpdates(updates) => { |
225 | | - let nexthop = match updates.nexthop { |
226 | | - BgpAddr::V4(v4) => Some(IpAddr::from(v4)), |
227 | | - BgpAddr::V6(v6) => Some(IpAddr::from(v6)), |
228 | | - _ => None, |
229 | | - }; |
230 | | - for net in bgp_addrs_to_nets(&updates.addrs) { |
231 | | - update_nets.push((net, nexthop)); |
| 230 | + ) -> impl std::future::Future<Output = ()> + std::marker::Send { |
| 231 | + async move { |
| 232 | + use zettabgp::prelude::*; |
| 233 | + let mut attrs: RouteAttrs = Default::default(); |
| 234 | + let mut nexthop = None; |
| 235 | + let mut update_nets = vec![]; |
| 236 | + let mut withdraw_nets = vec![]; |
| 237 | + for attr in update.attrs { |
| 238 | + match attr { |
| 239 | + BgpAttrItem::MPUpdates(updates) => { |
| 240 | + let nexthop = match updates.nexthop { |
| 241 | + BgpAddr::V4(v4) => Some(IpAddr::from(v4)), |
| 242 | + BgpAddr::V6(v6) => Some(IpAddr::from(v6)), |
| 243 | + _ => None, |
| 244 | + }; |
| 245 | + for net in bgp_addrs_to_nets(&updates.addrs) { |
| 246 | + update_nets.push((net, nexthop)); |
| 247 | + } |
232 | 248 | } |
233 | | - } |
234 | | - BgpAttrItem::MPWithdraws(withdraws) => { |
235 | | - for net in bgp_addrs_to_nets(&withdraws.addrs) { |
236 | | - withdraw_nets.push(net); |
| 249 | + BgpAttrItem::MPWithdraws(withdraws) => { |
| 250 | + for net in bgp_addrs_to_nets(&withdraws.addrs) { |
| 251 | + withdraw_nets.push(net); |
| 252 | + } |
237 | 253 | } |
238 | | - } |
239 | | - BgpAttrItem::NextHop(BgpNextHop { value }) => { |
240 | | - nexthop = Some(value); |
241 | | - } |
242 | | - BgpAttrItem::CommunityList(BgpCommunityList { value }) => { |
243 | | - let mut communities = vec![]; |
244 | | - for community in value.into_iter() { |
245 | | - communities.push(( |
246 | | - (community.value >> 16) as u16, |
247 | | - (community.value & 0xffff) as u16, |
248 | | - )); |
| 254 | + BgpAttrItem::NextHop(BgpNextHop { value }) => { |
| 255 | + nexthop = Some(value); |
249 | 256 | } |
250 | | - attrs.communities = Some(communities); |
251 | | - } |
252 | | - BgpAttrItem::MED(BgpMED { value }) => { |
253 | | - attrs.med = Some(value); |
254 | | - } |
255 | | - BgpAttrItem::LocalPref(BgpLocalpref { value }) => { |
256 | | - attrs.local_pref = Some(value); |
257 | | - } |
258 | | - BgpAttrItem::Origin(BgpOrigin { value }) => { |
259 | | - attrs.origin = Some(match value { |
260 | | - BgpAttrOrigin::Igp => RouteOrigin::Igp, |
261 | | - BgpAttrOrigin::Egp => RouteOrigin::Egp, |
262 | | - BgpAttrOrigin::Incomplete => RouteOrigin::Incomplete, |
263 | | - }) |
264 | | - } |
265 | | - BgpAttrItem::ASPath(BgpASpath { value }) => { |
266 | | - let mut as_path = vec![]; |
267 | | - for asn in value { |
268 | | - as_path.push(asn.value); |
| 257 | + BgpAttrItem::CommunityList(BgpCommunityList { value }) => { |
| 258 | + let mut communities = vec![]; |
| 259 | + for community in value.into_iter() { |
| 260 | + communities.push(( |
| 261 | + (community.value >> 16) as u16, |
| 262 | + (community.value & 0xffff) as u16, |
| 263 | + )); |
| 264 | + } |
| 265 | + attrs.communities = Some(communities); |
269 | 266 | } |
270 | | - attrs.as_path = Some(as_path); |
271 | | - } |
272 | | - BgpAttrItem::LargeCommunityList(BgpLargeCommunityList { value }) => { |
273 | | - let mut communities = vec![]; |
274 | | - for community in value.into_iter() { |
275 | | - communities.push((community.ga, community.ldp1, community.ldp2)); |
| 267 | + BgpAttrItem::MED(BgpMED { value }) => { |
| 268 | + attrs.med = Some(value); |
| 269 | + } |
| 270 | + BgpAttrItem::LocalPref(BgpLocalpref { value }) => { |
| 271 | + attrs.local_pref = Some(value); |
| 272 | + } |
| 273 | + BgpAttrItem::Origin(BgpOrigin { value }) => { |
| 274 | + attrs.origin = Some(match value { |
| 275 | + BgpAttrOrigin::Igp => RouteOrigin::Igp, |
| 276 | + BgpAttrOrigin::Egp => RouteOrigin::Egp, |
| 277 | + BgpAttrOrigin::Incomplete => RouteOrigin::Incomplete, |
| 278 | + }) |
276 | 279 | } |
277 | | - attrs.large_communities = Some(communities); |
| 280 | + BgpAttrItem::ASPath(BgpASpath { value }) => { |
| 281 | + let mut as_path = vec![]; |
| 282 | + for asn in value { |
| 283 | + as_path.push(asn.value); |
| 284 | + } |
| 285 | + attrs.as_path = Some(as_path); |
| 286 | + } |
| 287 | + BgpAttrItem::LargeCommunityList(BgpLargeCommunityList { value }) => { |
| 288 | + let mut communities = vec![]; |
| 289 | + for community in value.into_iter() { |
| 290 | + communities.push((community.ga, community.ldp1, community.ldp2)); |
| 291 | + } |
| 292 | + attrs.large_communities = Some(communities); |
| 293 | + } |
| 294 | + _ => {} |
278 | 295 | } |
279 | | - _ => {} |
280 | 296 | } |
281 | | - } |
282 | | - for net in bgp_addrs_to_nets(&update.updates).into_iter() { |
283 | | - update_nets.push((net, nexthop)); |
284 | | - } |
285 | | - for net in bgp_addrs_to_nets(&update.withdraws).into_iter() { |
286 | | - withdraw_nets.push(net); |
287 | | - } |
| 297 | + for net in bgp_addrs_to_nets(&update.updates).into_iter() { |
| 298 | + update_nets.push((net, nexthop)); |
| 299 | + } |
| 300 | + for net in bgp_addrs_to_nets(&update.withdraws).into_iter() { |
| 301 | + withdraw_nets.push(net); |
| 302 | + } |
288 | 303 |
|
289 | | - for ((mut rd, path, prefix), nexthop) in update_nets { |
290 | | - if rd.is_default() { |
291 | | - rd = session.route_distinguisher |
| 304 | + for ((mut rd, path, prefix), nexthop) in update_nets { |
| 305 | + if rd.is_default() { |
| 306 | + rd = session.route_distinguisher |
| 307 | + } |
| 308 | + let mut attrs = attrs.clone(); |
| 309 | + attrs.nexthop = nexthop; |
| 310 | + self.update_route( |
| 311 | + path, |
| 312 | + prefix, |
| 313 | + TableSelector { |
| 314 | + route_distinguisher: rd, |
| 315 | + ..session.clone() |
| 316 | + }, |
| 317 | + attrs, |
| 318 | + ) |
| 319 | + .await; |
292 | 320 | } |
293 | | - let mut attrs = attrs.clone(); |
294 | | - attrs.nexthop = nexthop; |
295 | | - self.update_route( |
296 | | - path, |
297 | | - prefix, |
298 | | - TableSelector { |
299 | | - route_distinguisher: rd, |
300 | | - ..session.clone() |
301 | | - }, |
302 | | - attrs, |
303 | | - ) |
304 | | - .await; |
305 | | - } |
306 | | - for (mut rd, path, prefix) in withdraw_nets { |
307 | | - if rd.is_default() { |
308 | | - rd = session.route_distinguisher |
| 321 | + for (mut rd, path, prefix) in withdraw_nets { |
| 322 | + if rd.is_default() { |
| 323 | + rd = session.route_distinguisher |
| 324 | + } |
| 325 | + self.withdraw_route( |
| 326 | + path, |
| 327 | + prefix, |
| 328 | + TableSelector { |
| 329 | + route_distinguisher: rd, |
| 330 | + ..session.clone() |
| 331 | + }, |
| 332 | + ) |
| 333 | + .await; |
309 | 334 | } |
310 | | - self.withdraw_route( |
311 | | - path, |
312 | | - prefix, |
313 | | - TableSelector { |
314 | | - route_distinguisher: rd, |
315 | | - ..session.clone() |
316 | | - }, |
317 | | - ) |
318 | | - .await; |
319 | 335 | } |
320 | 336 | } |
321 | 337 | } |
|
0 commit comments