Skip to content

Commit

Permalink
feat: add wallet state (#6763)
Browse files Browse the repository at this point in the history
Description
---
Adds a wallet state grpc call which returns the balance, scanned height
and wallet state
  • Loading branch information
SWvheerden authored Jan 27, 2025
1 parent 8f65540 commit d88f7d6
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 10 deletions.
10 changes: 10 additions & 0 deletions applications/minotari_app_grpc/proto/wallet.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import "network.proto";
service Wallet {
// This returns the current version
rpc GetVersion (GetVersionRequest) returns (GetVersionResponse);
// This returns the current state of the wallet
rpc GetState (GetStateRequest) returns (GetStateResponse);
// This checks if the wallet is healthy and running
rpc CheckConnectivity(GetConnectivityRequest) returns (CheckConnectivityResponse);
// Check for new updates
Expand Down Expand Up @@ -241,13 +243,21 @@ message GetCompletedTransactionsResponse {

message GetBalanceRequest {}

message GetStateRequest {}

message GetBalanceResponse {
uint64 available_balance = 1;
uint64 pending_incoming_balance = 2;
uint64 pending_outgoing_balance = 3;
uint64 timelocked_balance = 4;
}

message GetStateResponse {
uint64 scanned_height = 1;
GetBalanceResponse balance = 2;
NetworkStatusResponse network = 3;
}

message GetUnspentAmountsResponse {
repeated uint64 amount = 1;
}
Expand Down
2 changes: 1 addition & 1 deletion applications/minotari_console_wallet/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

mod get_balance_debounced;
mod wallet_debouncer;
mod wallet_grpc_server;

use minotari_app_grpc::tari_rpc::TransactionEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use minotari_wallet::{
service::Balance,
},
transaction_service::handle::{TransactionEvent, TransactionServiceHandle},
utxo_scanner_service::handle::{UtxoScannerEvent, UtxoScannerHandle},
};
use tari_shutdown::ShutdownSignal;
use tokio::sync::Mutex;
Expand All @@ -44,22 +45,25 @@ const LOG_TARGET: &str = "wallet::ui::grpc::get_balance_debounced";
/// balance needs to be updated. When ever a client requests the balance, it will be fetched from the backend if the
/// flag is set and clear the flag, otherwise the cached balance will be returned.
#[derive(Clone)]
pub struct GetBalanceDebounced {
pub struct WalletDebouncer {
balance: Arc<Mutex<Balance>>,
scanned_height: Arc<Mutex<u64>>,
refresh_needed: Arc<Mutex<bool>>,
output_manager_service: OutputManagerHandle,
transaction_service: TransactionServiceHandle,
wallet_connectivity: WalletConnectivityHandle,
utxo_scanner_handle: UtxoScannerHandle,
shutdown_signal: ShutdownSignal,
event_monitor_started: Arc<Mutex<bool>>,
}

impl GetBalanceDebounced {
/// Create a new GetBalanceDebounced instance.
impl WalletDebouncer {
/// Create a new WalletDebouncer instance.
pub fn new(
output_manager_service: OutputManagerHandle,
transaction_service: TransactionServiceHandle,
wallet_connectivity: WalletConnectivityHandle,
utxo_scanner_handle: UtxoScannerHandle,
shutdown_signal: ShutdownSignal,
) -> Self {
Self {
Expand All @@ -70,9 +74,11 @@ impl GetBalanceDebounced {
time_locked_balance: None,
})),
refresh_needed: Arc::new(Mutex::new(true)),
scanned_height: Arc::new(Mutex::new(0)),
output_manager_service,
transaction_service,
wallet_connectivity,
utxo_scanner_handle,
shutdown_signal,
event_monitor_started: Arc::new(Mutex::new(false)),
}
Expand Down Expand Up @@ -138,11 +144,27 @@ impl GetBalanceDebounced {
}
}

async fn update_scanned_height(&self, scanned_height: u64) {
let mut lock = self.scanned_height.lock().await;
if *lock != scanned_height {
trace!(target: LOG_TARGET, "set_scanned_height '{}'", scanned_height);
*lock = scanned_height;
}
}

pub async fn get_scanned_height(&mut self) -> u64 {
if !self.is_event_monitor_started().await {
self.start_event_monitor().await;
}
*self.scanned_height.lock().await
}

async fn monitor_events(&self) {
let mut shutdown_signal = self.shutdown_signal.clone();
let mut transaction_service_events = self.transaction_service.get_event_stream();
let mut base_node_changed = self.wallet_connectivity.clone().get_current_base_node_watcher();
let mut output_manager_service_events = self.output_manager_service.get_event_stream();
let mut utxo_scanner_events = self.utxo_scanner_handle.clone().get_event_receiver();

loop {
tokio::select! {
Expand Down Expand Up @@ -187,6 +209,29 @@ impl GetBalanceDebounced {
},
}
},
result = utxo_scanner_events.recv() => {
match result {
Ok(event) => {
match event {
UtxoScannerEvent::Progress {
current_height,..
}=> {
self.update_scanned_height(current_height).await;
}
UtxoScannerEvent::Completed {
final_height,
..
}=> {
self.update_scanned_height(final_height).await;
},
_ => {}
}
},
Err(e) => {
warn!(target: LOG_TARGET, "Problem with utxo scanner: {}",e);
},
}
},
_ = shutdown_signal.wait() => {
info!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ use minotari_app_grpc::tari_rpc::{
GetConnectivityRequest,
GetIdentityRequest,
GetIdentityResponse,
GetStateRequest,
GetStateResponse,
GetTransactionInfoRequest,
GetTransactionInfoResponse,
GetUnspentAmountsResponse,
Expand Down Expand Up @@ -122,7 +124,7 @@ use tokio::{
use tonic::{Request, Response, Status};

use crate::{
grpc::{convert_to_transaction_event, get_balance_debounced::GetBalanceDebounced, TransactionWrapper},
grpc::{convert_to_transaction_event, wallet_debouncer::WalletDebouncer, TransactionWrapper},
notifier::{CANCELLED, CONFIRMATION, MINED, QUEUED, RECEIVED, SENT},
};

Expand All @@ -146,23 +148,24 @@ async fn send_transaction_event(
pub struct WalletGrpcServer {
wallet: WalletSqlite,
rules: ConsensusManager,
get_balance_debounced: Arc<Mutex<GetBalanceDebounced>>,
debouncer: Arc<Mutex<WalletDebouncer>>,
}

impl WalletGrpcServer {
#[allow(dead_code)]
pub fn new(wallet: WalletSqlite) -> Result<Self, ConsensusBuilderError> {
let rules = ConsensusManager::builder(wallet.network.as_network()).build()?;
let get_balance = GetBalanceDebounced::new(
let debouncer = WalletDebouncer::new(
wallet.output_manager_service.clone(),
wallet.transaction_service.clone(),
wallet.wallet_connectivity.clone(),
wallet.utxo_scanner_service.clone(),
wallet.comms.shutdown_signal(),
);
Ok(Self {
wallet,
rules,
get_balance_debounced: Arc::new(Mutex::new(get_balance)),
debouncer: Arc::new(Mutex::new(debouncer)),
})
}

Expand Down Expand Up @@ -281,16 +284,56 @@ impl wallet_server::Wallet for WalletGrpcServer {
async fn get_balance(&self, _request: Request<GetBalanceRequest>) -> Result<Response<GetBalanceResponse>, Status> {
let start = std::time::Instant::now();
let balance = {
let mut get_balance = self.get_balance_debounced.lock().await;
let mut get_balance = self.debouncer.lock().await;
match get_balance.get_balance().await {
Ok(b) => b,
Err(e) => return Err(Status::not_found(format!("GetBalanceDebounced error! {}", e))),
Err(e) => return Err(Status::not_found(format!("WalletDebouncer error! {}", e))),
}
};
trace!(target: LOG_TARGET, "'get_balance' completed in {:.2?}", start.elapsed());
Ok(Response::new(balance))
}

async fn get_state(&self, _request: Request<GetStateRequest>) -> Result<Response<GetStateResponse>, Status> {
let start = std::time::Instant::now();
let (balance, scanned_height) = {
let mut debouncer = self.debouncer.lock().await;
let balance = match debouncer.get_balance().await {
Ok(b) => b,
Err(e) => return Err(Status::not_found(format!("WalletDebouncer error! {}", e))),
};
let scanned_height = debouncer.get_scanned_height().await;
(Some(balance), scanned_height)
};

let status = self
.comms()
.connectivity()
.get_connectivity_status()
.await
.map_err(|err| Status::internal(err.to_string()))?;
let mut base_node_service = self.wallet.base_node_service.clone();

let network = Some(tari_rpc::NetworkStatusResponse {
status: tari_rpc::ConnectivityStatus::from(status) as i32,
avg_latency_ms: base_node_service
.get_base_node_latency()
.await
.map_err(|err| Status::internal(err.to_string()))?
.map(|d| u32::try_from(d.as_millis()).unwrap_or(u32::MAX))
.unwrap_or_default(),
num_node_connections: u32::try_from(status.num_connected_nodes())
.map_err(|_| Status::internal("Count not convert u64 to usize".to_string()))?,
});

trace!(target: LOG_TARGET, "'get_state' completed in {:.2?}", start.elapsed());
Ok(Response::new(GetStateResponse {
scanned_height,
balance,
network,
}))
}

async fn get_unspent_amounts(
&self,
_: Request<tari_rpc::Empty>,
Expand Down

0 comments on commit d88f7d6

Please sign in to comment.