Skip to content

Commit 09037ba

Browse files
wrap grpc client connect in loop
1 parent 8c42803 commit 09037ba

File tree

1 file changed

+54
-46
lines changed

1 file changed

+54
-46
lines changed

cluster-endpoints/examples/grpc_using_streams.rs

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::{HashMap, HashSet};
22
use std::fmt::{Display, Formatter};
3-
use std::ops::Deref;
3+
use std::ops::{Add, Deref};
44
use std::path::PathBuf;
55
use std::pin::pin;
66
use std::sync::Arc;
@@ -21,7 +21,7 @@ use tokio::sync::broadcast::{Receiver, Sender};
2121
use tokio::sync::broadcast::error::TryRecvError;
2222
use tokio::sync::RwLock;
2323
use tokio::task::JoinHandle;
24-
use tokio::time::{sleep, Duration, timeout};
24+
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
2525
use yellowstone_grpc_client::GeyserGrpcClient;
2626
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
2727
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
@@ -189,55 +189,63 @@ async fn create_geyser_stream(grpc_addr: String, x_token: Option<String>) -> imp
189189
}
190190

191191
async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option<String>) -> impl Stream<Item = SubscribeUpdate> {
192-
193-
// throws e.g. InvalidUri(InvalidUri(InvalidAuthority))
194-
let mut client = GeyserGrpcClient::connect(grpc_addr, x_token, None).unwrap();
195-
196-
let mut blocks_subs = HashMap::new();
197-
blocks_subs.insert(
198-
"client".to_string(),
199-
SubscribeRequestFilterBlocks {
200-
account_include: Default::default(),
201-
include_transactions: Some(true),
202-
include_accounts: Some(false),
203-
include_entries: Some(false),
204-
},
205-
);
206-
207-
let stream = client
208-
.subscribe_once(
209-
HashMap::new(),
210-
Default::default(),
211-
HashMap::new(),
212-
Default::default(),
213-
blocks_subs,
214-
Default::default(),
215-
Some(CommitmentLevel::Confirmed),
216-
Default::default(),
217-
None,
218-
).await.unwrap();
219-
220192
stream! {
193+
let mut throttle_barrier;
194+
'main_loop: loop {
195+
throttle_barrier = Instant::now().add(Duration::from_millis(500));
221196

222-
for await update_message in stream {
197+
// throws e.g. InvalidUri(InvalidUri(InvalidAuthority))
198+
// GeyserGrpcClientError
199+
let client_result = GeyserGrpcClient::connect(grpc_addr.clone(), x_token.clone(), None);
223200

224-
match update_message {
225-
Ok(update_message) => {
226-
info!(">message on {}", label);
227-
yield update_message;
228-
}
229-
Err(status) => {
230-
error!(">error while receiving from stream {}: {:?}", label, status);
231-
// note: the for loop will terminate after this
232-
}
201+
if let Err(client_connect_error) = client_result {
202+
// TODO identify non-recoverable errors and cancel stream
203+
warn!("Connect failed and will be retried: {:?}", client_connect_error);
204+
sleep_until(throttle_barrier).await;
205+
continue 'main_loop;
233206
}
234207

235-
}
236-
237-
warn!("stream consumer loop terminated for {}", label);
238-
239-
}
240-
208+
let mut client = client_result.unwrap();
209+
210+
let mut blocks_subs = HashMap::new();
211+
blocks_subs.insert(
212+
"client".to_string(),
213+
SubscribeRequestFilterBlocks {
214+
account_include: Default::default(),
215+
include_transactions: Some(true),
216+
include_accounts: Some(false),
217+
include_entries: Some(false),
218+
},
219+
);
220+
221+
let stream = client
222+
.subscribe_once(
223+
HashMap::new(),
224+
Default::default(),
225+
HashMap::new(),
226+
Default::default(),
227+
blocks_subs,
228+
Default::default(),
229+
Some(CommitmentLevel::Confirmed),
230+
Default::default(),
231+
None,
232+
).await.unwrap();
233+
234+
for await update_message in stream {
235+
match update_message {
236+
Ok(update_message) => {
237+
info ! (">message on {}", label);
238+
yield update_message;
239+
}
240+
Err(status) => {
241+
error ! (">error while receiving from stream {}: {:?}", label, status);
242+
// note: the for loop will terminate after this
243+
}
244+
}
245+
} // -- production loop
241246

247+
warn!("stream consumer loop terminated for {}", label);
248+
} // -- main loop
249+
} // -- stream!
242250

243251
}

0 commit comments

Comments
 (0)