Skip to content
This repository was archived by the owner on Feb 3, 2025. It is now read-only.

Commit

Permalink
Sync scorer data in the background
Browse files Browse the repository at this point in the history
  • Loading branch information
benthecarman committed Feb 8, 2024
1 parent c90790b commit a9afd33
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 47 deletions.
89 changes: 55 additions & 34 deletions mutiny-core/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ use crate::{
};
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::Network;
#[cfg(target_arch = "wasm32")]
use instant::Instant;
use lightning::ln::msgs::NodeAnnouncement;
use lightning::routing::gossip::NodeId;
use lightning::util::logger::Logger;
use lightning::util::ser::ReadableArgs;
use lightning::{log_debug, log_error, log_info, log_warn};
use lightning::{log_debug, log_error, log_info, log_trace, log_warn};
use reqwest::Client;
use reqwest::{Method, Url};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;

use crate::logging::MutinyLogger;
use crate::node::{NetworkGraph, RapidGossipSync};
Expand Down Expand Up @@ -105,7 +109,7 @@ pub struct Scorer {
pub value: String,
}

pub async fn get_remote_scorer_bytes(
async fn get_remote_scorer_bytes(
auth_client: &MutinyAuthClient,
base_url: &str,
) -> Result<Vec<u8>, MutinyError> {
Expand All @@ -126,6 +130,29 @@ pub async fn get_remote_scorer_bytes(
Ok(decoded)
}

/// Gets the remote scorer from the server, parses it and returns it as a [`HubPreferentialScorer`]
pub async fn get_remote_scorer(
auth_client: &MutinyAuthClient,
base_url: &str,
network_graph: Arc<NetworkGraph>,
logger: Arc<MutinyLogger>,
) -> Result<HubPreferentialScorer, MutinyError> {
let start = Instant::now();
let scorer_bytes = get_remote_scorer_bytes(auth_client, base_url).await?;
let mut readable_bytes = lightning::io::Cursor::new(scorer_bytes);
let params = decay_params();
let args = (params, network_graph, logger.clone());
let scorer = ProbScorer::read(&mut readable_bytes, args)?;

log_trace!(
logger,
"Retrieved remote scorer in {}ms",
start.elapsed().as_millis()
);

Ok(HubPreferentialScorer::new(scorer))
}

fn write_gossip_data(
storage: &impl MutinyStorage,
last_sync_timestamp: u32,
Expand All @@ -142,9 +169,7 @@ fn write_gossip_data(
}

pub async fn get_gossip_sync(
_storage: &impl MutinyStorage,
remote_scorer_url: Option<String>,
auth_client: Option<Arc<MutinyAuthClient>>,
storage: &impl MutinyStorage,
network: Network,
logger: Arc<MutinyLogger>,
) -> Result<(RapidGossipSync, HubPreferentialScorer), MutinyError> {
Expand All @@ -158,37 +183,28 @@ pub async fn get_gossip_sync(
gossip_data.last_sync_timestamp
);

let start = Instant::now();

// get network graph
let gossip_sync = RapidGossipSync::new(gossip_data.network_graph.clone(), logger.clone());

// Try to get remote scorer if remote_scorer_url and auth_client are available
if let (Some(url), Some(client)) = (remote_scorer_url, &auth_client) {
match get_remote_scorer_bytes(client, &url).await {
Ok(scorer_bytes) => {
let mut readable_bytes = lightning::io::Cursor::new(scorer_bytes);
let params = decay_params();
let args = (
params,
Arc::clone(&gossip_data.network_graph),
Arc::clone(&logger),
);
if let Ok(remote_scorer) = ProbScorer::read(&mut readable_bytes, args) {
log_debug!(logger, "retrieved remote scorer");
let remote_scorer = HubPreferentialScorer::new(remote_scorer);
gossip_data.scorer = Some(remote_scorer);
} else {
log_error!(
logger,
"failed to parse remote scorer, keeping the local one"
);
}
}
Err(_) => {
log_error!(
logger,
"failed to retrieve remote scorer, keeping the local one"
);
}
let scorer_hex: Option<String> = storage.get_data(PROB_SCORER_KEY)?;

if let Some(hex) = scorer_hex {
let scorer_bytes: Vec<u8> = Vec::from_hex(&hex)?;
let mut readable_bytes = lightning::io::Cursor::new(scorer_bytes);
let params = decay_params();
let args = (
params,
Arc::clone(&gossip_data.network_graph),
Arc::clone(&logger),
);
if let Ok(scorer) = ProbScorer::read(&mut readable_bytes, args) {
log_debug!(logger, "retrieved local scorer");
let scorer = HubPreferentialScorer::new(scorer);
gossip_data.scorer = Some(scorer);
} else {
log_error!(logger, "failed to parse local scorer");
}
}

Expand All @@ -201,6 +217,11 @@ pub async fn get_gossip_sync(
}
};

log_trace!(
&logger,
"Gossip sync/Scorer initialized in {}ms",
start.elapsed().as_millis()
);
Ok((gossip_sync, prob_scorer))
}

Expand Down Expand Up @@ -550,7 +571,7 @@ mod test {
let storage = MemoryStorage::default();

let logger = Arc::new(MutinyLogger::default());
let _gossip_sync = get_gossip_sync(&storage, None, None, Network::Regtest, logger.clone())
let _gossip_sync = get_gossip_sync(&storage, Network::Regtest, logger.clone())
.await
.unwrap();

Expand Down
59 changes: 46 additions & 13 deletions mutiny-core/src/nodemanager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::auth::MutinyAuthClient;
use crate::event::HTLCStatus;
use crate::labels::LabelStorage;
use crate::logging::LOGGING_KEY;
Expand Down Expand Up @@ -381,14 +382,8 @@ impl<S: MutinyStorage> NodeManagerBuilder<S> {

let chain = Arc::new(MutinyChain::new(tx_sync, wallet.clone(), logger.clone()));

let (gossip_sync, scorer) = get_gossip_sync(
&self.storage,
c.scorer_url,
c.auth_client.clone(),
c.network,
logger.clone(),
)
.await?;
let (gossip_sync, scorer) =
get_gossip_sync(&self.storage, c.network, logger.clone()).await?;

let scorer = Arc::new(utils::Mutex::new(scorer));

Expand Down Expand Up @@ -521,6 +516,8 @@ impl<S: MutinyStorage> NodeManagerBuilder<S> {
#[cfg(target_arch = "wasm32")]
websocket_proxy_addr,
user_rgs_url: c.user_rgs_url,
scorer_url: c.scorer_url,
auth_client: c.auth_client,
esplora,
lsp_config,
logger,
Expand All @@ -547,6 +544,8 @@ pub struct NodeManager<S: MutinyStorage> {
#[cfg(target_arch = "wasm32")]
websocket_proxy_addr: String,
user_rgs_url: Option<String>,
scorer_url: Option<String>,
auth_client: Option<Arc<MutinyAuthClient>>,
esplora: Arc<AsyncClient>,
pub(crate) wallet: Arc<OnChainWallet<S>>,
gossip_sync: Arc<RapidGossipSync>,
Expand Down Expand Up @@ -620,11 +619,6 @@ impl<S: MutinyStorage> NodeManager<S> {
/// Creates a background process that will sync the wallet with the blockchain.
/// This will also update the fee estimates every 10 minutes.
pub fn start_sync(nm: Arc<NodeManager<S>>) {
// If we are stopped, don't sync
if nm.stop.load(Ordering::Relaxed) {
return;
}

utils::spawn(async move {
let mut synced = false;
loop {
Expand All @@ -639,6 +633,12 @@ impl<S: MutinyStorage> NodeManager<S> {
} else {
log_info!(nm.logger, "RGS Synced!");
}

if let Err(e) = nm.sync_scorer().await {
log_error!(nm.logger, "Failed to sync scorer: {e}");
} else {
log_info!(nm.logger, "Scorer Synced!");
}
}

// we don't need to re-sync fees every time
Expand Down Expand Up @@ -1207,6 +1207,39 @@ impl<S: MutinyStorage> NodeManager<S> {
Ok(())
}

/// Downloads the latest score data from the server and replaces the current scorer.
/// Will be skipped if in safe mode.
async fn sync_scorer(&self) -> Result<(), MutinyError> {
// Skip syncing scorer if we are in safe mode.
if self.safe_mode {
log_info!(self.logger, "Skipping scorer sync in safe mode");
return Ok(());
}

if let (Some(auth), Some(url)) = (self.auth_client.as_ref(), self.scorer_url.as_deref()) {
let scorer = get_remote_scorer(
auth,
url,
self.gossip_sync.network_graph().clone(),
self.logger.clone(),
)
.await
.map_err(|e| {
log_error!(self.logger, "Failed to sync scorer: {e}");
e
})?;

// Replace the current scorer with the new one
let mut lock = self
.scorer
.try_lock()
.map_err(|_| MutinyError::WalletSyncError)?;
*lock = scorer;
}

Ok(())
}

/// Syncs the on-chain wallet and lightning wallet.
/// This will update the on-chain wallet with any new
/// transactions and update the lightning wallet with
Expand Down
7 changes: 7 additions & 0 deletions mutiny-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ impl<T> Mutex<T> {
lock: self.inner.borrow_mut(),
})
}

#[allow(clippy::result_unit_err)]
pub fn try_lock(&self) -> LockResult<MutexGuard<'_, T>> {
Ok(MutexGuard {
lock: self.inner.try_borrow_mut().map_err(|_| ())?,
})
}
}

impl<'a, T: 'a + ScoreLookUp + ScoreUpdate> LockableScore<'a> for Mutex<T> {
Expand Down

0 comments on commit a9afd33

Please sign in to comment.