@@ -9,6 +9,7 @@ const config = require('../../config');
9
9
const nb_native = require ( '../util/nb_native' ) ;
10
10
const NsfsObjectSDK = require ( '../sdk/nsfs_object_sdk' ) ;
11
11
const ManageCLIError = require ( './manage_nsfs_cli_errors' ) . ManageCLIError ;
12
+ const path = require ( 'path' ) ;
12
13
const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require ( './manage_nsfs_cli_utils' ) ;
13
14
14
15
// TODO:
@@ -22,21 +23,23 @@ const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./
22
23
* @param {import('../sdk/config_fs').ConfigFS } config_fs
23
24
* @returns {Promise<Void> }
24
25
*/
25
- async function run_lifecycle ( config_fs ) {
26
+ async function run_lifecycle ( config_fs , disable_service_validation ) {
26
27
const options = { silent_if_missing : true } ;
27
28
const system_json = await config_fs . get_system_config_file ( options ) ;
28
- await throw_if_noobaa_not_active ( config_fs , system_json ) ;
29
+ if ( ! disable_service_validation ) await throw_if_noobaa_not_active ( config_fs , system_json ) ;
29
30
30
31
const bucket_names = await config_fs . list_buckets ( ) ;
31
- const concurrency = 10 ; // TODO - think about it
32
+ const concurrency = 10 ; // TODO - think about it
32
33
await P . map_with_concurrency ( concurrency , bucket_names , async bucket_name => {
33
34
const bucket_json = await config_fs . get_bucket_by_name ( bucket_name , options ) ;
34
35
const account = { email : '' , nsfs_account_config : config_fs . fs_context , access_keys : [ ] } ;
35
36
const object_sdk = new NsfsObjectSDK ( '' , config_fs , account , bucket_json . versioning , config_fs . config_root , system_json ) ;
37
+ //TODO temporary - need to check if we want to use a real account
38
+ object_sdk . _simple_load_requesting_account ( ) ;
36
39
await P . all ( _ . map ( bucket_json . lifecycle_configuration_rules ,
37
40
async ( lifecycle_rule , j ) => {
38
41
dbg . log0 ( 'NC LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:' , bucket_json . name , 'rule' , lifecycle_rule , 'j' , j ) ;
39
- return handle_bucket_rule ( lifecycle_rule , j , bucket_json , object_sdk ) ;
42
+ return await handle_bucket_rule ( config_fs , lifecycle_rule , j , bucket_json , object_sdk ) ;
40
43
}
41
44
) ) ;
42
45
} ) ;
@@ -60,11 +63,84 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
60
63
}
61
64
}
62
65
66
+ /**
67
+ * get file time since last modified in days
68
+ * @param {nb.NativeFSStats } stat
69
+ */
70
+ function _get_file_age_days ( stat ) {
71
+ //TODO how much do we care about rounding errors? (it is by days after all)
72
+ return ( Date . now ( ) - Number ( stat . mtimeNsBigint ) / 1e6 ) / 24 / 60 / 60 / 1000 ;
73
+ }
74
+
75
+ /**
76
+ * checks if tag query_tag is in the list tag_set
77
+ * @param {Object } query_tag
78
+ * @param {Array<Object> } tag_set
79
+ */
80
+ function _list_contain_tag ( query_tag , tag_set ) {
81
+ for ( const t of tag_set ) {
82
+ if ( t . key === query_tag . key && t . value === query_tag . value ) return true ;
83
+ }
84
+ return false ;
85
+ }
86
+
87
+ /**
88
+ * checks if object has all the tags in filter_tags
89
+ * @param {Object } object_info
90
+ * @param {Array<Object> } filter_tags
91
+ * @returns
92
+ */
93
+ function _file_contain_tags ( object_info , filter_tags ) {
94
+ if ( object_info . tags === undefined ) return false ;
95
+ for ( const tag of filter_tags ) {
96
+ if ( ! _list_contain_tag ( tag , object_info . tags ) ) {
97
+ return false ;
98
+ }
99
+ }
100
+ return true ;
101
+ }
102
+
103
+ /**
104
+ * @param {* } create_params_parsed
105
+ * @param {nb.NativeFSStats } stat
106
+ */
107
+ function _get_lifecycle_object_info_for_mpu ( create_params_parsed , stat ) {
108
+ return {
109
+ key : create_params_parsed . key ,
110
+ age : _get_file_age_days ( stat ) ,
111
+ tags : create_params_parsed . tagging ,
112
+ } ;
113
+ }
114
+
115
+
116
+ /**
117
+ * @typedef {{
118
+ * filter: Object
119
+ * expiration: Number
120
+ * }} filter_params
121
+ *
122
+ * @param {filter_params } params
123
+ * @returns
124
+ */
125
+ function _build_lifecycle_filter ( params ) {
126
+ /**
127
+ * @param {Object } object_info
128
+ */
129
+ return function ( object_info ) {
130
+ if ( params . filter ?. prefix && ! object_info . key . startsWith ( params . filter . prefix ) ) return false ;
131
+ if ( params . expiration && object_info . age < params . expiration ) return false ;
132
+ if ( params . filter ?. tags && ! _file_contain_tags ( object_info , params . filter . tags ) ) return false ;
133
+ if ( params . filter ?. object_size_greater_than && object_info . size < params . filter . object_size_greater_than ) return false ;
134
+ if ( params . filter ?. object_size_less_than && object_info . size > params . filter . object_size_less_than ) return false ;
135
+ return true ;
136
+ } ;
137
+ }
138
+
63
139
/**
64
140
* handle_bucket_rule processes the lifecycle rule for a bucket
65
- * @param {* } lifecycle_rule
66
- * @param {* } j
67
- * @param {Object } bucket_json
141
+ * @param {* } lifecycle_rule
142
+ * @param {* } j
143
+ * @param {Object } bucket_json
68
144
* @param {Object } object_sdk
69
145
*/
70
146
async function handle_bucket_rule ( config_fs , lifecycle_rule , j , bucket_json , object_sdk ) {
@@ -73,22 +149,26 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj
73
149
const should_process_lifecycle_rule = validate_rule_enabled ( lifecycle_rule , bucket_json , now ) ;
74
150
if ( ! should_process_lifecycle_rule ) return ;
75
151
dbg . log0 ( 'LIFECYCLE PROCESSING bucket:' , bucket_json . name , '(bucket id:' , bucket_json . _id , ') rule' , util . inspect ( lifecycle_rule ) ) ;
76
- const delete_candidates = await get_delete_candidates ( bucket_json , lifecycle_rule ) ;
152
+ const candidates = await get_delete_candidates ( bucket_json , lifecycle_rule , object_sdk , config_fs . fs_context ) ;
77
153
const delete_objects_reply = await object_sdk . delete_multiple_objects ( {
78
154
bucket : bucket_json . name ,
79
- objects : delete_candidates // probably need to convert to the format expected by delete_multiple_objects
155
+ objects : candidates . delete_candidates // probably need to convert to the format expected by delete_multiple_objects
156
+ } ) ;
157
+ await candidates . abort_mpus ?. forEach ( async element => {
158
+ await object_sdk . abort_object_upload ( element ) ;
80
159
} ) ;
81
160
// TODO - implement notifications for the deleted objects
82
161
await update_lifecycle_rules_last_sync ( config_fs , bucket_json , j , delete_objects_reply . num_objects_deleted ) ;
83
162
}
84
163
85
164
/**
86
165
* get_delete_candidates gets the delete candidates for the lifecycle rule
87
- * @param {Object } bucket_json
88
- * @param {* } lifecycle_rule
166
+ * @param {Object } bucket_json
167
+ * @param {* } lifecycle_rule
89
168
*/
90
- async function get_delete_candidates ( bucket_json , lifecycle_rule ) {
169
+ async function get_delete_candidates ( bucket_json , lifecycle_rule , object_sdk , fs_context ) {
91
170
// let reply_objects = []; // TODO: needed for the notification log file
171
+ const candidates = { delete_candidates : [ ] } ;
92
172
if ( lifecycle_rule . expiration ) {
93
173
await get_candidates_by_expiration_rule ( lifecycle_rule , bucket_json ) ;
94
174
if ( lifecycle_rule . expiration . days || lifecycle_rule . expiration . expired_object_delete_marker ) {
@@ -99,8 +179,10 @@ async function get_delete_candidates(bucket_json, lifecycle_rule) {
99
179
await get_candidates_by_noncurrent_version_expiration_rule ( lifecycle_rule , bucket_json ) ;
100
180
}
101
181
if ( lifecycle_rule . abort_incomplete_multipart_upload ) {
102
- await get_candidates_by_abort_incomplete_multipart_upload_rule ( lifecycle_rule , bucket_json ) ;
182
+ candidates . abort_mpus = await get_candidates_by_abort_incomplete_multipart_upload_rule (
183
+ lifecycle_rule , bucket_json , object_sdk , fs_context ) ;
103
184
}
185
+ return candidates ;
104
186
}
105
187
106
188
/**
@@ -170,20 +252,53 @@ async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, b
170
252
* TODO:
171
253
* POSIX - need to support both noncurrent_days and newer_noncurrent_versions
172
254
* GPFS - implement noncurrent_days using GPFS ILM policy as an optimization
173
- * @param {* } lifecycle_rule
174
- * @param {Object } bucket_json
255
+ * @param {* } lifecycle_rule
256
+ * @param {Object } bucket_json
175
257
*/
176
258
async function get_candidates_by_noncurrent_version_expiration_rule ( lifecycle_rule , bucket_json ) {
177
259
// TODO - implement
178
260
}
179
261
180
262
/**
181
263
* get_candidates_by_abort_incomplete_multipart_upload_rule processes the abort incomplete multipart upload rule
182
- * @param {* } lifecycle_rule
183
- * @param {Object } bucket_json
264
+ * @param {* } lifecycle_rule
265
+ * @param {Object } bucket_json
184
266
*/
185
- async function get_candidates_by_abort_incomplete_multipart_upload_rule ( lifecycle_rule , bucket_json ) {
186
- // TODO - implement
267
+ async function get_candidates_by_abort_incomplete_multipart_upload_rule ( lifecycle_rule , bucket_json , object_sdk , fs_context ) {
268
+ const nsfs = await object_sdk . _get_bucket_namespace ( bucket_json . name ) ;
269
+ const mpu_path = nsfs . _mpu_root_path ( ) ;
270
+ const filter = lifecycle_rule . filter ;
271
+ const expiration = lifecycle_rule . abort_incomplete_multipart_upload . days_after_initiation ;
272
+ const res = [ ] ;
273
+
274
+ const filter_func = _build_lifecycle_filter ( { filter, expiration} ) ;
275
+ let dir_handle ;
276
+ //TODO this is almost identical to list_uploads except for error handling and support for pagination. should modify list-upload and use it in here instead
277
+ try {
278
+ dir_handle = await nb_native ( ) . fs . opendir ( fs_context , mpu_path ) ;
279
+ } catch ( err ) {
280
+ if ( err . code !== 'ENOENT' ) throw err ;
281
+ return ;
282
+ }
283
+ for ( ; ; ) {
284
+ try {
285
+ const dir_entry = await dir_handle . read ( fs_context ) ;
286
+ if ( ! dir_entry ) break ;
287
+ const create_path = path . join ( mpu_path , dir_entry . name , 'create_object_upload' ) ;
288
+ const { data : create_params_buffer } = await nb_native ( ) . fs . readFile ( fs_context , create_path ) ;
289
+ const create_params_parsed = JSON . parse ( create_params_buffer . toString ( ) ) ;
290
+ const stat = await nb_native ( ) . fs . stat ( fs_context , path . join ( mpu_path , dir_entry . name ) ) ;
291
+ const object_lifecycle_info = _get_lifecycle_object_info_for_mpu ( create_params_parsed , stat ) ;
292
+ if ( filter_func ( object_lifecycle_info ) ) {
293
+ res . push ( { obj_id : dir_entry . name , key : create_params_parsed . key , bucket : bucket_json . name } ) ;
294
+ }
295
+ } catch ( err ) {
296
+ if ( err . code !== 'ENOENT' || err . code !== 'ENOTDIR' ) throw err ;
297
+ }
298
+ }
299
+ await dir_handle . close ( fs_context ) ;
300
+ dir_handle = null ;
301
+ return res ;
187
302
}
188
303
189
304
/**
0 commit comments