Skip to content

Commit

Permalink
feat: add ability to join random squad (#232)
Browse files Browse the repository at this point in the history
adds a new parameter `--num-squads` and renames `--squad` to
`--squad-prefix`.

After creating or reading in the p2pool node's peer_id, it uses this to
determine which squad to join, using the pattern:
`<squad_prefix>_[0..num_squads-1]`

Also worth noting that the PeerInfo broadcast message has changed to be
a network wide topic, and seed peers will also subscribe and process
these.
  • Loading branch information
stringhandler authored Jan 9, 2025
1 parent 0cece29 commit 04ea87c
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 243 deletions.
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ axum = "0.7.7"
blake2 = "0.10.6"
chrono = "0.4.38"
clap = { version = "4.5.7", features = ["derive"] }
convert_case = "0.6.0"
digest = "0.10.7"
dirs = "4.0.0"
hex = "0.4.3"
Expand Down
7 changes: 5 additions & 2 deletions src/cli/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ pub(crate) struct StartArgs {
/// Squad to enter (a team of miners).
/// A squad can have any name.
#[arg(
long, alias = "tribe", value_name = "squad", default_value = "default", value_parser = validate_squad
long, alias = "squad", value_name = "squad", default_value = "default", value_parser = validate_squad
)]
pub squad: String,
pub squad_prefix: String,

#[arg(long, value_name = "num-squads", default_value = "1")]
pub num_squads: usize,

/// Private key folder.
///
Expand Down
4 changes: 2 additions & 2 deletions src/cli/commands/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{
server::{
self as main_server,
http::stats_collector::{StatsBroadcastClient, StatsCollector},
p2p::Squad,
server::Server,
},
sharechain::{in_memory::InMemoryShareChain, BlockValidationParams},
Expand Down Expand Up @@ -61,7 +60,8 @@ pub async fn server(
config_builder.with_share_window(share_window);
}

config_builder.with_squad(Squad::from(args.squad.clone()));
config_builder.with_squad_prefix(args.squad_prefix.clone());
config_builder.with_num_squads(args.num_squads);

// set default tari network specific seed peer address
let mut seed_peers = vec![];
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() -> anyhow::Result<()> {
if cfg!(debug_assertions) {
// In debug mode, we want to see the panic message
eprintln!("Panic occurred at {}: {}", location, message);
process::exit(500);
std::process::exit(500);
}
}));

Expand Down
11 changes: 8 additions & 3 deletions src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{path::PathBuf, time::Duration};

use libp2p::identity::Keypair;

use crate::server::{http, p2p, p2p::Squad};
use crate::server::{http, p2p};

/// Config is the server configuration struct.
#[derive(Clone)]
Expand Down Expand Up @@ -76,8 +76,13 @@ impl ConfigBuilder {
self
}

pub fn with_squad(&mut self, squad: Squad) -> &mut Self {
self.config.p2p_service.squad = squad;
pub fn with_squad_prefix(&mut self, squad: String) -> &mut Self {
self.config.p2p_service.squad_prefix = squad;
self
}

pub fn with_num_squads(&mut self, num_squads: usize) -> &mut Self {
self.config.p2p_service.num_squads = num_squads;
self
}

Expand Down
12 changes: 6 additions & 6 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
server::{
grpc::{error::Error, util, util::convert_coinbase_extra, MAX_ACCEPTABLE_GRPC_TIMEOUT},
http::stats_collector::StatsBroadcastClient,
p2p::{client::ServiceClient, messages::NotifyNewTipBlock, Squad},
p2p::{client::ServiceClient, messages::NotifyNewTipBlock},
},
sharechain::{p2block::P2Block, BlockValidationParams, ShareChain},
};
Expand Down Expand Up @@ -74,13 +74,13 @@ where S: ShareChain
sha3_block_height_difficulty_cache: Arc<RwLock<HashMap<u64, Difficulty>>>,
randomx_block_height_difficulty_cache: Arc<RwLock<HashMap<u64, Difficulty>>>,
stats_max_difficulty_since_last_success: Arc<RwLock<Difficulty>>,
squad: Squad,
template_store_sha3x: RwLock<HashMap<FixedHash, P2Block>>,
list_of_templates_sha3x: RwLock<VecDeque<FixedHash>>,
template_store_rx: RwLock<HashMap<FixedHash, P2Block>>,
list_of_templates_rx: RwLock<VecDeque<FixedHash>>,
are_we_synced_with_randomx_p2pool: Arc<AtomicBool>,
are_we_synced_with_sha3x_p2pool: Arc<AtomicBool>,
squad: String,
}

impl<S> ShaP2PoolGrpc<S>
Expand All @@ -98,9 +98,9 @@ where S: ShareChain
consensus_manager: ConsensusManager,
genesis_block_hash: FixedHash,
stats_broadcast: StatsBroadcastClient,
squad: Squad,
are_we_synced_with_randomx_p2pool: Arc<AtomicBool>,
are_we_synced_with_sha3x_p2pool: Arc<AtomicBool>,
squad: String,
) -> Result<Self, Error> {
Ok(Self {
local_peer_id,
Expand All @@ -119,13 +119,13 @@ where S: ShareChain
sha3_block_height_difficulty_cache: Arc::new(RwLock::new(HashMap::new())),
randomx_block_height_difficulty_cache: Arc::new(RwLock::new(HashMap::new())),
stats_max_difficulty_since_last_success: Arc::new(RwLock::new(Difficulty::min())),
squad,
template_store_sha3x: RwLock::new(HashMap::new()),
list_of_templates_sha3x: RwLock::new(VecDeque::with_capacity(MAX_STORED_TEMPLATES_SHA3X + 1)),
template_store_rx: RwLock::new(HashMap::new()),
list_of_templates_rx: RwLock::new(VecDeque::with_capacity(MAX_STORED_TEMPLATES_RX + 1)),
are_we_synced_with_randomx_p2pool,
are_we_synced_with_sha3x_p2pool,
squad,
})
}

Expand Down Expand Up @@ -227,8 +227,8 @@ where S: ShareChain
self.are_we_synced_with_sha3x_p2pool.load(Ordering::SeqCst),
),
};
let coinbase_extra =
convert_coinbase_extra(self.squad.clone(), grpc_req.coinbase_extra).unwrap_or_default();
let squad = self.squad.clone();
let coinbase_extra = convert_coinbase_extra(squad, grpc_req.coinbase_extra).unwrap_or_default();
let mut new_tip_block = (*share_chain
.generate_new_tip_block(&wallet_payment_address, coinbase_extra.clone())
.await
Expand Down
7 changes: 2 additions & 5 deletions src/server/grpc/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use tari_shutdown::ShutdownSignal;
use tokio::select;
use tonic::transport::Channel;

use crate::server::{
grpc::error::{Error, TonicError},
p2p::Squad,
};
use crate::server::grpc::error::{Error, TonicError};

/// Utility function to connect to a Base node and try infinitely when it fails until gets connected.
pub async fn connect_base_node(
Expand Down Expand Up @@ -53,7 +50,7 @@ pub async fn connect_base_node(
Ok(client)
}

pub fn convert_coinbase_extra(squad: Squad, custom_coinbase_extra: String) -> Result<Vec<u8>, TryFromIntError> {
pub fn convert_coinbase_extra(squad: String, custom_coinbase_extra: String) -> Result<Vec<u8>, TryFromIntError> {
let type_length_value_marker = 0xFFu8;
let squad_type_marker = 0x02u8;
let custom_message_type_marker = 0x01u8;
Expand Down
23 changes: 21 additions & 2 deletions src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) struct StatsCollector {
request_tx: tokio::sync::mpsc::Sender<StatsRequest>,
request_rx: tokio::sync::mpsc::Receiver<StatsRequest>,
first_stat_received: Option<EpochTime>,
last_squad: Option<String>,
miner_rx_accepted: u64,
miner_sha_accepted: u64,
// miner_rejected: u64,
Expand Down Expand Up @@ -53,6 +54,7 @@ impl StatsCollector {
stats_broadcast_receiver,
request_rx: rx,
request_tx: tx,
last_squad: None,
first_stat_received: None,
miner_rx_accepted: 0,
miner_sha_accepted: 0,
Expand Down Expand Up @@ -87,6 +89,9 @@ impl StatsCollector {

fn handle_stat(&mut self, sample: StatData) {
match sample {
StatData::SquadChanged { squad, .. } => {
self.last_squad = Some(squad);
},
StatData::MinerBlockAccepted { pow_algo, .. } => match pow_algo {
PowAlgorithm::Sha3x => {
self.miner_sha_accepted += 1;
Expand Down Expand Up @@ -183,12 +188,13 @@ impl StatsCollector {
let formatter = Formatter::new();

info!(target: LOG_TARGET,
"========= Uptime: {}. v{} Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
"========= Uptime: {}. v{}, Sqd: {}, Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner accepts(rx/sha): {}/{}. Pool accepts (rx/sha) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{} Last gossip: {}==== ",
humantime::format_duration(Duration::from_secs(
EpochTime::now().as_u64().checked_sub(
self.first_stat_received.unwrap_or(EpochTime::now()).as_u64())
.unwrap_or_default())),
env!("CARGO_PKG_VERSION"),
self.last_squad.as_deref().unwrap_or("Not set"),
self.randomx_chain_height.saturating_sub(self.randomx_chain_length),
self.randomx_chain_height,
self.sha3x_chain_height.saturating_sub(self.sha3x_chain_length),
Expand Down Expand Up @@ -283,6 +289,10 @@ pub(crate) struct GetStatsResponse {

#[derive(Clone)]
pub(crate) enum StatData {
SquadChanged {
squad: String,
timestamp: EpochTime,
},
TargetDifficultyChanged {
target_difficulty: Difficulty,
pow_algo: PowAlgorithm,
Expand Down Expand Up @@ -328,6 +338,7 @@ pub(crate) enum StatData {
impl StatData {
pub fn timestamp(&self) -> EpochTime {
match self {
StatData::SquadChanged { timestamp, .. } => *timestamp,
StatData::MinerBlockAccepted { timestamp, .. } => *timestamp,
StatData::PoolBlockAccepted { timestamp, .. } => *timestamp,
StatData::ChainChanged { timestamp, .. } => *timestamp,
Expand Down Expand Up @@ -370,7 +381,7 @@ impl StatsBroadcastClient {
}

pub fn broadcast(&self, data: StatData) -> Result<(), anyhow::Error> {
let _ = self
let _unused = self
.tx
.send(data)
.inspect_err(|_e| error!(target: LOG_TARGET, "ShareChainError broadcasting stats"));
Expand Down Expand Up @@ -409,6 +420,14 @@ impl StatsBroadcastClient {
self.broadcast(data)
}

pub fn send_squad_changed(&self, squad: String) -> Result<(), anyhow::Error> {
let data = StatData::SquadChanged {
squad,
timestamp: EpochTime::now(),
};
self.broadcast(data)
}

pub fn send_chain_changed(&self, pow_algo: PowAlgorithm, height: u64, length: u64) -> Result<(), anyhow::Error> {
let data = StatData::ChainChanged {
algo: pow_algo,
Expand Down
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pub mod grpc;
pub mod http;
pub mod p2p;

pub const PROTOCOL_VERSION: u64 = 28;
pub const PROTOCOL_VERSION: u64 = 29;
Loading

0 comments on commit 04ea87c

Please sign in to comment.