@@ -39,7 +39,9 @@ pub async fn main() {
39
39
40
40
41
41
// mango validator (mainnet)
42
- let grpc_addr_mainnet_triton = "http://202.8.9.108:10000" . to_string ( ) ;
42
+ // let grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string();
43
+ // via toxiproxy
44
+ let grpc_addr_mainnet_triton = "http://127.0.0.1:10001" . to_string ( ) ;
43
45
// ams81 (mainnet)
44
46
let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000" . to_string ( ) ;
45
47
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
@@ -74,8 +76,8 @@ async fn create_multiplex(
74
76
75
77
let jh = tokio:: spawn ( async move {
76
78
77
- let mut green = create_geyser_stream2 ( grpc_addr_mainnet_triton. clone ( ) , None ) . await ;
78
- let mut blue = create_geyser_stream2 ( grpc_addr_mainnet_ams81. clone ( ) , None ) . await ;
79
+ let mut green = create_geyser_stream2 ( "green-toxiproxy" . to_string ( ) , grpc_addr_mainnet_triton. clone ( ) , None ) . await ;
80
+ let mut blue = create_geyser_stream2 ( "blue" . to_string ( ) , grpc_addr_mainnet_ams81. clone ( ) , None ) . await ;
79
81
pin_mut ! ( green) ;
80
82
pin_mut ! ( blue) ;
81
83
@@ -186,9 +188,9 @@ async fn create_geyser_stream(grpc_addr: String, x_token: Option<String>) -> imp
186
188
return stream;
187
189
}
188
190
189
- async fn create_geyser_stream2 ( grpc_addr : String , x_token : Option < String > ) -> impl Stream < Item = SubscribeUpdate > {
190
-
191
+ async fn create_geyser_stream2 ( label : String , grpc_addr : String , x_token : Option < String > ) -> impl Stream < Item = SubscribeUpdate > {
191
192
193
+ // throws e.g. InvalidUri(InvalidUri(InvalidAuthority))
192
194
let mut client = GeyserGrpcClient :: connect ( grpc_addr, x_token, None ) . unwrap ( ) ;
193
195
194
196
let mut blocks_subs = HashMap :: new ( ) ;
@@ -221,15 +223,19 @@ async fn create_geyser_stream2(grpc_addr: String, x_token: Option<String>) -> im
221
223
222
224
match update_message {
223
225
Ok ( update_message) => {
226
+ info!( ">message on {}" , label) ;
224
227
yield update_message;
225
228
}
226
229
Err ( status) => {
227
- error!( ">stream: {:?}" , status) ;
230
+ error!( ">error while receiving from stream {}: {:?}" , label, status) ;
231
+ // note: the for loop will terminate after this
228
232
}
229
233
}
230
234
231
235
}
232
236
237
+ warn!( "stream consumer loop terminated for {}" , label) ;
238
+
233
239
}
234
240
235
241
0 commit comments