Skip to content

Commit dd65419

Browse files
handle subscribe+recv errors
1 parent 3c57bc0 commit dd65419

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

cluster-endpoints/examples/grpc_using_streams.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,9 @@ async fn create_geyser_stream(grpc_addr: String, x_token: Option<String>) -> imp
190190

191191
async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option<String>) -> impl Stream<Item = SubscribeUpdate> {
192192
stream! {
193-
let mut throttle_barrier;
193+
let mut throttle_barrier = Instant::now();
194194
'main_loop: loop {
195+
sleep_until(throttle_barrier).await;
195196
throttle_barrier = Instant::now().add(Duration::from_millis(1000));
196197

197198
// throws e.g. InvalidUri(InvalidUri(InvalidAuthority))
@@ -202,8 +203,7 @@ async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option
202203

203204
if let Err(client_connect_error) = connect_result {
204205
// TODO identify non-recoverable errors and cancel stream
205-
warn!("Connect failed - retrying: {:?}", client_connect_error);
206-
sleep_until(throttle_barrier).await;
206+
warn!("Connect failed on {} - retrying: {:?}", label, client_connect_error);
207207
continue 'main_loop;
208208
}
209209

@@ -220,7 +220,7 @@ async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option
220220
},
221221
);
222222

223-
let stream = client
223+
let subscribe_result = client
224224
.subscribe_once(
225225
HashMap::new(),
226226
Default::default(),
@@ -231,17 +231,26 @@ async fn create_geyser_stream2(label: String, grpc_addr: String, x_token: Option
231231
Some(CommitmentLevel::Confirmed),
232232
Default::default(),
233233
None,
234-
).await.unwrap();
234+
).await;
235+
236+
if let Err(subscribe_error) = subscribe_result {
237+
// TODO identify non-recoverable errors and cancel stream
238+
warn!("Subscribe failed on {} - retrying: {:?}", label, subscribe_error);
239+
continue 'main_loop;
240+
}
241+
242+
let stream = subscribe_result.unwrap();
235243

236244
for await update_message in stream {
237245
match update_message {
238246
Ok(update_message) => {
239-
info ! (">message on {}", label);
247+
info!(">message on {}", label);
240248
yield update_message;
241249
}
242-
Err(status) => {
243-
error ! (">error while receiving from stream {}: {:?}", label, status);
244-
// note: the for loop will terminate after this
250+
Err(tonic_status) => {
251+
// TODO identify non-recoverable errors and cancel stream
252+
warn!("Receive error on label - retrying: {:?}", label, tonic_status);
253+
continue 'main_loop;
245254
}
246255
}
247256
} // -- production loop

0 commit comments

Comments
 (0)