Skip to content

Commit

Permalink
feat(stats): adding stats HTTP server to provide statistics about loc…
Browse files Browse the repository at this point in the history
…al p2pool (#25)

Description
---

Adding statistics and expose them over an HTTP server.
Stats:
- [x] Pool hashrate
- [ ] Network hashrate (can be checked when tribes are implemented)
- [x] Number of miners
- [x] Last block won
- [x] Connected (to the network)
- [x] Connected time
- [x] estimated earning in XTM in 1min, 1h, 24h, 7d, 30d for all miners
- [x] estimated earning of pool in XTM in 1min, 1h, 24h, 7d, 30d
- [ ] Submitted/Accepted/Rejected blocks

Motivation and Context
---

Tari Universe wants to use metrics to show/use states about the current
pool.

How Has This Been Tested?
---

- Running a miner and a p2pool node normally.
- Query http://127.0.0.1:19000/stats
- Checking output

Example output:
```json
{
  "connected": true,
  "connected_since": 1722519838,
  "num_of_miners": 2,
  "last_block_won": {
    "hash": "9c4fc0a3d996c6544e25b41ec0cf3a8bf771ac20fd0b7ae367e56ea597265d4f",
    "height": 5025,
    "timestamp": 1722518118,
    "miner_wallet_address": "f27mEvFXUZJCNFQM1MbkRwBnNJHwsK6JBxzrwVyYNm6bgGHpL1xDj7Gjevwj9caFrp23iLGUcysK6decdDL87sQCtL2"
  },
  "share_chain_height": 5196,
  "pool_hash_rate": [2536963],
  "pool_total_earnings": 27782378316,
  "pool_total_estimated_earnings": {
    "1min": 409165920,
    "1h": 24549955200,
    "1d": 589198924800,
    "1w": 4124392473600,
    "30d": 17675967744000
  },
  "total_earnings": {
    "f27mEvFXUZJCNFQM1MbkRwBnNJHwsK6JBxzrwVyYNm6bgGHpL1xDj7Gjevwj9caFrp23iLGUcysK6decdDL87sQCtL2": 5556473688,
    "f2CrtWaTZE3xWSxCkR1mR9Bszx321Ar2S1fPcBLNSNWiLBVwqkeankFjeTmdxYLuyeHg8oM4vSgsV1tjL4GSKEuy9pk": 8334710532
  },
  "estimated_earnings": {
    "f2CrtWaTZE3xWSxCkR1mR9Bszx321Ar2S1fPcBLNSNWiLBVwqkeankFjeTmdxYLuyeHg8oM4vSgsV1tjL4GSKEuy9pk": {
      "1min": 245499540,
      "1h": 14729972400,
      "1d": 353519337600,
      "1w": 2474635363200,
      "30d": 10605580128000
    },
    "f27mEvFXUZJCNFQM1MbkRwBnNJHwsK6JBxzrwVyYNm6bgGHpL1xDj7Gjevwj9caFrp23iLGUcysK6decdDL87sQCtL2": {
      "1min": 163666380,
      "1h": 9819982800,
      "1d": 235679587200,
      "1w": 1649757110400,
      "30d": 7070387616000
    }
  }
}
```

What process can a PR reviewer use to test or verify this change?
---

Do the steps at test.


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
ksrichard authored Aug 2, 2024
1 parent 94404b4 commit 62198a0
Show file tree
Hide file tree
Showing 20 changed files with 826 additions and 189 deletions.
249 changes: 230 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ moka = { version = "0.12.7", features = ["future"] }
rand = "0.8.0"
dirs = "4.0.0"
log4rs = "1.3.0"
axum = "0.7.5"
itertools = "0.13.0"
num = { version = "0.4.3", features = ["default", "num-bigint", "serde"] }

[package.metadata.cargo-machete]
ignored = ["log4rs"]
23 changes: 16 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use clap::{
builder::{styling::AnsiColor, Styles},
Parser,
};
use log::LevelFilter;

use tari_common::initialize_logging;

use crate::sharechain::in_memory::InMemoryShareChain;
Expand All @@ -27,14 +25,13 @@ fn cli_styles() -> Styles {
.valid(AnsiColor::BrightGreen.on_default())
}

#[allow(clippy::struct_excessive_bools)]
#[derive(Clone, Parser)]
#[command(version)]
#[command(styles = cli_styles())]
#[command(about = "⛏ Decentralized mining pool for Tari network ⛏", long_about = None)]
struct Cli {
/// Log level
#[arg(short, long, value_name = "log-level", default_value = Some("info"))]
log_level: LevelFilter,
base_dir: Option<PathBuf>,

/// (Optional) gRPC port to use.
#[arg(short, long, value_name = "grpc-port")]
Expand All @@ -44,6 +41,10 @@ struct Cli {
#[arg(short, long, value_name = "p2p-port")]
p2p_port: Option<u16>,

/// (Optional) stats server port to use.
#[arg(long, value_name = "stats-server-port")]
stats_server_port: Option<u16>,

/// (Optional) seed peers.
/// Any amount of seed peers can be added to join a p2pool network.
///
Expand Down Expand Up @@ -85,14 +86,18 @@ struct Cli {
#[arg(long, value_name = "mdns-disabled", default_value_t = false)]
mdns_disabled: bool,

base_dir: Option<PathBuf>,
/// Stats server disabled
///
/// If set, local stats HTTP server is disabled.
#[arg(long, value_name = "stats-server-disabled", default_value_t = false)]
stats_server_disabled: bool,
}

impl Cli {
pub fn base_dir(&self) -> PathBuf {
self.base_dir
.clone()
.unwrap_or_else(|| dirs::home_dir().unwrap().join(".p2pool/miner"))
.unwrap_or_else(|| dirs::home_dir().unwrap().join(".tari/p2pool"))
}
}

Expand Down Expand Up @@ -124,6 +129,10 @@ async fn main() -> anyhow::Result<()> {
config_builder.with_private_key_folder(cli.private_key_folder.clone());
config_builder.with_mining_enabled(!cli.mining_disabled);
config_builder.with_mdns_enabled(!cli.mdns_disabled);
config_builder.with_stats_server_enabled(!cli.stats_server_disabled);
if let Some(stats_server_port) = cli.stats_server_port {
config_builder.with_stats_server_port(stats_server_port);
}

// server start
let config = config_builder.build();
Expand Down
13 changes: 13 additions & 0 deletions src/server/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::{path::PathBuf, time::Duration};

use crate::server::http::stats;
use crate::server::{p2p, p2p::peer_store::PeerStoreConfig};

/// Config is the server configuration struct.
Expand All @@ -15,6 +16,7 @@ pub struct Config {
pub peer_store: PeerStoreConfig,
pub p2p_service: p2p::Config,
pub mining_enabled: bool,
pub stats_server: stats::server::Config,
}

impl Default for Config {
Expand All @@ -27,6 +29,7 @@ impl Default for Config {
peer_store: PeerStoreConfig::default(),
p2p_service: p2p::Config::default(),
mining_enabled: true,
stats_server: stats::server::Config::default(),
}
}
}
Expand Down Expand Up @@ -95,6 +98,16 @@ impl ConfigBuilder {
self
}

pub fn with_stats_server_enabled(&mut self, config: bool) -> &mut Self {
self.config.stats_server.enabled = config;
self
}

pub fn with_stats_server_port(&mut self, config: u16) -> &mut Self {
self.config.stats_server.port = config;
self
}

pub fn build(&self) -> Config {
self.config.clone()
}
Expand Down
40 changes: 20 additions & 20 deletions src/server/grpc/base_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,66 +109,48 @@ impl TariBaseNodeGrpc {

#[tonic::async_trait]
impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
type FetchMatchingUtxosStream = mpsc::Receiver<Result<tari_rpc::FetchMatchingUtxosResponse, Status>>;
type GetActiveValidatorNodesStream = mpsc::Receiver<Result<tari_rpc::GetActiveValidatorNodesResponse, Status>>;
type GetBlocksStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;
type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;
type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;
type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;
type GetSideChainUtxosStream = mpsc::Receiver<Result<tari_rpc::GetSideChainUtxosResponse, Status>>;
type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::GetTemplateRegistrationResponse, Status>>;
type GetTokensInCirculationStream = mpsc::Receiver<Result<ValueAtHeightResponse, Status>>;
type ListHeadersStream = mpsc::Receiver<Result<BlockHeaderResponse, Status>>;
type SearchKernelsStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;
type SearchUtxosStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;

async fn list_headers(
&self,
request: Request<ListHeadersRequest>,
) -> Result<Response<Self::ListHeadersStream>, Status> {
proxy_stream_result!(self, list_headers, request, LIST_HEADERS_PAGE_SIZE)
}

async fn get_header_by_hash(
&self,
request: Request<GetHeaderByHashRequest>,
) -> Result<Response<BlockHeaderResponse>, Status> {
proxy_simple_result!(self, get_header_by_hash, request)
}

type GetBlocksStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;
async fn get_blocks(&self, request: Request<GetBlocksRequest>) -> Result<Response<Self::GetBlocksStream>, Status> {
proxy_stream_result!(self, get_blocks, request, GET_BLOCKS_PAGE_SIZE)
}

async fn get_block_timing(&self, request: Request<HeightRequest>) -> Result<Response<BlockTimingResponse>, Status> {
proxy_simple_result!(self, get_block_timing, request)
}

async fn get_constants(&self, request: Request<BlockHeight>) -> Result<Response<ConsensusConstants>, Status> {
proxy_simple_result!(self, get_constants, request)
}

async fn get_block_size(
&self,
request: Request<BlockGroupRequest>,
) -> Result<Response<BlockGroupResponse>, Status> {
proxy_simple_result!(self, get_block_size, request)
}

async fn get_block_fees(
&self,
request: Request<BlockGroupRequest>,
) -> Result<Response<BlockGroupResponse>, Status> {
proxy_simple_result!(self, get_block_fees, request)
}

async fn get_version(&self, request: Request<Empty>) -> Result<Response<StringValue>, Status> {
proxy_simple_result!(self, get_version, request)
}

async fn check_for_updates(&self, request: Request<Empty>) -> Result<Response<SoftwareUpdate>, Status> {
proxy_simple_result!(self, check_for_updates, request)
}
type GetTokensInCirculationStream = mpsc::Receiver<Result<ValueAtHeightResponse, Status>>;

async fn get_tokens_in_circulation(
&self,
Expand All @@ -182,6 +164,8 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
)
}

type GetNetworkDifficultyStream = mpsc::Receiver<Result<tari_rpc::NetworkDifficultyResponse, Status>>;

async fn get_network_difficulty(
&self,
request: Request<HeightRequest>,
Expand Down Expand Up @@ -251,31 +235,41 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
proxy_simple_result!(self, get_tip_info, request)
}

type SearchKernelsStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;

async fn search_kernels(
&self,
request: Request<SearchKernelsRequest>,
) -> Result<Response<Self::SearchKernelsStream>, Status> {
proxy_stream_result!(self, search_kernels, request, GET_BLOCKS_PAGE_SIZE)
}

type SearchUtxosStream = mpsc::Receiver<Result<HistoricalBlock, Status>>;

async fn search_utxos(
&self,
request: Request<SearchUtxosRequest>,
) -> Result<Response<Self::SearchUtxosStream>, Status> {
proxy_stream_result!(self, search_utxos, request, GET_BLOCKS_PAGE_SIZE)
}

type FetchMatchingUtxosStream = mpsc::Receiver<Result<tari_rpc::FetchMatchingUtxosResponse, Status>>;

async fn fetch_matching_utxos(
&self,
request: Request<FetchMatchingUtxosRequest>,
) -> Result<Response<Self::FetchMatchingUtxosStream>, Status> {
proxy_stream_result!(self, fetch_matching_utxos, request, GET_BLOCKS_PAGE_SIZE)
}

type GetPeersStream = mpsc::Receiver<Result<tari_rpc::GetPeersResponse, Status>>;

async fn get_peers(&self, request: Request<GetPeersRequest>) -> Result<Response<Self::GetPeersStream>, Status> {
proxy_stream_result!(self, get_peers, request, GET_BLOCKS_PAGE_SIZE)
}

type GetMempoolTransactionsStream = mpsc::Receiver<Result<tari_rpc::GetMempoolTransactionsResponse, Status>>;

async fn get_mempool_transactions(
&self,
request: Request<GetMempoolTransactionsRequest>,
Expand Down Expand Up @@ -309,6 +303,8 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
proxy_simple_result!(self, get_mempool_stats, request)
}

type GetActiveValidatorNodesStream = mpsc::Receiver<Result<tari_rpc::GetActiveValidatorNodesResponse, Status>>;

async fn get_active_validator_nodes(
&self,
request: Request<GetActiveValidatorNodesRequest>,
Expand All @@ -323,13 +319,17 @@ impl tari_rpc::base_node_server::BaseNode for TariBaseNodeGrpc {
proxy_simple_result!(self, get_shard_key, request)
}

type GetTemplateRegistrationsStream = mpsc::Receiver<Result<tari_rpc::GetTemplateRegistrationResponse, Status>>;

async fn get_template_registrations(
&self,
request: Request<GetTemplateRegistrationsRequest>,
) -> Result<Response<Self::GetTemplateRegistrationsStream>, Status> {
proxy_stream_result!(self, get_template_registrations, request, 10)
}

type GetSideChainUtxosStream = mpsc::Receiver<Result<tari_rpc::GetSideChainUtxosResponse, Status>>;

async fn get_side_chain_utxos(
&self,
request: Request<GetSideChainUtxosRequest>,
Expand Down
29 changes: 7 additions & 22 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use log::{debug, info, warn};
use log::{info, warn};
use minotari_app_grpc::tari_rpc::{
base_node_client::BaseNodeClient, pow_algo::PowAlgos, sha_p2_pool_server::ShaP2Pool, GetNewBlockRequest,
GetNewBlockResponse, GetNewBlockTemplateWithCoinbasesRequest, HeightRequest, NewBlockTemplateRequest, PowAlgo,
SubmitBlockRequest, SubmitBlockResponse,
};
use tari_core::proof_of_work::sha3x_difficulty;
use tari_utilities::hex::Hex;
use tokio::sync::Mutex;
use tonic::{Code, Request, Response, Status};
use tonic::{Request, Response, Status};

use crate::{
server::{
Expand All @@ -27,45 +27,38 @@ const LOG_TARGET: &str = "p2pool::server::grpc::p2pool";
/// P2Pool specific gRPC service to provide `get_new_block` and `submit_block` functionalities.
pub struct ShaP2PoolGrpc<S>
where
S: ShareChain + Send + Sync + 'static,
S: ShareChain,
{
/// Base node client
client: Arc<Mutex<BaseNodeClient<tonic::transport::Channel>>>,
/// P2P service client
p2p_client: p2p::ServiceClient,
/// Current share chain
share_chain: Arc<S>,
sync_in_progress: Arc<AtomicBool>,
}

impl<S> ShaP2PoolGrpc<S>
where
S: ShareChain + Send + Sync + 'static,
S: ShareChain,
{
pub async fn new(
base_node_address: String,
p2p_client: p2p::ServiceClient,
share_chain: Arc<S>,
sync_in_progress: Arc<AtomicBool>,
) -> Result<Self, Error> {
Ok(Self {
client: Arc::new(Mutex::new(util::connect_base_node(base_node_address).await?)),
p2p_client,
share_chain,
sync_in_progress,
})
}

/// Submits a new block to share chain and broadcasts to the p2p network.
pub async fn submit_share_chain_block(&self, block: &Block) -> Result<(), Status> {
if self.sync_in_progress.load(Ordering::Relaxed) {
return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress..."));
}

if let Err(error) = self.share_chain.submit_block(block).await {
warn!(target: LOG_TARGET, "Failed to add new block: {error:?}");
}
debug!(target: LOG_TARGET, "Broadcast new block with height: {:?}", block.height());
info!(target: LOG_TARGET, "Broadcast new block: {:?}", block.hash().to_hex());
self.p2p_client
.broadcast_block(block)
.await
Expand All @@ -76,18 +69,14 @@ where
#[tonic::async_trait]
impl<S> ShaP2Pool for ShaP2PoolGrpc<S>
where
S: ShareChain + Send + Sync + 'static,
S: ShareChain,
{
/// Returns a new block (that can be mined) which contains all the shares generated
/// from the current share chain as coinbase transactions.
async fn get_new_block(
&self,
_request: Request<GetNewBlockRequest>,
) -> Result<Response<GetNewBlockResponse>, Status> {
if self.sync_in_progress.load(Ordering::Relaxed) {
return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress..."));
}

let mut pow_algo = PowAlgo::default();
pow_algo.set_pow_algo(PowAlgos::Sha3x);

Expand Down Expand Up @@ -137,10 +126,6 @@ where
&self,
request: Request<SubmitBlockRequest>,
) -> Result<Response<SubmitBlockResponse>, Status> {
if self.sync_in_progress.load(Ordering::Relaxed) {
return Err(Status::new(Code::Unavailable, "Share chain syncing is in progress..."));
}

let grpc_block = request.get_ref();
let grpc_request_payload = grpc_block
.block
Expand Down
4 changes: 4 additions & 0 deletions src/server/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

pub mod stats;
Loading

0 comments on commit 62198a0

Please sign in to comment.