@@ -18,6 +18,7 @@ use zettabgp::bmp::BmpMessage;
1818
1919fn table_selector_for_peer (
2020 client_addr : SocketAddr ,
21+ listener_name : String ,
2122 peer : & BmpMessagePeerHeader ,
2223) -> Option < TableSelector > {
2324 let route_state = match ( peer. peertype , peer. flags . view_bits :: < Msb0 > ( ) [ 1 ] ) {
@@ -37,7 +38,8 @@ fn table_selector_for_peer(
3738 route_distinguisher,
3839 route_state,
3940 session_id : SessionId {
40- from_client : client_addr,
41+ from_client : client_addr. ip ( ) ,
42+ listener : listener_name,
4143 peer_address : peer. peeraddress ,
4244 } ,
4345 } )
@@ -46,9 +48,10 @@ fn table_selector_for_peer(
4648async fn process_route_monitoring (
4749 store : & impl Store ,
4850 client_addr : SocketAddr ,
51+ listener_name : String ,
4952 rm : BmpMessageRouteMonitoring ,
5053) {
51- let session = match table_selector_for_peer ( client_addr, & rm. peer ) {
54+ let session = match table_selector_for_peer ( client_addr, listener_name , & rm. peer ) {
5255 Some ( session) => session,
5356 None => {
5457 trace ! (
@@ -65,6 +68,7 @@ async fn process_route_monitoring(
6568
6669pub fn run_peer (
6770 client_addr : SocketAddr ,
71+ listener_name : String ,
6872 peer : BmpMessagePeerHeader ,
6973 store : & impl Store ,
7074) -> mpsc:: Sender < Result < BmpMessageRouteMonitoring , BmpMessagePeerDown > > {
@@ -73,7 +77,7 @@ pub fn run_peer(
7377
7478 tokio:: task:: spawn ( async move {
7579 trace ! ( "{} {:?}" , client_addr, peer) ;
76- if let Some ( session_id) = table_selector_for_peer ( client_addr, & peer)
80+ if let Some ( session_id) = table_selector_for_peer ( client_addr, listener_name . clone ( ) , & peer)
7781 . and_then ( |store| store. session_id ( ) . cloned ( ) )
7882 {
7983 store. session_up ( session_id, Session { } ) . await ;
@@ -82,7 +86,7 @@ pub fn run_peer(
8286 loop {
8387 match rx. recv ( ) . await {
8488 Some ( Ok ( rm) ) => {
85- process_route_monitoring ( & store, client_addr, rm) . await ;
89+ process_route_monitoring ( & store, client_addr, listener_name . clone ( ) , rm) . await ;
8690 }
8791 Some ( Err ( down_msg) ) => {
8892 trace ! ( "{} {:?}" , client_addr, down_msg) ;
@@ -94,7 +98,7 @@ pub fn run_peer(
9498 }
9599 }
96100 }
97- if let Some ( session_id) = table_selector_for_peer ( client_addr, & peer)
101+ if let Some ( session_id) = table_selector_for_peer ( client_addr, listener_name . clone ( ) , & peer)
98102 . and_then ( |store| store. session_id ( ) . cloned ( ) )
99103 {
100104 store. session_down ( session_id, None ) . await ;
@@ -107,6 +111,7 @@ pub async fn run_client(
107111 cfg : PeerConfig ,
108112 io : TcpStream ,
109113 client_addr : SocketAddr ,
114+ listener_name : String ,
110115 store : & impl Store ,
111116) -> anyhow:: Result < BmpMessageTermination > {
112117 let read = LengthDelimitedCodec :: builder ( )
@@ -151,15 +156,21 @@ pub async fn run_client(
151156 > = HashMap :: new ( ) ;
152157 channels. insert (
153158 first_peer_up. peer . peeraddress ,
154- run_peer ( client_addr, first_peer_up. peer , store) ,
159+ run_peer (
160+ client_addr,
161+ listener_name. clone ( ) ,
162+ first_peer_up. peer ,
163+ store,
164+ ) ,
155165 ) ;
156166 let client_name = cfg
157167 . name_override
158168 . or ( init_msg. sys_name )
159169 . unwrap_or ( client_addr. ip ( ) . to_string ( ) ) ;
160170 store
161171 . client_up (
162- client_addr,
172+ client_addr. ip ( ) ,
173+ listener_name. clone ( ) ,
163174 RouteState :: Selected ,
164175 Client {
165176 client_name,
@@ -178,12 +189,15 @@ pub async fn run_client(
178189 BmpMessage :: RouteMonitoring ( rm) => {
179190 let channel = channels. entry ( rm. peer . peeraddress ) . or_insert_with ( || {
180191 warn ! ( "the bmp device {} sent a message for a nonexisting peer, we'll initialize the table now: {:?}" , & client_addr, & rm) ;
181- run_peer ( client_addr, rm. peer . clone ( ) , store)
192+ run_peer ( client_addr, listener_name . clone ( ) , rm. peer . clone ( ) , store)
182193 } ) ;
183194 channel. send ( Ok ( rm) ) . await . unwrap ( ) ;
184195 }
185196 BmpMessage :: PeerUpNotification ( n) => {
186- channels. insert ( n. peer . peeraddress , run_peer ( client_addr, n. peer , store) ) ;
197+ channels. insert (
198+ n. peer . peeraddress ,
199+ run_peer ( client_addr, listener_name. clone ( ) , n. peer , store) ,
200+ ) ;
187201 }
188202 BmpMessage :: PeerDownNotification ( n) => match channels. remove ( & n. peer . peeraddress ) {
189203 Some ( channel) => channel. send ( Err ( n) ) . await . unwrap ( ) ,
@@ -209,6 +223,7 @@ pub struct BmpCollectorConfig {
209223}
210224
211225pub async fn run (
226+ name : String ,
212227 cfg : BmpCollectorConfig ,
213228 store : impl Store ,
214229 mut shutdown : tokio:: sync:: watch:: Receiver < bool > ,
@@ -222,11 +237,12 @@ pub async fn run(
222237 info!( "connected {:?}" , client_addr) ;
223238
224239 let store = store. clone( ) ;
240+ let name = name. clone( ) ;
225241 let mut shutdown = shutdown. clone( ) ;
226242 if let Some ( peer_cfg) = cfg. peers. get( & client_addr. ip( ) ) . or( cfg. default_peer_config. as_ref( ) ) . cloned( ) {
227243 running_tasks. push( tokio:: spawn( async move {
228244 tokio:: select! {
229- res = run_client( peer_cfg, io, client_addr, & store) => {
245+ res = run_client( peer_cfg, io, client_addr, name . clone ( ) , & store) => {
230246 match res {
231247 Err ( e) => warn!( "disconnected {} {}" , client_addr, e) ,
232248 Ok ( notification) => info!( "disconnected {} {:?}" , client_addr, notification) ,
@@ -235,7 +251,7 @@ pub async fn run(
235251 _ = shutdown. changed( ) => {
236252 }
237253 } ;
238- store. client_down( client_addr) . await ;
254+ store. client_down( client_addr. ip ( ) , name . clone ( ) ) . await ;
239255 } ) ) ;
240256 } else {
241257 info!( "unexpected connection from {}" , client_addr) ;
0 commit comments