Skip to content

Commit

Permalink
Add sned_transactions_to_address method to workers_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Jan 17, 2025
1 parent e6ed940 commit d5dfa51
Showing 1 changed file with 55 additions and 3 deletions.
58 changes: 55 additions & 3 deletions tpu-client-next/src/workers_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ impl WorkerInfo {
Ok(())
}

async fn send_transactions(
&self,
txs_batch: TransactionBatch,
) -> Result<(), WorkersCacheError> {
self.sender
.send(txs_batch)
.await
.map_err(|_| WorkersCacheError::ReceiverDropped)?;
Ok(())
}

/// Closes the worker by dropping the sender and awaiting the worker's
/// statistics.
async fn shutdown(self) -> Result<(), WorkersCacheError> {
Expand Down Expand Up @@ -118,9 +129,7 @@ impl WorkersCache {
None
}

/// Sends a batch of transactions to the worker for a given peer. If the
/// worker for the peer is disconnected or fails, it is removed from the
/// cache.
/// Try sending a batch of transactions to the worker for a given peer.
pub(crate) fn try_send_transactions_to_address(
&mut self,
peer: &SocketAddr,
Expand Down Expand Up @@ -149,6 +158,49 @@ impl WorkersCache {
send_res
}

/// Sends a batch of transactions to the worker for a given peer.
///
/// If the worker for the peer is disconnected or fails, it
/// is removed from the cache.
#[allow(
dead_code,
reason = "This method will be used in the upcoming changes to implement optional backpressure on the sender."
)]
pub(crate) async fn send_transactions_to_address(
&mut self,
peer: &SocketAddr,
txs_batch: TransactionBatch,
) -> Result<(), WorkersCacheError> {
let Self {
workers, cancel, ..
} = self;

let body = async move {
let current_worker = workers.get(peer).expect(
"Failed to fetch worker for peer {peer}.\n\
Peer existence must be checked before this call using `contains` method.",
);
let send_res = current_worker.send_transactions(txs_batch).await;
if let Err(WorkersCacheError::ReceiverDropped) = send_res {
// Remove the worker from the cache, if the peer has disconnected.
if let Some(current_worker) = workers.pop(peer) {
// To avoid obscuring the error from send, ignore a possible
// `TaskJoinFailure`.
let close_result = current_worker.shutdown().await;
if let Err(error) = close_result {
error!("Error while closing worker: {error}.");
}
}
}

send_res
};
cancel
.run_until_cancelled(body)
.await
.unwrap_or_else(|| Err(WorkersCacheError::ShutdownError))
}

/// Closes and removes all workers in the cache. This is typically done when
/// shutting down the system.
pub(crate) async fn shutdown(&mut self) {
Expand Down

0 comments on commit d5dfa51

Please sign in to comment.