Skip to content

Commit 0ba4d07

Browse files
committed
feat(sync):daemon support for advanced sync
1 parent c640979 commit 0ba4d07

File tree

5 files changed

+173
-86
lines changed

5 files changed

+173
-86
lines changed

aw-sync/README.md

+18-1
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,27 @@ Was originally prototyped as a PR to aw-server: https://github.com/ActivityWatch
1515
This will start a daemon which pulls and pushes events with the sync directory (`~/ActivityWatchSync` by default) every 5 minutes:
1616

1717
```sh
18+
# Basic sync daemon (syncs all buckets every 5 minutes)
1819
aw-sync
20+
21+
# Same as above
22+
aw-sync daemon
23+
24+
# Sync daemon with specific buckets only
25+
aw-sync daemon --buckets "aw-watcher-window,aw-watcher-afk" --start-date "2024-01-01"
26+
27+
# Sync all buckets once and exit
28+
aw-sync sync --start-date "2024-01-01"
1929
```
2030

21-
For more options, see `aw-sync --help`.
31+
For more options, see `aw-sync --help`. Some notable options:
32+
- `--buckets`: Specify which buckets to sync (comma-separated). By default, all buckets are synced.
33+
- Use `--buckets "bucket1,bucket2"` to sync specific buckets
34+
- Use `--buckets "*"` to explicitly sync all buckets
35+
- Not specifying this option syncs all buckets by default
36+
- `--start-date`: Only sync events after this date (YYYY-MM-DD)
37+
- `--sync-db`: Specify a specific database file in the sync directory
38+
- `--mode`: Choose sync mode: "push", "pull", or "both" (default: "both")
2239

2340
### Setting up sync
2441

aw-sync/src/main.rs

+144-74
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
// - [x] Setup local sync bucket
33
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
44
// - [x] Import buckets and sync events from remotes
5-
// - [ ] Add CLI arguments
5+
// - [x] Add CLI arguments
66
// - [x] For which local server to use
77
// - [x] For which sync dir to use
8-
// - [ ] Date to start syncing from
8+
// - [x] Date to start syncing from
99

1010
#[macro_use]
1111
extern crate log;
@@ -60,35 +60,47 @@ struct Opts {
6060
enum Commands {
6161
/// Daemon subcommand
6262
/// Starts aw-sync as a daemon, which will sync every 5 minutes.
63-
Daemon {},
63+
Daemon {
64+
/// Date to start syncing from.
65+
/// If not specified, start from beginning.
66+
/// Format: YYYY-MM-DD
67+
#[clap(long, value_parser=parse_start_date)]
68+
start_date: Option<DateTime<Utc>>,
69+
70+
/// Specify buckets to sync using a comma-separated list.
71+
/// Use "*" to sync all buckets.
72+
/// By default, all buckets are synced.
73+
#[clap(long)]
74+
buckets: Option<String>,
75+
76+
/// Full path to sync db file
77+
/// Useful for syncing buckets from a specific db file in the sync directory.
78+
/// Must be a valid absolute path to a file in the sync directory.
79+
#[clap(long)]
80+
sync_db: Option<PathBuf>,
81+
},
6482

65-
/// Sync subcommand (basic)
83+
/// Sync subcommand
6684
///
67-
/// Pulls remote buckets then pushes local buckets.
85+
/// Syncs data between local aw-server and sync directory.
86+
/// First pulls remote buckets from the sync directory to the local aw-server.
87+
/// Then pushes local buckets from the aw-server to the local sync directory.
6888
Sync {
6989
/// Host(s) to pull from, comma separated. Will pull from all hosts if not specified.
7090
#[clap(long, value_parser=parse_list)]
7191
host: Option<Vec<String>>,
72-
},
7392

74-
/// Sync subcommand (advanced)
75-
///
76-
/// Pulls remote buckets then pushes local buckets.
77-
/// First pulls remote buckets in the sync directory to the local aw-server.
78-
/// Then pushes local buckets from the aw-server to the local sync directory.
79-
#[clap(arg_required_else_help = true)]
80-
SyncAdvanced {
8193
/// Date to start syncing from.
8294
/// If not specified, start from beginning.
83-
/// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
8495
/// Format: YYYY-MM-DD
8596
#[clap(long, value_parser=parse_start_date)]
8697
start_date: Option<DateTime<Utc>>,
8798

8899
/// Specify buckets to sync using a comma-separated list.
89-
/// If not specified, all buckets will be synced.
90-
#[clap(long, value_parser=parse_list)]
91-
buckets: Option<Vec<String>>,
100+
/// Use "*" to sync all buckets.
101+
/// By default, all buckets are synced.
102+
#[clap(long)]
103+
buckets: Option<String>,
92104

93105
/// Mode to sync in. Can be "push", "pull", or "both".
94106
/// Defaults to "both".
@@ -111,6 +123,18 @@ fn parse_start_date(arg: &str) -> Result<DateTime<Utc>, chrono::ParseError> {
111123
}
112124

113125
fn parse_list(arg: &str) -> Result<Vec<String>, clap::Error> {
126+
// If the argument is empty or just whitespace, return an empty Vec
127+
// This handles the case when --buckets is used without a value
128+
if arg.trim().is_empty() {
129+
return Ok(vec![]);
130+
}
131+
132+
// Special case: if arg is just "*", return a vector with a single "*" string
133+
if arg.trim() == "*" {
134+
return Ok(vec!["*".to_string()]);
135+
}
136+
137+
// Otherwise, split by comma as usual
114138
Ok(arg.split(',').map(|s| s.to_string()).collect())
115139
}
116140

@@ -139,60 +163,94 @@ fn main() -> Result<(), Box<dyn Error>> {
139163

140164
let client = AwClient::new(&opts.host, port, "aw-sync")?;
141165

142-
// if opts.command is None, then we're using the default subcommand (Sync)
143-
match opts.command.unwrap_or(Commands::Daemon {}) {
166+
// if opts.command is None, then we're using the default subcommand (Daemon)
167+
match opts.command.unwrap_or(Commands::Daemon {
168+
start_date: None,
169+
buckets: None,
170+
sync_db: None,
171+
}) {
144172
// Start daemon
145-
Commands::Daemon {} => {
173+
Commands::Daemon {
174+
start_date,
175+
buckets,
176+
sync_db,
177+
} => {
146178
info!("Starting daemon...");
147-
daemon(&client)?;
148-
}
149-
// Perform basic sync
150-
Commands::Sync { host } => {
151-
// Pull
152-
match host {
153-
Some(hosts) => {
154-
for host in hosts.iter() {
155-
info!("Pulling from host: {}", host);
156-
sync_wrapper::pull(host, &client)?;
157-
}
158-
}
159-
None => {
160-
info!("Pulling from all hosts");
161-
sync_wrapper::pull_all(&client)?;
162-
}
163-
}
164-
165-
// Push
166-
info!("Pushing local data");
167-
sync_wrapper::push(&client)?
179+
180+
// Use an empty vector to sync all buckets for these cases:
181+
// 1. When --buckets '*' is supplied
182+
// 2. When no bucket argument is provided (default)
183+
let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() {
184+
Some(vec![])
185+
} else if let Some(buckets_str) = buckets {
186+
Some(buckets_str.split(',').map(|s| s.to_string()).collect())
187+
} else {
188+
None
189+
};
190+
191+
daemon(&client, start_date, effective_buckets, sync_db)?;
168192
}
169-
// Perform two-way sync
170-
Commands::SyncAdvanced {
193+
// Perform sync
194+
Commands::Sync {
195+
host,
171196
start_date,
172197
buckets,
173198
mode,
174199
sync_db,
175200
} => {
176-
let sync_dir = dirs::get_sync_dir()?;
177-
if let Some(db_path) = &sync_db {
178-
info!("Using sync db: {}", &db_path.display());
179-
180-
if !db_path.is_absolute() {
181-
Err("Sync db path must be absolute")?
182-
}
183-
if !db_path.starts_with(&sync_dir) {
184-
Err("Sync db path must be in sync directory")?
201+
// Use an empty vector to sync all buckets for these cases:
202+
// 1. When --buckets '*' is supplied
203+
// 2. When no bucket argument is provided (default)
204+
let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() {
205+
Some(vec![])
206+
} else if let Some(buckets_str) = buckets {
207+
Some(buckets_str.split(',').map(|s| s.to_string()).collect())
208+
} else {
209+
None
210+
};
211+
212+
// If advanced options are provided, use advanced sync mode
213+
if start_date.is_some() || effective_buckets.is_some() || sync_db.is_some() {
214+
let sync_dir = dirs::get_sync_dir()?;
215+
if let Some(db_path) = &sync_db {
216+
info!("Using sync db: {}", &db_path.display());
217+
218+
if !db_path.is_absolute() {
219+
Err("Sync db path must be absolute")?
220+
}
221+
if !db_path.starts_with(&sync_dir) {
222+
Err("Sync db path must be in sync directory")?
223+
}
185224
}
186-
}
187225

188-
let sync_spec = sync::SyncSpec {
189-
path: sync_dir,
190-
path_db: sync_db,
191-
buckets,
192-
start: start_date,
193-
};
226+
let sync_spec = sync::SyncSpec {
227+
path: sync_dir,
228+
path_db: sync_db,
229+
buckets: effective_buckets,
230+
start: start_date,
231+
};
232+
233+
sync::sync_run(&client, &sync_spec, mode)?
234+
} else {
235+
// Simple host-based sync mode (backwards compatibility)
236+
// Pull
237+
match host {
238+
Some(hosts) => {
239+
for host in hosts.iter() {
240+
info!("Pulling from host: {}", host);
241+
sync_wrapper::pull(host, &client)?;
242+
}
243+
}
244+
None => {
245+
info!("Pulling from all hosts");
246+
sync_wrapper::pull_all(&client)?;
247+
}
248+
}
194249

195-
sync::sync_run(&client, &sync_spec, mode)?
250+
// Push
251+
info!("Pushing local data");
252+
sync_wrapper::push(&client)?
253+
}
196254
}
197255

198256
// List all buckets
@@ -207,23 +265,45 @@ fn main() -> Result<(), Box<dyn Error>> {
207265
Ok(())
208266
}
209267

210-
fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
268+
fn daemon(
269+
client: &AwClient,
270+
start_date: Option<DateTime<Utc>>,
271+
buckets: Option<Vec<String>>,
272+
sync_db: Option<PathBuf>,
273+
) -> Result<(), Box<dyn Error>> {
211274
let (tx, rx) = channel();
212275

213276
ctrlc::set_handler(move || {
214277
let _ = tx.send(());
215278
})?;
216279

280+
let sync_dir = dirs::get_sync_dir()?;
281+
if let Some(db_path) = &sync_db {
282+
info!("Using sync db: {}", &db_path.display());
283+
284+
if !db_path.is_absolute() {
285+
Err("Sync db path must be absolute")?
286+
}
287+
if !db_path.starts_with(&sync_dir) {
288+
Err("Sync db path must be in sync directory")?
289+
}
290+
}
291+
292+
let sync_spec = sync::SyncSpec {
293+
path: sync_dir,
294+
buckets,
295+
path_db: sync_db,
296+
start: start_date,
297+
};
298+
217299
loop {
218-
if let Err(e) = daemon_sync_cycle(client) {
300+
if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) {
219301
error!("Error during sync cycle: {}", e);
220-
// Re-throw the error
221302
return Err(e);
222303
}
223304

224305
info!("Sync pass done, sleeping for 5 minutes");
225306

226-
// Wait for either the sleep duration or a termination signal
227307
match rx.recv_timeout(Duration::from_secs(300)) {
228308
Ok(_) | Err(RecvTimeoutError::Disconnected) => {
229309
info!("Termination signal received, shutting down.");
@@ -237,13 +317,3 @@ fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
237317

238318
Ok(())
239319
}
240-
241-
fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
242-
info!("Pulling from all hosts");
243-
sync_wrapper::pull_all(client)?;
244-
245-
info!("Pushing local data");
246-
sync_wrapper::push(client)?;
247-
248-
Ok(())
249-
}

aw-sync/src/sync.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,18 @@ pub fn sync_datastores(
247247
.get_buckets()
248248
.unwrap()
249249
.iter_mut()
250-
// If buckets vec isn't empty, filter out buckets not in the buckets vec
250+
// Only filter buckets if specific bucket IDs are provided
251251
.filter(|tup| {
252252
let bucket = &tup.1;
253253
if let Some(buckets) = &sync_spec.buckets {
254-
buckets.iter().any(|b_id| b_id == &bucket.id)
254+
// If "*" is in the buckets list or no buckets specified, sync all buckets
255+
if buckets.iter().any(|b_id| b_id == "*") || buckets.is_empty() {
256+
true
257+
} else {
258+
buckets.iter().any(|b_id| b_id == &bucket.id)
259+
}
255260
} else {
261+
// By default, sync all buckets
256262
true
257263
}
258264
})

aw-sync/src/sync_wrapper.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
4848
let sync_spec = SyncSpec {
4949
path: sync_dir.clone(),
5050
path_db: Some(db.path().clone()),
51-
buckets: Some(vec![
52-
format!("aw-watcher-window_{}", host),
53-
format!("aw-watcher-afk_{}", host),
54-
]),
51+
buckets: None, // Sync all buckets by default
5552
start: None,
5653
};
5754
sync_run(client, &sync_spec, SyncMode::Pull)?;
@@ -67,10 +64,7 @@ pub fn push(client: &AwClient) -> Result<(), Box<dyn Error>> {
6764
let sync_spec = SyncSpec {
6865
path: sync_dir,
6966
path_db: None,
70-
buckets: Some(vec![
71-
format!("aw-watcher-window_{}", client.hostname),
72-
format!("aw-watcher-afk_{}", client.hostname),
73-
]),
67+
buckets: None, // Sync all buckets by default
7468
start: None,
7569
};
7670
sync_run(client, &sync_spec, SyncMode::Push)?;

0 commit comments

Comments
 (0)