|
| 1 | +//! Contains the engine api client. |
| 2 | +
|
| 3 | +use std::{collections::HashMap, time::Duration}; |
| 4 | + |
| 5 | +use again::RetryPolicy; |
| 6 | +use futures::future::TryFutureExt; |
| 7 | +use reqwest::{header, Client}; |
| 8 | +use serde::{de::DeserializeOwned, Deserialize, Serialize}; |
| 9 | +use serde_json::Value; |
| 10 | + |
| 11 | +use alloy_rpc_types_engine::{ |
| 12 | + Claims, ExecutionPayloadEnvelopeV3, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, |
| 13 | + JwtSecret, PayloadAttributes, PayloadId, PayloadStatus, |
| 14 | +}; |
| 15 | + |
| 16 | +use crate::{ |
| 17 | + Engine, DEFAULT_AUTH_PORT, ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_GET_PAYLOAD_V2, |
| 18 | + ENGINE_NEW_PAYLOAD_V2, JSONRPC_VERSION, STATIC_ID, |
| 19 | +}; |
| 20 | + |
| 21 | +/// An external op-geth engine api client |
| 22 | +#[derive(Debug, Clone)] |
| 23 | +pub struct EngineApi { |
| 24 | + /// Base request url |
| 25 | + pub base_url: String, |
| 26 | + /// The url port |
| 27 | + pub port: u16, |
| 28 | + /// HTTP Client |
| 29 | + pub client: Option<Client>, |
| 30 | + /// A JWT secret used to authenticate with the engine api |
| 31 | + secret: JwtSecret, |
| 32 | +} |
| 33 | + |
| 34 | +/// Generic Engine API response |
| 35 | +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] |
| 36 | +#[serde(rename_all = "camelCase")] |
| 37 | +pub struct EngineApiResponse<P> { |
| 38 | + /// JSON RPC version |
| 39 | + jsonrpc: String, |
| 40 | + /// Request ID |
| 41 | + id: u64, |
| 42 | + /// JSON RPC payload |
| 43 | + result: Option<P>, |
| 44 | + /// JSON RPC error payload |
| 45 | + error: Option<EngineApiErrorPayload>, |
| 46 | +} |
| 47 | + |
| 48 | +/// Engine API error payload |
| 49 | +#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] |
| 50 | +pub struct EngineApiErrorPayload { |
| 51 | + /// The error code |
| 52 | + pub code: i64, |
| 53 | + /// The error message |
| 54 | + pub message: String, |
| 55 | + /// Optional additional error data |
| 56 | + pub data: Option<Value>, |
| 57 | +} |
| 58 | + |
| 59 | +impl std::fmt::Display for EngineApiErrorPayload { |
| 60 | + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 61 | + write!(f, "Engine API Error: code: {}, message: {}", self.code, self.message) |
| 62 | + } |
| 63 | +} |
| 64 | + |
| 65 | +/// An engine api error |
| 66 | +#[derive(Debug, thiserror::Error)] |
| 67 | +pub enum EngineApiError { |
| 68 | + /// An error converting the raw value to json. |
| 69 | + #[error("Error converting value to json")] |
| 70 | + SerdeError(#[from] serde_json::Error), |
| 71 | + /// Missing http client |
| 72 | + #[error("Missing http client")] |
| 73 | + MissingHttpClient, |
| 74 | + /// Failed to encode the JWT Claims |
| 75 | + #[error("Failed to encode JWT Claims")] |
| 76 | + JwtEncode, |
| 77 | + /// A reqwest error |
| 78 | + #[error("Reqwest error: {0}")] |
| 79 | + ReqwestError(#[from] reqwest::Error), |
| 80 | + /// An [EngineApiErrorPayload] returned by the engine api |
| 81 | + #[error("Engine API error")] |
| 82 | + EngineApiPayload(Option<EngineApiErrorPayload>), |
| 83 | +} |
| 84 | + |
| 85 | +impl EngineApi { |
| 86 | + /// Creates a new [`EngineApi`] with a base url and secret. |
| 87 | + pub fn new(base_url: &str, secret_str: &str) -> Self { |
| 88 | + let secret = JwtSecret::from_hex(secret_str).unwrap(); |
| 89 | + |
| 90 | + // Gracefully parse the port from the base url |
| 91 | + let parts: Vec<&str> = base_url.split(':').collect(); |
| 92 | + let port = parts[parts.len() - 1].parse::<u16>().unwrap_or(DEFAULT_AUTH_PORT); |
| 93 | + let base_url = if parts.len() <= 2 { parts[0].to_string() } else { parts.join(":") }; |
| 94 | + |
| 95 | + let client = reqwest::Client::builder() |
| 96 | + .default_headers({ |
| 97 | + header::HeaderMap::from_iter([( |
| 98 | + header::CONTENT_TYPE, |
| 99 | + header::HeaderValue::from_static("application/json"), |
| 100 | + )]) |
| 101 | + }) |
| 102 | + .timeout(Duration::from_secs(5)) |
| 103 | + .build() |
| 104 | + .expect("reqwest::Client could not be built, TLS backend could not be initialized"); |
| 105 | + |
| 106 | + Self { base_url, port, client: Some(client), secret } |
| 107 | + } |
| 108 | + |
| 109 | + /// Constructs the base engine api url for the given address |
| 110 | + pub fn auth_url_from_addr(addr: &str, port: Option<u16>) -> String { |
| 111 | + let stripped = addr.strip_prefix("http://").unwrap_or(addr); |
| 112 | + let stripped = addr.strip_prefix("https://").unwrap_or(stripped); |
| 113 | + let port = port.unwrap_or(DEFAULT_AUTH_PORT); |
| 114 | + format!("http://{stripped}:{port}") |
| 115 | + } |
| 116 | + |
| 117 | + /// Returns if the provided secret matches the secret used to authenticate with the engine api. |
| 118 | + pub fn check_secret(&self, secret: &str) -> bool { |
| 119 | + self.secret.validate(secret).is_ok() |
| 120 | + } |
| 121 | + |
| 122 | + /// Creates an engine api from environment variables |
| 123 | + pub fn from_env() -> Self { |
| 124 | + let base_url = std::env::var("ENGINE_API_URL").unwrap_or_else(|_| { |
| 125 | + panic!( |
| 126 | + "ENGINE_API_URL environment variable not set. \ |
| 127 | + Please set this to the base url of the engine api" |
| 128 | + ) |
| 129 | + }); |
| 130 | + let secret_key = std::env::var("JWT_SECRET").unwrap_or_else(|_| { |
| 131 | + panic!( |
| 132 | + "JWT_SECRET environment variable not set. \ |
| 133 | + Please set this to the 256 bit hex-encoded secret key used to authenticate with the engine api. \ |
| 134 | + This should be the same as set in the `--auth.secret` flag when executing go-ethereum." |
| 135 | + ) |
| 136 | + }); |
| 137 | + let base_url = EngineApi::auth_url_from_addr(&base_url, None); |
| 138 | + Self::new(&base_url, &secret_key) |
| 139 | + } |
| 140 | + |
| 141 | + /// Construct base body |
| 142 | + pub fn base_body(&self) -> HashMap<String, Value> { |
| 143 | + let mut map = HashMap::new(); |
| 144 | + map.insert("jsonrpc".to_string(), Value::String(JSONRPC_VERSION.to_string())); |
| 145 | + map.insert("id".to_string(), Value::Number(STATIC_ID.into())); |
| 146 | + map |
| 147 | + } |
| 148 | + |
| 149 | + /// Helper to construct a post request through the client |
| 150 | + async fn post<P>(&self, method: &str, params: Vec<Value>) -> Result<P, EngineApiError> |
| 151 | + where |
| 152 | + P: DeserializeOwned, |
| 153 | + { |
| 154 | + // Construct the request params |
| 155 | + let mut body = self.base_body(); |
| 156 | + body.insert("method".to_string(), Value::String(method.to_string())); |
| 157 | + body.insert("params".to_string(), Value::Array(params)); |
| 158 | + |
| 159 | + // Send the client request |
| 160 | + let client = self.client.as_ref().ok_or(EngineApiError::MissingHttpClient)?; |
| 161 | + |
| 162 | + // Clone the secret so we can use it in the retry policy. |
| 163 | + let secret_clone = self.secret; |
| 164 | + |
| 165 | + let policy = RetryPolicy::fixed(Duration::ZERO).with_max_retries(5); |
| 166 | + |
| 167 | + // Send the request |
| 168 | + let res = policy |
| 169 | + .retry(|| async { |
| 170 | + // Construct the JWT Authorization Token |
| 171 | + let claims = Claims::with_current_timestamp(); |
| 172 | + let jwt = secret_clone.encode(&claims).map_err(|_| EngineApiError::JwtEncode)?; |
| 173 | + |
| 174 | + // Send the request |
| 175 | + client |
| 176 | + .post(&self.base_url) |
| 177 | + .header(header::AUTHORIZATION, format!("Bearer {}", jwt)) |
| 178 | + .json(&body) |
| 179 | + .send() |
| 180 | + .map_err(EngineApiError::ReqwestError) |
| 181 | + // .timeout(Duration::from_secs(2)) |
| 182 | + .await? |
| 183 | + .json::<EngineApiResponse<P>>() |
| 184 | + .map_err(EngineApiError::ReqwestError) |
| 185 | + // .timeout(Duration::from_secs(2)) |
| 186 | + // .map_err(|e| EngineApiError::ReqwestError(e)) |
| 187 | + .await |
| 188 | + }) |
| 189 | + .await?; |
| 190 | + |
| 191 | + if let Some(res) = res.result { |
| 192 | + return Ok(res); |
| 193 | + } |
| 194 | + |
| 195 | + Err(EngineApiError::EngineApiPayload(res.error)) |
| 196 | + } |
| 197 | + |
| 198 | + /// Calls the engine to verify it's available to receive requests |
| 199 | + pub async fn is_available(&self) -> bool { |
| 200 | + self.post::<Value>("eth_chainId", vec![]).await.is_ok() |
| 201 | + } |
| 202 | +} |
| 203 | + |
| 204 | +#[async_trait::async_trait] |
| 205 | +impl Engine for EngineApi { |
| 206 | + type Error = EngineApiError; |
| 207 | + |
| 208 | + /// Sends an `engine_forkchoiceUpdatedV2` (V3 post Ecotone) message to the engine. |
| 209 | + async fn forkchoice_updated( |
| 210 | + &self, |
| 211 | + forkchoice_state: ForkchoiceState, |
| 212 | + payload_attributes: Option<PayloadAttributes>, |
| 213 | + ) -> Result<ForkchoiceUpdated, Self::Error> { |
| 214 | + let payload_attributes_param = match payload_attributes { |
| 215 | + Some(payload_attributes) => serde_json::to_value(payload_attributes)?, |
| 216 | + None => Value::Null, |
| 217 | + }; |
| 218 | + let forkchoice_state_param = serde_json::to_value(forkchoice_state)?; |
| 219 | + let params = vec![forkchoice_state_param, payload_attributes_param]; |
| 220 | + let res = self.post(ENGINE_FORKCHOICE_UPDATED_V2, params).await?; |
| 221 | + Ok(res) |
| 222 | + } |
| 223 | + |
| 224 | + /// Sends an `engine_newPayloadV2` (V3 post Ecotone) message to the engine. |
| 225 | + async fn new_payload( |
| 226 | + &self, |
| 227 | + execution_payload: ExecutionPayloadV3, |
| 228 | + ) -> Result<PayloadStatus, Self::Error> { |
| 229 | + let params = vec![serde_json::to_value(execution_payload)?]; |
| 230 | + let res = self.post(ENGINE_NEW_PAYLOAD_V2, params).await?; |
| 231 | + Ok(res) |
| 232 | + } |
| 233 | + |
| 234 | + /// Sends an `engine_getPayloadV2` (V3 post Ecotone) message to the engine. |
| 235 | + async fn get_payload(&self, payload_id: PayloadId) -> Result<ExecutionPayloadV3, Self::Error> { |
| 236 | + let encoded = format!("{:x}", payload_id.0); |
| 237 | + let padded = format!("0x{:0>16}", encoded); |
| 238 | + let params = vec![Value::String(padded)]; |
| 239 | + let res = self.post::<ExecutionPayloadEnvelopeV3>(ENGINE_GET_PAYLOAD_V2, params).await?; |
| 240 | + Ok(res.execution_payload) |
| 241 | + } |
| 242 | +} |
| 243 | + |
| 244 | +#[cfg(test)] |
| 245 | +mod tests { |
| 246 | + use alloy_rpc_types_engine::Claims; |
| 247 | + use std::time::SystemTime; |
| 248 | + |
| 249 | + // use std::str::FromStr; |
| 250 | + // use ethers_core::types::H256; |
| 251 | + |
| 252 | + use super::*; |
| 253 | + |
| 254 | + const AUTH_ADDR: &str = "0.0.0.0"; |
| 255 | + const SECRET: &str = "f79ae8046bc11c9927afe911db7143c51a806c4a537cc08e0d37140b0192f430"; |
| 256 | + |
| 257 | + #[tokio::test] |
| 258 | + async fn test_engine_get_payload() { |
| 259 | + // Construct the engine api client |
| 260 | + let base_url = EngineApi::auth_url_from_addr(AUTH_ADDR, Some(8551)); |
| 261 | + assert_eq!(base_url, "http://0.0.0.0:8551"); |
| 262 | + let engine_api = EngineApi::new(&base_url, SECRET); |
| 263 | + assert_eq!(engine_api.base_url, "http://0.0.0.0:8551"); |
| 264 | + assert_eq!(engine_api.port, 8551); |
| 265 | + |
| 266 | + // Construct mock server params |
| 267 | + let secret = JwtSecret::from_hex(SECRET).unwrap(); |
| 268 | + let iat = SystemTime::UNIX_EPOCH; |
| 269 | + let iat_secs = iat.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); |
| 270 | + let claims = Claims { iat: iat_secs, exp: Some(iat_secs + 60) }; |
| 271 | + let jwt = secret.encode(&claims).unwrap(); |
| 272 | + assert_eq!(jwt, String::from("eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjAsImV4cCI6NjB9.rJv_krfkQefjWnZxrpnDimR1NN1UEUffK3hQzD1KInA")); |
| 273 | + // let bearer = format!("Bearer {jwt}"); |
| 274 | + // let expected_body = r#"{"jsonrpc": "2.0", "method": "engine_getPayloadV1", "params": |
| 275 | + // [""], "id": 1}"#; let mock_response = ExecutionPayloadResponse { |
| 276 | + // jsonrpc: "2.0".to_string(), |
| 277 | + // id: 1, |
| 278 | + // result: ExecutionPayload { |
| 279 | + // parent_hash: H256::from( |
| 280 | + // } |
| 281 | + // }; |
| 282 | + |
| 283 | + // Create the mock server |
| 284 | + // let server = ServerBuilder::default() |
| 285 | + // .set_id_provider(RandomStringIdProvider::new(16)) |
| 286 | + // .set_middleware(middleware) |
| 287 | + // .build(addr.parse::<SocketAddr>().unwrap()) |
| 288 | + // .await |
| 289 | + // .unwrap(); |
| 290 | + |
| 291 | + // Query the engine api client |
| 292 | + // let execution_payload = engine_api.get_payload(PayloadId::default()).await.unwrap(); |
| 293 | + // let expected_block_hash = |
| 294 | + // H256::from_str("0xdc0818cf78f21a8e70579cb46a43643f78291264dda342ae31049421c82d21ae") |
| 295 | + // .unwrap(); |
| 296 | + // assert_eq!(expected_block_hash, execution_payload.block_hash); |
| 297 | + |
| 298 | + // Stop the server |
| 299 | + // server.stop().unwrap(); |
| 300 | + // server.stopped().await; |
| 301 | + } |
| 302 | +} |
0 commit comments