Skip to content

Commit da812d7

Browse files
refactor create_slot_task
1 parent 1bcf7d2 commit da812d7

File tree

1 file changed

+64
-63
lines changed

1 file changed

+64
-63
lines changed

cluster-endpoints/src/grpc_subscription.rs

Lines changed: 64 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -324,71 +324,12 @@ pub fn create_grpc_subscription(
324324
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);
325325
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);
326326

327-
let mut slots = HashMap::new();
328-
slots.insert(
329-
"client".to_string(),
330-
SubscribeRequestFilterSlots {
331-
filter_by_commitment: Some(true),
332-
},
327+
let slot_task: AnyhowJoinHandle = create_slot_task(
328+
grpc_addr.clone(),
329+
grpc_x_token.clone(),
330+
slot_sx,
333331
);
334332

335-
let slot_task: AnyhowJoinHandle = {
336-
let grpc_x_token = grpc_x_token.clone();
337-
let grpc_addr = grpc_addr.clone();
338-
tokio::spawn(async move {
339-
// connect to grpc
340-
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?;
341-
342-
let version = client.get_version().await?.version;
343-
if version != expected_grpc_version {
344-
log::warn!(
345-
"Expected grpc version {:?}, got {:?}, continue",
346-
expected_grpc_version,
347-
version
348-
);
349-
}
350-
let mut stream = client
351-
.subscribe_once(
352-
slots,
353-
Default::default(),
354-
HashMap::new(),
355-
Default::default(),
356-
HashMap::new(),
357-
Default::default(),
358-
Some(CommitmentLevel::Processed),
359-
Default::default(),
360-
None,
361-
)
362-
.await?;
363-
364-
while let Some(message) = stream.next().await {
365-
let message = message?;
366-
367-
let Some(update) = message.update_oneof else {
368-
continue;
369-
};
370-
371-
match update {
372-
UpdateOneof::Slot(slot) => {
373-
slot_sx
374-
.send(SlotNotification {
375-
estimated_processed_slot: slot.slot,
376-
processed_slot: slot.slot,
377-
})
378-
.context("Error sending slot notification")?;
379-
}
380-
UpdateOneof::Ping(_) => {
381-
log::trace!("GRPC Ping");
382-
}
383-
k => {
384-
bail!("Unexpected update: {k:?}");
385-
}
386-
};
387-
}
388-
bail!("geyser slot stream ended");
389-
})
390-
};
391-
392333
let block_confirmed_task: AnyhowJoinHandle = create_block_processing_task(
393334
grpc_addr.clone(),
394335
grpc_x_token.clone(),
@@ -420,3 +361,63 @@ pub fn create_grpc_subscription(
420361
];
421362
Ok((streamers, endpoint_tasks))
422363
}
364+
365+
366+
// note: cannot add confirmation level as parameter because the data produced is declared as "processed"
367+
pub fn create_slot_task(
368+
grpc_addr: String,
369+
grpc_x_token: Option<String>,
370+
slot_sx: Sender<SlotNotification>,
371+
) -> AnyhowJoinHandle {
372+
let mut slots = HashMap::new();
373+
slots.insert(
374+
"client".to_string(),
375+
SubscribeRequestFilterSlots {
376+
filter_by_commitment: Some(true),
377+
},
378+
);
379+
380+
tokio::spawn(async move {
381+
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?;
382+
383+
let mut stream = client
384+
.subscribe_once(
385+
slots,
386+
Default::default(),
387+
HashMap::new(),
388+
Default::default(),
389+
HashMap::new(),
390+
Default::default(),
391+
Some(CommitmentLevel::Processed),
392+
Default::default(),
393+
None,
394+
)
395+
.await?;
396+
397+
while let Some(message) = stream.next().await {
398+
let message = message?;
399+
400+
let Some(update) = message.update_oneof else {
401+
continue;
402+
};
403+
404+
match update {
405+
UpdateOneof::Slot(slot) => {
406+
slot_sx
407+
.send(SlotNotification {
408+
estimated_processed_slot: slot.slot,
409+
processed_slot: slot.slot,
410+
})
411+
.context("Error sending slot notification")?;
412+
}
413+
UpdateOneof::Ping(_) => {
414+
log::trace!("GRPC Ping");
415+
}
416+
k => {
417+
bail!("Unexpected update: {k:?}");
418+
}
419+
};
420+
}
421+
bail!("geyser slot stream ended");
422+
})
423+
}

0 commit comments

Comments
 (0)