From eb26502a5dfc121032fbb42bea5551336891798d Mon Sep 17 00:00:00 2001 From: nadav mizrahi Date: Mon, 3 Mar 2025 14:47:08 +0200 Subject: [PATCH] NC | lifecycle | add abort multipart upload handler Signed-off-by: nadav mizrahi --- src/cmd/manage_nsfs.js | 7 ++- src/manage_nsfs/manage_nsfs_constants.js | 4 +- src/manage_nsfs/manage_nsfs_validations.js | 2 + src/manage_nsfs/nc_lifecycle.js | 67 +++++++++++++++++----- 4 files changed, 63 insertions(+), 17 deletions(-) diff --git a/src/cmd/manage_nsfs.js b/src/cmd/manage_nsfs.js index 53b8aa0c15..6be5d3a946 100644 --- a/src/cmd/manage_nsfs.js +++ b/src/cmd/manage_nsfs.js @@ -92,7 +92,7 @@ async function main(argv = minimist(process.argv.slice(2))) { } else if (type === TYPES.CONNECTION) { await connection_management(action, user_input); } else if (type === TYPES.LIFECYCLE) { - await lifecycle_management(); + await lifecycle_management(argv); } else { throw_cli_error(ManageCLIError.InvalidType); } @@ -880,8 +880,9 @@ async function list_connections() { * lifecycle_management runs the nc lifecycle management * @returns {Promise} */ -async function lifecycle_management() { - await noobaa_cli_lifecycle.run_lifecycle(config_fs); +async function lifecycle_management(args) { + const disable_service_validation = args.disable_service_validation === 'true'; + await noobaa_cli_lifecycle.run_lifecycle(config_fs, disable_service_validation); } exports.main = main; diff --git a/src/manage_nsfs/manage_nsfs_constants.js b/src/manage_nsfs/manage_nsfs_constants.js index 530fdbb8b9..a7da234b43 100644 --- a/src/manage_nsfs/manage_nsfs_constants.js +++ b/src/manage_nsfs/manage_nsfs_constants.js @@ -96,7 +96,7 @@ const VALID_OPTIONS_CONNECTION = { 'status': new Set(['name', 'decrypt', ...CLI_MUTUAL_OPTIONS]), }; -const VALID_OPTIONS_LIFECYCLE = new Set([...CLI_MUTUAL_OPTIONS]); +const VALID_OPTIONS_LIFECYCLE = new Set(['disable_service_validation', ...CLI_MUTUAL_OPTIONS]); const VALID_OPTIONS_WHITELIST = new Set(['ips', ...CLI_MUTUAL_OPTIONS]); @@ -154,6 +154,8 @@ const OPTION_TYPE = { expected_hosts: 'string', custom_upgrade_scripts_dir: 'string', skip_verification: 'boolean', + // lifecycle options + disable_service_validation: 'string', //connection notification_protocol: 'string', agent_request_object: 'string', diff --git a/src/manage_nsfs/manage_nsfs_validations.js b/src/manage_nsfs/manage_nsfs_validations.js index d6318aac7e..921c251220 100644 --- a/src/manage_nsfs/manage_nsfs_validations.js +++ b/src/manage_nsfs/manage_nsfs_validations.js @@ -165,6 +165,8 @@ function validate_no_extra_options(type, action, input_options, is_options_from_ valid_options = VALID_OPTIONS.notification_options[action]; } else if (type === TYPES.CONNECTION) { valid_options = VALID_OPTIONS.connection_options[action]; + } else if (type === TYPES.LIFECYCLE) { + valid_options = VALID_OPTIONS.lifecycle_options; } else { valid_options = VALID_OPTIONS.whitelist_options; } diff --git a/src/manage_nsfs/nc_lifecycle.js b/src/manage_nsfs/nc_lifecycle.js index f77187ce69..e172b77056 100644 --- a/src/manage_nsfs/nc_lifecycle.js +++ b/src/manage_nsfs/nc_lifecycle.js @@ -9,6 +9,7 @@ const config = require('../../config'); const nb_native = require('../util/nb_native'); const NsfsObjectSDK = require('../sdk/nsfs_object_sdk'); const ManageCLIError = require('./manage_nsfs_cli_errors').ManageCLIError; +const path = require('path'); const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./manage_nsfs_cli_utils'); // TODO: @@ -22,10 +23,10 @@ const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./ * @param {import('../sdk/config_fs').ConfigFS} config_fs * @returns {Promise} */ -async function run_lifecycle(config_fs) { +async function run_lifecycle(config_fs, disable_service_validation) { const options = { silent_if_missing: true }; const system_json = await config_fs.get_system_config_file(options); - await throw_if_noobaa_not_active(config_fs, system_json); + if (!disable_service_validation) await throw_if_noobaa_not_active(config_fs, system_json); const bucket_names = await config_fs.list_buckets(); const concurrency = 10; // TODO - think about it @@ -33,10 +34,11 @@ async function run_lifecycle(config_fs) { const bucket_json = await config_fs.get_bucket_by_name(bucket_name, options); const account = { email: '', nsfs_account_config: config_fs.fs_context, access_keys: [] }; const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json); + object_sdk._simple_load_requesting_account(); await P.all(_.map(bucket_json.lifecycle_configuration_rules, async (lifecycle_rule, j) => { dbg.log0('NC LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket_json.name, 'rule', lifecycle_rule, 'j', j); - return handle_bucket_rule(lifecycle_rule, j, bucket_json, object_sdk); + return await handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk); } )); }); @@ -60,6 +62,18 @@ async function throw_if_noobaa_not_active(config_fs, system_json) { } } +function _build_lifecycle_filter(params) { + /** + * @param {string} key + * @param {nb.NativeFSStats} stat + */ + return function(key, stat) { + if (params.prefix && !key.startsWith(params.prefix)) return false; + if (params.days && (Date.now() - Number(stat.mtimeNsBigint) / 1e6) / 24 / 60 / 60 / 1000 < params.days) return false; + return true; + } +} + /** * handle_bucket_rule processes the lifecycle rule for a bucket * @param {*} lifecycle_rule @@ -73,10 +87,13 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj const should_process_lifecycle_rule = validate_rule_enabled(lifecycle_rule, bucket_json, now); if (!should_process_lifecycle_rule) return; dbg.log0('LIFECYCLE PROCESSING bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') rule', util.inspect(lifecycle_rule)); - const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule); + const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, config_fs.fs_context); const delete_objects_reply = await object_sdk.delete_multiple_objects({ bucket: bucket_json.name, - objects: delete_candidates // probably need to convert to the format expected by delete_multiple_objects + objects: delete_candidates.delete_objects // probably need to convert to the format expected by delete_multiple_objects + }); + await delete_candidates.abort_mpus?.forEach(async element => { + await object_sdk.abort_object_upload(element); }); // TODO - implement notifications for the deleted objects await update_lifecycle_rules_last_sync(config_fs, bucket_json, j, delete_objects_reply.num_objects_deleted); @@ -87,8 +104,9 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj * @param {Object} bucket_json * @param {*} lifecycle_rule */ -async function get_delete_candidates(bucket_json, lifecycle_rule) { +async function get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) { // let reply_objects = []; // TODO: needed for the notification log file + const candidates = {delete_objects: []}; if (lifecycle_rule.expiration) { await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json); if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) { @@ -99,8 +117,10 @@ async function get_delete_candidates(bucket_json, lifecycle_rule) { await get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json); } if (lifecycle_rule.abort_incomplete_multipart_upload) { - await get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json); + candidates.abort_mpus = await get_candidates_by_abort_incomplete_multipart_upload_rule( + lifecycle_rule, bucket_json, object_sdk, fs_context); } + return candidates; } /** @@ -170,8 +190,8 @@ async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, b * TODO: * POSIX - need to support both noncurrent_days and newer_noncurrent_versions * GPFS - implement noncurrent_days using GPFS ILM policy as an optimization - * @param {*} lifecycle_rule - * @param {Object} bucket_json + * @param {*} lifecycle_rule + * @param {Object} bucket_json */ async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json) { // TODO - implement @@ -179,11 +199,32 @@ async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_ru /** * get_candidates_by_abort_incomplete_multipart_upload_rule processes the abort incomplete multipart upload rule - * @param {*} lifecycle_rule - * @param {Object} bucket_json + * @param {*} lifecycle_rule + * @param {Object} bucket_json */ -async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json) { - // TODO - implement +async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json, object_sdk, fs_context) { + const nsfs = await object_sdk._get_bucket_namespace(bucket_json.name); + const mpu_path = nsfs._mpu_root_path(); + const prefix = lifecycle_rule.filter.prefix; + const expiration = lifecycle_rule.abort_incomplete_multipart_upload.days_after_initiation; + const res = []; + + const filter_func = _build_lifecycle_filter({prefix, days: expiration}); + let dir_handle = await nb_native().fs.opendir(fs_context, mpu_path); + for (;;) { + const dir_entry = await dir_handle.read(fs_context); + if (!dir_entry) break; + const create_path = path.join(mpu_path, dir_entry.name, 'create_object_upload'); + const { data: create_params_buffer } = await nb_native().fs.readFile(fs_context, create_path); + const create_params_parsed = JSON.parse(create_params_buffer.toString()); + const stat = await nb_native().fs.stat(fs_context, path.join(mpu_path, dir_entry.name)); + if (filter_func(create_params_parsed.key, stat)) { + res.push({obj_id: dir_entry.name, key: create_params_parsed.key, bucket: bucket_json.name}); + } + } + await dir_handle.close(fs_context); + dir_handle = null; + return res; } /**