2
2
// - [x] Setup local sync bucket
3
3
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
4
4
// - [x] Import buckets and sync events from remotes
5
- // - [ ] Add CLI arguments
5
+ // - [x ] Add CLI arguments
6
6
// - [x] For which local server to use
7
7
// - [x] For which sync dir to use
8
- // - [ ] Date to start syncing from
8
+ // - [x ] Date to start syncing from
9
9
10
10
#[ macro_use]
11
11
extern crate log;
@@ -60,35 +60,47 @@ struct Opts {
60
60
enum Commands {
61
61
/// Daemon subcommand
62
62
/// Starts aw-sync as a daemon, which will sync every 5 minutes.
63
- Daemon { } ,
63
+ Daemon {
64
+ /// Date to start syncing from.
65
+ /// If not specified, start from beginning.
66
+ /// Format: YYYY-MM-DD
67
+ #[ clap( long, value_parser=parse_start_date) ]
68
+ start_date : Option < DateTime < Utc > > ,
69
+
70
+ /// Specify buckets to sync using a comma-separated list.
71
+ /// Use "*" to sync all buckets.
72
+ /// By default, all buckets are synced.
73
+ #[ clap( long) ]
74
+ buckets : Option < String > ,
75
+
76
+ /// Full path to sync db file
77
+ /// Useful for syncing buckets from a specific db file in the sync directory.
78
+ /// Must be a valid absolute path to a file in the sync directory.
79
+ #[ clap( long) ]
80
+ sync_db : Option < PathBuf > ,
81
+ } ,
64
82
65
- /// Sync subcommand (basic)
83
+ /// Sync subcommand
66
84
///
67
- /// Pulls remote buckets then pushes local buckets.
85
+ /// Syncs data between local aw-server and sync directory.
86
+ /// First pulls remote buckets from the sync directory to the local aw-server.
87
+ /// Then pushes local buckets from the aw-server to the local sync directory.
68
88
Sync {
69
89
/// Host(s) to pull from, comma separated. Will pull from all hosts if not specified.
70
90
#[ clap( long, value_parser=parse_list) ]
71
91
host : Option < Vec < String > > ,
72
- } ,
73
92
74
- /// Sync subcommand (advanced)
75
- ///
76
- /// Pulls remote buckets then pushes local buckets.
77
- /// First pulls remote buckets in the sync directory to the local aw-server.
78
- /// Then pushes local buckets from the aw-server to the local sync directory.
79
- #[ clap( arg_required_else_help = true ) ]
80
- SyncAdvanced {
81
93
/// Date to start syncing from.
82
94
/// If not specified, start from beginning.
83
- /// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
84
95
/// Format: YYYY-MM-DD
85
96
#[ clap( long, value_parser=parse_start_date) ]
86
97
start_date : Option < DateTime < Utc > > ,
87
98
88
99
/// Specify buckets to sync using a comma-separated list.
89
- /// If not specified, all buckets will be synced.
90
- #[ clap( long, value_parser=parse_list) ]
91
- buckets : Option < Vec < String > > ,
100
+ /// Use "*" to sync all buckets.
101
+ /// By default, all buckets are synced.
102
+ #[ clap( long) ]
103
+ buckets : Option < String > ,
92
104
93
105
/// Mode to sync in. Can be "push", "pull", or "both".
94
106
/// Defaults to "both".
@@ -111,6 +123,18 @@ fn parse_start_date(arg: &str) -> Result<DateTime<Utc>, chrono::ParseError> {
111
123
}
112
124
113
125
fn parse_list ( arg : & str ) -> Result < Vec < String > , clap:: Error > {
126
+ // If the argument is empty or just whitespace, return an empty Vec
127
+ // This handles the case when --buckets is used without a value
128
+ if arg. trim ( ) . is_empty ( ) {
129
+ return Ok ( vec ! [ ] ) ;
130
+ }
131
+
132
+ // Special case: if arg is just "*", return a vector with a single "*" string
133
+ if arg. trim ( ) == "*" {
134
+ return Ok ( vec ! [ "*" . to_string( ) ] ) ;
135
+ }
136
+
137
+ // Otherwise, split by comma as usual
114
138
Ok ( arg. split ( ',' ) . map ( |s| s. to_string ( ) ) . collect ( ) )
115
139
}
116
140
@@ -139,60 +163,94 @@ fn main() -> Result<(), Box<dyn Error>> {
139
163
140
164
let client = AwClient :: new ( & opts. host , port, "aw-sync" ) ?;
141
165
142
- // if opts.command is None, then we're using the default subcommand (Sync)
143
- match opts. command . unwrap_or ( Commands :: Daemon { } ) {
166
+ // if opts.command is None, then we're using the default subcommand (Daemon)
167
+ match opts. command . unwrap_or ( Commands :: Daemon {
168
+ start_date : None ,
169
+ buckets : None ,
170
+ sync_db : None ,
171
+ } ) {
144
172
// Start daemon
145
- Commands :: Daemon { } => {
173
+ Commands :: Daemon {
174
+ start_date,
175
+ buckets,
176
+ sync_db,
177
+ } => {
146
178
info ! ( "Starting daemon..." ) ;
147
- daemon ( & client) ?;
148
- }
149
- // Perform basic sync
150
- Commands :: Sync { host } => {
151
- // Pull
152
- match host {
153
- Some ( hosts) => {
154
- for host in hosts. iter ( ) {
155
- info ! ( "Pulling from host: {}" , host) ;
156
- sync_wrapper:: pull ( host, & client) ?;
157
- }
158
- }
159
- None => {
160
- info ! ( "Pulling from all hosts" ) ;
161
- sync_wrapper:: pull_all ( & client) ?;
162
- }
163
- }
164
179
165
- // Push
166
- info ! ( "Pushing local data" ) ;
167
- sync_wrapper:: push ( & client) ?
180
+ // Use an empty vector to sync all buckets for these cases:
181
+ // 1. When --buckets '*' is supplied
182
+ // 2. When no bucket argument is provided (default)
183
+ let effective_buckets = if buckets. as_deref ( ) == Some ( "*" ) || buckets. is_none ( ) {
184
+ Some ( vec ! [ ] )
185
+ } else if let Some ( buckets_str) = buckets {
186
+ Some ( buckets_str. split ( ',' ) . map ( |s| s. to_string ( ) ) . collect ( ) )
187
+ } else {
188
+ None
189
+ } ;
190
+
191
+ daemon ( & client, start_date, effective_buckets, sync_db) ?;
168
192
}
169
- // Perform two-way sync
170
- Commands :: SyncAdvanced {
193
+ // Perform sync
194
+ Commands :: Sync {
195
+ host,
171
196
start_date,
172
197
buckets,
173
198
mode,
174
199
sync_db,
175
200
} => {
176
- let sync_dir = dirs:: get_sync_dir ( ) ?;
177
- if let Some ( db_path) = & sync_db {
178
- info ! ( "Using sync db: {}" , & db_path. display( ) ) ;
201
+ // Use an empty vector to sync all buckets for these cases:
202
+ // 1. When --buckets '*' is supplied
203
+ // 2. When no bucket argument is provided (default)
204
+ let effective_buckets = if buckets. as_deref ( ) == Some ( "*" ) || buckets. is_none ( ) {
205
+ Some ( vec ! [ ] )
206
+ } else if let Some ( buckets_str) = buckets {
207
+ Some ( buckets_str. split ( ',' ) . map ( |s| s. to_string ( ) ) . collect ( ) )
208
+ } else {
209
+ None
210
+ } ;
179
211
180
- if !db_path. is_absolute ( ) {
181
- Err ( "Sync db path must be absolute" ) ?
182
- }
183
- if !db_path. starts_with ( & sync_dir) {
184
- Err ( "Sync db path must be in sync directory" ) ?
212
+ // If advanced options are provided, use advanced sync mode
213
+ if start_date. is_some ( ) || effective_buckets. is_some ( ) || sync_db. is_some ( ) {
214
+ let sync_dir = dirs:: get_sync_dir ( ) ?;
215
+ if let Some ( db_path) = & sync_db {
216
+ info ! ( "Using sync db: {}" , & db_path. display( ) ) ;
217
+
218
+ if !db_path. is_absolute ( ) {
219
+ Err ( "Sync db path must be absolute" ) ?
220
+ }
221
+ if !db_path. starts_with ( & sync_dir) {
222
+ Err ( "Sync db path must be in sync directory" ) ?
223
+ }
185
224
}
186
- }
187
225
188
- let sync_spec = sync:: SyncSpec {
189
- path : sync_dir,
190
- path_db : sync_db,
191
- buckets,
192
- start : start_date,
193
- } ;
226
+ let sync_spec = sync:: SyncSpec {
227
+ path : sync_dir,
228
+ path_db : sync_db,
229
+ buckets : effective_buckets,
230
+ start : start_date,
231
+ } ;
232
+
233
+ sync:: sync_run ( & client, & sync_spec, mode) ?
234
+ } else {
235
+ // Simple host-based sync mode (backwards compatibility)
236
+ // Pull
237
+ match host {
238
+ Some ( hosts) => {
239
+ for host in hosts. iter ( ) {
240
+ info ! ( "Pulling from host: {}" , host) ;
241
+ sync_wrapper:: pull ( host, & client) ?;
242
+ }
243
+ }
244
+ None => {
245
+ info ! ( "Pulling from all hosts" ) ;
246
+ sync_wrapper:: pull_all ( & client) ?;
247
+ }
248
+ }
194
249
195
- sync:: sync_run ( & client, & sync_spec, mode) ?
250
+ // Push
251
+ info ! ( "Pushing local data" ) ;
252
+ sync_wrapper:: push ( & client) ?
253
+ }
196
254
}
197
255
198
256
// List all buckets
@@ -207,23 +265,45 @@ fn main() -> Result<(), Box<dyn Error>> {
207
265
Ok ( ( ) )
208
266
}
209
267
210
- fn daemon ( client : & AwClient ) -> Result < ( ) , Box < dyn Error > > {
268
+ fn daemon (
269
+ client : & AwClient ,
270
+ start_date : Option < DateTime < Utc > > ,
271
+ buckets : Option < Vec < String > > ,
272
+ sync_db : Option < PathBuf > ,
273
+ ) -> Result < ( ) , Box < dyn Error > > {
211
274
let ( tx, rx) = channel ( ) ;
212
275
213
276
ctrlc:: set_handler ( move || {
214
277
let _ = tx. send ( ( ) ) ;
215
278
} ) ?;
216
279
280
+ let sync_dir = dirs:: get_sync_dir ( ) ?;
281
+ if let Some ( db_path) = & sync_db {
282
+ info ! ( "Using sync db: {}" , & db_path. display( ) ) ;
283
+
284
+ if !db_path. is_absolute ( ) {
285
+ Err ( "Sync db path must be absolute" ) ?
286
+ }
287
+ if !db_path. starts_with ( & sync_dir) {
288
+ Err ( "Sync db path must be in sync directory" ) ?
289
+ }
290
+ }
291
+
292
+ let sync_spec = sync:: SyncSpec {
293
+ path : sync_dir,
294
+ buckets,
295
+ path_db : sync_db,
296
+ start : start_date,
297
+ } ;
298
+
217
299
loop {
218
- if let Err ( e) = daemon_sync_cycle ( client) {
300
+ if let Err ( e) = sync :: sync_run ( client, & sync_spec , sync :: SyncMode :: Both ) {
219
301
error ! ( "Error during sync cycle: {}" , e) ;
220
- // Re-throw the error
221
302
return Err ( e) ;
222
303
}
223
304
224
305
info ! ( "Sync pass done, sleeping for 5 minutes" ) ;
225
306
226
- // Wait for either the sleep duration or a termination signal
227
307
match rx. recv_timeout ( Duration :: from_secs ( 300 ) ) {
228
308
Ok ( _) | Err ( RecvTimeoutError :: Disconnected ) => {
229
309
info ! ( "Termination signal received, shutting down." ) ;
@@ -237,13 +317,3 @@ fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
237
317
238
318
Ok ( ( ) )
239
319
}
240
-
241
- fn daemon_sync_cycle ( client : & AwClient ) -> Result < ( ) , Box < dyn Error > > {
242
- info ! ( "Pulling from all hosts" ) ;
243
- sync_wrapper:: pull_all ( client) ?;
244
-
245
- info ! ( "Pushing local data" ) ;
246
- sync_wrapper:: push ( client) ?;
247
-
248
- Ok ( ( ) )
249
- }
0 commit comments