Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple config, env and cli #247

Merged
merged 9 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions core/src/keypair_loader.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
use anyhow::Context;
use solana_sdk::signature::Keypair;
use std::env;

// note this is duplicated from lite-rpc module
pub async fn load_identity_keypair(identity_from_cli: &String) -> Option<Keypair> {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
} else {
// must be a file
let identity_file = tokio::fs::read_to_string(identity_env_var.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
} else if identity_from_cli.is_empty() {
None
} else {
let identity_file = tokio::fs::read_to_string(identity_from_cli.as_str())
pub async fn load_identity_keypair(
identity_from_cli: Option<String>,
) -> anyhow::Result<Option<Keypair>> {
let identity_str = if let Some(identity_from_cli) = identity_from_cli {
tokio::fs::read_to_string(identity_from_cli)
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file).unwrap();
Some(Keypair::from_bytes(identity_bytes.as_slice()).unwrap())
}
.context("Cannot find the identity file provided")?
} else if let Ok(identity_env_var) = env::var("IDENTITY") {
identity_env_var
} else {
return Ok(None);
};

let identity_bytes: Vec<u8> =
serde_json::from_str(&identity_str).context("Invalid identity format expected Vec<u8>")?;

Ok(Some(
Keypair::from_bytes(identity_bytes.as_slice()).context("Invalid identity")?,
))
}
130 changes: 111 additions & 19 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,134 @@
use std::env;

use crate::{
DEFAULT_FANOUT_SIZE, DEFAULT_GRPC_ADDR, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR,
DEFAULT_WS_ADDR, MAX_RETRIES,
};
use anyhow::Context;
use clap::Parser;
use dotenv::dotenv;
use tokio::io::AsyncReadExt;

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, default_value_t = String::from(DEFAULT_RPC_ADDR))]
/// config.json
#[arg(short, long)]
pub config: Option<String>,
/// identity keypair
#[arg(short = 'k', long)]
pub identity_keypair: Option<String>,
}

#[derive(Debug, serde::Deserialize)]
pub struct Config {
#[serde(default = "Config::default_rpc_addr")]
pub rpc_addr: String,
#[arg(short, long, default_value_t = String::from(DEFAULT_WS_ADDR))]
#[serde(default = "Config::default_ws_addr")]
pub ws_addr: String,
#[arg(short = 'l', long, default_value_t = String::from("[::]:8890"))]
#[serde(default = "Config::default_lite_rpc_http_addr")]
pub lite_rpc_http_addr: String,
#[arg(short = 's', long, default_value_t = String::from("[::]:8891"))]
#[serde(default = "Config::default_lite_rpc_ws_addr")]
pub lite_rpc_ws_addr: String,
/// tpu fanout
#[arg(short = 'f', long, default_value_t = DEFAULT_FANOUT_SIZE) ]
#[serde(default = "Config::default_fanout_size")]
pub fanout_size: u64,
/// enable logging to postgres
#[arg(short = 'p', long)]
#[serde(default)]
pub enable_postgres: bool,
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]
#[serde(default)]
pub identity_keypair: Option<String>,
#[serde(default = "Config::default_prometheus_addr")]
pub prometheus_addr: String,
#[arg(short = 'k', long, default_value_t = String::new())]
pub identity_keypair: String,
#[arg(long, default_value_t = MAX_RETRIES)]
#[serde(default = "Config::default_maximum_retries_per_tx")]
pub maximum_retries_per_tx: usize,
#[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)]
#[serde(default = "Config::default_transaction_retry_after_secs")]
pub transaction_retry_after_secs: u64,
#[arg(long)]
#[serde(default)]
pub quic_proxy_addr: Option<String>,
#[arg(short = 'g', long)]
#[serde(default)]
pub use_grpc: bool,
/// grpc address
#[arg(long, default_value_t = String::from(DEFAULT_GRPC_ADDR))]
#[serde(default = "Config::default_grpc_addr")]
pub grpc_addr: String,
#[arg(long)]
#[serde(default)]
pub grpc_x_token: Option<String>,
}

impl Config {
pub async fn load() -> anyhow::Result<Self> {
dotenv().ok();

let args = Args::parse();

let config_path = if let Some(config) = &args.config {
config
} else {
let default_config_path = "config.json";

// check if config.json exists in current directory
if tokio::fs::metadata(default_config_path).await.is_err() {
return Ok(serde_json::from_str("{}").unwrap());
godmodegalactus marked this conversation as resolved.
Show resolved Hide resolved
}

default_config_path
};

let mut config = String::new();

tokio::fs::File::open(config_path)
.await
.context("Error opening config file")?
.read_to_string(&mut config)
.await
.context("Error reading config file")?;

let mut config: Config =
serde_json::from_str(&config).context("Error parsing config file")?;

if args.identity_keypair.is_some() {
config.identity_keypair = args.identity_keypair;
}

config.enable_postgres = env::var("PG_ENABLED").is_ok();
aniketfuryrocks marked this conversation as resolved.
Show resolved Hide resolved

Ok(config)
}

pub fn lite_rpc_ws_addr() -> String {
"[::]:8891".to_string()
}

pub fn default_lite_rpc_http_addr() -> String {
"[::]:8890".to_string()
}

pub fn default_rpc_addr() -> String {
DEFAULT_RPC_ADDR.to_string()
}

pub fn default_ws_addr() -> String {
DEFAULT_WS_ADDR.to_string()
}

pub fn default_lite_rpc_ws_addr() -> String {
"[::]:8891".to_string()
}

pub const fn default_fanout_size() -> u64 {
DEFAULT_FANOUT_SIZE
}

pub fn default_prometheus_addr() -> String {
"[::]:9091".to_string()
}

pub const fn default_maximum_retries_per_tx() -> usize {
MAX_RETRIES
}

pub const fn default_transaction_retry_after_secs() -> u64 {
DEFAULT_RETRY_TIMEOUT
}

pub fn default_grpc_addr() -> String {
DEFAULT_GRPC_ADDR.to_string()
}
}
35 changes: 9 additions & 26 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ pub mod rpc_tester;
use std::time::Duration;

use anyhow::bail;
use clap::Parser;
use dashmap::DashMap;
use dotenv::dotenv;
use lite_rpc::bridge::LiteBridge;
use lite_rpc::cli::Config;
use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION};

use crate::rpc_tester::RpcTester;
Expand Down Expand Up @@ -43,7 +42,6 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use std::env;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -81,8 +79,8 @@ pub async fn start_postgres(
Ok((Some(postgres_send), postgres))
}

pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
let Args {
pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
let Config {
lite_rpc_ws_addr,
lite_rpc_http_addr,
fanout_size,
Expand All @@ -99,8 +97,8 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
} = args;

let validator_identity = Arc::new(
load_identity_keypair(&identity_keypair)
.await
load_identity_keypair(identity_keypair)
.await?
.unwrap_or_else(Keypair::new),
);

Expand Down Expand Up @@ -239,34 +237,19 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
}
}

fn get_args() -> Args {
let mut args = Args::parse();

dotenv().ok();

args.enable_postgres = args.enable_postgres
|| if let Ok(enable_postgres_env_var) = env::var("PG_ENABLED") {
enable_postgres_env_var != "false"
} else {
false
};

args
}

#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let args = get_args();
let config = Config::load().await?;

let ctrl_c_signal = tokio::signal::ctrl_c();
let Args { rpc_addr, .. } = &args;
let Config { rpc_addr, .. } = &config;
// rpc client
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
let rpc_tester = tokio::spawn(RpcTester::new(rpc_client.clone()).start());

let main = start_lite_rpc(args.clone(), rpc_client);
let main = start_lite_rpc(config, rpc_client);

tokio::select! {
err = rpc_tester => {
Expand Down
3 changes: 2 additions & 1 deletion quic-forward-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub async fn main() -> anyhow::Result<()> {
dotenv().ok();

let proxy_listener_addr = proxy_listen_addr.parse().unwrap();
let validator_identity = ValidatorIdentity::new(load_identity_keypair(&identity_keypair).await);
let validator_identity =
ValidatorIdentity::new(load_identity_keypair(Some(identity_keypair)).await?);

let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost());
let main_services = QuicForwardProxy::new(proxy_listener_addr, tls_config, validator_identity)
Expand Down
Loading