Skip to content

Commit a9ff43d

Browse files
author
Lazy Luis
authored
Merge pull request #66 from sideprotocol/websocket_reconnect
Optimize web socket connection and DKG procession
2 parents 9001c7e + 5be806c commit a9ff43d

File tree

18 files changed

+438
-388
lines changed

18 files changed

+438
-388
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
edition = "2021"
33
name = "shuttler"
4-
version = "2.0.0-rc3"
4+
version = "2.0.0-rc4"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

src/apps/core.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
helper::{
1515
encoding::to_base64,
1616
now,
17-
store::{DefaultStore, MemStore, Store},
17+
store::{DefaultStore, Store},
1818
}, protocols::refresh::RefreshInput,
1919
};
2020

@@ -210,8 +210,12 @@ type SignatureShareStore =
210210
type SignerNonceStore = DefaultStore<String, BTreeMap<Index, round1::SigningNonces>>;
211211

212212
pub type Round1Store =
213-
MemStore<String, BTreeMap<Identifier, Vec<frost_adaptor_signature::keys::dkg::round1::Package>>>;
214-
pub type Round2Store = MemStore<String, BTreeMap<Identifier, Vec<Vec<u8>>>>;
213+
DefaultStore<String, BTreeMap<Identifier, Vec<frost_adaptor_signature::keys::dkg::round1::Package>>>;
214+
pub type Round2Store = DefaultStore<String, BTreeMap<Identifier, Vec<Vec<u8>>>>;
215+
216+
217+
pub type Round1SecetStore = DefaultStore<String, Vec<frost_adaptor_signature::keys::dkg::round1::SecretPackage>>;
218+
pub type Round2SecetStore = DefaultStore<String, Vec<frost_adaptor_signature::keys::dkg::round2::SecretPackage>>;
215219

216220
pub struct Context {
217221
pub swarm: Swarm<ShuttlerBehaviour>,
@@ -232,6 +236,8 @@ pub struct Context {
232236
// dkg stores
233237
pub db_round1: Round1Store,
234238
pub db_round2: Round2Store,
239+
pub sec_round1: Round1SecetStore,
240+
pub sec_round2: Round2SecetStore,
235241
}
236242

237243
impl Context {
@@ -265,13 +271,24 @@ impl Context {
265271
commitment_store: CommitmentStore::new(conf.get_database_with_name("commitments")),
266272
signature_store: SignatureShareStore::new(conf.get_database_with_name("signature_shares")),
267273
general_store: DefaultStore::new(conf.get_database_with_name("general")),
274+
275+
db_round1: Round1Store::new(conf.get_database_with_name("round1")),
276+
db_round2: Round2Store::new(conf.get_database_with_name("round2")),
277+
sec_round1: Round1SecetStore::new(conf.get_database_with_name("sec_round1")),
278+
sec_round2: Round2SecetStore::new(conf.get_database_with_name("sec_round2")),
279+
268280
conf,
269281

270-
db_round1: Round1Store::new(),
271-
db_round2: Round2Store::new(),
272282
}
273283
}
274284

285+
pub fn clean_dkg_cache(&self, task_id: &String) {
286+
self.db_round1.remove(task_id);
287+
self.db_round2.remove(task_id);
288+
self.sec_round1.remove(task_id);
289+
self.sec_round2.remove(task_id);
290+
}
291+
275292
pub fn clean_task_cache(&self, task_id: &String) {
276293
self.task_store.remove(task_id);
277294
self.nonce_store.remove(task_id);

src/apps/lending.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use cosmrs::Any;
44
use frost_adaptor_signature::VerifyingKey;
55
use side_proto::side::tss::{MsgCompleteDkg, MsgCompleteRefreshing, MsgSubmitSignatures, SigningType};
6-
use tracing::debug;
76

87
use crate::config::{VaultKeypair, APP_NAME_LENDING};
98
use crate::helper::encoding::{from_base64, hash, pubkey_to_identifier};
@@ -64,7 +63,6 @@ impl DKGAdaptor for KeygenHander {
6463
match event {
6564
SideEvent::BlockEvent(events) => {
6665
if events.contains_key("initiate_dkg.id") {
67-
println!("Events: {:?}", events);
6866

6967
let live_peers = mem_store::alive_participants();
7068

@@ -75,16 +73,27 @@ impl DKGAdaptor for KeygenHander {
7573
.zip(events.get("initiate_dkg.batch_size")?) {
7674

7775
let mut participants = vec![];
76+
let mut down_peers = vec![];
77+
let mut names = vec![];
7878
for p in ps.split(",") {
7979
if let Ok(keybytes) = from_base64(p) {
8080
let identifier = pubkey_to_identifier(&keybytes);
8181
// not have enough participants
82+
let moniker = mem_store::get_participant_moniker(&identifier);
8283
if !live_peers.contains(&identifier) {
83-
break;
84-
}
84+
down_peers.push(moniker.clone());
85+
}
86+
names.push(moniker);
87+
8588
participants.push(identifier);
8689
}
8790
};
91+
92+
tracing::debug!("Task {} has {} offline participants {:?} {:?}, threshold {}", id, down_peers.len(), down_peers, names, t);
93+
if down_peers.len() > 0 {
94+
continue;
95+
}
96+
8897
if let Ok(threshold) = t.parse() {
8998
if threshold as usize * 3 >= participants.len() * 2 {
9099
if let Ok(batch_size) = b.parse() {
@@ -116,7 +125,7 @@ impl DKGAdaptor for KeygenHander {
116125
pub_keys.push(hexkey);
117126
});
118127

119-
debug!("Oracle pubkey >>>: {:?}", pub_keys);
128+
// debug!("Oracle pubkey >>>: {:?}", pub_keys);
120129

121130
// save dkg id and keys for refresh
122131
ctx.general_store.save(&format!("{}", task.id).as_str(), &pub_keys.join(","));

src/apps/relayer/bridge.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ pub async fn send_withdraw_tx(
528528
info!("Submitting withdrawal tx: {:?}", msg);
529529

530530
let any_msg = Any::from_msg(&msg).unwrap();
531-
send_cosmos_transaction(relayer.config(), any_msg).await
531+
send_cosmos_transaction(&relayer.identifier, relayer.config(), any_msg).await
532532
}
533533

534534
pub async fn send_deposit_tx(
@@ -549,7 +549,7 @@ pub async fn send_deposit_tx(
549549
info!("Submitting deposit tx: {:?}", msg);
550550

551551
let any_msg = Any::from_msg(&msg).unwrap();
552-
send_cosmos_transaction(&relayer.config(), any_msg).await
552+
send_cosmos_transaction(&relayer.identifier, &relayer.config(), any_msg).await
553553
}
554554

555555
pub(crate) fn get_last_scanned_height(relayer: &Relayer) -> u64 {
@@ -629,7 +629,7 @@ pub async fn submit_fee_rate_to_side(relayer: &Relayer, fee_rate: i64) {
629629

630630
info!("Submitting fee rate: {:?}", msg_submit_fee_rate);
631631
let any_msg = Any::from_msg(&msg_submit_fee_rate).unwrap();
632-
match send_cosmos_transaction(relayer.config(), any_msg).await {
632+
match send_cosmos_transaction(&relayer.identifier, relayer.config(), any_msg).await {
633633
Ok(resp) => {
634634
let tx_response = resp.into_inner().tx_response.unwrap();
635635
if tx_response.code != 0 {

src/apps/relayer/lending.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ pub async fn send_deposit_tx(
404404
info!("submit deposit tx to side: {:?}", msg);
405405

406406
let any_msg = Any::from_msg(&msg).unwrap();
407-
send_cosmos_transaction(&relayer.config(), any_msg).await
407+
send_cosmos_transaction(&relayer.identifier, &relayer.config(), any_msg).await
408408
}
409409

410410
pub async fn handle_cet(relayer: &Relayer, loan_id: String, cet_type: String) {

src/apps/relayer/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
use std::time::Duration;
44

55
use bitcoincore_rpc::{Auth, Client};
6+
use ed25519_compact::SecretKey;
67
use futures::join;
7-
use crate::{config::Config, helper::{client_fee_provider::FeeProviderClient, client_ordinals::OrdinalsClient}};
8+
use crate::{config::Config, helper::{client_fee_provider::FeeProviderClient, client_ordinals::OrdinalsClient, encoding::pubkey_to_identifier}};
89

910
pub mod bridge;
1011
pub mod lending;
@@ -18,6 +19,7 @@ pub struct Relayer {
1819
pub fee_provider_client: FeeProviderClient,
1920
pub db_relayer: sled::Db,
2021
pub ticker: tokio::time::Interval,
22+
pub identifier: frost_adaptor_signature::Identifier,
2123
}
2224

2325
impl Relayer {
@@ -40,6 +42,20 @@ impl Relayer {
4042
let db_relayer = sled::open(conf.get_database_with_name("relayer")).expect("Counld not create database!");
4143
let ticker = tokio::time::interval(Duration::from_secs(conf.loop_interval as u64));
4244

45+
46+
// load private key from priv_validator_key_path
47+
let priv_validator_key = conf.load_validator_key();
48+
49+
let mut b = priv_validator_key
50+
.priv_key
51+
.ed25519_signing_key()
52+
.unwrap()
53+
.as_bytes()
54+
.to_vec();
55+
b.extend(priv_validator_key.pub_key.to_bytes());
56+
let node_key = SecretKey::new(b.as_slice().try_into().unwrap());
57+
let identifier = pubkey_to_identifier(node_key.public_key().as_slice());
58+
4359
Self {
4460
// priv_validator_key: validator_key,
4561
bitcoin_client,
@@ -48,6 +64,7 @@ impl Relayer {
4864
config: conf,
4965
db_relayer,
5066
ticker,
67+
identifier
5168
}
5269
}
5370

src/apps/shuttler.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,11 @@ impl<'a> Shuttler<'a> {
165165
// Tx Sender for Tx Quene
166166
let (tx_sender, tx_receiver) = std::sync::mpsc::channel::<Any>();
167167
let conf2 = conf.clone();
168+
let identifier2 = identifier.clone();
168169
spawn(async move {
169170
while let Ok(message) = tx_receiver.recv() {
170-
println!("Received: {:?}", message);
171-
match send_cosmos_transaction(&conf2, message).await {
171+
// println!("Received: {:?}", message);
172+
match send_cosmos_transaction(&identifier2, &conf2, message).await {
172173
Ok(resp) => {
173174
if let Some(inner) = resp.into_inner().tx_response {
174175
debug!("Submited {}, {}, {}", inner.txhash, inner.code, inner.raw_log)
@@ -179,7 +180,6 @@ impl<'a> Shuttler<'a> {
179180
}
180181
});
181182

182-
183183
// Common Setting: Context and Heart Beat
184184
let mut context = Context::new(swarm, tx_sender, identifier, node_key, conf.clone());
185185
let mut ticker = tokio::time::interval_at(get_next_full_hour(), Duration::from_secs(5 * 60));
@@ -206,18 +206,31 @@ impl<'a> Shuttler<'a> {
206206

207207
loop {
208208
select! {
209-
Some(msg) = client.receive_message() => {
210-
// tracing::info!("msg {:?}", msg);
211-
if self.handle_block_event(&mut context, msg) {
212-
tracing::error!("websocket connection closed, reconnecting...");
213-
tokio::time::sleep(Duration::from_secs(5)).await;
214-
if client.reconnect().await.is_err() {
215-
tracing::error!("Failed to reconnect to websocket");
216-
break;
217-
} else {
218-
tracing::info!("Reconnected to websocket");
209+
Some(recv) = client.receive_message() => {
210+
match recv {
211+
Ok(msg) => {
212+
if self.handle_block_event(&mut context, msg) {
213+
tracing::error!("websocket connection closed, will reconnect in 5s.");
214+
tokio::time::sleep(Duration::from_secs(5)).await;
215+
if client.reconnect().await.is_err() {
216+
tracing::error!("Failed to reconnect to websocket");
217+
break;
218+
} else {
219+
tracing::info!("Reconnected to websocket");
220+
}
221+
};
222+
},
223+
Err(e) => {
224+
tracing::error!("websocket error: {:?}, will reconnect in 5s", e);
225+
tokio::time::sleep(Duration::from_secs(5)).await;
226+
if client.reconnect().await.is_err() {
227+
tracing::error!("Failed to reconnect to websocket");
228+
break;
229+
} else {
230+
tracing::info!("Reconnected to websocket");
231+
}
219232
}
220-
};
233+
}
221234
}
222235
// recv = sidechain_event_stream.next() => {
223236
// match recv {
@@ -248,7 +261,7 @@ impl<'a> Shuttler<'a> {
248261
// info!(" @@(Received) Discovered new peer: {peer_id} with info: {connection_id} {:?}", info);
249262
info.listen_addrs.iter().for_each(|addr| {
250263
if !addr.to_string().starts_with("/ip4/127.0.0.1") {
251-
tracing::debug!("Discovered: {addr}/p2p/{peer_id}");
264+
// tracing::debug!("Discovered: {addr}/p2p/{peer_id}");
252265
context.swarm
253266
.behaviour_mut()
254267
.kad
@@ -276,7 +289,10 @@ impl<'a> Shuttler<'a> {
276289
} else {
277290
let _ = context.swarm.disconnect_peer_id(peer_id);
278291
}
279-
info!("Connected peers {:?}", context.swarm.connected_peers().collect::<Vec<_>>());
292+
293+
info!("Connected to {}", peer_id)
294+
295+
// info!("Connected peers {:?}", context.swarm.connected_peers().map(|i|).collect::<Vec<_>>());
280296
},
281297
SwarmEvent::ConnectionClosed { peer_id, cause, .. } => {
282298
info!("Disconnected {peer_id}: {:?}", cause);
@@ -336,7 +352,7 @@ impl<'a> Shuttler<'a> {
336352
},
337353
tokio_tungstenite::tungstenite::Message::Close(_close) => {
338354
tracing::error!("connection closed");
339-
return false
355+
return true
340356
},
341357
_ => return false
342358
};
@@ -350,7 +366,7 @@ impl<'a> Shuttler<'a> {
350366
tendermint_rpc::event::v0_38::DeEventData::NewBlock { block, result_finalize_block , ..} => {
351367
if let Some(b) = block {
352368
let height = b.header.height.value();
353-
debug!("Received New block: #{:?}, {:?}", height, mem_store::alive_participants_monikers());
369+
debug!("Block: #{:?}, offline: {:?}", height, mem_store::offline_participants_monikers());
354370
sending_heart_beat(ctx, height);
355371
}
356372
if let Some(finalize_block) = result_finalize_block {
@@ -420,8 +436,7 @@ impl<'a> Shuttler<'a> {
420436
tasks.push(task);
421437
}
422438
};
423-
});
424-
439+
});
425440
}
426441

427442
if let Some(lending) = self.apps.iter().find(|a| a.name() == APP_NAME_LENDING) {

src/config/candidate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Candidate {
6060
params.allowed_dkg_participants.iter().for_each(|v| {
6161
if let Ok(pk ) = from_base64(&v.consensus_pubkey) {
6262
let id = pubkey_to_identifier(&pk);
63-
debug!("added {:?} in white list", id);
63+
debug!("Access granted to {} {}", v.consensus_pubkey, v.moniker);
6464
mem_store::add_moniker(&id, v.moniker.clone());
6565
self.peers.push(identifier_to_peer_id(&id ));
6666
self.identifiers.push( id );

0 commit comments

Comments
 (0)