Skip to content

Commit c6127b8

Browse files
committed
feat(sync):daemon support for advanced sync
1 parent c640979 commit c6127b8

File tree

2 files changed

+102
-7
lines changed

2 files changed

+102
-7
lines changed

aw-sync/src/main.rs

+101-6
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
// - [x] Setup local sync bucket
33
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
44
// - [x] Import buckets and sync events from remotes
5-
// - [ ] Add CLI arguments
5+
// - [x] Add CLI arguments
66
// - [x] For which local server to use
77
// - [x] For which sync dir to use
8-
// - [ ] Date to start syncing from
8+
// - [x] Date to start syncing from
99

1010
#[macro_use]
1111
extern crate log;
@@ -60,7 +60,29 @@ struct Opts {
6060
enum Commands {
6161
/// Daemon subcommand
6262
/// Starts aw-sync as a daemon, which will sync every 5 minutes.
63-
Daemon {},
63+
Daemon {
64+
/// Use advanced sync mode
65+
/// (automatically enabled when any advanced options are provided)
66+
#[clap(long)]
67+
advanced: bool,
68+
69+
/// Date to start syncing from.
70+
/// If not specified, start from beginning.
71+
/// Format: YYYY-MM-DD
72+
#[clap(long, value_parser=parse_start_date)]
73+
start_date: Option<DateTime<Utc>>,
74+
75+
/// Specify buckets to sync using a comma-separated list.
76+
/// If not specified, all buckets will be synced.
77+
#[clap(long, value_parser=parse_list)]
78+
buckets: Option<Vec<String>>,
79+
80+
/// Full path to sync db file
81+
/// Useful for syncing buckets from a specific db file in the sync directory.
82+
/// Must be a valid absolute path to a file in the sync directory.
83+
#[clap(long)]
84+
sync_db: Option<PathBuf>,
85+
},
6486

6587
/// Sync subcommand (basic)
6688
///
@@ -140,11 +162,30 @@ fn main() -> Result<(), Box<dyn Error>> {
140162
let client = AwClient::new(&opts.host, port, "aw-sync")?;
141163

142164
// if opts.command is None, then we're using the default subcommand (Sync)
143-
match opts.command.unwrap_or(Commands::Daemon {}) {
165+
match opts.command.unwrap_or(Commands::Daemon {
166+
advanced: false,
167+
start_date: None,
168+
buckets: None,
169+
sync_db: None,
170+
}) {
144171
// Start daemon
145-
Commands::Daemon {} => {
172+
Commands::Daemon {
173+
advanced,
174+
start_date,
175+
buckets,
176+
sync_db,
177+
} => {
146178
info!("Starting daemon...");
147-
daemon(&client)?;
179+
// Infer advanced mode if any advanced options are provided
180+
let use_advanced = start_date.is_some() || buckets.is_some() || sync_db.is_some();
181+
182+
if use_advanced {
183+
info!("Using advanced sync mode");
184+
daemon_advanced(&client, start_date, buckets, sync_db)?;
185+
} else {
186+
info!("Using basic sync mode");
187+
daemon(&client)?;
188+
}
148189
}
149190
// Perform basic sync
150191
Commands::Sync { host } => {
@@ -167,6 +208,7 @@ fn main() -> Result<(), Box<dyn Error>> {
167208
sync_wrapper::push(&client)?
168209
}
169210
// Perform two-way sync
211+
// Only way to sync non-window buckets
170212
Commands::SyncAdvanced {
171213
start_date,
172214
buckets,
@@ -247,3 +289,56 @@ fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
247289

248290
Ok(())
249291
}
292+
293+
fn daemon_advanced(
294+
client: &AwClient,
295+
start_date: Option<DateTime<Utc>>,
296+
buckets: Option<Vec<String>>,
297+
sync_db: Option<PathBuf>,
298+
) -> Result<(), Box<dyn Error>> {
299+
let (tx, rx) = channel();
300+
301+
ctrlc::set_handler(move || {
302+
let _ = tx.send(());
303+
})?;
304+
305+
let sync_dir = dirs::get_sync_dir()?;
306+
if let Some(db_path) = &sync_db {
307+
info!("Using sync db: {}", &db_path.display());
308+
309+
if !db_path.is_absolute() {
310+
Err("Sync db path must be absolute")?
311+
}
312+
if !db_path.starts_with(&sync_dir) {
313+
Err("Sync db path must be in sync directory")?
314+
}
315+
}
316+
317+
let sync_spec = sync::SyncSpec {
318+
path: sync_dir,
319+
path_db: sync_db,
320+
buckets,
321+
start: start_date,
322+
};
323+
324+
loop {
325+
if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) {
326+
error!("Error during sync cycle: {}", e);
327+
return Err(e);
328+
}
329+
330+
info!("Advanced sync pass done, sleeping for 5 minutes");
331+
332+
match rx.recv_timeout(Duration::from_secs(300)) {
333+
Ok(_) | Err(RecvTimeoutError::Disconnected) => {
334+
info!("Termination signal received, shutting down.");
335+
break;
336+
}
337+
Err(RecvTimeoutError::Timeout) => {
338+
// Continue the loop if the timeout occurs
339+
}
340+
}
341+
}
342+
343+
Ok(())
344+
}

0 commit comments

Comments
 (0)