Skip to content

Commit a1b353b

Browse files
map data from stream and yield
1 parent 5369567 commit a1b353b

File tree

2 files changed

+63
-10
lines changed

2 files changed

+63
-10
lines changed

cluster-endpoints/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ yellowstone-grpc-proto = { workspace = true }
4141
itertools = {workspace = true}
4242
prometheus = { workspace = true }
4343
lazy_static = { workspace = true }
44+
async-stream = "0.3.5"
4445

4546
[dev-dependencies]
4647
tracing-subscriber = { workspace = true, features = ["std", "env-filter"] }

cluster-endpoints/examples/grpc_using_streams.rs

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@ use std::collections::{HashMap, HashSet};
22
use std::fmt::{Display, Formatter};
33
use std::ops::Deref;
44
use std::path::PathBuf;
5+
use std::pin::pin;
56
use std::sync::Arc;
67
use std::thread;
78
use anyhow::{bail, Context};
8-
use futures::{Stream, StreamExt};
9+
use async_stream::stream;
10+
use futures::{pin_mut, Stream, StreamExt};
911
use itertools::{ExactlyOneError, Itertools};
1012

1113
use log::{debug, error, info, warn};
1214
use serde::Serializer;
15+
use serde_json::de::Read;
1316
use solana_client::nonblocking::rpc_client::RpcClient;
1417
use solana_sdk::clock::Slot;
1518
use solana_sdk::commitment_config::CommitmentConfig;
16-
use tokio::select;
19+
use tokio::{select};
1720
use tokio::sync::broadcast::{Receiver, Sender};
1821
use tokio::sync::broadcast::error::TryRecvError;
1922
use tokio::sync::RwLock;
@@ -71,8 +74,10 @@ async fn create_multiplex(
7174

7275
let jh = tokio::spawn(async move {
7376

74-
let mut green = create_geyser_stream(grpc_addr_mainnet_triton.clone(), None).await;
75-
let mut blue = create_geyser_stream(grpc_addr_mainnet_ams81.clone(), None).await;
77+
let mut green = create_geyser_stream2(grpc_addr_mainnet_triton.clone(), None).await;
78+
let mut blue = create_geyser_stream2(grpc_addr_mainnet_ams81.clone(), None).await;
79+
pin_mut!(green);
80+
pin_mut!(blue);
7681

7782
let mut current_slot = 0 as Slot;
7883

@@ -83,8 +88,6 @@ async fn create_multiplex(
8388
message = green.next() => {
8489
match message {
8590
Some(message) => {
86-
// TODO tonic errors - pull up into create_geyser_stream
87-
let message = message.expect("TODO what to do with tonic errors?");
8891
map_filter_block_message(current_slot, message, commitment_config)
8992
}
9093
None => {
@@ -96,8 +99,6 @@ async fn create_multiplex(
9699
message = blue.next() => {
97100
match message {
98101
Some(message) => {
99-
// TODO tonic errors
100-
let message = message.expect("TODO what to do with tonic errors?");
101102
map_filter_block_message(current_slot, message, commitment_config)
102103
}
103104
None => {
@@ -128,6 +129,7 @@ async fn create_multiplex(
128129
return jh;
129130
}
130131

132+
#[derive(Debug)]
131133
enum BlockCmd {
132134
ForwardBlock(ProducedBlock),
133135
DiscardBlockBehindTip(Slot),
@@ -166,7 +168,7 @@ async fn create_geyser_stream(grpc_addr: String, x_token: Option<String>) -> imp
166168
},
167169
);
168170

169-
let mut stream = client
171+
let stream = client
170172
.subscribe_once(
171173
HashMap::new(),
172174
Default::default(),
@@ -182,4 +184,54 @@ async fn create_geyser_stream(grpc_addr: String, x_token: Option<String>) -> imp
182184

183185
// TODO pull tonic error handling inside this method
184186
return stream;
185-
}
187+
}
188+
189+
async fn create_geyser_stream2(grpc_addr: String, x_token: Option<String>) -> impl Stream<Item = SubscribeUpdate> {
190+
191+
192+
let mut client = GeyserGrpcClient::connect(grpc_addr, x_token, None).unwrap();
193+
194+
let mut blocks_subs = HashMap::new();
195+
blocks_subs.insert(
196+
"client".to_string(),
197+
SubscribeRequestFilterBlocks {
198+
account_include: Default::default(),
199+
include_transactions: Some(true),
200+
include_accounts: Some(false),
201+
include_entries: Some(false),
202+
},
203+
);
204+
205+
let stream = client
206+
.subscribe_once(
207+
HashMap::new(),
208+
Default::default(),
209+
HashMap::new(),
210+
Default::default(),
211+
blocks_subs,
212+
Default::default(),
213+
Some(CommitmentLevel::Confirmed),
214+
Default::default(),
215+
None,
216+
).await.unwrap();
217+
218+
stream! {
219+
220+
for await update_message in stream {
221+
222+
match update_message {
223+
Ok(update_message) => {
224+
yield update_message;
225+
}
226+
Err(status) => {
227+
error!(">stream: {:?}", status);
228+
}
229+
}
230+
231+
}
232+
233+
}
234+
235+
236+
237+
}

0 commit comments

Comments
 (0)