diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b934fc6e..be779b85 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -37,7 +37,6 @@ jobs: - name: Checkout code uses: actions/checkout@v4 with: - ref: "stable" fetch-depth: 0 submodules: true @@ -108,7 +107,6 @@ jobs: - name: Checkout code uses: actions/checkout@v4 with: - ref: "stable" fetch-depth: 0 submodules: true @@ -168,7 +166,6 @@ jobs: - name: Checkout code uses: actions/checkout@v4 with: - ref: "stable" fetch-depth: 0 submodules: true @@ -221,7 +218,6 @@ jobs: - name: Checkout code uses: actions/checkout@v4 with: - ref: "stable" fetch-depth: 0 submodules: true diff --git a/Cargo.lock b/Cargo.lock index 6dd9c8ae..819bedb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "cb-bench-pbs" -version = "0.9.0" +version = "0.9.2" dependencies = [ "alloy", "cb-common", @@ -1592,7 +1592,7 @@ dependencies = [ [[package]] name = "cb-cli" -version = "0.9.0" +version = "0.9.2" dependencies = [ "cb-common", "clap", @@ -1604,7 +1604,7 @@ dependencies = [ [[package]] name = "cb-common" -version = "0.9.0" +version = "0.9.2" dependencies = [ "aes 0.8.4", "alloy", @@ -1626,6 +1626,7 @@ dependencies = [ "futures", "headers-accept", "jsonwebtoken", + "lazy_static", "mediatype 0.20.0", "pbkdf2 0.12.2", "rand 0.9.2", @@ -1652,7 +1653,7 @@ dependencies = [ [[package]] name = "cb-metrics" -version = "0.9.0" +version = "0.9.2" dependencies = [ "axum 0.8.4", "cb-common", @@ -1665,7 +1666,7 @@ dependencies = [ [[package]] name = "cb-pbs" -version = "0.9.0" +version = "0.9.2" dependencies = [ "alloy", "async-trait", @@ -1676,6 +1677,7 @@ dependencies = [ "ethereum_ssz", "eyre", "futures", + "headers", "lazy_static", "parking_lot", "prometheus", @@ -1692,7 +1694,7 @@ dependencies = [ [[package]] name = "cb-signer" -version = "0.9.0" +version = "0.9.2" dependencies = [ "alloy", "axum 0.8.4", @@ -1721,7 +1723,7 @@ dependencies = [ [[package]] name = "cb-tests" -version = "0.9.0" +version = "0.9.2" dependencies = [ "alloy", "axum 0.8.4", @@ -1881,7 +1883,7 @@ dependencies = [ [[package]] name = "commit-boost" -version = "0.9.0" +version = "0.9.2" dependencies = [ "cb-cli", "cb-common", @@ -2169,7 +2171,7 @@ dependencies = [ [[package]] name = "da_commit" -version = "0.9.0" +version = "0.9.2" dependencies = [ "alloy", "color-eyre", @@ -6097,7 +6099,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "status_api" -version = "0.9.0" +version = "0.9.2" dependencies = [ "async-trait", "axum 0.8.4", diff --git a/Cargo.toml b/Cargo.toml index a4920cca..8946f425 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ resolver = "2" [workspace.package] edition = "2024" rust-version = "1.89" -version = "0.9.0" +version = "0.9.2" [workspace.dependencies] aes = "0.8" @@ -33,7 +33,7 @@ cb-metrics = { path = "crates/metrics" } cb-pbs = { path = "crates/pbs" } cb-signer = { path = "crates/signer" } cipher = "0.4" -clap = { version = "4.5.4", features = ["derive", "env"] } +clap = { version = "4.5.48", features = ["derive", "env"] } color-eyre = "0.6.3" ctr = "0.9.2" derive_more = { version = "2.0.1", features = ["deref", "display", "from", "into"] } diff --git a/bin/cli.rs b/bin/cli.rs index d3fa736c..234dc9bd 100644 --- a/bin/cli.rs +++ b/bin/cli.rs @@ -1,8 +1,20 @@ use clap::Parser; +/// Version string with a leading 'v' +const VERSION: &str = concat!("v", env!("CARGO_PKG_VERSION")); + +/// Subcommands and global arguments for the module +#[derive(Parser, Debug)] +#[command(name = "Commit-Boost CLI", version = VERSION, about, long_about = None)] +struct Cli {} + /// Main entry point of the Commit-Boost CLI #[tokio::main] async fn main() -> eyre::Result<()> { + // Parse the CLI arguments (currently only used for version info, more can be + // added later) + let _cli = Cli::parse(); + color_eyre::install()?; // set default backtrace unless provided diff --git a/bin/pbs.rs b/bin/pbs.rs index 69945fe8..0b7c3f72 100644 --- a/bin/pbs.rs +++ b/bin/pbs.rs @@ -7,8 +7,20 @@ use clap::Parser; use eyre::Result; use tracing::{error, info}; +/// Version string with a leading 'v' +const VERSION: &str = concat!("v", env!("CARGO_PKG_VERSION")); + +/// Subcommands and global arguments for the module +#[derive(Parser, Debug)] +#[command(name = "Commit-Boost PBS Service", version = VERSION, about, long_about = None)] +struct Cli {} + #[tokio::main] async fn main() -> Result<()> { + // Parse the CLI arguments (currently only used for version info, more can be + // added later) + let _cli = Cli::parse(); + color_eyre::install()?; let _guard = initialize_tracing_log(PBS_MODULE_NAME, LogsSettings::from_env_config()?); diff --git a/bin/signer.rs b/bin/signer.rs index 2d9a60ad..01f3c970 100644 --- a/bin/signer.rs +++ b/bin/signer.rs @@ -7,8 +7,20 @@ use clap::Parser; use eyre::Result; use tracing::{error, info}; +/// Version string with a leading 'v' +const VERSION: &str = concat!("v", env!("CARGO_PKG_VERSION")); + +/// Subcommands and global arguments for the module +#[derive(Parser, Debug)] +#[command(name = "Commit-Boost Signer Service", version = VERSION, about, long_about = None)] +struct Cli {} + #[tokio::main] async fn main() -> Result<()> { + // Parse the CLI arguments (currently only used for version info, more can be + // added later) + let _cli = Cli::parse(); + color_eyre::install()?; let _guard = initialize_tracing_log(SIGNER_MODULE_NAME, LogsSettings::from_env_config()?); diff --git a/config.example.toml b/config.example.toml index 3714a257..87f619db 100644 --- a/config.example.toml +++ b/config.example.toml @@ -89,6 +89,8 @@ headers = { X-MyCustomHeader = "MyCustomValue" } # OPTIONAL get_params = { param1 = "value1", param2 = "value2" } # Whether to enable timing games, as tuned by `target_first_request_ms` and `frequency_get_header_ms`. +# NOTE: if neither `target_first_request_ms` nor `frequency_get_header_ms` is set, this flag has no effect. +# # These values should be carefully chosen for each relay, as each relay has different latency and timing games setups. # They should only be used by advanced users, and if mis-configured can result in unforeseen effects, e.g. fetching a lower header value, # or getting a temporary IP ban. @@ -150,7 +152,7 @@ validator_pubkeys = [ # OPTIONAL loader = "./tests/data/mux_keys.example.json" # loader = { url = "http://localhost:8000/keys" } -# loader = { registry = "lido", node_operator_id = 8, enable_refreshing = false } +# loader = { registry = "lido", node_operator_id = 8, lido_module_id = 1, enable_refreshing = false } # loader = { registry = "ssv", node_operator_id = 8, enable_refreshing = false } late_in_slot_time_ms = 1500 timeout_get_header_ms = 900 diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index feeb283c..070efc7b 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -27,6 +27,7 @@ eyre.workspace = true futures.workspace = true headers-accept.workspace = true jsonwebtoken.workspace = true +lazy_static.workspace = true lh_eth2.workspace = true lh_eth2_keystore.workspace = true lh_types.workspace = true diff --git a/crates/common/src/abi/LidoCSModuleNORegistry.json b/crates/common/src/abi/LidoCSModuleNORegistry.json new file mode 100644 index 00000000..a0b98aab --- /dev/null +++ b/crates/common/src/abi/LidoCSModuleNORegistry.json @@ -0,0 +1,37 @@ +[ + { + "constant": true, + "inputs": [ + { "name": "nodeOperatorId", "type": "uint256" } + ], + "name": "getNodeOperatorSummary", + "outputs": [ + { "name": "targetLimitMode", "type": "uint256" }, + { "name": "targetValidatorsCount", "type": "uint256" }, + { "name": "stuckValidatorsCount", "type": "uint256" }, + { "name": "refundedValidatorsCount", "type": "uint256" }, + { "name": "stuckPenaltyEndTimestamp", "type": "uint256" }, + { "name": "totalExitedValidators", "type": "uint256" }, + { "name": "totalDepositedValidators", "type": "uint256" }, + { "name": "depositableValidatorsCount", "type": "uint256" } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { "name": "nodeOperatorId", "type": "uint256" }, + { "name": "startIndex", "type": "uint256" }, + { "name": "keysCount", "type": "uint256" } + ], + "name": "getSigningKeys", + "outputs": [ + { "name": "", "type": "bytes" } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + } +] diff --git a/crates/common/src/config/mux.rs b/crates/common/src/config/mux.rs index 419a097b..27950d1c 100644 --- a/crates/common/src/config/mux.rs +++ b/crates/common/src/config/mux.rs @@ -7,10 +7,9 @@ use std::{ }; use alloy::{ - primitives::{Address, U256, address}, + primitives::{Address, Bytes, U256}, providers::ProviderBuilder, rpc::{client::RpcClient, types::beacon::constants::BLS_PUBLIC_KEY_BYTES_LEN}, - sol, transports::http::Http, }; use eyre::{Context, bail, ensure}; @@ -22,7 +21,7 @@ use url::Url; use super::{MUX_PATH_ENV, PbsConfig, RelayConfig, load_optional_env_var}; use crate::{ config::{remove_duplicate_keys, safe_read_http_response}, - interop::ssv::utils::fetch_ssv_pubkeys_from_url, + interop::{lido::utils::*, ssv::utils::*}, pbs::RelayClient, types::{BlsPublicKey, Chain}, utils::default_bool, @@ -193,6 +192,8 @@ pub enum MuxKeysLoader { Registry { registry: NORegistry, node_operator_id: u64, + #[serde(default)] + lido_module_id: Option, #[serde(default = "default_bool::")] enable_refreshing: bool, }, @@ -239,30 +240,33 @@ impl MuxKeysLoader { .wrap_err("failed to fetch mux keys from HTTP endpoint") } - Self::Registry { registry, node_operator_id, enable_refreshing: _ } => match registry { - NORegistry::Lido => { - let Some(rpc_url) = rpc_url else { - bail!("Lido registry requires RPC URL to be set in the PBS config"); - }; - - fetch_lido_registry_keys( - rpc_url, - chain, - U256::from(*node_operator_id), - http_timeout, - ) - .await - } - NORegistry::SSV => { - fetch_ssv_pubkeys( - ssv_api_url, - chain, - U256::from(*node_operator_id), - http_timeout, - ) - .await + Self::Registry { registry, node_operator_id, lido_module_id, enable_refreshing: _ } => { + match registry { + NORegistry::Lido => { + let Some(rpc_url) = rpc_url else { + bail!("Lido registry requires RPC URL to be set in the PBS config"); + }; + + fetch_lido_registry_keys( + rpc_url, + chain, + U256::from(*node_operator_id), + lido_module_id.unwrap_or(1), + http_timeout, + ) + .await + } + NORegistry::SSV => { + fetch_ssv_pubkeys( + ssv_api_url, + chain, + U256::from(*node_operator_id), + http_timeout, + ) + .await + } } - }, + } }?; // Remove duplicates @@ -285,63 +289,28 @@ fn get_mux_path(mux_id: &str) -> String { format!("/{mux_id}-mux_keys.json") } -sol! { - #[allow(missing_docs)] - #[sol(rpc)] - LidoRegistry, - "src/abi/LidoNORegistry.json" -} - -// Fetching Lido Curated Module -fn lido_registry_address(chain: Chain) -> eyre::Result
{ - match chain { - Chain::Mainnet => Ok(address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5")), - Chain::Holesky => Ok(address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC")), - Chain::Hoodi => Ok(address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5")), - Chain::Sepolia => Ok(address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")), - _ => bail!("Lido registry not supported for chain: {chain:?}"), - } -} - -async fn fetch_lido_registry_keys( - rpc_url: Url, - chain: Chain, - node_operator_id: U256, - http_timeout: Duration, -) -> eyre::Result> { - debug!(?chain, %node_operator_id, "loading operator keys from Lido registry"); - - // Create an RPC provider with HTTP timeout support - let client = Client::builder().timeout(http_timeout).build()?; - let http = Http::with_client(client, rpc_url); - let is_local = http.guess_local(); - let rpc_client = RpcClient::new(http, is_local); - let provider = ProviderBuilder::new().connect_client(rpc_client); - - let registry_address = lido_registry_address(chain)?; - let registry = LidoRegistry::new(registry_address, provider); - - let total_keys = registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; - +async fn collect_registry_keys( + total_keys: u64, + mut fetch_batch: F, +) -> eyre::Result> +where + F: FnMut(u64, u64) -> Fut, + Fut: std::future::Future>, +{ if total_keys == 0 { return Ok(Vec::new()); } - debug!("fetching {total_keys} total keys"); const CALL_BATCH_SIZE: u64 = 250u64; let mut keys = vec![]; - let mut offset = 0; + let mut offset: u64 = 0; while offset < total_keys { let limit = CALL_BATCH_SIZE.min(total_keys - offset); - let pubkeys = registry - .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) - .call() - .await? - .pubkeys; + let pubkeys = fetch_batch(offset, limit).await?; ensure!( pubkeys.len() % BLS_PUBLIC_KEY_BYTES_LEN == 0, @@ -368,6 +337,59 @@ async fn fetch_lido_registry_keys( Ok(keys) } +async fn fetch_lido_csm_registry_keys( + registry_address: Address, + rpc_client: RpcClient, + node_operator_id: U256, +) -> eyre::Result> { + let provider = ProviderBuilder::new().connect_client(rpc_client); + let registry = get_lido_csm_registry(registry_address, provider); + let total_keys = fetch_lido_csm_keys_total(®istry, node_operator_id).await?; + + collect_registry_keys(total_keys, |offset, limit| { + fetch_lido_csm_keys_batch(®istry, node_operator_id, offset, limit) + }) + .await +} + +async fn fetch_lido_module_registry_keys( + registry_address: Address, + rpc_client: RpcClient, + node_operator_id: U256, +) -> eyre::Result> { + let provider = ProviderBuilder::new().connect_client(rpc_client); + let registry = get_lido_module_registry(registry_address, provider); + let total_keys: u64 = fetch_lido_module_keys_total(®istry, node_operator_id).await?; + + collect_registry_keys(total_keys, |offset, limit| { + fetch_lido_module_keys_batch(®istry, node_operator_id, offset, limit) + }) + .await +} + +async fn fetch_lido_registry_keys( + rpc_url: Url, + chain: Chain, + node_operator_id: U256, + lido_module_id: u8, + http_timeout: Duration, +) -> eyre::Result> { + debug!(?chain, %node_operator_id, ?lido_module_id, "loading operator keys from Lido registry"); + + // Create an RPC provider with HTTP timeout support + let client = Client::builder().timeout(http_timeout).build()?; + let http = Http::with_client(client, rpc_url); + let is_local = http.guess_local(); + let rpc_client = RpcClient::new(http, is_local); + let registry_address = lido_registry_address(chain, lido_module_id)?; + + if is_csm_module(chain, lido_module_id) { + fetch_lido_csm_registry_keys(registry_address, rpc_client, node_operator_id).await + } else { + fetch_lido_module_registry_keys(registry_address, rpc_client, node_operator_id).await + } +} + async fn fetch_ssv_pubkeys( mut api_url: Url, chain: Chain, @@ -421,46 +443,3 @@ async fn fetch_ssv_pubkeys( Ok(pubkeys) } - -#[cfg(test)] -mod tests { - use alloy::{primitives::U256, providers::ProviderBuilder}; - use url::Url; - - use super::*; - - #[tokio::test] - async fn test_lido_registry_address() -> eyre::Result<()> { - let url = Url::parse("https://ethereum-rpc.publicnode.com")?; - let provider = ProviderBuilder::new().connect_http(url); - - let registry = - LidoRegistry::new(address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), provider); - - const LIMIT: usize = 3; - let node_operator_id = U256::from(1); - - let total_keys: u64 = - registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; - - assert!(total_keys > LIMIT as u64); - - let pubkeys = registry - .getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)) - .call() - .await? - .pubkeys; - - let mut vec = vec![]; - for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { - vec.push( - BlsPublicKey::deserialize(chunk) - .map_err(|_| eyre::eyre!("invalid BLS public key"))?, - ); - } - - assert_eq!(vec.len(), LIMIT); - - Ok(()) - } -} diff --git a/crates/common/src/interop/lido/mod.rs b/crates/common/src/interop/lido/mod.rs new file mode 100644 index 00000000..b4ab6a6a --- /dev/null +++ b/crates/common/src/interop/lido/mod.rs @@ -0,0 +1,2 @@ +pub mod types; +pub mod utils; diff --git a/crates/common/src/interop/lido/types.rs b/crates/common/src/interop/lido/types.rs new file mode 100644 index 00000000..48aad122 --- /dev/null +++ b/crates/common/src/interop/lido/types.rs @@ -0,0 +1,15 @@ +use alloy::sol; + +sol! { + #[allow(missing_docs)] + #[sol(rpc)] + LidoRegistry, + "src/abi/LidoNORegistry.json" +} + +sol! { + #[allow(missing_docs)] + #[sol(rpc)] + LidoCSMRegistry, + "src/abi/LidoCSModuleNORegistry.json" +} diff --git a/crates/common/src/interop/lido/utils.rs b/crates/common/src/interop/lido/utils.rs new file mode 100644 index 00000000..02ff7c42 --- /dev/null +++ b/crates/common/src/interop/lido/utils.rs @@ -0,0 +1,271 @@ +use std::collections::HashMap; + +use alloy::primitives::{Address, Bytes, U256, address}; +use eyre::Context; +use lazy_static::lazy_static; + +use crate::{ + interop::lido::types::{ + LidoCSMRegistry::{self, getNodeOperatorSummaryReturn}, + LidoRegistry, + }, + types::{Chain, HoleskyLidoModule, HoodiLidoModule, MainnetLidoModule}, +}; + +lazy_static! { + static ref LIDO_REGISTRY_ADDRESSES_BY_MODULE: HashMap> = { + let mut map: HashMap> = HashMap::new(); + + // --- Mainnet --- + let mut mainnet = HashMap::new(); + mainnet.insert( + MainnetLidoModule::Curated as u8, + address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), + ); + mainnet.insert( + MainnetLidoModule::SimpleDVT as u8, + address!("aE7B191A31f627b4eB1d4DaC64eaB9976995b433"), + ); + mainnet.insert( + MainnetLidoModule::CommunityStaking as u8, + address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), + ); + map.insert(Chain::Mainnet, mainnet); + + // --- Holesky --- + let mut holesky = HashMap::new(); + holesky.insert( + HoleskyLidoModule::Curated as u8, + address!("595F64Ddc3856a3b5Ff4f4CC1d1fb4B46cFd2bAC"), + ); + holesky.insert( + HoleskyLidoModule::SimpleDVT as u8, + address!("11a93807078f8BB880c1BD0ee4C387537de4b4b6"), + ); + holesky.insert( + HoleskyLidoModule::Sandbox as u8, + address!("D6C2ce3BB8bea2832496Ac8b5144819719f343AC"), + ); + holesky.insert( + HoleskyLidoModule::CommunityStaking as u8, + address!("4562c3e63c2e586cD1651B958C22F88135aCAd4f"), + ); + map.insert(Chain::Holesky, holesky); + + // --- Hoodi --- + let mut hoodi = HashMap::new(); + hoodi.insert( + HoodiLidoModule::Curated as u8, + address!("5cDbE1590c083b5A2A64427fAA63A7cfDB91FbB5"), + ); + hoodi.insert( + HoodiLidoModule::SimpleDVT as u8, + address!("0B5236BECA68004DB89434462DfC3BB074d2c830"), + ); + hoodi.insert( + HoodiLidoModule::Sandbox as u8, + address!("682E94d2630846a503BDeE8b6810DF71C9806891"), + ); + hoodi.insert( + HoodiLidoModule::CommunityStaking as u8, + address!("79CEf36D84743222f37765204Bec41E92a93E59d"), + ); + map.insert(Chain::Hoodi, hoodi); + + // --- Sepolia -- + let mut sepolia = HashMap::new(); + sepolia.insert(1, address!("33d6E15047E8644F8DDf5CD05d202dfE587DA6E3")); + map.insert(Chain::Sepolia, sepolia); + + map + }; +} + +// Fetching appropiate registry address +pub fn lido_registry_address(chain: Chain, lido_module_id: u8) -> eyre::Result
{ + LIDO_REGISTRY_ADDRESSES_BY_MODULE + .get(&chain) + .ok_or_else(|| eyre::eyre!("Lido registry not supported for chain: {chain:?}"))? + .get(&lido_module_id) + .copied() + .ok_or_else(|| { + eyre::eyre!("Lido module id {:?} not found for chain: {chain:?}", lido_module_id) + }) +} + +pub fn is_csm_module(chain: Chain, module_id: u8) -> bool { + match chain { + Chain::Mainnet => module_id == MainnetLidoModule::CommunityStaking as u8, + Chain::Holesky => module_id == HoleskyLidoModule::CommunityStaking as u8, + Chain::Hoodi => module_id == HoodiLidoModule::CommunityStaking as u8, + _ => false, + } +} + +pub fn get_lido_csm_registry

( + registry_address: Address, + provider: P, +) -> LidoCSMRegistry::LidoCSMRegistryInstance

+where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + LidoCSMRegistry::new(registry_address, provider) +} + +pub fn get_lido_module_registry

( + registry_address: Address, + provider: P, +) -> LidoRegistry::LidoRegistryInstance

+where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + LidoRegistry::new(registry_address, provider) +} + +pub async fn fetch_lido_csm_keys_total

( + registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, + node_operator_id: U256, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let summary: getNodeOperatorSummaryReturn = + registry.getNodeOperatorSummary(node_operator_id).call().await?; + + let total_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; + + let total_u64 = u64::try_from(total_u256) + .wrap_err_with(|| format!("total keys ({total_u256}) does not fit into u64"))?; + + Ok(total_u64) +} + +pub async fn fetch_lido_module_keys_total

( + registry: &LidoRegistry::LidoRegistryInstance

, + node_operator_id: U256, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let total_keys: u64 = + registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; + + Ok(total_keys) +} + +pub async fn fetch_lido_csm_keys_batch

( + registry: &LidoCSMRegistry::LidoCSMRegistryInstance

, + node_operator_id: U256, + offset: u64, + limit: u64, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) + .call() + .await?; + + Ok(pubkeys) +} + +pub async fn fetch_lido_module_keys_batch

( + registry: &LidoRegistry::LidoRegistryInstance

, + node_operator_id: U256, + offset: u64, + limit: u64, +) -> eyre::Result +where + P: Clone + Send + Sync + 'static + alloy::providers::Provider, +{ + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::from(offset), U256::from(limit)) + .call() + .await? + .pubkeys; + + Ok(pubkeys) +} + +#[cfg(test)] +mod tests { + use alloy::{ + primitives::{U256, address}, + providers::ProviderBuilder, + rpc::types::beacon::constants::BLS_PUBLIC_KEY_BYTES_LEN, + }; + use url::Url; + + use super::*; + use crate::{interop::lido::types::LidoRegistry, types::BlsPublicKey}; + + #[tokio::test] + async fn test_lido_registry_address() -> eyre::Result<()> { + let url = Url::parse("https://ethereum-rpc.publicnode.com")?; + let provider = ProviderBuilder::new().connect_http(url); + + let registry = + LidoRegistry::new(address!("55032650b14df07b85bF18A3a3eC8E0Af2e028d5"), provider); + + const LIMIT: usize = 3; + let node_operator_id = U256::from(1); + + let total_keys: u64 = + registry.getTotalSigningKeyCount(node_operator_id).call().await?.try_into()?; + + assert!(total_keys > LIMIT as u64); + + let pubkeys = registry + .getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)) + .call() + .await? + .pubkeys; + + let mut vec = vec![]; + for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { + vec.push( + BlsPublicKey::deserialize(chunk) + .map_err(|_| eyre::eyre!("invalid BLS public key"))?, + ); + } + + assert_eq!(vec.len(), LIMIT); + + Ok(()) + } + + #[tokio::test] + async fn test_lido_csm_registry_address() -> eyre::Result<()> { + let url = Url::parse("https://ethereum-rpc.publicnode.com")?; + let provider = ProviderBuilder::new().connect_http(url); + + let registry = + LidoCSMRegistry::new(address!("dA7dE2ECdDfccC6c3AF10108Db212ACBBf9EA83F"), provider); + + const LIMIT: usize = 3; + let node_operator_id = U256::from(1); + + let summary = registry.getNodeOperatorSummary(node_operator_id).call().await?; + + let total_keys_u256 = summary.totalDepositedValidators + summary.depositableValidatorsCount; + let total_keys: u64 = total_keys_u256.try_into()?; + + assert!(total_keys > LIMIT as u64, "expected more than {LIMIT} keys, got {total_keys}"); + + let pubkeys = + registry.getSigningKeys(node_operator_id, U256::ZERO, U256::from(LIMIT)).call().await?; + + let mut vec = Vec::new(); + for chunk in pubkeys.chunks(BLS_PUBLIC_KEY_BYTES_LEN) { + vec.push( + BlsPublicKey::deserialize(chunk) + .map_err(|_| eyre::eyre!("invalid BLS public key"))?, + ); + } + + assert_eq!(vec.len(), LIMIT, "expected {LIMIT} keys, got {}", vec.len()); + + Ok(()) + } +} diff --git a/crates/common/src/interop/mod.rs b/crates/common/src/interop/mod.rs index 42502f6f..4d0230a9 100644 --- a/crates/common/src/interop/mod.rs +++ b/crates/common/src/interop/mod.rs @@ -1 +1,2 @@ +pub mod lido; pub mod ssv; diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs index 77d942cd..58066c4f 100644 --- a/crates/common/src/pbs/error.rs +++ b/crates/common/src/pbs/error.rs @@ -14,6 +14,9 @@ pub enum PbsError { #[error("json decode error: {err:?}, raw: {raw}")] JsonDecode { err: serde_json::Error, raw: String }, + #[error("error with request: {0}")] + GeneralRequest(String), + #[error("{0}")] ReadResponse(#[from] ResponseReadError), diff --git a/crates/common/src/pbs/mod.rs b/crates/common/src/pbs/mod.rs index af2c07b4..a1152b58 100644 --- a/crates/common/src/pbs/mod.rs +++ b/crates/common/src/pbs/mod.rs @@ -6,5 +6,6 @@ mod types; pub use builder::*; pub use constants::*; +pub use lh_types::ForkVersionDecode; pub use relay::*; pub use types::*; diff --git a/crates/common/src/pbs/types/mod.rs b/crates/common/src/pbs/types/mod.rs index 8ad87c08..a10cfe2a 100644 --- a/crates/common/src/pbs/types/mod.rs +++ b/crates/common/src/pbs/types/mod.rs @@ -54,6 +54,17 @@ pub struct GetHeaderParams { pub pubkey: BlsPublicKey, } +/// Which encoding types the original requester accepts in the response. +/// As the builder spec adds more encoding types, this struct can be expanded. +#[derive(Clone)] +pub struct AcceptTypes { + /// Whether SSZ encoding is accepted + pub ssz: bool, + + /// Whether JSON encoding is accepted + pub json: bool, +} + pub trait GetHeaderInfo { fn block_hash(&self) -> B256; fn value(&self) -> &U256; diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index 077b4ccd..89934471 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -29,7 +29,7 @@ pub struct JwtClaims { pub module: String, } -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq, Hash)] pub enum Chain { Mainnet, Holesky, @@ -44,6 +44,26 @@ pub enum Chain { }, } +pub enum MainnetLidoModule { + Curated = 1, + SimpleDVT = 2, + CommunityStaking = 3, +} + +pub enum HoleskyLidoModule { + Curated = 1, + SimpleDVT = 2, + Sandbox = 3, + CommunityStaking = 4, +} + +pub enum HoodiLidoModule { + Curated = 1, + SimpleDVT = 2, + Sandbox = 3, + CommunityStaking = 4, +} + pub type ForkVersion = [u8; 4]; impl std::fmt::Display for Chain { @@ -233,7 +253,8 @@ impl KnownChain { pub fn fulu_fork_slot(&self) -> u64 { match self { - KnownChain::Mainnet | KnownChain::Helder => u64::MAX, + KnownChain::Mainnet => 13164544, + KnownChain::Helder => u64::MAX, KnownChain::Holesky => 5283840, KnownChain::Sepolia => 8724480, KnownChain::Hoodi => 1622016, diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 291932d8..ddc93e1b 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -1,6 +1,7 @@ #[cfg(feature = "testing-flags")] use std::cell::Cell; use std::{ + collections::HashSet, fmt::Display, net::Ipv4Addr, str::FromStr, @@ -45,9 +46,9 @@ use crate::{ types::{BlsPublicKey, Chain, Jwt, JwtClaims, ModuleId}, }; -const APPLICATION_JSON: &str = "application/json"; -const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; -const WILDCARD: &str = "*/*"; +pub const APPLICATION_JSON: &str = "application/json"; +pub const APPLICATION_OCTET_STREAM: &str = "application/octet-stream"; +pub const WILDCARD: &str = "*/*"; const MILLIS_PER_SECOND: u64 = 1_000; pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version"; @@ -433,36 +434,34 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result eyre::Result { - let accept = Accept::from_str( - req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or(APPLICATION_JSON), - ) - .map_err(|e| eyre::eyre!("invalid accept header: {e}"))?; - - if accept.media_types().count() == 0 { - // No valid media types found, default to JSON - return Ok(EncodingType::Json); - } - - // Get the SSZ and JSON media types if present - let mut ssz_type = false; - let mut json_type = false; +pub fn get_accept_types(req_headers: &HeaderMap) -> eyre::Result> { + let mut accepted_types = HashSet::new(); let mut unsupported_type = false; - accept.media_types().for_each(|mt| match mt.essence().to_string().as_str() { - APPLICATION_OCTET_STREAM => ssz_type = true, - APPLICATION_JSON | WILDCARD => json_type = true, - _ => unsupported_type = true, - }); - - // If SSZ is present, prioritize it - if ssz_type { - return Ok(EncodingType::Ssz); + for header in req_headers.get_all(ACCEPT).iter() { + let accept = Accept::from_str(header.to_str()?) + .map_err(|e| eyre::eyre!("invalid accept header: {e}"))?; + for mt in accept.media_types() { + match mt.essence().to_string().as_str() { + APPLICATION_OCTET_STREAM => { + accepted_types.insert(EncodingType::Ssz); + } + APPLICATION_JSON | WILDCARD => { + accepted_types.insert(EncodingType::Json); + } + _ => unsupported_type = true, + }; + } } - // If there aren't any unsupported types, use JSON - if !unsupported_type { - return Ok(EncodingType::Json); + + if accepted_types.is_empty() { + if unsupported_type { + return Err(eyre::eyre!("unsupported accept type")); + } + + // No accept header so just return the same type as the content type + accepted_types.insert(get_content_type(req_headers)); } - Err(eyre::eyre!("unsupported accept type")) + Ok(accepted_types) } /// Parse CONTENT TYPE header to get the encoding type of the body, defaulting @@ -490,7 +489,7 @@ pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option /// Enum for types that can be used to encode incoming request bodies or /// outgoing response bodies -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum EncodingType { /// Body is UTF-8 encoded as JSON Json, @@ -499,21 +498,28 @@ pub enum EncodingType { Ssz, } -impl std::fmt::Display for EncodingType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl EncodingType { + /// Get the content type string for the encoding type + pub fn content_type(&self) -> &str { match self { - EncodingType::Json => write!(f, "application/json"), - EncodingType::Ssz => write!(f, "application/octet-stream"), + EncodingType::Json => APPLICATION_JSON, + EncodingType::Ssz => APPLICATION_OCTET_STREAM, } } } +impl std::fmt::Display for EncodingType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.content_type()) + } +} + impl FromStr for EncodingType { type Err = String; fn from_str(value: &str) -> Result { - match value { - "application/json" | "" => Ok(EncodingType::Json), - "application/octet-stream" => Ok(EncodingType::Ssz), + match value.to_ascii_lowercase().as_str() { + APPLICATION_JSON | "" => Ok(EncodingType::Json), + APPLICATION_OCTET_STREAM => Ok(EncodingType::Ssz), _ => Err(format!("unsupported encoding type: {value}")), } } @@ -636,8 +642,18 @@ pub fn bls_pubkey_from_hex_unchecked(hex: &str) -> BlsPublicKey { #[cfg(test)] mod test { + use axum::http::{HeaderMap, HeaderValue}; + use reqwest::header::ACCEPT; + use super::{create_jwt, decode_jwt, validate_jwt}; - use crate::types::{Jwt, ModuleId}; + use crate::{ + types::{Jwt, ModuleId}, + utils::{ + APPLICATION_JSON, APPLICATION_OCTET_STREAM, EncodingType, WILDCARD, get_accept_types, + }, + }; + + const APPLICATION_TEXT: &str = "application/text"; #[test] fn test_jwt_validation() { @@ -660,4 +676,100 @@ mod test { assert!(response.is_err()); assert_eq!(response.unwrap_err().to_string(), "InvalidSignature"); } + + /// Make sure a missing Accept header is interpreted as JSON + #[test] + fn test_missing_accept_header() { + let headers = HeaderMap::new(); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result.len(), 1); + assert!(result.contains(&EncodingType::Json)); + } + + /// Test accepting JSON + #[test] + fn test_accept_header_json() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_JSON).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result.len(), 1); + assert!(result.contains(&EncodingType::Json)); + } + + /// Test accepting SSZ + #[test] + fn test_accept_header_ssz() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_OCTET_STREAM).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result.len(), 1); + assert!(result.contains(&EncodingType::Ssz)); + } + + /// Test accepting wildcards + #[test] + fn test_accept_header_wildcard() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(WILDCARD).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result.len(), 1); + assert!(result.contains(&EncodingType::Json)); + } + + /// Test accepting one header with multiple values + #[test] + fn test_accept_header_multiple_values() { + let header_string = format!("{APPLICATION_JSON}, {APPLICATION_OCTET_STREAM}"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result.len(), 2); + assert!(result.contains(&EncodingType::Json)); + assert!(result.contains(&EncodingType::Ssz)); + } + + /// Test accepting multiple headers + #[test] + fn test_multiple_accept_headers() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_JSON).unwrap()); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_OCTET_STREAM).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result.len(), 2); + assert!(result.contains(&EncodingType::Json)); + assert!(result.contains(&EncodingType::Ssz)); + } + + /// Test accepting one header with multiple values, including a type that + /// can't be used + #[test] + fn test_accept_header_multiple_values_including_unknown() { + let header_string = + format!("{APPLICATION_JSON}, {APPLICATION_OCTET_STREAM}, {APPLICATION_TEXT}"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + let result = get_accept_types(&headers).unwrap(); + assert_eq!(result.len(), 2); + assert!(result.contains(&EncodingType::Json)); + assert!(result.contains(&EncodingType::Ssz)); + } + + /// Test rejecting an unknown accept type + #[test] + fn test_invalid_accept_header_type() { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(APPLICATION_TEXT).unwrap()); + let result = get_accept_types(&headers); + assert!(result.is_err()); + } + + /// Test accepting one header with multiple values + #[test] + fn test_accept_header_invalid_parse() { + let header_string = format!("{APPLICATION_JSON}, a?;ef)"); + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_str(&header_string).unwrap()); + let result = get_accept_types(&headers); + assert!(result.is_err()); + } } diff --git a/crates/pbs/Cargo.toml b/crates/pbs/Cargo.toml index 1c5c2f1f..b0c1585e 100644 --- a/crates/pbs/Cargo.toml +++ b/crates/pbs/Cargo.toml @@ -15,6 +15,7 @@ cb-metrics.workspace = true ethereum_ssz.workspace = true eyre.workspace = true futures.workspace = true +headers.workspace = true lazy_static.workspace = true parking_lot.workspace = true prometheus.workspace = true diff --git a/crates/pbs/src/error.rs b/crates/pbs/src/error.rs index 6c1c5c68..1214fd6a 100644 --- a/crates/pbs/src/error.rs +++ b/crates/pbs/src/error.rs @@ -1,4 +1,5 @@ use axum::{http::StatusCode, response::IntoResponse}; +use cb_common::utils::BodyDeserializeError; #[derive(Debug)] /// Errors that the PbsService returns to client @@ -7,6 +8,7 @@ pub enum PbsClientError { NoPayload, Internal, DecodeError(String), + RelayError(String), } impl PbsClientError { @@ -16,10 +18,17 @@ impl PbsClientError { PbsClientError::NoPayload => StatusCode::BAD_GATEWAY, PbsClientError::Internal => StatusCode::INTERNAL_SERVER_ERROR, PbsClientError::DecodeError(_) => StatusCode::BAD_REQUEST, + PbsClientError::RelayError(_) => StatusCode::FAILED_DEPENDENCY, } } } +impl From for PbsClientError { + fn from(e: BodyDeserializeError) -> Self { + PbsClientError::DecodeError(format!("failed to deserialize body: {e}")) + } +} + impl IntoResponse for PbsClientError { fn into_response(self) -> axum::response::Response { let msg = match &self { @@ -27,6 +36,7 @@ impl IntoResponse for PbsClientError { PbsClientError::NoPayload => "no payload from relays".to_string(), PbsClientError::Internal => "internal server error".to_string(), PbsClientError::DecodeError(e) => format!("error decoding request: {e}"), + PbsClientError::RelayError(e) => format!("error processing relay response: {e}"), }; (self.status_code(), msg).into_response() diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 86743703..d495166a 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -12,20 +12,24 @@ use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ constants::APPLICATION_BUILDER_DOMAIN, pbs::{ - EMPTY_TX_ROOT_HASH, ExecutionPayloadHeaderRef, GetHeaderInfo, GetHeaderParams, - GetHeaderResponse, HEADER_START_TIME_UNIX_MS, HEADER_TIMEOUT_MS, RelayClient, + EMPTY_TX_ROOT_HASH, ExecutionPayloadHeaderRef, ForkName, ForkVersionDecode, GetHeaderInfo, + GetHeaderParams, GetHeaderResponse, HEADER_START_TIME_UNIX_MS, HEADER_TIMEOUT_MS, + RelayClient, SignedBuilderBid, error::{PbsError, ValidationError}, }, signature::verify_signed_message, types::{BlsPublicKey, BlsPublicKeyBytes, BlsSignature, Chain}, utils::{ - get_user_agent_with_version, ms_into_slot, read_chunked_body_with_max, - timestamp_of_slot_start_sec, utcnow_ms, + EncodingType, get_consensus_version_header, get_user_agent_with_version, ms_into_slot, + read_chunked_body_with_max, timestamp_of_slot_start_sec, utcnow_ms, }, }; use futures::future::join_all; use parking_lot::RwLock; -use reqwest::{StatusCode, header::USER_AGENT}; +use reqwest::{ + StatusCode, + header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, +}; use tokio::time::sleep; use tracing::{Instrument, debug, error, warn}; use tree_hash::TreeHash; @@ -41,6 +45,40 @@ use crate::{ utils::check_gas_limit, }; +/// Info about an incoming get_header request. +/// Sent from get_header to each send_timed_get_header call. +#[derive(Clone)] +struct RequestInfo { + /// The blockchain parameters of the get_header request (what slot it's for, + /// which pubkey is requesting it, etc) + params: GetHeaderParams, + + /// Common baseline of headers to send with each request + headers: Arc, + + /// The chain the request is for + chain: Chain, + + /// Context for validating the header returned by the relay + validation: ValidationContext, +} + +// Context for validating the header +#[derive(Clone)] +struct ValidationContext { + // Whether to skip signature verification + skip_sigverify: bool, + + // Minimum acceptable bid, in wei + min_bid_wei: U256, + + // Whether extra validation of the parent block is enabled + extra_validation_enabled: bool, + + // The parent block, if fetched + parent_block: Arc>>, +} + /// Implements https://ethereum.github.io/builder-specs/#/Builder/getHeader /// Returns 200 if at least one relay returns 200, else 204 pub async fn get_header( @@ -97,22 +135,33 @@ pub async fn get_header( let mut send_headers = HeaderMap::new(); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); + // Create the Accept headers for requests since the module handles both SSZ and + // JSON + let accept_types = + [EncodingType::Ssz.content_type(), EncodingType::Json.content_type()].join(","); + send_headers.insert(ACCEPT, HeaderValue::from_str(&accept_types).unwrap()); + + // Send requests to all relays concurrently + let slot = params.slot as i64; + let request_info = Arc::new(RequestInfo { + params, + headers: Arc::new(send_headers), + chain: state.config.chain, + validation: ValidationContext { + skip_sigverify: state.pbs_config().skip_sigverify, + min_bid_wei: state.pbs_config().min_bid_wei, + extra_validation_enabled: state.extra_validation_enabled(), + parent_block, + }, + }); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { handles.push( send_timed_get_header( - params.clone(), + request_info.clone(), relay.clone(), - state.config.chain, - send_headers.clone(), ms_into_slot, max_timeout_ms, - ValidationContext { - skip_sigverify: state.pbs_config().skip_sigverify, - min_bid_wei: state.pbs_config().min_bid_wei, - extra_validation_enabled: state.extra_validation_enabled(), - parent_block: parent_block.clone(), - }, ) .in_current_span(), ); @@ -125,7 +174,7 @@ pub async fn get_header( match res { Ok(Some(res)) => { - RELAY_LAST_SLOT.with_label_values(&[relay_id]).set(params.slot as i64); + RELAY_LAST_SLOT.with_label_values(&[relay_id]).set(slot); let value_gwei = (res.data.message.value() / U256::from(1_000_000_000)) .try_into() .unwrap_or_default(); @@ -170,15 +219,13 @@ async fn fetch_parent_block( } async fn send_timed_get_header( - params: GetHeaderParams, + request_info: Arc, relay: RelayClient, - chain: Chain, - headers: HeaderMap, ms_into_slot: u64, mut timeout_left_ms: u64, - validation: ValidationContext, ) -> Result, PbsError> { - let url = relay.get_header_url(params.slot, ¶ms.parent_hash, ¶ms.pubkey)?; + let params = &request_info.params; + let url = Arc::new(relay.get_header_url(params.slot, ¶ms.parent_hash, ¶ms.pubkey)?); if relay.config.enable_timing_games { if let Some(target_ms) = relay.config.target_first_request_ms { @@ -209,18 +256,12 @@ async fn send_timed_get_header( ); loop { - let params = params.clone(); handles.push(tokio::spawn( send_one_get_header( - params, + request_info.clone(), relay.clone(), - chain, - RequestContext { - timeout_ms: timeout_left_ms, - url: url.clone(), - headers: headers.clone(), - }, - validation.clone(), + url.clone(), + timeout_left_ms, ) .in_current_span(), )); @@ -276,54 +317,124 @@ async fn send_timed_get_header( } // if no timing games or no repeated send, just send one request - send_one_get_header( - params, - relay, - chain, - RequestContext { timeout_ms: timeout_left_ms, url, headers }, - validation, - ) - .await - .map(|(_, maybe_header)| maybe_header) + send_one_get_header(request_info, relay, url, timeout_left_ms) + .await + .map(|(_, maybe_header)| maybe_header) } -struct RequestContext { - url: Url, - timeout_ms: u64, - headers: HeaderMap, -} +/// Handles requesting a header from a relay, decoding, and validation. +/// Used by send_timed_get_header to handle each individual request. +async fn send_one_get_header( + request_info: Arc, + relay: RelayClient, + url: Arc, + timeout_left_ms: u64, +) -> Result<(u64, Option), PbsError> { + // Send the header request + let (start_request_time, get_header_response) = send_get_header_impl( + &relay, + url, + timeout_left_ms, + (*request_info.headers).clone(), /* Create a copy of the HeaderMap because the impl will + * modify it */ + ) + .await?; + let get_header_response = match get_header_response { + None => { + // Break if there's no header + return Ok((start_request_time, None)); + } + Some(res) => res, + }; -#[derive(Clone)] -struct ValidationContext { - skip_sigverify: bool, - min_bid_wei: U256, - extra_validation_enabled: bool, - parent_block: Arc>>, + // Extract the basic header data needed for validation + let header_data = match &get_header_response.data.message.header() { + ExecutionPayloadHeaderRef::Bellatrix(_) | + ExecutionPayloadHeaderRef::Capella(_) | + ExecutionPayloadHeaderRef::Deneb(_) | + ExecutionPayloadHeaderRef::Gloas(_) => { + Err(PbsError::Validation(ValidationError::UnsupportedFork)) + } + ExecutionPayloadHeaderRef::Electra(res) => Ok(HeaderData { + block_hash: res.block_hash.0, + parent_hash: res.parent_hash.0, + tx_root: res.transactions_root, + value: *get_header_response.value(), + timestamp: res.timestamp, + }), + ExecutionPayloadHeaderRef::Fulu(res) => Ok(HeaderData { + block_hash: res.block_hash.0, + parent_hash: res.parent_hash.0, + tx_root: res.transactions_root, + value: *get_header_response.value(), + timestamp: res.timestamp, + }), + }?; + + // Validate the header + let chain = request_info.chain; + let params = &request_info.params; + let validation = &request_info.validation; + validate_header_data( + &header_data, + chain, + params.parent_hash, + validation.min_bid_wei, + params.slot, + )?; + + // Validate the relay signature + if !validation.skip_sigverify { + validate_signature( + chain, + relay.pubkey(), + get_header_response.data.message.pubkey(), + &get_header_response.data.message, + &get_header_response.data.signature, + )?; + } + + // Validate the parent block if enabled + if validation.extra_validation_enabled { + let parent_block = validation.parent_block.read(); + if let Some(parent_block) = parent_block.as_ref() { + extra_validation(parent_block, &get_header_response)?; + } else { + warn!( + relay_id = relay.id.as_ref(), + "parent block not found, skipping extra validation" + ); + } + } + + Ok((start_request_time, Some(get_header_response))) } -async fn send_one_get_header( - params: GetHeaderParams, - relay: RelayClient, - chain: Chain, - mut req_config: RequestContext, - validation: ValidationContext, +/// Sends a get_header request to a relay, returning the response, the time the +/// request was started, and the encoding type of the response (if any). +/// Used by send_one_get_header to perform the actual request submission. +async fn send_get_header_impl( + relay: &RelayClient, + url: Arc, + timeout_left_ms: u64, + mut headers: HeaderMap, ) -> Result<(u64, Option), PbsError> { // the timestamp in the header is the consensus block time which is fixed, // use the beginning of the request as proxy to make sure we use only the // last one received + let start_request = Instant::now(); let start_request_time = utcnow_ms(); - req_config.headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time)); + headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time)); // The timeout header indicating how long a relay has to respond, so they can // minimize timing games without losing the bid - req_config.headers.insert(HEADER_TIMEOUT_MS, HeaderValue::from(req_config.timeout_ms)); + headers.insert(HEADER_TIMEOUT_MS, HeaderValue::from(timeout_left_ms)); - let start_request = Instant::now(); let res = match relay .client - .get(req_config.url) - .timeout(Duration::from_millis(req_config.timeout_ms)) - .headers(req_config.headers) + .get(url.as_ref().clone()) + .timeout(Duration::from_millis(timeout_left_ms)) + .headers(headers) .send() .await { @@ -336,129 +447,114 @@ async fn send_one_get_header( } }; + // Log the response code and latency + let code = res.status(); let request_latency = start_request.elapsed(); RELAY_LATENCY .with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id]) .observe(request_latency.as_secs_f64()); - - let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc(); - let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER_RESPONSE).await?; - let header_size_bytes = response_bytes.len(); - if !code.is_success() { - return Err(PbsError::RelayResponse { - error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), - code: code.as_u16(), - }); - }; - if code == StatusCode::NO_CONTENT { - debug!( - relay_id = relay.id.as_ref(), - ?code, - latency = ?request_latency, - response = ?response_bytes, - "no header from relay" - ); - return Ok((start_request_time, None)); + // According to the spec, OK is the only allowed success code so this can break + // early + if code != StatusCode::OK { + if code == StatusCode::NO_CONTENT { + let response_bytes = + read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER_RESPONSE).await?; + debug!( + relay_id = relay.id.as_ref(), + ?code, + latency = ?request_latency, + response = ?response_bytes, + "no header from relay" + ); + return Ok((start_request_time, None)); + } else { + return Err(PbsError::RelayResponse { + error_msg: format!("unexpected status code from relay: {code}"), + code: code.as_u16(), + }); + } } - let get_header_response = match serde_json::from_slice::(&response_bytes) { - Ok(parsed) => parsed, - Err(err) => { - return Err(PbsError::JsonDecode { - err, - raw: String::from_utf8_lossy(&response_bytes).into_owned(), - }); + // Get the content type + let content_type = match res.headers().get(CONTENT_TYPE) { + None => { + // Assume a missing content type means JSON; shouldn't happen in practice with + // any respectable HTTP server but just in case + EncodingType::Json + } + Some(header_value) => match header_value.to_str().map_err(|e| PbsError::RelayResponse { + error_msg: format!("cannot decode content-type header: {e}").to_string(), + code: (code.as_u16()), + })? { + header_str if header_str.eq_ignore_ascii_case(&EncodingType::Ssz.to_string()) => { + EncodingType::Ssz + } + header_str if header_str.eq_ignore_ascii_case(&EncodingType::Json.to_string()) => { + EncodingType::Json + } + header_str => { + return Err(PbsError::RelayResponse { + error_msg: format!("unsupported content type: {header_str}"), + code: code.as_u16(), + }) + } + }, + }; + + // Decode the body + let fork = get_consensus_version_header(res.headers()); + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER_RESPONSE).await?; + let get_header_response = match content_type { + EncodingType::Json => decode_json_payload(&response_bytes)?, + EncodingType::Ssz => { + let fork = fork.ok_or(PbsError::RelayResponse { + error_msg: "relay did not provide consensus version header for ssz payload" + .to_string(), + code: code.as_u16(), + })?; + decode_ssz_payload(&response_bytes, fork)? } }; + // Log and return debug!( relay_id = relay.id.as_ref(), - header_size_bytes, + header_size_bytes = response_bytes.len(), latency = ?request_latency, version =? get_header_response.version, value_eth = format_ether(*get_header_response.value()), block_hash = %get_header_response.block_hash(), + content_type = ?content_type, "received new header" ); + Ok((start_request_time, Some(get_header_response))) +} - match &get_header_response.data.message.header() { - ExecutionPayloadHeaderRef::Bellatrix(_) | - ExecutionPayloadHeaderRef::Capella(_) | - ExecutionPayloadHeaderRef::Deneb(_) | - ExecutionPayloadHeaderRef::Gloas(_) => { - return Err(PbsError::Validation(ValidationError::UnsupportedFork)) - } - ExecutionPayloadHeaderRef::Electra(res) => { - let header_data = HeaderData { - block_hash: res.block_hash.0, - parent_hash: res.parent_hash.0, - tx_root: res.transactions_root, - value: *get_header_response.value(), - timestamp: res.timestamp, - }; - - validate_header_data( - &header_data, - chain, - params.parent_hash, - validation.min_bid_wei, - params.slot, - )?; - - if !validation.skip_sigverify { - validate_signature( - chain, - relay.pubkey(), - get_header_response.data.message.pubkey(), - &get_header_response.data.message, - &get_header_response.data.signature, - )?; - } - } - ExecutionPayloadHeaderRef::Fulu(res) => { - let header_data = HeaderData { - block_hash: res.block_hash.0, - parent_hash: res.parent_hash.0, - tx_root: res.transactions_root, - value: *get_header_response.value(), - timestamp: res.timestamp, - }; - - validate_header_data( - &header_data, - chain, - params.parent_hash, - validation.min_bid_wei, - params.slot, - )?; - - if !validation.skip_sigverify { - validate_signature( - chain, - relay.pubkey(), - get_header_response.data.message.pubkey(), - &get_header_response.data.message, - &get_header_response.data.signature, - )?; - } - } +/// Decode a JSON-encoded get_header response +fn decode_json_payload(response_bytes: &[u8]) -> Result { + match serde_json::from_slice::(response_bytes) { + Ok(parsed) => Ok(parsed), + Err(err) => Err(PbsError::JsonDecode { + err, + raw: String::from_utf8_lossy(response_bytes).into_owned(), + }), } +} - if validation.extra_validation_enabled { - let parent_block = validation.parent_block.read(); - if let Some(parent_block) = parent_block.as_ref() { - extra_validation(parent_block, &get_header_response)?; - } else { - warn!( - relay_id = relay.id.as_ref(), - "parent block not found, skipping extra validation" - ); +/// Decode an SSZ-encoded get_header response +fn decode_ssz_payload( + response_bytes: &[u8], + fork: ForkName, +) -> Result { + let data = SignedBuilderBid::from_ssz_bytes_by_fork(response_bytes, fork).map_err(|e| { + PbsError::RelayResponse { + error_msg: (format!("error decoding relay payload: {e:?}")).to_string(), + code: 200, } - } - - Ok((start_request_time, Some(get_header_response))) + })?; + Ok(GetHeaderResponse { version: fork, data, metadata: Default::default() }) } struct HeaderData { diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 2b10dcaa..89d18fca 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -1,5 +1,4 @@ use std::{ - str::FromStr, sync::Arc, time::{Duration, Instant}, }; @@ -8,26 +7,46 @@ use alloy::{eips::eip7594::CELLS_PER_EXT_BLOB, primitives::B256}; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ pbs::{ - BlindedBeaconBlock, BlobsBundle, BuilderApiVersion, ForkName, HEADER_CONSENSUS_VERSION, - HEADER_START_TIME_UNIX_MS, KzgCommitments, RelayClient, SignedBlindedBeaconBlock, - SubmitBlindedBlockResponse, + BlindedBeaconBlock, BlobsBundle, BuilderApiVersion, ForkName, ForkVersionDecode, + HEADER_START_TIME_UNIX_MS, KzgCommitments, PayloadAndBlobs, RelayClient, + SignedBlindedBeaconBlock, SubmitBlindedBlockResponse, error::{PbsError, ValidationError}, }, - utils::{get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, get_consensus_version_header, + get_user_agent_with_version, read_chunked_body_with_max, utcnow_ms, + }, }; use futures::{FutureExt, future::select_ok}; -use reqwest::header::USER_AGENT; +use reqwest::{ + StatusCode, + header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, +}; +use ssz::Encode; use tracing::{debug, warn}; use url::Url; use crate::{ - constants::{ - MAX_SIZE_SUBMIT_BLOCK_RESPONSE, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR, - }, + TIMEOUT_ERROR_CODE_STR, + constants::{MAX_SIZE_SUBMIT_BLOCK_RESPONSE, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, }; +/// Info about a proposal submission request. +/// Sent from submit_block to the submit_block_with_timeout function. +#[derive(Clone)] +struct ProposalInfo { + /// The signed blinded block to submit + signed_blinded_block: Arc, + + /// Common baseline of headers to send with each request + headers: Arc, + + /// The version of the submit_block route being used + api_version: BuilderApiVersion, +} + /// Implements https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlock and /// https://ethereum.github.io/builder-specs/#/Builder/submitBlindedBlockV2. Use `api_version` to /// distinguish between the two. @@ -39,36 +58,30 @@ pub async fn submit_block( ) -> eyre::Result> { debug!(?req_headers, "received headers"); - let fork_name = req_headers - .get(HEADER_CONSENSUS_VERSION) - .and_then(|h| { - let str = h.to_str().ok()?; - ForkName::from_str(str).ok() - }) - .unwrap_or_else(|| { - let slot = signed_blinded_block.slot().as_u64(); - state.config.chain.fork_by_slot(slot) - }); - - // safe because ForkName is visible ASCII chars - let consensus_version = HeaderValue::from_str(&fork_name.to_string()).unwrap(); - // prepare headers let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms())); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - send_headers.insert(HEADER_CONSENSUS_VERSION, consensus_version); + // Create the Accept headers for requests since the module handles both SSZ and + // JSON + let accept_types = + [EncodingType::Ssz.content_type(), EncodingType::Json.content_type()].join(","); + send_headers.insert(ACCEPT, HeaderValue::from_str(&accept_types).unwrap()); + + // Send requests to all relays concurrently + let proposal_info = Arc::new(ProposalInfo { + signed_blinded_block, + headers: Arc::new(send_headers), + api_version, + }); let mut handles = Vec::with_capacity(state.all_relays().len()); - for relay in state.all_relays().iter().cloned() { + for relay in state.all_relays().iter() { handles.push( tokio::spawn(submit_block_with_timeout( - signed_blinded_block.clone(), - relay, - send_headers.clone(), + proposal_info.clone(), + relay.clone(), state.pbs_config().timeout_get_payload_ms, - api_version, - fork_name, )) .map(|join_result| match join_result { Ok(res) => res, @@ -87,14 +100,11 @@ pub async fn submit_block( /// Submit blinded block to relay, retry connection errors until the /// given timeout has passed async fn submit_block_with_timeout( - signed_blinded_block: Arc, + proposal_info: Arc, relay: RelayClient, - headers: HeaderMap, timeout_ms: u64, - api_version: BuilderApiVersion, - fork_name: ForkName, ) -> Result, PbsError> { - let mut url = relay.submit_block_url(api_version)?; + let mut url = Arc::new(relay.submit_block_url(proposal_info.api_version)?); let mut remaining_timeout_ms = timeout_ms; let mut retry = 0; let mut backoff = Duration::from_millis(250); @@ -102,14 +112,11 @@ async fn submit_block_with_timeout( loop { let start_request = Instant::now(); match send_submit_block( + proposal_info.clone(), url.clone(), - &signed_blinded_block, &relay, - headers.clone(), remaining_timeout_ms, retry, - &api_version, - fork_name, ) .await { @@ -127,12 +134,14 @@ async fn submit_block_with_timeout( } } - Err(err) if err.is_not_found() && matches!(api_version, BuilderApiVersion::V2) => { + Err(err) + if err.is_not_found() && proposal_info.api_version == BuilderApiVersion::V2 => + { warn!( relay_id = relay.id.as_ref(), "relay does not support v2 endpoint, retrying with v1" ); - url = relay.submit_block_url(BuilderApiVersion::V1)?; + url = Arc::new(relay.submit_block_url(BuilderApiVersion::V1)?); } Err(err) => return Err(err), @@ -146,22 +155,91 @@ async fn submit_block_with_timeout( // back #[allow(clippy::too_many_arguments)] async fn send_submit_block( - url: Url, - signed_blinded_block: &SignedBlindedBeaconBlock, + proposal_info: Arc, + url: Arc, relay: &RelayClient, - headers: HeaderMap, timeout_ms: u64, retry: u32, - api_version: &BuilderApiVersion, - fork_name: ForkName, +) -> Result, PbsError> { + // Send the request + let block_response = send_submit_block_impl( + relay, + url, + timeout_ms, + (*proposal_info.headers).clone(), + &proposal_info.signed_blinded_block, + retry, + proposal_info.api_version, + ) + .await?; + let block_response = match block_response { + None => { + // Break if there's no response (v2 accepted) + return Ok(None); + } + Some(res) => res, + }; + + // Extract the info needed for validation + let got_block_hash = block_response.data.execution_payload.block_hash().0; + + // request has different type so cant be deserialized in the wrong version, + // response has a "version" field + match &proposal_info.signed_blinded_block.message() { + BlindedBeaconBlock::Electra(blinded_block) => { + let expected_block_hash = + blinded_block.body.execution_payload.execution_payload_header.block_hash.0; + let expected_commitments = &blinded_block.body.blob_kzg_commitments; + + validate_unblinded_block( + expected_block_hash, + got_block_hash, + expected_commitments, + &block_response.data.blobs_bundle, + block_response.version, + ) + } + + BlindedBeaconBlock::Fulu(blinded_block) => { + let expected_block_hash = + blinded_block.body.execution_payload.execution_payload_header.block_hash.0; + let expected_commitments = &blinded_block.body.blob_kzg_commitments; + + validate_unblinded_block( + expected_block_hash, + got_block_hash, + expected_commitments, + &block_response.data.blobs_bundle, + block_response.version, + ) + } + + _ => return Err(PbsError::Validation(ValidationError::UnsupportedFork)), + }?; + + Ok(Some(block_response)) +} + +async fn send_submit_block_impl( + relay: &RelayClient, + url: Arc, + timeout_ms: u64, + headers: HeaderMap, + signed_blinded_block: &SignedBlindedBeaconBlock, + retry: u32, + api_version: BuilderApiVersion, ) -> Result, PbsError> { let start_request = Instant::now(); - let res = match relay + + // Try SSZ first + let mut res = match relay .client - .post(url) + .post(url.as_ref().clone()) .timeout(Duration::from_millis(timeout_ms)) - .headers(headers) - .json(&signed_blinded_block) + .headers(headers.clone()) + .body(signed_blinded_block.as_ssz_bytes()) + .header(CONTENT_TYPE, EncodingType::Ssz.to_string()) + .header(CONSENSUS_VERSION_HEADER, signed_blinded_block.fork_name_unchecked().to_string()) .send() .await { @@ -177,51 +255,136 @@ async fn send_submit_block( return Err(err.into()); } }; + + // If we got a client error, retry with JSON - the spec says that this should be + // a 406 or 415, but we're a little more permissive here + if res.status().is_client_error() { + warn!( + relay_id = relay.id.as_ref(), + "relay does not support SSZ, resubmitting block with JSON content-type" + ); + res = match relay + .client + .post(url.as_ref().clone()) + .timeout(Duration::from_millis(timeout_ms)) + .headers(headers) + .body(serde_json::to_vec(&signed_blinded_block).unwrap()) + .header(CONTENT_TYPE, EncodingType::Json.to_string()) + .send() + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[ + TIMEOUT_ERROR_CODE_STR, + SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, + &relay.id, + ]) + .inc(); + return Err(err.into()); + } + }; + } + + // Log the response code and latency + let code = res.status(); let request_latency = start_request.elapsed(); RELAY_LATENCY .with_label_values(&[SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) .observe(request_latency.as_secs_f64()); - - let code = res.status(); RELAY_STATUS_CODE .with_label_values(&[code.as_str(), SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) .inc(); - let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE).await?; - if !code.is_success() { + // If this was API v2 and succeeded then we can just return here + if api_version != BuilderApiVersion::V1 { + debug!( + relay_id = relay.id.as_ref(), + retry, + latency = ?request_latency, + "received 202 Accepted for v2 submit_block" + ); + + match code { + StatusCode::ACCEPTED => { + return Ok(None); + } + StatusCode::OK => { + warn!( + relay_id = relay.id.as_ref(), + "relay sent OK response for v2 submit_block, expected 202 Accepted" + ); + return Ok(None); + } + _ => { + return Err(PbsError::RelayResponse { + error_msg: format!( + "relay sent unexpected code for builder route v2 {}: {code}", + relay.id.as_ref() + ), + code: code.as_u16(), + }); + } + } + } + + // If the code is not OK, return early + if code != StatusCode::OK { + let response_bytes = + read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE).await?; let err = PbsError::RelayResponse { error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), code: code.as_u16(), }; // we requested the payload from all relays, but some may have not received it - warn!(relay_id = relay.id.as_ref(), retry, %err, "failed to get payload (this might be ok if other relays have it)"); + warn!(relay_id = relay.id.as_ref(), %err, "failed to get payload (this might be ok if other relays have it)"); return Err(err); - }; - - if api_version != &BuilderApiVersion::V1 { - // v2 response is going to be empty, so just break here - debug!( - relay_id = relay.id.as_ref(), - retry, - latency = ?request_latency, - "successful request" - ); - - return Ok(None); } - let block_response = match serde_json::from_slice::(&response_bytes) - { - Ok(parsed) => parsed, - Err(err) => { - return Err(PbsError::JsonDecode { - err, - raw: String::from_utf8_lossy(&response_bytes).into_owned(), - }); + // We're on v1 so decode the payload normally - get the content type + let content_type = match res.headers().get(CONTENT_TYPE) { + None => { + // Assume a missing content type means JSON; shouldn't happen in practice with + // any respectable HTTP server but just in case + EncodingType::Json + } + Some(header_value) => match header_value.to_str().map_err(|e| PbsError::RelayResponse { + error_msg: format!("cannot decode content-type header: {e}").to_string(), + code: (code.as_u16()), + })? { + header_str if header_str.eq_ignore_ascii_case(&EncodingType::Ssz.to_string()) => { + EncodingType::Ssz + } + header_str if header_str.eq_ignore_ascii_case(&EncodingType::Json.to_string()) => { + EncodingType::Json + } + header_str => { + return Err(PbsError::RelayResponse { + error_msg: format!("unsupported content type: {header_str}"), + code: code.as_u16(), + }) + } + }, + }; + + // Decode the body + let fork = get_consensus_version_header(res.headers()); + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_SUBMIT_BLOCK_RESPONSE).await?; + let block_response = match content_type { + EncodingType::Json => decode_json_payload(&response_bytes)?, + EncodingType::Ssz => { + let fork = fork.ok_or(PbsError::RelayResponse { + error_msg: "relay did not provide consensus version header for ssz payload" + .to_string(), + code: code.as_u16(), + })?; + decode_ssz_payload(&response_bytes, fork)? } }; + // Log and return debug!( relay_id = relay.id.as_ref(), retry, @@ -229,44 +392,32 @@ async fn send_submit_block( version =% block_response.version, "received unblinded block" ); + Ok(Some(block_response)) +} - let got_block_hash = block_response.data.execution_payload.block_hash().0; - - // request has different type so cant be deserialized in the wrong version, - // response has a "version" field - match &signed_blinded_block.message() { - BlindedBeaconBlock::Electra(blinded_block) => { - let expected_block_hash = - blinded_block.body.execution_payload.execution_payload_header.block_hash.0; - let expected_commitments = &blinded_block.body.blob_kzg_commitments; - - validate_unblinded_block( - expected_block_hash, - got_block_hash, - expected_commitments, - &block_response.data.blobs_bundle, - fork_name, - ) - } - - BlindedBeaconBlock::Fulu(blinded_block) => { - let expected_block_hash = - blinded_block.body.execution_payload.execution_payload_header.block_hash.0; - let expected_commitments = &blinded_block.body.blob_kzg_commitments; +/// Decode a JSON-encoded submit_block response +fn decode_json_payload(response_bytes: &[u8]) -> Result { + match serde_json::from_slice::(response_bytes) { + Ok(parsed) => Ok(parsed), + Err(err) => Err(PbsError::JsonDecode { + err, + raw: String::from_utf8_lossy(response_bytes).into_owned(), + }), + } +} - validate_unblinded_block( - expected_block_hash, - got_block_hash, - expected_commitments, - &block_response.data.blobs_bundle, - fork_name, - ) +/// Decode an SSZ-encoded submit_block response +fn decode_ssz_payload( + response_bytes: &[u8], + fork: ForkName, +) -> Result { + let data = PayloadAndBlobs::from_ssz_bytes_by_fork(response_bytes, fork).map_err(|e| { + PbsError::RelayResponse { + error_msg: (format!("error decoding relay payload: {e:?}")).to_string(), + code: 200, } - - _ => return Err(PbsError::Validation(ValidationError::UnsupportedFork)), - }?; - - Ok(Some(block_response)) + })?; + Ok(SubmitBlindedBlockResponse { version: fork, data, metadata: Default::default() }) } fn validate_unblinded_block( diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index ca8d2d7c..a11d77e2 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -7,7 +7,7 @@ use axum::{ use cb_common::{ pbs::{GetHeaderInfo, GetHeaderParams}, utils::{ - CONSENSUS_VERSION_HEADER, EncodingType, get_accept_type, get_user_agent, ms_into_slot, + CONSENSUS_VERSION_HEADER, EncodingType, get_accept_types, get_user_agent, ms_into_slot, }, }; use reqwest::{StatusCode, header::CONTENT_TYPE}; @@ -35,14 +35,12 @@ pub async fn handle_get_header>( let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); - let accept_type = get_accept_type(&req_headers).map_err(|e| { + let accept_types = get_accept_types(&req_headers).map_err(|e| { error!(%e, "error parsing accept header"); PbsClientError::DecodeError(format!("error parsing accept header: {e}")) - }); - if let Err(e) = accept_type { - return Ok((StatusCode::BAD_REQUEST, e).into_response()); - } - let accept_type = accept_type.unwrap(); + })?; + let accepts_ssz = accept_types.contains(&EncodingType::Ssz); + let accepts_json = accept_types.contains(&EncodingType::Json); info!(ua, ms_into_slot, "new request"); @@ -52,30 +50,47 @@ pub async fn handle_get_header>( info!(value_eth = format_ether(*max_bid.data.message.value()), block_hash =% max_bid.block_hash(), "received header"); BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); - let response = match accept_type { - EncodingType::Ssz => { - let mut res = max_bid.data.as_ssz_bytes().into_response(); - let Ok(consensus_version_header) = - HeaderValue::from_str(&max_bid.version.to_string()) - else { - info!("sending response as JSON"); - return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); - }; - let Ok(content_type_header) = - HeaderValue::from_str(&format!("{}", EncodingType::Ssz)) - else { - info!("sending response as JSON"); - return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); - }; - res.headers_mut() - .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); - res.headers_mut().insert(CONTENT_TYPE, content_type_header); - info!("sending response as SSZ"); - res - } - EncodingType::Json => (StatusCode::OK, axum::Json(max_bid)).into_response(), - }; - Ok(response) + + // Handle SSZ + if accepts_ssz { + let mut res = max_bid.data.as_ssz_bytes().into_response(); + let consensus_version_header = match HeaderValue::from_str( + &max_bid.version.to_string(), + ) { + Ok(consensus_version_header) => { + Ok::(consensus_version_header) + } + Err(e) => { + if accepts_json { + info!("sending response as JSON"); + return Ok((StatusCode::OK, axum::Json(max_bid)).into_response()); + } else { + return Err(PbsClientError::RelayError(format!( + "error decoding consensus version from relay payload: {e}" + ))); + } + } + }?; + + // This won't actually fail since the string is a const + let content_type_header = + HeaderValue::from_str(EncodingType::Ssz.content_type()).unwrap(); + + res.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + res.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + return Ok(res); + } + + // Handle JSON + if accepts_json { + Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + } else { + // This shouldn't ever happen but the compiler needs it + Err(PbsClientError::DecodeError( + "no viable accept types in request".to_string(), + )) + } } else { // spec: return 204 if request is valid but no bid available info!("no header available for slot"); diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 1134b462..78829641 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use axum::{ - Json, extract::State, http::{HeaderMap, HeaderValue}, response::IntoResponse, @@ -9,8 +8,8 @@ use axum::{ use cb_common::{ pbs::{BuilderApiVersion, GetPayloadInfo}, utils::{ - CONSENSUS_VERSION_HEADER, EncodingType, RawRequest, deserialize_body, get_accept_type, - get_user_agent, timestamp_of_slot_start_millis, utcnow_ms + CONSENSUS_VERSION_HEADER, EncodingType, RawRequest, deserialize_body, get_accept_types, + get_user_agent, timestamp_of_slot_start_millis, utcnow_ms, }, }; use reqwest::{StatusCode, header::CONTENT_TYPE}; @@ -47,11 +46,8 @@ async fn handle_submit_block_impl>( raw_request: RawRequest, api_version: BuilderApiVersion, ) -> Result { - let signed_blinded_block = Arc::new( - deserialize_body(&req_headers, raw_request.body_bytes).await.map_err(|e| { - error!(%e, "failed to deserialize signed blinded block"); - PbsClientError::DecodeError(format!("failed to deserialize body: {e}")) - })?); + let signed_blinded_block = + Arc::new(deserialize_body(&req_headers, raw_request.body_bytes).await?); tracing::Span::current().record("slot", signed_blinded_block.slot().as_u64() as i64); tracing::Span::current() .record("block_hash", tracing::field::debug(signed_blinded_block.block_hash())); @@ -66,14 +62,12 @@ async fn handle_submit_block_impl>( let block_hash = signed_blinded_block.block_hash(); let slot_start_ms = timestamp_of_slot_start_millis(slot.into(), state.config.chain); let ua = get_user_agent(&req_headers); - let response_type = get_accept_type(&req_headers).map_err(|e| { + let accept_types = get_accept_types(&req_headers).map_err(|e| { error!(%e, "error parsing accept header"); PbsClientError::DecodeError(format!("error parsing accept header: {e}")) - }); - if let Err(e) = response_type { - return Ok((StatusCode::BAD_REQUEST, e.into_response())); - } - let response_type = response_type.unwrap(); + })?; + let accepts_ssz = accept_types.contains(&EncodingType::Ssz); + let accepts_json = accept_types.contains(&EncodingType::Json); info!(ua, ms_into_slot = now.saturating_sub(slot_start_ms), "new request"); @@ -86,41 +80,32 @@ async fn handle_submit_block_impl>( BEACON_NODE_STATUS .with_label_values(&["200", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]) .inc(); - let response = match response_type { - EncodingType::Json => { - info!("sending response as JSON"); - Json(payload_and_blobs).into_response() - } - EncodingType::Ssz => { - let mut response = payload_and_blobs.data.as_ssz_bytes().into_response(); - let Ok(consensus_version_header) = - HeaderValue::from_str(&payload_and_blobs.version.to_string()) - else { - info!("sending response as JSON"); - return Ok(( - StatusCode::OK, - axum::Json(payload_and_blobs).into_response(), - )); - }; - let Ok(content_type_header) = - HeaderValue::from_str(&EncodingType::Ssz.to_string()) - else { - info!("sending response as JSON"); - return Ok(( - StatusCode::OK, - axum::Json(payload_and_blobs).into_response(), - )); - }; - response - .headers_mut() - .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); - response.headers_mut().insert(CONTENT_TYPE, content_type_header); - info!("sending response as SSZ"); - response - } - }; - - Ok((StatusCode::OK, response)) + + // Try SSZ + if accepts_ssz { + let mut response = payload_and_blobs.data.as_ssz_bytes().into_response(); + + // This won't actually fail since the string is a const + let content_type_header = + HeaderValue::from_str(EncodingType::Ssz.content_type()).unwrap(); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response.headers_mut().insert( + CONSENSUS_VERSION_HEADER, + HeaderValue::from_str(&payload_and_blobs.version.to_string()).unwrap(), + ); + info!("sending response as SSZ"); + return Ok(response); + } + + // Handle JSON + if accepts_json { + Ok((StatusCode::OK, axum::Json(payload_and_blobs)).into_response()) + } else { + // This shouldn't ever happen but the compiler needs it + Err(PbsClientError::DecodeError( + "no viable accept types in request".to_string(), + )) + } } None => { info!("received unblinded block (v2)"); @@ -130,7 +115,7 @@ async fn handle_submit_block_impl>( BEACON_NODE_STATUS .with_label_values(&["202", SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG]) .inc(); - Ok((StatusCode::ACCEPTED, "".into_response())) + Ok((StatusCode::ACCEPTED, "").into_response()) } }, diff --git a/examples/configs/pbs_mux.toml b/examples/configs/pbs_mux.toml index 3ea9f355..fcf4ea8c 100644 --- a/examples/configs/pbs_mux.toml +++ b/examples/configs/pbs_mux.toml @@ -33,7 +33,7 @@ target_first_request_ms = 200 [[mux]] id = "lido-mux" -loader = { registry = "lido", node_operator_id = 8 } +loader = { registry = "lido", node_operator_id = 8, lido_module_id = 1 } [[mux.relays]] id = "relay-3" diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index 5611a1eb..15c6cbbc 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, net::SocketAddr, sync::{ Arc, RwLock, @@ -26,7 +27,8 @@ use cb_common::{ types::{BlsSecretKey, Chain}, utils::{ CONSENSUS_VERSION_HEADER, EncodingType, RawRequest, TestRandomSeed, deserialize_body, - get_accept_type, get_consensus_version_header, timestamp_of_slot_start_sec, + get_accept_types, get_consensus_version_header, get_content_type, + timestamp_of_slot_start_sec, }, }; use cb_pbs::MAX_SIZE_SUBMIT_BLOCK_RESPONSE; @@ -50,6 +52,7 @@ pub async fn start_mock_relay_service(state: Arc, port: u16) -> pub struct MockRelayState { pub chain: Chain, pub signer: BlsSecretKey, + pub supported_content_types: Arc>, large_body: bool, received_get_header: Arc, received_get_status: Arc, @@ -90,6 +93,9 @@ impl MockRelayState { received_register_validator: Default::default(), received_submit_block: Default::default(), response_override: RwLock::new(None), + supported_content_types: Arc::new( + [EncodingType::Json, EncodingType::Ssz].iter().cloned().collect(), + ), } } @@ -118,15 +124,28 @@ async fn handle_get_header( headers: HeaderMap, ) -> Response { state.received_get_header.fetch_add(1, Ordering::Relaxed); - let accept_type = get_accept_type(&headers) + let accept_types = get_accept_types(&headers) .map_err(|e| (StatusCode::BAD_REQUEST, format!("error parsing accept header: {e}"))); - if let Err(e) = accept_type { + if let Err(e) = accept_types { return e.into_response(); } - let accept_header = accept_type.unwrap(); + let accept_types = accept_types.unwrap(); let consensus_version_header = get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); + let content_type = if state.supported_content_types.contains(&EncodingType::Ssz) && + accept_types.contains(&EncodingType::Ssz) + { + EncodingType::Ssz + } else if state.supported_content_types.contains(&EncodingType::Json) && + accept_types.contains(&EncodingType::Json) + { + EncodingType::Json + } else { + return (StatusCode::NOT_ACCEPTABLE, "No acceptable content type found".to_string()) + .into_response(); + }; + let data = match consensus_version_header { // Add Fusaka and other forks here when necessary ForkName::Electra => { @@ -150,16 +169,16 @@ async fn handle_get_header( let object_root = message.tree_hash_root(); let signature = sign_builder_root(state.chain, &state.signer, object_root); let response = SignedBuilderBid { message, signature }; - match accept_header { - EncodingType::Json => { - let versioned_response = GetHeaderResponse { - version: ForkName::Electra, - data: response, - metadata: Default::default(), - }; - serde_json::to_vec(&versioned_response).unwrap() - } - EncodingType::Ssz => response.as_ssz_bytes(), + if content_type == EncodingType::Ssz { + response.as_ssz_bytes() + } else { + // Return JSON for everything else; this is fine for the mock + let versioned_response = GetHeaderResponse { + version: ForkName::Electra, + data: response, + metadata: Default::default(), + }; + serde_json::to_vec(&versioned_response).unwrap() } } _ => { @@ -174,7 +193,7 @@ async fn handle_get_header( let mut response = (StatusCode::OK, data).into_response(); let consensus_version_header = HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); - let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); + let content_type_header = HeaderValue::from_str(&content_type.to_string()).unwrap(); response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); response.headers_mut().insert(CONTENT_TYPE, content_type_header); response @@ -205,15 +224,32 @@ async fn handle_submit_block_v1( raw_request: RawRequest, ) -> Response { state.received_submit_block.fetch_add(1, Ordering::Relaxed); - let accept_header = get_accept_type(&headers); - if let Err(e) = accept_header { - error!(%e, "error parsing accept header"); - return (StatusCode::BAD_REQUEST, format!("error parsing accept header: {e}")) - .into_response(); + let accept_types = get_accept_types(&headers) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("error parsing accept header: {e}"))); + if let Err(e) = accept_types { + return e.into_response(); } - let accept_header = accept_header.unwrap(); - let consensus_version_header = - get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); + let accept_types = accept_types.unwrap(); + let consensus_version_header = get_consensus_version_header(&headers); + let response_content_type = if state.supported_content_types.contains(&EncodingType::Ssz) && + accept_types.contains(&EncodingType::Ssz) + { + EncodingType::Ssz + } else if state.supported_content_types.contains(&EncodingType::Json) && + accept_types.contains(&EncodingType::Json) + { + EncodingType::Json + } else { + return (StatusCode::NOT_ACCEPTABLE, "No acceptable content type found".to_string()) + .into_response(); + }; + + // Error out if the request content type is not supported + let content_type = get_content_type(&headers); + if !state.supported_content_types.contains(&content_type) { + return (StatusCode::UNSUPPORTED_MEDIA_TYPE, "Unsupported content type".to_string()) + .into_response(); + }; let data = if state.large_body() { vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK_RESPONSE] @@ -239,40 +275,46 @@ async fn handle_submit_block_v1( let response = PayloadAndBlobs { execution_payload: execution_payload.into(), blobs_bundle }; - match accept_header { - EncodingType::Json => { - // Response is versioned for JSON - let response = SubmitBlindedBlockResponse { - version: ForkName::Electra, - metadata: Default::default(), - data: response, - }; - serde_json::to_vec(&response).unwrap() - } - EncodingType::Ssz => match consensus_version_header { - // Response isn't versioned for SSZ - ForkName::Electra => response.as_ssz_bytes(), - _ => { - return ( - StatusCode::BAD_REQUEST, - format!("Unsupported fork {consensus_version_header}"), - ) - .into_response(); - } - }, + if response_content_type == EncodingType::Ssz { + response.as_ssz_bytes() + } else { + // Return JSON for everything else; this is fine for the mock + let response = SubmitBlindedBlockResponse { + version: ForkName::Electra, + metadata: Default::default(), + data: response, + }; + serde_json::to_vec(&response).unwrap() } }; let mut response = (StatusCode::OK, data).into_response(); - let consensus_version_header = - HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); - let content_type_header = HeaderValue::from_str(&accept_header.to_string()).unwrap(); - response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + if response_content_type == EncodingType::Ssz { + let consensus_version_header = match consensus_version_header { + Some(header) => header, + None => { + return (StatusCode::BAD_REQUEST, "Missing consensus version header".to_string()) + .into_response() + } + }; + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + } + let content_type_header = HeaderValue::from_str(&response_content_type.to_string()).unwrap(); response.headers_mut().insert(CONTENT_TYPE, content_type_header); response } -async fn handle_submit_block_v2(State(state): State>) -> Response { +async fn handle_submit_block_v2( + headers: HeaderMap, + State(state): State>, +) -> Response { state.received_submit_block.fetch_add(1, Ordering::Relaxed); + let content_type = get_content_type(&headers); + if !state.supported_content_types.contains(&content_type) { + return (StatusCode::NOT_ACCEPTABLE, "No acceptable content type found".to_string()) + .into_response(); + }; (StatusCode::ACCEPTED, "").into_response() } diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index 80aed0c2..092b97a5 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use alloy::{primitives::B256, rpc::types::beacon::relay::ValidatorRegistration}; use cb_common::{ pbs::{BuilderApiVersion, RelayClient, SignedBlindedBeaconBlock}, @@ -27,7 +29,7 @@ impl MockValidator { pub async fn do_get_header( &self, pubkey: Option, - accept: Option, + accept: HashSet, fork_name: ForkName, ) -> eyre::Result { let default_pubkey = bls_pubkey_from_hex( @@ -35,14 +37,24 @@ impl MockValidator { )?; let url = self.comm_boost.get_header_url(0, &B256::ZERO, &pubkey.unwrap_or(default_pubkey))?; - let res = self + let accept = match accept.len() { + 0 => None, + 1 => Some(accept.into_iter().next().unwrap().to_string()), + _ => { + let accept_strings: Vec = + accept.into_iter().map(|e| e.to_string()).collect(); + Some(accept_strings.join(", ")) + } + }; + let mut res = self .comm_boost .client .get(url) - .header(ACCEPT, &accept.unwrap_or(EncodingType::Json).to_string()) - .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()) - .send() - .await?; + .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()); + if let Some(accept_header) = accept { + res = res.header(ACCEPT, accept_header); + } + let res = res.send().await?; Ok(res) } @@ -67,7 +79,7 @@ impl MockValidator { pub async fn do_submit_block_v1( &self, signed_blinded_block_opt: Option, - accept: EncodingType, + accept: HashSet, content_type: EncodingType, fork_name: ForkName, ) -> eyre::Result { @@ -84,7 +96,7 @@ impl MockValidator { pub async fn do_submit_block_v2( &self, signed_blinded_block_opt: Option, - accept: EncodingType, + accept: HashSet, content_type: EncodingType, fork_name: ForkName, ) -> eyre::Result { @@ -101,7 +113,7 @@ impl MockValidator { async fn do_submit_block_impl( &self, signed_blinded_block_opt: Option, - accept: EncodingType, + accept: HashSet, content_type: EncodingType, fork_name: ForkName, api_version: BuilderApiVersion, @@ -115,16 +127,27 @@ impl MockValidator { EncodingType::Ssz => signed_blinded_block.as_ssz_bytes(), }; - Ok(self + let accept = match accept.len() { + 0 => None, + 1 => Some(accept.into_iter().next().unwrap().to_string()), + _ => { + let accept_strings: Vec = + accept.into_iter().map(|e| e.to_string()).collect(); + Some(accept_strings.join(", ")) + } + }; + let mut res = self .comm_boost .client .post(url) .body(body) .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()) - .header(CONTENT_TYPE, &content_type.to_string()) - .header(ACCEPT, &accept.to_string()) - .send() - .await?) + .header(CONTENT_TYPE, &content_type.to_string()); + if let Some(accept_header) = accept { + res = res.header(ACCEPT, accept_header); + } + let res = res.send().await?; + Ok(res) } } diff --git a/tests/tests/pbs_get_header.rs b/tests/tests/pbs_get_header.rs index eebb0113..45a2dd4a 100644 --- a/tests/tests/pbs_get_header.rs +++ b/tests/tests/pbs_get_header.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use alloy::primitives::{B256, U256}; use cb_common::{ @@ -15,42 +15,137 @@ use cb_tests::{ utils::{generate_mock_relay, get_pbs_static_config, setup_test_env, to_pbs_config}, }; use eyre::Result; -use lh_types::ForkVersionDecode; +use lh_types::{ForkVersionDecode, beacon_response::EmptyMetadata}; use reqwest::StatusCode; use tracing::info; use tree_hash::TreeHash; +/// Test requesting JSON when the relay supports JSON #[tokio::test] async fn test_get_header() -> Result<()> { + test_get_header_impl( + 3200, + HashSet::from([EncodingType::Json]), + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + 1, + ) + .await +} + +/// Test requesting SSZ when the relay supports SSZ +#[tokio::test] +async fn test_get_header_ssz() -> Result<()> { + test_get_header_impl( + 3210, + HashSet::from([EncodingType::Ssz]), + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + 1, + ) + .await +} + +/// Test requesting SSZ when the relay only supports JSON, which should be +/// handled because PBS supports both types internally and re-maps them on the +/// fly +#[tokio::test] +async fn test_get_header_ssz_into_json() -> Result<()> { + test_get_header_impl( + 3220, + HashSet::from([EncodingType::Ssz]), + HashSet::from([EncodingType::Json]), + 1, + ) + .await +} + +/// Test requesting multiple types when the relay supports SSZ, which should +/// return SSZ +#[tokio::test] +async fn test_get_header_multitype_ssz() -> Result<()> { + test_get_header_impl( + 3230, + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + HashSet::from([EncodingType::Ssz]), + 1, + ) + .await +} + +/// Test requesting multiple types when the relay supports JSON, which should +/// still work +#[tokio::test] +async fn test_get_header_multitype_json() -> Result<()> { + test_get_header_impl( + 3240, + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + HashSet::from([EncodingType::Json]), + 1, + ) + .await +} + +/// Core implementation for get_header tests +async fn test_get_header_impl( + pbs_port: u16, + accept_types: HashSet, + relay_types: HashSet, + expected_try_count: u64, +) -> Result<()> { + // Setup test environment setup_test_env(); let signer = random_secret(); let pubkey = signer.public_key(); - let chain = Chain::Holesky; - let pbs_port = 3200; let relay_port = pbs_port + 1; // Run a mock relay - let mock_state = Arc::new(MockRelayState::new(chain, signer)); + let mut mock_state = MockRelayState::new(chain, signer); + mock_state.supported_content_types = Arc::new(relay_types); + let mock_state = Arc::new(mock_state); let mock_relay = generate_mock_relay(relay_port, pubkey)?; tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); // Run the PBS service - let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay.clone()]); + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay]); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; + // Send the get_header request let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None, None, ForkName::Electra).await?; + let res = mock_validator.do_get_header(None, accept_types.clone(), ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); - let res = serde_json::from_slice::(&res.bytes().await?)?; - - assert_eq!(mock_state.received_get_header(), 1); + // Get the content type + let content_type = match res + .headers() + .get(reqwest::header::CONTENT_TYPE) + .and_then(|ct| ct.to_str().ok()) + .unwrap() + { + ct if ct == EncodingType::Ssz.to_string() => EncodingType::Ssz, + ct if ct == EncodingType::Json.to_string() => EncodingType::Json, + _ => panic!("unexpected content type"), + }; + assert!(accept_types.contains(&content_type)); + + // Get the data + let res = match content_type { + EncodingType::Json => serde_json::from_slice::(&res.bytes().await?)?, + EncodingType::Ssz => { + let fork = + get_consensus_version_header(res.headers()).expect("missing fork version header"); + assert_eq!(fork, ForkName::Electra); + let data = SignedBuilderBid::from_ssz_bytes_by_fork(&res.bytes().await?, fork).unwrap(); + GetHeaderResponse { version: fork, data, metadata: EmptyMetadata::default() } + } + }; + + // Validate the data + assert_eq!(mock_state.received_get_header(), expected_try_count); assert_eq!(res.version, ForkName::Electra); assert_eq!(res.data.message.header().block_hash().0[0], 1); assert_eq!(res.data.message.header().parent_hash().0, B256::ZERO); @@ -64,52 +159,6 @@ async fn test_get_header() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_get_header_ssz() -> Result<()> { - setup_test_env(); - let signer = random_secret(); - let pubkey = signer.public_key(); - - let chain = Chain::Holesky; - let pbs_port = 3210; - let relay_port = pbs_port + 1; - - // Run a mock relay - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - let mock_relay = generate_mock_relay(relay_port, pubkey)?; - tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); - - // Run the PBS service - let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay.clone()]); - let state = PbsState::new(config); - tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); - - // leave some time to start servers - tokio::time::sleep(Duration::from_millis(100)).await; - - let mock_validator = MockValidator::new(pbs_port)?; - info!("Sending get header"); - let res = - mock_validator.do_get_header(None, Some(EncodingType::Ssz), ForkName::Electra).await?; - assert_eq!(res.status(), StatusCode::OK); - - let fork = get_consensus_version_header(res.headers()).expect("missing fork version header"); - assert_eq!(fork, ForkName::Electra); - let data = SignedBuilderBid::from_ssz_bytes_by_fork(&res.bytes().await?, fork).unwrap(); - - assert_eq!(mock_state.received_get_header(), 1); - assert_eq!(data.message.header().block_hash().0[0], 1); - assert_eq!(data.message.header().parent_hash().0, B256::ZERO); - assert_eq!(*data.message.value(), U256::from(10)); - assert_eq!(*data.message.pubkey(), BlsPublicKeyBytes::from(mock_state.signer.public_key())); - assert_eq!(data.message.header().timestamp(), timestamp_of_slot_start_sec(0, chain)); - assert_eq!( - data.signature, - sign_builder_root(chain, &mock_state.signer, data.message.tree_hash_root()) - ); - Ok(()) -} - #[tokio::test] async fn test_get_header_returns_204_if_relay_down() -> Result<()> { setup_test_env(); @@ -137,7 +186,7 @@ async fn test_get_header_returns_204_if_relay_down() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None, None, ForkName::Electra).await?; + let res = mock_validator.do_get_header(None, HashSet::new(), ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::NO_CONTENT); // 204 error assert_eq!(mock_state.received_get_header(), 0); // no header received diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 34bc76de..6b5afe44 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use cb_common::{ config::{HTTP_TIMEOUT_SECONDS_DEFAULT, MUXER_HTTP_MAX_LENGTH, RuntimeMuxConfig}, @@ -196,7 +200,7 @@ async fn test_mux() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header with default"); assert_eq!( - mock_validator.do_get_header(None, None, ForkName::Electra).await?.status(), + mock_validator.do_get_header(None, HashSet::new(), ForkName::Electra).await?.status(), StatusCode::OK ); assert_eq!(mock_state.received_get_header(), 1); // only default relay was used @@ -205,7 +209,7 @@ async fn test_mux() -> Result<()> { info!("Sending get header with mux"); assert_eq!( mock_validator - .do_get_header(Some(validator_pubkey), None, ForkName::Electra) + .do_get_header(Some(validator_pubkey), HashSet::new(), ForkName::Electra) .await? .status(), StatusCode::OK @@ -226,7 +230,12 @@ async fn test_mux() -> Result<()> { info!("Sending submit block v1"); assert_eq!( mock_validator - .do_submit_block_v1(None, EncodingType::Json, EncodingType::Json, ForkName::Electra) + .do_submit_block_v1( + None, + HashSet::from([EncodingType::Json]), + EncodingType::Json, + ForkName::Electra + ) .await? .status(), StatusCode::OK @@ -237,7 +246,12 @@ async fn test_mux() -> Result<()> { info!("Sending submit block v2"); assert_eq!( mock_validator - .do_submit_block_v2(None, EncodingType::Json, EncodingType::Json, ForkName::Electra) + .do_submit_block_v2( + None, + HashSet::from([EncodingType::Json]), + EncodingType::Json, + ForkName::Electra + ) .await? .status(), StatusCode::ACCEPTED diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index 1c8d3cdc..b6a32db7 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use cb_common::{ config::{MuxConfig, MuxKeysLoader, PbsMuxes}, @@ -73,6 +73,7 @@ async fn test_auto_refresh() -> Result<()> { let loader = MuxKeysLoader::Registry { enable_refreshing: true, node_operator_id: 1, + lido_module_id: None, registry: cb_common::config::NORegistry::SSV, }; let muxes = PbsMuxes { @@ -109,8 +110,9 @@ async fn test_auto_refresh() -> Result<()> { // relay only since it hasn't been seen in the mux yet let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = - mock_validator.do_get_header(Some(new_mux_pubkey.clone()), None, ForkName::Electra).await?; + let res = mock_validator + .do_get_header(Some(new_mux_pubkey.clone()), HashSet::new(), ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 1); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 0); // mux relay was not used @@ -138,16 +140,18 @@ async fn test_auto_refresh() -> Result<()> { assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); // Try to run a get_header on the new pubkey - now it should use the mux relay - let res = - mock_validator.do_get_header(Some(new_mux_pubkey.clone()), None, ForkName::Electra).await?; + let res = mock_validator + .do_get_header(Some(new_mux_pubkey.clone()), HashSet::new(), ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 1); // default relay was not used here assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was used // Now try to do a get_header with the old pubkey - it should only use the // default relay - let res = - mock_validator.do_get_header(Some(default_pubkey.clone()), None, ForkName::Electra).await?; + let res = mock_validator + .do_get_header(Some(default_pubkey.clone()), HashSet::new(), ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 2); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used @@ -165,7 +169,7 @@ async fn test_auto_refresh() -> Result<()> { // Try to do a get_header with the removed pubkey - it should only use the // default relay let res = mock_validator - .do_get_header(Some(existing_mux_pubkey.clone()), None, ForkName::Electra) + .do_get_header(Some(existing_mux_pubkey.clone()), HashSet::new(), ForkName::Electra) .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 3); // default relay was used diff --git a/tests/tests/pbs_post_blinded_blocks.rs b/tests/tests/pbs_post_blinded_blocks.rs index 79725b3e..b5a16e89 100644 --- a/tests/tests/pbs_post_blinded_blocks.rs +++ b/tests/tests/pbs_post_blinded_blocks.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use cb_common::{ pbs::{BuilderApiVersion, GetPayloadInfo, PayloadAndBlobs, SubmitBlindedBlockResponse}, @@ -19,7 +19,15 @@ use tracing::info; #[tokio::test] async fn test_submit_block_v1() -> Result<()> { - let res = submit_block_impl(3800, BuilderApiVersion::V1, EncodingType::Json).await?; + let res = submit_block_impl( + 3800, + BuilderApiVersion::V1, + HashSet::from([EncodingType::Json]), + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Json, + 1, + ) + .await?; assert_eq!(res.status(), StatusCode::OK); let signed_blinded_block = load_test_signed_blinded_block(); @@ -34,7 +42,15 @@ async fn test_submit_block_v1() -> Result<()> { #[tokio::test] async fn test_submit_block_v2() -> Result<()> { - let res = submit_block_impl(3850, BuilderApiVersion::V2, EncodingType::Json).await?; + let res = submit_block_impl( + 3810, + BuilderApiVersion::V2, + HashSet::from([EncodingType::Json]), + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Json, + 1, + ) + .await?; assert_eq!(res.status(), StatusCode::ACCEPTED); assert_eq!(res.bytes().await?.len(), 0); Ok(()) @@ -42,7 +58,15 @@ async fn test_submit_block_v2() -> Result<()> { #[tokio::test] async fn test_submit_block_v1_ssz() -> Result<()> { - let res = submit_block_impl(3810, BuilderApiVersion::V1, EncodingType::Ssz).await?; + let res = submit_block_impl( + 3820, + BuilderApiVersion::V1, + HashSet::from([EncodingType::Ssz]), + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Ssz, + 1, + ) + .await?; assert_eq!(res.status(), StatusCode::OK); let signed_blinded_block = load_test_signed_blinded_block(); @@ -58,12 +82,116 @@ async fn test_submit_block_v1_ssz() -> Result<()> { #[tokio::test] async fn test_submit_block_v2_ssz() -> Result<()> { - let res = submit_block_impl(3860, BuilderApiVersion::V2, EncodingType::Ssz).await?; + let res = submit_block_impl( + 3830, + BuilderApiVersion::V2, + HashSet::from([EncodingType::Ssz]), + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + EncodingType::Ssz, + 1, + ) + .await?; assert_eq!(res.status(), StatusCode::ACCEPTED); assert_eq!(res.bytes().await?.len(), 0); Ok(()) } +/// Test that a v1 submit block request in SSZ is converted to JSON if the relay +/// only supports JSON +#[tokio::test] +async fn test_submit_block_v1_ssz_into_json() -> Result<()> { + let res = submit_block_impl( + 3840, + BuilderApiVersion::V1, + HashSet::from([EncodingType::Ssz]), + HashSet::from([EncodingType::Json]), + EncodingType::Ssz, + 2, + ) + .await?; + assert_eq!(res.status(), StatusCode::OK); + + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +} + +/// Test that a v2 submit block request in SSZ is converted to JSON if the relay +/// only supports JSON +#[tokio::test] +async fn test_submit_block_v2_ssz_into_json() -> Result<()> { + let res = submit_block_impl( + 3850, + BuilderApiVersion::V2, + HashSet::from([EncodingType::Ssz]), + HashSet::from([EncodingType::Json]), + EncodingType::Ssz, + 2, + ) + .await?; + assert_eq!(res.status(), StatusCode::ACCEPTED); + assert_eq!(res.bytes().await?.len(), 0); + Ok(()) +} + +/// Test v1 requesting multiple types when the relay supports SSZ, which should +/// return SSZ +#[tokio::test] +async fn test_submit_block_v1_multitype_ssz() -> Result<()> { + let res = submit_block_impl( + 3860, + BuilderApiVersion::V1, + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + HashSet::from([EncodingType::Ssz]), + EncodingType::Ssz, + 1, + ) + .await?; + assert_eq!(res.status(), StatusCode::OK); + + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +} + +/// Test v1 requesting multiple types when the relay supports JSON, which should +/// still return SSZ +#[tokio::test] +async fn test_submit_block_v1_multitype_json() -> Result<()> { + let res = submit_block_impl( + 3870, + BuilderApiVersion::V1, + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + HashSet::from([EncodingType::Json]), + EncodingType::Ssz, + 2, + ) + .await?; + assert_eq!(res.status(), StatusCode::OK); + + let signed_blinded_block = load_test_signed_blinded_block(); + + let response_body = + PayloadAndBlobs::from_ssz_bytes_by_fork(&res.bytes().await?, ForkName::Electra).unwrap(); + assert_eq!( + response_body.execution_payload.block_hash(), + signed_blinded_block.block_hash().into() + ); + Ok(()) +} + #[tokio::test] async fn test_submit_block_too_large() -> Result<()> { setup_test_env(); @@ -87,7 +215,12 @@ async fn test_submit_block_too_large() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending submit block"); let res = mock_validator - .do_submit_block_v1(None, EncodingType::Json, EncodingType::Json, ForkName::Electra) + .do_submit_block_v1( + None, + HashSet::from([EncodingType::Json]), + EncodingType::Json, + ForkName::Electra, + ) .await; // response size exceeds max size: max: 20971520 @@ -99,29 +232,34 @@ async fn test_submit_block_too_large() -> Result<()> { async fn submit_block_impl( pbs_port: u16, api_version: BuilderApiVersion, + accept_types: HashSet, + relay_types: HashSet, serialization_mode: EncodingType, + expected_try_count: u64, ) -> Result { - let accept = serialization_mode; - + // Setup test environment setup_test_env(); let signer = random_secret(); let pubkey = signer.public_key(); - let chain = Chain::Holesky; + let relay_port = pbs_port + 1; // Run a mock relay - let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?]; - let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); + let mut mock_state = MockRelayState::new(chain, signer); + mock_state.supported_content_types = Arc::new(relay_types); + let mock_state = Arc::new(mock_state); + let mock_relay = generate_mock_relay(relay_port, pubkey)?; + tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); // Run the PBS service - let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays); + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), vec![mock_relay]); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; + // Send the submit block request let signed_blinded_block = load_test_signed_blinded_block(); let mock_validator = MockValidator::new(pbs_port)?; info!("Sending submit block"); @@ -130,7 +268,7 @@ async fn submit_block_impl( mock_validator .do_submit_block_v1( Some(signed_blinded_block), - accept, + accept_types, serialization_mode, ForkName::Electra, ) @@ -140,13 +278,13 @@ async fn submit_block_impl( mock_validator .do_submit_block_v2( Some(signed_blinded_block), - accept, + accept_types, serialization_mode, ForkName::Electra, ) .await? } }; - assert_eq!(mock_state.received_submit_block(), 1); + assert_eq!(mock_state.received_submit_block(), expected_try_count); Ok(res) }