Skip to content

Commit 8963c73

Browse files
committed
feat:finish archiver
Signed-off-by: Chen Kai <[email protected]>
1 parent e584c6f commit 8963c73

File tree

6 files changed

+49
-21
lines changed

6 files changed

+49
-21
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,4 @@ tracing-appender = "0.2.3"
3838
uuid = { version = "1.10.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
3939
warp = "0.3.2"
4040
ssz_rs = "0.9.0"
41+
ctrlc = { version = "3.2.3", features = ["termination"] }

Dockerfile

Whitespace-only changes.

bin/archiver/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ tracing-subscriber.workspace = true
2525
uuid.workspace = true
2626
warp.workspace = true
2727
tracing-appender.workspace = true
28+
ctrlc.workspace = true
2829
blob-archiver-storage = { path = "../../crates/storage" }
2930
blob-archiver-beacon = { path = "../../crates/beacon" }
3031

bin/archiver/src/main.rs

+40-19
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::time::Duration;
66

77
use again::RetryPolicy;
88
use clap::Parser;
9+
use ctrlc::set_handler;
910
use eth2::types::{BlockId, Hash256};
1011
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
1112
use serde::{Deserialize, Serialize};
@@ -16,12 +17,12 @@ use tracing_subscriber::layer::SubscriberExt;
1617
use tracing_subscriber::util::SubscriberInitExt;
1718
use tracing_subscriber::{fmt, EnvFilter};
1819

20+
use blob_archiver_beacon::beacon_client;
1921
use blob_archiver_beacon::beacon_client::BeaconClientEth2;
20-
use blob_archiver_beacon::{beacon_client, blob_test_helper};
2122
use blob_archiver_storage::fs::FSStorage;
22-
use blob_archiver_storage::s3::S3Config;
23+
use blob_archiver_storage::s3::{S3Config, S3Storage};
2324
use blob_archiver_storage::storage;
24-
use blob_archiver_storage::storage::StorageType;
25+
use blob_archiver_storage::storage::{Storage, StorageType};
2526

2627
use crate::archiver::{Archiver, Config, STARTUP_FETCH_BLOB_MAXIMUM_RETRIES};
2728

@@ -34,27 +35,47 @@ static INIT: std::sync::Once = std::sync::Once::new();
3435
async fn main() {
3536
let args = CliArgs::parse();
3637

37-
let config = args.to_config();
38-
println!("{:#?}", config);
39-
init_logging(0, None, None);
38+
let config: Config = args.to_config();
39+
init_logging(
40+
config.log_config.verbose,
41+
config.log_config.log_dir.clone(),
42+
config
43+
.log_config
44+
.log_rotation
45+
.clone()
46+
.map(|s| to_rotation(s.as_str())),
47+
);
4048
let beacon_client = BeaconNodeHttpClient::new(
41-
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
42-
Timeouts::set_all(Duration::from_secs(30)),
49+
SensitiveUrl::from_str(config.beacon_config.beacon_endpoint.as_str()).unwrap(),
50+
Timeouts::set_all(config.beacon_config.beacon_client_timeout),
4351
);
44-
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
45-
let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
46-
let beacon_client_eth2 = BeaconClientEth2 { beacon_client };
47-
let config = Config {
48-
poll_interval: Duration::from_secs(5),
49-
listen_addr: "".to_string(),
50-
origin_block: *blob_test_helper::ORIGIN_BLOCK,
51-
beacon_config: Default::default(),
52-
storage_config: Default::default(),
53-
log_config: Default::default(),
52+
let storage: Arc<Mutex<dyn Storage>> = if config.storage_config.storage_type == StorageType::FS
53+
{
54+
Arc::new(Mutex::new(
55+
FSStorage::new(config.storage_config.fs_dir.clone().unwrap())
56+
.await
57+
.unwrap(),
58+
))
59+
} else {
60+
Arc::new(Mutex::new(
61+
S3Storage::new(config.storage_config.s3_config.clone().unwrap())
62+
.await
63+
.unwrap(),
64+
))
5465
};
66+
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
67+
set_handler(move || {
68+
tracing::info!("shutting down");
69+
shutdown_tx
70+
.send(true)
71+
.expect("could not send shutdown signal");
72+
})
73+
.expect("could not register shutdown handler");
74+
75+
let beacon_client_eth2 = BeaconClientEth2 { beacon_client };
5576
let archiver = Archiver::new(
5677
Arc::new(Mutex::new(beacon_client_eth2)),
57-
Arc::new(Mutex::new(storage)),
78+
storage,
5879
config,
5980
shutdown_rx,
6081
);

crates/storage/src/s3.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::path::Path;
22
use std::time::Duration;
33

4+
use crate::storage::{
5+
BackfillProcesses, BlobData, LockFile, Storage, StorageReader, StorageWriter,
6+
};
47
use async_trait::async_trait;
58
use aws_sdk_s3::config::retry::RetryConfig;
69
use aws_sdk_s3::config::timeout::TimeoutConfig;
@@ -14,8 +17,6 @@ use serde::{Deserialize, Serialize};
1417
use tracing::info;
1518
use tracing::log::trace;
1619

17-
use crate::storage::{BackfillProcesses, BlobData, LockFile, StorageReader, StorageWriter};
18-
1920
use crate::storage::BACKFILL_LOCK;
2021

2122
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
@@ -89,6 +90,9 @@ impl S3Storage {
8990
}
9091
}
9192

93+
#[async_trait]
94+
impl Storage for S3Storage {}
95+
9296
#[async_trait]
9397
impl StorageReader for S3Storage {
9498
async fn read_blob_data(&self, hash: &Hash256) -> Result<BlobData> {

0 commit comments

Comments
 (0)