Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eric/token auth #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,739 changes: 883 additions & 856 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ members = [
"jito_protos",
"searcher_client",
]
resolver = "2"
16 changes: 6 additions & 10 deletions backrun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ edition = "2021"

[dependencies]
bincode = "1.3.3"
chrono = "0.4.24"
clap = { version = "4", features = ["derive", "env"] }
crossbeam-channel = "0.5.7"
env_logger = "0.10"
futures = "0.3.28"
futures-util = "0.3.28"
histogram = "0.6.9"
jito-protos = { path = "../jito_protos" }
log = "0.4.17"
prost-types = "0.11"
rand = "0.8.5"
jito-searcher-client = { path = "../searcher_client" }
solana-client = "=1.16.16"
solana-metrics = "=1.16.16"
solana-sdk = "=1.16.16"
solana-transaction-status = "=1.16.16"
solana-client = "=1.17.20"
solana-metrics = "=1.17.20"
solana-sdk = "=1.17.20"
solana-transaction-status = "=1.17.20"
spl-memo = "3.0.1"
thiserror = "1.0.40"
tokio = "~1.14.1"
tonic = { version = "0.8.3", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tokio = "1"
tonic = { version = "0.10", features = ["tls", "tls-roots", "tls-webpki-roots"] }
29 changes: 20 additions & 9 deletions backrun/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
# Backrun Example

## Example:
```bash

git submodule update --init --recursive
See the [cli README](../cli/README.md) for setup instructions.

cargo b --release && \
RUST_LOG=info ./target/release/jito-backrun-example \
--block-engine-addr <BLOCK_ENGINE_ADDR> \
## Usage
```bash
cargo run --bin jito-backrun-example -- \
--block-engine-url <BLOCK_ENGINE_URL> \
--payer-keypair <PAYER_KEYPAIR> \
--auth-keypair <AUTH_KEYPAIR> \
--pubsub-url ws://{RPC_URL}:8900 \
--rpc-url http://{RPC_URL}:8899 \
--tip-program-id {TIP_PROGRAM_ID} \
--backrun-accounts {account}
--tip-program-id <TIP_PROGRAM_ID> \
--backrun-accounts <BACKRUN_ACCOUNTS>
```

## Example
Backrun transactions that write-lock the [Pyth SOL/USDC account](https://solscan.io/account/H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG):
```bash
RUST_LOG=INFO cargo run --bin jito-backrun-example -- \
--block-engine-url https://frankfurt.mainnet.block-engine.jito.wtf \
--payer-keypair keypair.json \
--auth-keypair keypair.json \
--pubsub-url ws://api.mainnet-beta.solana.com:8900 \
--rpc-url https://api.mainnet-beta.solana.com:8899 \
--tip-program-id T1pyyaTNZsKv2WcRAB8oVnk93mLJw2XzjtVYqCsaHqt \
--backrun-accounts H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG
```
13 changes: 7 additions & 6 deletions backrun/src/event_loops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,13 @@ pub async fn block_subscribe_loop(
}
}

// attempts to maintain connection to searcher service and stream pending transaction notifications over a channel
/// attempts to maintain connection to searcher service and stream pending transaction notifications over a channel
pub async fn pending_tx_loop(
block_engine_addr: String,
block_engine_url: String,
auth_keypair: Arc<Keypair>,
pending_tx_sender: Sender<PendingTxNotification>,
backrun_pubkeys: Vec<Pubkey>,
regions: Vec<String>,
) {
let mut num_searcher_connection_errors: usize = 0;
let mut num_pending_tx_sub_errors: usize = 0;
Expand All @@ -160,11 +161,11 @@ pub async fn pending_tx_loop(
loop {
sleep(Duration::from_secs(1)).await;

match get_searcher_client(&block_engine_addr, &auth_keypair).await {
match get_searcher_client(block_engine_url.clone(), &auth_keypair).await {
Ok(mut searcher_client) => {
match searcher_client
.subscribe_mempool(MempoolSubscription {
regions: vec![],
regions: regions.clone(),
msg: Some(mempool_subscription::Msg::WlaV0Sub(
WriteLockedAccountSubscriptionV0 {
accounts: backrun_pubkeys.iter().map(|pk| pk.to_string()).collect(),
Expand Down Expand Up @@ -226,7 +227,7 @@ pub async fn pending_tx_loop(
}

pub async fn bundle_results_loop(
block_engine_addr: String,
block_engine_url: String,
auth_keypair: Arc<Keypair>,
bundle_results_sender: Sender<BundleResult>,
) {
Expand All @@ -235,7 +236,7 @@ pub async fn bundle_results_loop(

loop {
sleep(Duration::from_millis(1000)).await;
match get_searcher_client(&block_engine_addr, &auth_keypair).await {
match get_searcher_client(block_engine_url.clone(), &auth_keypair).await {
Ok(mut c) => match c
.subscribe_bundle_results(SubscribeBundleResultsRequest {})
.await
Expand Down
88 changes: 48 additions & 40 deletions backrun/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod event_loops;

use std::{
collections::{hash_map::Entry, HashMap, HashSet},
path::Path,
path::PathBuf,
result,
str::FromStr,
sync::Arc,
Expand All @@ -21,8 +21,8 @@ use jito_protos::{
},
};
use jito_searcher_client::{
get_searcher_client, send_bundle_no_wait, token_authenticator::ClientInterceptor,
BlockEngineConnectionError,
get_searcher_client, send_bundle_no_wait,
token_authenticator::{BlockEngineConnectionError, ClientInterceptor},
};
use log::*;
use rand::{rngs::ThreadRng, thread_rng, Rng};
Expand Down Expand Up @@ -58,37 +58,49 @@ use crate::event_loops::{
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// Address for auth service
#[clap(long, env)]
block_engine_addr: String,
/// URL of the block engine.
/// See: https://jito-labs.gitbook.io/mev/searcher-resources/block-engine#connection-details
#[arg(long, env)]
block_engine_url: String,

/// Accounts to backrun
/// Account pubkeys to backrun
#[clap(long, env)]
backrun_accounts: Vec<String>,
backrun_accounts: Vec<Pubkey>,

/// Path to keypair file used to sign and pay for transactions
#[clap(long, env)]
payer_keypair: String,
payer_keypair: PathBuf,

/// Path to keypair file used to authenticate with the backend
/// Path to keypair file used to authenticate with the Jito Block Engine
/// See: https://jito-labs.gitbook.io/mev/searcher-resources/getting-started#block-engine-api-key
#[clap(long, env)]
auth_keypair: String,
auth_keypair: PathBuf,

/// Pubsub URL. Note that this RPC server must have --rpc-pubsub-enable-block-subscription enabled
/// RPC Websocket URL.
/// See: https://solana.com/docs/rpc/websocket
/// Note that this RPC server must have --rpc-pubsub-enable-block-subscription enabled
#[clap(long, env)]
pubsub_url: String,

/// RPC URL to get block hashes from
/// RPC HTTP URL.
#[clap(long, env)]
rpc_url: String,

/// Memo program message
/// Message to pass into the memo program as part of a bundle.
#[clap(long, env, default_value_t = String::from("jito backrun"))]
message: String,

/// Tip program public key
/// See: https://jito-foundation.gitbook.io/mev/mev-payment-and-distribution/on-chain-addresses
#[clap(long, env)]
tip_program_id: String,
tip_program_id: Pubkey,

/// Comma-separated list of regions to request cross-region data from.
/// If no region specified, then default to the currently connected block engine's region.
/// Details: https://jito-labs.gitbook.io/mev/searcher-services/recommendations#cross-region
/// Available regions: https://jito-labs.gitbook.io/mev/searcher-resources/block-engine#connection-details
#[arg(long, env, value_delimiter = ',')]
regions: Vec<String>,

/// Subscribe and print bundle results.
#[clap(long, env)]
Expand Down Expand Up @@ -169,7 +181,7 @@ async fn send_bundles(
searcher_client: &mut SearcherServiceClient<InterceptedService<Channel, ClientInterceptor>>,
bundles: &[BundledTransactions],
) -> Result<Vec<result::Result<Response<SendBundleResponse>, Status>>> {
let mut futs = vec![];
let mut futs = Vec::with_capacity(bundles.len());
for b in bundles {
let mut searcher_client = searcher_client.clone();
let txs = b
Expand All @@ -183,7 +195,7 @@ async fn send_bundles(
futs.push(task);
}

let responses = futures::future::join_all(futs).await;
let responses = futures_util::future::join_all(futs).await;
let send_bundle_responses = responses.into_iter().map(|r| r.unwrap()).collect();
Ok(send_bundle_responses)
}
Expand All @@ -208,6 +220,7 @@ async fn maintenance_tick(
rpc_client: &RpcClient,
leader_schedule: &mut HashMap<Pubkey, HashSet<Slot>>,
blockhash: &mut Hash,
regions: Vec<String>,
) -> Result<()> {
*blockhash = rpc_client
.get_latest_blockhash_with_commitment(CommitmentConfig {
Expand All @@ -234,13 +247,14 @@ async fn maintenance_tick(
}

let next_scheduled_leader = searcher_client
.get_next_scheduled_leader(NextScheduledLeaderRequest {})
.get_next_scheduled_leader(NextScheduledLeaderRequest { regions })
.await?
.into_inner();
info!(
"next_scheduled_leader: {} in {} slots",
"next_scheduled_leader: {} in {} slots from {}",
next_scheduled_leader.next_leader_identity,
next_scheduled_leader.next_leader_slot - next_scheduled_leader.current_slot
next_scheduled_leader.next_leader_slot - next_scheduled_leader.current_slot,
next_scheduled_leader.next_leader_region
);

Ok(())
Expand All @@ -266,7 +280,7 @@ fn print_block_stats(
i64
),
(
"sent_rt_pp_min",
"sent_rt_pp_max",
stats.send_rt_per_packet.maximum().unwrap_or_default(),
i64
),
Expand Down Expand Up @@ -482,10 +496,11 @@ fn print_block_stats(

#[allow(clippy::too_many_arguments)]
async fn run_searcher_loop(
block_engine_addr: String,
block_engine_url: String,
auth_keypair: Arc<Keypair>,
keypair: &Keypair,
rpc_url: String,
regions: Vec<String>,
message: String,
tip_program_pubkey: Pubkey,
mut slot_receiver: Receiver<Slot>,
Expand All @@ -497,7 +512,7 @@ async fn run_searcher_loop(
let mut block_stats: HashMap<Slot, BlockStats> = HashMap::new();
let mut block_signatures: HashMap<Slot, HashSet<Signature>> = HashMap::new();

let mut searcher_client = get_searcher_client(&block_engine_addr, &auth_keypair).await?;
let mut searcher_client = get_searcher_client(block_engine_url.clone(), &auth_keypair).await?;

let mut rng = thread_rng();

Expand All @@ -519,7 +534,7 @@ async fn run_searcher_loop(
loop {
tokio::select! {
_ = tick.tick() => {
maintenance_tick(&mut searcher_client, &rpc_client, &mut leader_schedule, &mut blockhash).await?;
maintenance_tick(&mut searcher_client, &rpc_client, &mut leader_schedule, &mut blockhash, regions.clone()).await?;
}
maybe_bundle_result = bundle_results_receiver.recv() => {
let bundle_result: BundleResult = maybe_bundle_result.ok_or(BackrunError::Shutdown)?;
Expand Down Expand Up @@ -575,20 +590,11 @@ fn main() -> Result<()> {
.init();
let args: Args = Args::parse();

let payer_keypair =
Arc::new(read_keypair_file(Path::new(&args.payer_keypair)).expect("parse kp file"));
let auth_keypair =
Arc::new(read_keypair_file(Path::new(&args.auth_keypair)).expect("parse kp file"));
let payer_keypair = Arc::new(read_keypair_file(&args.payer_keypair).expect("parse kp file"));
let auth_keypair = Arc::new(read_keypair_file(&args.auth_keypair).expect("parse kp file"));

set_host_id(auth_keypair.pubkey().to_string());

let backrun_pubkeys: Vec<Pubkey> = args
.backrun_accounts
.iter()
.map(|a| Pubkey::from_str(a).unwrap())
.collect();
let tip_program_pubkey = Pubkey::from_str(&args.tip_program_id).unwrap();

let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
runtime.block_on(async move {
let (slot_sender, slot_receiver) = channel(100);
Expand All @@ -599,27 +605,29 @@ fn main() -> Result<()> {
tokio::spawn(slot_subscribe_loop(args.pubsub_url.clone(), slot_sender));
tokio::spawn(block_subscribe_loop(args.pubsub_url.clone(), block_sender));
tokio::spawn(pending_tx_loop(
args.block_engine_addr.clone(),
args.block_engine_url.clone(),
auth_keypair.clone(),
pending_tx_sender,
backrun_pubkeys,
args.backrun_accounts,
args.regions.clone(),
));

if args.subscribe_bundle_results {
tokio::spawn(bundle_results_loop(
args.block_engine_addr.clone(),
args.block_engine_url.clone(),
auth_keypair.clone(),
bundle_results_sender,
));
}

let result = run_searcher_loop(
args.block_engine_addr,
args.block_engine_url,
auth_keypair,
&payer_keypair,
args.rpc_url,
args.regions,
args.message,
tip_program_pubkey,
args.tip_program_id,
slot_receiver,
block_receiver,
bundle_results_receiver,
Expand Down
15 changes: 7 additions & 8 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ chrono = "0.4.24"
clap = { version = "4", features = ["derive", "env"] }
crossbeam-channel = "0.5.7"
env_logger = "0.10"
futures = "0.3.28"
futures-util = "0.3.28"
histogram = "0.6.9"
jito-protos = { path = "../jito_protos" }
log = "0.4.17"
prost-types = "0.11.9"
prost-types = "0.12"
rand = "0.8.5"
jito-searcher-client = { path = "../searcher_client" }
solana-client = "=1.16.16"
solana-metrics = "=1.16.16"
solana-sdk = "=1.16.16"
solana-transaction-status = "=1.16.16"
solana-client = "=1.17.20"
solana-metrics = "=1.17.20"
solana-sdk = "=1.17.20"
solana-transaction-status = "=1.17.20"
spl-memo = "3.0.1"
thiserror = "1.0.40"
tokio = { version = "~1.14.1", features = ["rt-multi-thread"] }
tonic = { version = "0.8.3", features = ["tls", "tls-roots", "tls-webpki-roots"] }
tokio = { version = "1", features = ["rt-multi-thread"] }
tonic = { version = "0.10", features = ["tls", "tls-roots", "tls-webpki-roots"] }
Loading