diff --git a/src/cmd/manage_nsfs.js b/src/cmd/manage_nsfs.js index 53b8aa0c15..6be5d3a946 100644 --- a/src/cmd/manage_nsfs.js +++ b/src/cmd/manage_nsfs.js @@ -92,7 +92,7 @@ async function main(argv = minimist(process.argv.slice(2))) { } else if (type === TYPES.CONNECTION) { await connection_management(action, user_input); } else if (type === TYPES.LIFECYCLE) { - await lifecycle_management(); + await lifecycle_management(argv); } else { throw_cli_error(ManageCLIError.InvalidType); } @@ -880,8 +880,9 @@ async function list_connections() { * lifecycle_management runs the nc lifecycle management * @returns {Promise} */ -async function lifecycle_management() { - await noobaa_cli_lifecycle.run_lifecycle(config_fs); +async function lifecycle_management(args) { + const disable_service_validation = args.disable_service_validation === 'true'; + await noobaa_cli_lifecycle.run_lifecycle(config_fs, disable_service_validation); } exports.main = main; diff --git a/src/manage_nsfs/manage_nsfs_constants.js b/src/manage_nsfs/manage_nsfs_constants.js index 530fdbb8b9..a7da234b43 100644 --- a/src/manage_nsfs/manage_nsfs_constants.js +++ b/src/manage_nsfs/manage_nsfs_constants.js @@ -96,7 +96,7 @@ const VALID_OPTIONS_CONNECTION = { 'status': new Set(['name', 'decrypt', ...CLI_MUTUAL_OPTIONS]), }; -const VALID_OPTIONS_LIFECYCLE = new Set([...CLI_MUTUAL_OPTIONS]); +const VALID_OPTIONS_LIFECYCLE = new Set(['disable_service_validation', ...CLI_MUTUAL_OPTIONS]); const VALID_OPTIONS_WHITELIST = new Set(['ips', ...CLI_MUTUAL_OPTIONS]); @@ -154,6 +154,8 @@ const OPTION_TYPE = { expected_hosts: 'string', custom_upgrade_scripts_dir: 'string', skip_verification: 'boolean', + // lifecycle options + disable_service_validation: 'string', //connection notification_protocol: 'string', agent_request_object: 'string', diff --git a/src/manage_nsfs/manage_nsfs_validations.js b/src/manage_nsfs/manage_nsfs_validations.js index d6318aac7e..921c251220 100644 --- a/src/manage_nsfs/manage_nsfs_validations.js +++ b/src/manage_nsfs/manage_nsfs_validations.js @@ -165,6 +165,8 @@ function validate_no_extra_options(type, action, input_options, is_options_from_ valid_options = VALID_OPTIONS.notification_options[action]; } else if (type === TYPES.CONNECTION) { valid_options = VALID_OPTIONS.connection_options[action]; + } else if (type === TYPES.LIFECYCLE) { + valid_options = VALID_OPTIONS.lifecycle_options; } else { valid_options = VALID_OPTIONS.whitelist_options; } diff --git a/src/manage_nsfs/nc_lifecycle.js b/src/manage_nsfs/nc_lifecycle.js index f77187ce69..eff309f314 100644 --- a/src/manage_nsfs/nc_lifecycle.js +++ b/src/manage_nsfs/nc_lifecycle.js @@ -9,6 +9,7 @@ const config = require('../../config'); const nb_native = require('../util/nb_native'); const NsfsObjectSDK = require('../sdk/nsfs_object_sdk'); const ManageCLIError = require('./manage_nsfs_cli_errors').ManageCLIError; +const path = require('path'); const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./manage_nsfs_cli_utils'); // TODO: @@ -22,21 +23,23 @@ const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./ * @param {import('../sdk/config_fs').ConfigFS} config_fs * @returns {Promise} */ -async function run_lifecycle(config_fs) { +async function run_lifecycle(config_fs, disable_service_validation) { const options = { silent_if_missing: true }; const system_json = await config_fs.get_system_config_file(options); - await throw_if_noobaa_not_active(config_fs, system_json); + if (!disable_service_validation) await throw_if_noobaa_not_active(config_fs, system_json); const bucket_names = await config_fs.list_buckets(); - const concurrency = 10; // TODO - think about it + const concurrency = 10; // TODO - think about it await P.map_with_concurrency(concurrency, bucket_names, async bucket_name => { const bucket_json = await config_fs.get_bucket_by_name(bucket_name, options); const account = { email: '', nsfs_account_config: config_fs.fs_context, access_keys: [] }; const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json); + //TODO temporary - need to check if we want to use a real account + object_sdk._simple_load_requesting_account(); await P.all(_.map(bucket_json.lifecycle_configuration_rules, async (lifecycle_rule, j) => { dbg.log0('NC LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket_json.name, 'rule', lifecycle_rule, 'j', j); - return handle_bucket_rule(lifecycle_rule, j, bucket_json, object_sdk); + return await handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk); } )); }); @@ -60,11 +63,84 @@ async function throw_if_noobaa_not_active(config_fs, system_json) { } } +/** + * get file time since last modified in days + * @param {nb.NativeFSStats} stat + */ +function _get_file_age_days(stat) { + //TODO how much do we care about rounding errors? (it is by days after all) + return (Date.now() - Number(stat.mtimeNsBigint) / 1e6) / 24 / 60 / 60 / 1000; +} + +/** + * checks if tag query_tag is in the list tag_set + * @param {Object} query_tag + * @param {Array} tag_set + */ +function _list_contain_tag(query_tag, tag_set) { + for (const t of tag_set) { + if (t.key === query_tag.key && t.value === query_tag.value) return true; + } + return false; +} + +/** + * checks if object has all the tags in filter_tags + * @param {Object} object_info + * @param {Array} filter_tags + * @returns + */ +function _file_contain_tags(object_info, filter_tags) { + if (object_info.tags === undefined) return false; + for (const tag of filter_tags) { + if (!_list_contain_tag(tag, object_info.tags)) { + return false; + } + } + return true; +} + +/** + * @param {*} create_params_parsed + * @param {nb.NativeFSStats} stat + */ +function _get_lifecycle_object_info_for_mpu(create_params_parsed, stat) { + return { + key: create_params_parsed.key, + age: _get_file_age_days(stat), + tags: create_params_parsed.tagging, + }; +} + + +/** + * @typedef {{ + * filter: Object + * expiration: Number + * }} filter_params + * + * @param {filter_params} params + * @returns + */ +function _build_lifecycle_filter(params) { + /** + * @param {Object} object_info + */ + return function(object_info) { + if (params.filter?.prefix && !object_info.key.startsWith(params.filter.prefix)) return false; + if (params.expiration && object_info.age < params.expiration) return false; + if (params.filter?.tags && !_file_contain_tags(object_info, params.filter.tags)) return false; + if (params.filter?.object_size_greater_than && object_info.size < params.filter.object_size_greater_than) return false; + if (params.filter?.object_size_less_than && object_info.size > params.filter.object_size_less_than) return false; + return true; + }; +} + /** * handle_bucket_rule processes the lifecycle rule for a bucket - * @param {*} lifecycle_rule - * @param {*} j - * @param {Object} bucket_json + * @param {*} lifecycle_rule + * @param {*} j + * @param {Object} bucket_json * @param {Object} object_sdk */ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk) { @@ -73,10 +149,13 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj const should_process_lifecycle_rule = validate_rule_enabled(lifecycle_rule, bucket_json, now); if (!should_process_lifecycle_rule) return; dbg.log0('LIFECYCLE PROCESSING bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') rule', util.inspect(lifecycle_rule)); - const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule); + const candidates = await get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, config_fs.fs_context); const delete_objects_reply = await object_sdk.delete_multiple_objects({ bucket: bucket_json.name, - objects: delete_candidates // probably need to convert to the format expected by delete_multiple_objects + objects: candidates.delete_candidates // probably need to convert to the format expected by delete_multiple_objects + }); + await candidates.abort_mpus?.forEach(async element => { + await object_sdk.abort_object_upload(element); }); // TODO - implement notifications for the deleted objects await update_lifecycle_rules_last_sync(config_fs, bucket_json, j, delete_objects_reply.num_objects_deleted); @@ -84,11 +163,12 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj /** * get_delete_candidates gets the delete candidates for the lifecycle rule - * @param {Object} bucket_json - * @param {*} lifecycle_rule + * @param {Object} bucket_json + * @param {*} lifecycle_rule */ -async function get_delete_candidates(bucket_json, lifecycle_rule) { +async function get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) { // let reply_objects = []; // TODO: needed for the notification log file + const candidates = {delete_candidates: []}; if (lifecycle_rule.expiration) { await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json); if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) { @@ -99,8 +179,10 @@ async function get_delete_candidates(bucket_json, lifecycle_rule) { await get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json); } if (lifecycle_rule.abort_incomplete_multipart_upload) { - await get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json); + candidates.abort_mpus = await get_candidates_by_abort_incomplete_multipart_upload_rule( + lifecycle_rule, bucket_json, object_sdk, fs_context); } + return candidates; } /** @@ -170,8 +252,8 @@ async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, b * TODO: * POSIX - need to support both noncurrent_days and newer_noncurrent_versions * GPFS - implement noncurrent_days using GPFS ILM policy as an optimization - * @param {*} lifecycle_rule - * @param {Object} bucket_json + * @param {*} lifecycle_rule + * @param {Object} bucket_json */ async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json) { // TODO - implement @@ -179,11 +261,44 @@ async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_ru /** * get_candidates_by_abort_incomplete_multipart_upload_rule processes the abort incomplete multipart upload rule - * @param {*} lifecycle_rule - * @param {Object} bucket_json + * @param {*} lifecycle_rule + * @param {Object} bucket_json */ -async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json) { - // TODO - implement +async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json, object_sdk, fs_context) { + const nsfs = await object_sdk._get_bucket_namespace(bucket_json.name); + const mpu_path = nsfs._mpu_root_path(); + const filter = lifecycle_rule.filter; + const expiration = lifecycle_rule.abort_incomplete_multipart_upload.days_after_initiation; + const res = []; + + const filter_func = _build_lifecycle_filter({filter, expiration}); + let dir_handle; + //TODO this is almost identical to list_uploads except for error handling and support for pagination. should modify list-upload and use it in here instead + try { + dir_handle = await nb_native().fs.opendir(fs_context, mpu_path); + } catch (err) { + if (err.code !== 'ENOENT') throw err; + return; + } + for (;;) { + try { + const dir_entry = await dir_handle.read(fs_context); + if (!dir_entry) break; + const create_path = path.join(mpu_path, dir_entry.name, 'create_object_upload'); + const { data: create_params_buffer } = await nb_native().fs.readFile(fs_context, create_path); + const create_params_parsed = JSON.parse(create_params_buffer.toString()); + const stat = await nb_native().fs.stat(fs_context, path.join(mpu_path, dir_entry.name)); + const object_lifecycle_info = _get_lifecycle_object_info_for_mpu(create_params_parsed, stat); + if (filter_func(object_lifecycle_info)) { + res.push({obj_id: dir_entry.name, key: create_params_parsed.key, bucket: bucket_json.name}); + } + } catch (err) { + if (err.code !== 'ENOENT' || err.code !== 'ENOTDIR') throw err; + } + } + await dir_handle.close(fs_context); + dir_handle = null; + return res; } /** diff --git a/src/test/unit_tests/jest_tests/test_nc_lifecycle_cli.test.js b/src/test/unit_tests/jest_tests/test_nc_lifecycle_cli.test.js new file mode 100644 index 0000000000..7d0bfda0ad --- /dev/null +++ b/src/test/unit_tests/jest_tests/test_nc_lifecycle_cli.test.js @@ -0,0 +1,159 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +// disabling init_rand_seed as it takes longer than the actual test execution +process.env.DISABLE_INIT_RANDOM_SEED = 'true'; + +const path = require('path'); +const fs_utils = require('../../../util/fs_utils'); +const { ConfigFS } = require('../../../sdk/config_fs'); +const { TMP_PATH, set_nc_config_dir_in_config, TEST_TIMEOUT, exec_manage_cli } = require('../../system_tests/test_utils'); +const BucketSpaceFS = require('../../../sdk/bucketspace_fs'); +const { TYPES, ACTIONS } = require('../../../manage_nsfs/manage_nsfs_constants'); +const NamespaceFS = require('../../../sdk/namespace_fs'); +const endpoint_stats_collector = require('../../../sdk/endpoint_stats_collector'); +const os_utils = require('../../../util/os_utils'); + +const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000; +const old_umask = process.umask(new_umask); +console.log('test_nc_lifecycle_cli: replacing old umask: ', old_umask.toString(8), 'with new umask: ', new_umask.toString(8)); + +const config_root = path.join(TMP_PATH, 'config_root_nc_lifecycle'); +const root_path = path.join(TMP_PATH, 'root_path_nc_lifecycle/'); +const config_fs = new ConfigFS(config_root); + +function make_dummy_object_sdk(account_json) { + return { + requesting_account: account_json + }; +} + + +describe('noobaa cli - lifecycle', () => { + const bucketspace_fs = new BucketSpaceFS({ config_root }, undefined); + const test_bucket = 'test-bucket'; + const test_bucket2 = 'test-bucket2'; + const test_bucket_path = `${root_path}/${test_bucket}`; + const test_key1 = 'test_key1'; + const prefix = 'test/'; + const test_prefix_key = `${prefix}/test_key1`; + const account_options1 = {uid: 2002, gid: 2002, new_buckets_path: root_path, name: 'user2', config_root, allow_bucket_creation: "true"}; + let dummy_sdk; + let ns_src; + + beforeAll(async () => { + await fs_utils.create_fresh_path(config_root, 0o777); + set_nc_config_dir_in_config(config_root); + await fs_utils.create_fresh_path(root_path, 0o777); + const res = await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, account_options1); + const json_account = JSON.parse(res).response.reply; + console.log(json_account); + dummy_sdk = make_dummy_object_sdk(json_account); + await bucketspace_fs.create_bucket({ name: test_bucket }, dummy_sdk); + await bucketspace_fs.create_bucket({ name: test_bucket2 }, dummy_sdk); + const bucket_json = await config_fs.get_bucket_by_name(test_bucket, undefined); + + ns_src = new NamespaceFS({ + bucket_path: test_bucket_path, + bucket_id: bucket_json._id, + namespace_resource_id: undefined, + access_mode: undefined, + versioning: undefined, + force_md5_etag: false, + stats: endpoint_stats_collector.instance(), + }); + + }); + + afterEach(async () => { + await bucketspace_fs.delete_bucket_lifecycle({ name: test_bucket }); + await bucketspace_fs.delete_bucket_lifecycle({ name: test_bucket2 }); + }); + + afterAll(async () => { + await fs_utils.folder_delete(`${root_path}/${test_bucket}`); + await fs_utils.folder_delete(`${root_path}/${test_bucket2}`); + await fs_utils.folder_delete(root_path); + await fs_utils.folder_delete(config_root); + }, TEST_TIMEOUT); + + it('lifecycle_cli - abort mpu by number of days ', async () => { + const lifecycle_rule = [ + { + "id": "abort mpu after 3 days", + "status": "Enabled", + "filter": {"prefix": ""}, + "abort_incomplete_multipart_upload": { + "days_after_initiation": 3 + } + } + ]; + await bucketspace_fs.set_bucket_lifecycle_configuration_rules({ name: test_bucket, rules: lifecycle_rule }); + const res = await ns_src.create_object_upload({ key: test_key1, bucket: test_bucket }, dummy_sdk); + await ns_src.create_object_upload({ key: test_key1, bucket: test_bucket }, dummy_sdk); + await update_mpu_mtime(res.obj_id); + await exec_manage_cli(TYPES.LIFECYCLE, '', {disable_service_validation: "true", config_root}, undefined, undefined); + const mpu_list = await ns_src.list_uploads({ bucket: test_bucket }, dummy_sdk); + expect(mpu_list.objects.length).toBe(1); //removed the mpu that was created 5 days ago + }); + + it('lifecycle_cli - abort mpu by prefix and number of days ', async () => { + const lifecycle_rule = [ + { + "id": "abort mpu after 3 days for prefix", + "status": "Enabled", + "filter": {"prefix": prefix}, + "abort_incomplete_multipart_upload": { + "days_after_initiation": 3 + } + } + ]; + await bucketspace_fs.set_bucket_lifecycle_configuration_rules({ name: test_bucket, rules: lifecycle_rule }); + let res = await ns_src.create_object_upload({ key: test_key1, bucket: test_bucket }, dummy_sdk); + await update_mpu_mtime(res.obj_id); + res = await ns_src.create_object_upload({ key: test_prefix_key, bucket: test_bucket }, dummy_sdk); + await update_mpu_mtime(res.obj_id); + await exec_manage_cli(TYPES.LIFECYCLE, '', {disable_service_validation: "true", config_root}, undefined, undefined); + const mpu_list = await ns_src.list_uploads({ bucket: test_bucket }, dummy_sdk); + expect(mpu_list.objects.length).toBe(2); //only removed test_prefix_key + }); + + it('lifecycle_cli - abort mpu by tags ', async () => { + const tag_set = [{key: "key1", value: "val1"}, {key: "key2", value: "val2"}]; + const different_tag_set = [{key: "key5", value: "val5"}]; + const lifecycle_rule = [ + { + "id": "abort mpu after 3 days for tags", + "status": "Enabled", + "filter": { + "prefix": '', + "tags": tag_set + }, + "abort_incomplete_multipart_upload": { + "days_after_initiation": 3 + } + } + ]; + await bucketspace_fs.set_bucket_lifecycle_configuration_rules({ name: test_bucket, rules: lifecycle_rule }); + let res = await ns_src.create_object_upload( + {key: test_key1, bucket: test_bucket, tagging: [...tag_set, ...different_tag_set]}, + dummy_sdk); + await update_mpu_mtime(res.obj_id); + res = await ns_src.create_object_upload({ key: test_key1, bucket: test_bucket, tagging: different_tag_set}, dummy_sdk); + await update_mpu_mtime(res.obj_id); + await exec_manage_cli(TYPES.LIFECYCLE, '', {disable_service_validation: "true", config_root}, undefined, undefined); + const mpu_list = await ns_src.list_uploads({ bucket: test_bucket }, dummy_sdk); + expect(mpu_list.objects.length).toBe(3); //two from previous tests + one new undeleted mpu + }); + + async function update_mpu_mtime(obj_id) { + const mpu_path = ns_src._mpu_path({obj_id}); + return await update_file_mtime(mpu_path); + } + +}); + +async function update_file_mtime(target_path) { + const update_file_mtime_cmp = `touch -d "5 days ago" ${target_path}`; + await os_utils.exec(update_file_mtime_cmp, { return_stdout: true }); +}