Skip to content

Commit 35f9390

Browse files
Add timestamp to v2 header API (#772)
Co-authored-by: Aleksandar Terentić <[email protected]>
1 parent 86f9e7a commit 35f9390

File tree

12 files changed

+73
-15
lines changed

12 files changed

+73
-15
lines changed

core/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 1.2.0
44

5+
- Add timestamp to v2 header API
56
- Increase maximum kademlia record size to allow row with 512 cells
67
- Fix issue with multiple telemetry gauge callbacks
78

core/src/api/types.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,11 @@ impl Reply for Block {
324324
}
325325
}
326326

327-
impl TryFrom<AvailHeader> for HeaderMessage {
327+
impl TryFrom<(AvailHeader, u64)> for HeaderMessage {
328328
type Error = Report;
329329

330-
fn try_from(header: AvailHeader) -> Result<Self, Self::Error> {
331-
let header: Header = header.try_into()?;
330+
fn try_from((header, received_at): (AvailHeader, u64)) -> Result<Self, Self::Error> {
331+
let header: Header = (header, received_at).try_into()?;
332332
Ok(Self {
333333
block_number: header.number,
334334
header,
@@ -345,6 +345,7 @@ pub struct Header {
345345
extrinsics_root: H256,
346346
extension: Extension,
347347
digest: Digest,
348+
received_at: u64,
348349
}
349350

350351
impl Reply for Header {
@@ -401,10 +402,10 @@ struct Extension {
401402
app_lookup: CompactDataLookup,
402403
}
403404

404-
impl TryFrom<AvailHeader> for Header {
405+
impl TryFrom<(AvailHeader, u64)> for Header {
405406
type Error = Report;
406407

407-
fn try_from(header: AvailHeader) -> Result<Self> {
408+
fn try_from((header, received_at): (AvailHeader, u64)) -> Result<Self> {
408409
Ok(Header {
409410
hash: Encode::using_encoded(&header, blake2_256).into(),
410411
parent_hash: header.parent_hash,
@@ -413,6 +414,7 @@ impl TryFrom<AvailHeader> for Header {
413414
extrinsics_root: header.extrinsics_root,
414415
extension: header.extension.try_into()?,
415416
digest: header.digest.try_into()?,
417+
received_at,
416418
})
417419
}
418420
}
@@ -483,7 +485,11 @@ impl TryFrom<RpcEvent> for Option<PublishMessage> {
483485

484486
fn try_from(value: RpcEvent) -> Result<Self, Self::Error> {
485487
match value {
486-
RpcEvent::HeaderUpdate { header, .. } => header
488+
RpcEvent::HeaderUpdate {
489+
header,
490+
received_at: _,
491+
received_at_timestamp,
492+
} => (header, received_at_timestamp)
487493
.try_into()
488494
.map(Box::new)
489495
.map(PublishMessage::HeaderVerified)
@@ -903,6 +909,7 @@ mod tests {
903909
digest: Digest {
904910
logs: vec![DigestItem::RuntimeEnvironmentUpdated],
905911
},
912+
received_at: 0,
906913
},
907914
}))
908915
}

core/src/api/v2/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ Content-Type: application/json
168168
"{log}", ...
169169
]
170170
},
171+
"received_at": "{received-at-timestamp}"
171172
}
172173
```
173174

@@ -595,6 +596,7 @@ When header verification is finished, the message is pushed to the light client
595596
]
596597
}
597598
}
599+
"received_at": "{received-at-timestamp}",
598600
}
599601
}
600602
}

core/src/api/v2/handlers.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use crate::{
1414
Subscription, SubscriptionId, Transaction, Version, WsClients,
1515
},
1616
},
17-
data::{AppDataKey, BlockHeaderKey, Database, RpcNodeKey, VerifiedCellCountKey},
17+
data::{
18+
AppDataKey, BlockHeaderKey, BlockHeaderReceivedAtKey, Database, RpcNodeKey,
19+
VerifiedCellCountKey,
20+
},
1821
utils::calculate_confidence,
1922
};
2023

@@ -109,6 +112,10 @@ pub async fn block_header(
109112
.and_then(|extension| block_status(sync_start_block, db.clone(), block_number, extension))
110113
.ok_or(Error::not_found())?;
111114

115+
let received_at = db
116+
.get(BlockHeaderReceivedAtKey(block_number))
117+
.ok_or_else(Error::not_found)?;
118+
112119
if matches!(
113120
block_status,
114121
BlockStatus::Unavailable | BlockStatus::Pending | BlockStatus::VerifyingHeader
@@ -118,7 +125,7 @@ pub async fn block_header(
118125

119126
db.get(BlockHeaderKey(block_number))
120127
.ok_or_else(|| eyre!("Header not found"))
121-
.and_then(|header| header.try_into())
128+
.and_then(|header| (header, received_at).try_into())
122129
.map_err(Error::internal_server_error)
123130
}
124131

core/src/api/v2/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,9 @@ mod tests {
217217
},
218218
data::{
219219
self, AchievedConfidenceKey, AchievedSyncConfidenceKey, AppDataKey, BlockHeaderKey,
220-
Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, RpcNodeKey,
221-
VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey,
220+
BlockHeaderReceivedAtKey, Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey,
221+
MemoryDB, RpcNodeKey, VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey,
222+
VerifiedSyncDataKey,
222223
},
223224
network::rpc::Node,
224225
types::BlockRange,
@@ -236,6 +237,7 @@ mod tests {
236237
};
237238
use hyper::StatusCode;
238239
use std::{collections::HashSet, str::FromStr, sync::Arc};
240+
239241
use test_case::test_case;
240242
use uuid::Uuid;
241243

@@ -405,6 +407,7 @@ mod tests {
405407
db.put(VerifiedHeaderKey, BlockRange::init(9));
406408
db.put(LatestSyncKey, 5);
407409
db.put(BlockHeaderKey(block_number), header());
410+
db.put(BlockHeaderReceivedAtKey(block_number), 1737039274);
408411
let route = super::block_header_route(config, db);
409412
let response = warp::test::request()
410413
.method("GET")
@@ -473,6 +476,7 @@ mod tests {
473476
db.put(LatestHeaderKey, 1);
474477
db.put(VerifiedHeaderKey, BlockRange::init(1));
475478
db.put(BlockHeaderKey(1), header());
479+
db.put(BlockHeaderReceivedAtKey(1), 1737039274);
476480
let route = super::block_header_route(config, db);
477481
let response = warp::test::request()
478482
.method("GET")
@@ -481,7 +485,7 @@ mod tests {
481485
.await;
482486
assert_eq!(
483487
response.body(),
484-
r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]}}"#
488+
r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]},"received_at":1737039274}"#
485489
);
486490
}
487491

core/src/crawl_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ pub async fn run(
7979
while let Ok(rpc::OutputEvent::HeaderUpdate {
8080
header,
8181
received_at,
82+
..
8283
}) = message_rx.recv().await
8384
{
8485
let block = match types::BlockVerified::try_from((header, None)) {

core/src/data.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,18 @@ impl RecordKey for SignerNonceKey {
310310
SIGNER_NONCE.into()
311311
}
312312
}
313+
314+
pub struct BlockHeaderReceivedAtKey(pub u32);
315+
316+
impl RecordKey for BlockHeaderReceivedAtKey {
317+
type Type = u64;
318+
319+
fn space(&self) -> Option<&'static str> {
320+
Some(APP_STATE_CF)
321+
}
322+
323+
fn key(&self) -> String {
324+
let BlockHeaderReceivedAtKey(block_num) = self;
325+
format!("{BLOCK_HEADER_RECEIVED_AT}:{block_num}")
326+
}
327+
}

core/src/data/keys.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,5 @@ pub const CLIENT_ID_KEY: &str = "client_id";
3535
pub const P2P_KEYPAIR_KEY: &str = "p2p_keypair";
3636
/// Key for storing signer nonce
3737
pub const SIGNER_NONCE: &str = "signer_nonce";
38+
/// Key for storing block header received at timestamp
39+
pub const BLOCK_HEADER_RECEIVED_AT: &str = "block_header_received_at";

core/src/fat_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ pub async fn run(
249249
RpcEvent::HeaderUpdate {
250250
header,
251251
received_at,
252+
..
252253
} => (header, received_at),
253254
// skip ConnectedHost event
254255
RpcEvent::ConnectedHost(_) => continue,

core/src/light_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ pub async fn run(
209209
RpcEvent::HeaderUpdate {
210210
header,
211211
received_at,
212+
..
212213
} => (header, received_at),
213214
// skip ConnectedHost event
214215
RpcEvent::ConnectedHost(_) => continue,

core/src/network/rpc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub enum OutputEvent {
4646
HeaderUpdate {
4747
header: AvailHeader,
4848
received_at: Instant,
49+
received_at_timestamp: u64,
4950
},
5051
}
5152

core/src/network/rpc/subscriptions.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,18 @@ use avail_rust::{
66
use codec::Encode;
77
use color_eyre::{eyre::eyre, Result};
88
#[cfg(not(target_arch = "wasm32"))]
9-
use std::time::Instant;
9+
use std::time::{Instant, SystemTime, UNIX_EPOCH};
1010
use tokio::sync::broadcast::Sender;
1111
use tokio_stream::StreamExt;
1212
use tracing::{debug, info, trace};
1313
#[cfg(target_arch = "wasm32")]
14-
use web_time::Instant;
14+
use web_time::{Instant, SystemTime, UNIX_EPOCH};
1515

1616
use super::{Client, OutputEvent, Subscription};
1717
use crate::{
1818
data::{
19-
Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, IsFinalitySyncedKey,
20-
LatestHeaderKey, VerifiedHeaderKey,
19+
BlockHeaderReceivedAtKey, Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey,
20+
IsFinalitySyncedKey, LatestHeaderKey, VerifiedHeaderKey,
2121
},
2222
finality::{check_finality, ValidatorSet},
2323
types::{BlockRange, GrandpaJustification},
@@ -98,9 +98,18 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
9898
match subscription {
9999
Subscription::Header(header) => {
100100
let received_at = Instant::now();
101+
let received_at_timestamp = SystemTime::now()
102+
.duration_since(UNIX_EPOCH)
103+
.expect("Time went backwards")
104+
.as_secs();
101105
self.db.put(LatestHeaderKey, header.clone().number);
102106
info!("Header no.: {}", header.number);
103107

108+
self.db.put(
109+
BlockHeaderReceivedAtKey(header.number),
110+
received_at_timestamp,
111+
);
112+
104113
// if new validator set becomes active, replace the current one
105114
if self.block_data.next_valset.is_some() {
106115
self.block_data.current_valset = self.block_data.next_valset.take().unwrap();
@@ -163,6 +172,11 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
163172
let (header, received_at, valset) =
164173
self.block_data.unverified_headers.swap_remove(pos);
165174

175+
let received_at_timestamp = self
176+
.db
177+
.get(BlockHeaderReceivedAtKey(header.number))
178+
.expect("Block header timestamp is in the database");
179+
166180
let is_final = check_finality(&valset, &justification);
167181

168182
is_final.expect("Finality check failed");
@@ -216,6 +230,7 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
216230
.send(OutputEvent::HeaderUpdate {
217231
header,
218232
received_at,
233+
received_at_timestamp,
219234
})
220235
.unwrap();
221236
}
@@ -240,6 +255,7 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
240255
.send(OutputEvent::HeaderUpdate {
241256
header,
242257
received_at,
258+
received_at_timestamp,
243259
})
244260
.unwrap();
245261
} else {

0 commit comments

Comments
 (0)