Skip to content

Commit

Permalink
NC | lifecycle | add abort multipart upload handler
Browse files Browse the repository at this point in the history
Signed-off-by: nadav mizrahi <[email protected]>
  • Loading branch information
nadavMiz committed Mar 5, 2025
1 parent 4bb9068 commit eb26502
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 17 deletions.
7 changes: 4 additions & 3 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async function main(argv = minimist(process.argv.slice(2))) {
} else if (type === TYPES.CONNECTION) {
await connection_management(action, user_input);
} else if (type === TYPES.LIFECYCLE) {
await lifecycle_management();
await lifecycle_management(argv);
} else {
throw_cli_error(ManageCLIError.InvalidType);
}
Expand Down Expand Up @@ -880,8 +880,9 @@ async function list_connections() {
* lifecycle_management runs the nc lifecycle management
* @returns {Promise<void>}
*/
async function lifecycle_management() {
await noobaa_cli_lifecycle.run_lifecycle(config_fs);
async function lifecycle_management(args) {
const disable_service_validation = args.disable_service_validation === 'true';
await noobaa_cli_lifecycle.run_lifecycle(config_fs, disable_service_validation);
}

exports.main = main;
Expand Down
4 changes: 3 additions & 1 deletion src/manage_nsfs/manage_nsfs_constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const VALID_OPTIONS_CONNECTION = {
'status': new Set(['name', 'decrypt', ...CLI_MUTUAL_OPTIONS]),
};

const VALID_OPTIONS_LIFECYCLE = new Set([...CLI_MUTUAL_OPTIONS]);
const VALID_OPTIONS_LIFECYCLE = new Set(['disable_service_validation', ...CLI_MUTUAL_OPTIONS]);

const VALID_OPTIONS_WHITELIST = new Set(['ips', ...CLI_MUTUAL_OPTIONS]);

Expand Down Expand Up @@ -154,6 +154,8 @@ const OPTION_TYPE = {
expected_hosts: 'string',
custom_upgrade_scripts_dir: 'string',
skip_verification: 'boolean',
// lifecycle options
disable_service_validation: 'string',
//connection
notification_protocol: 'string',
agent_request_object: 'string',
Expand Down
2 changes: 2 additions & 0 deletions src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ function validate_no_extra_options(type, action, input_options, is_options_from_
valid_options = VALID_OPTIONS.notification_options[action];
} else if (type === TYPES.CONNECTION) {
valid_options = VALID_OPTIONS.connection_options[action];
} else if (type === TYPES.LIFECYCLE) {
valid_options = VALID_OPTIONS.lifecycle_options;
} else {
valid_options = VALID_OPTIONS.whitelist_options;
}
Expand Down
67 changes: 54 additions & 13 deletions src/manage_nsfs/nc_lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const config = require('../../config');
const nb_native = require('../util/nb_native');
const NsfsObjectSDK = require('../sdk/nsfs_object_sdk');
const ManageCLIError = require('./manage_nsfs_cli_errors').ManageCLIError;
const path = require('path');
const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./manage_nsfs_cli_utils');

// TODO:
Expand All @@ -22,21 +23,22 @@ const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @returns {Promise<Void>}
*/
async function run_lifecycle(config_fs) {
async function run_lifecycle(config_fs, disable_service_validation) {
const options = { silent_if_missing: true };
const system_json = await config_fs.get_system_config_file(options);
await throw_if_noobaa_not_active(config_fs, system_json);
if (!disable_service_validation) await throw_if_noobaa_not_active(config_fs, system_json);

const bucket_names = await config_fs.list_buckets();
const concurrency = 10; // TODO - think about it
await P.map_with_concurrency(concurrency, bucket_names, async bucket_name => {
const bucket_json = await config_fs.get_bucket_by_name(bucket_name, options);
const account = { email: '', nsfs_account_config: config_fs.fs_context, access_keys: [] };
const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json);
object_sdk._simple_load_requesting_account();
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, j) => {
dbg.log0('NC LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket_json.name, 'rule', lifecycle_rule, 'j', j);
return handle_bucket_rule(lifecycle_rule, j, bucket_json, object_sdk);
return await handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk);
}
));
});
Expand All @@ -60,6 +62,18 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
}
}

function _build_lifecycle_filter(params) {
/**
* @param {string} key
* @param {nb.NativeFSStats} stat
*/
return function(key, stat) {
if (params.prefix && !key.startsWith(params.prefix)) return false;
if (params.days && (Date.now() - Number(stat.mtimeNsBigint) / 1e6) / 24 / 60 / 60 / 1000 < params.days) return false;
return true;
}
}

/**
* handle_bucket_rule processes the lifecycle rule for a bucket
* @param {*} lifecycle_rule
Expand All @@ -73,10 +87,13 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj
const should_process_lifecycle_rule = validate_rule_enabled(lifecycle_rule, bucket_json, now);
if (!should_process_lifecycle_rule) return;
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') rule', util.inspect(lifecycle_rule));
const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule);
const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, config_fs.fs_context);
const delete_objects_reply = await object_sdk.delete_multiple_objects({
bucket: bucket_json.name,
objects: delete_candidates // probably need to convert to the format expected by delete_multiple_objects
objects: delete_candidates.delete_objects // probably need to convert to the format expected by delete_multiple_objects
});
await delete_candidates.abort_mpus?.forEach(async element => {
await object_sdk.abort_object_upload(element);
});
// TODO - implement notifications for the deleted objects
await update_lifecycle_rules_last_sync(config_fs, bucket_json, j, delete_objects_reply.num_objects_deleted);
Expand All @@ -87,8 +104,9 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj
* @param {Object} bucket_json
* @param {*} lifecycle_rule
*/
async function get_delete_candidates(bucket_json, lifecycle_rule) {
async function get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) {
// let reply_objects = []; // TODO: needed for the notification log file
const candidates = {delete_objects: []};
if (lifecycle_rule.expiration) {
await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json);
if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) {
Expand All @@ -99,8 +117,10 @@ async function get_delete_candidates(bucket_json, lifecycle_rule) {
await get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json);
}
if (lifecycle_rule.abort_incomplete_multipart_upload) {
await get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json);
candidates.abort_mpus = await get_candidates_by_abort_incomplete_multipart_upload_rule(
lifecycle_rule, bucket_json, object_sdk, fs_context);
}
return candidates;
}

/**
Expand Down Expand Up @@ -170,20 +190,41 @@ async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, b
* TODO:
* POSIX - need to support both noncurrent_days and newer_noncurrent_versions
* GPFS - implement noncurrent_days using GPFS ILM policy as an optimization
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json) {
// TODO - implement
}

/**
* get_candidates_by_abort_incomplete_multipart_upload_rule processes the abort incomplete multipart upload rule
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json) {
// TODO - implement
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json, object_sdk, fs_context) {
const nsfs = await object_sdk._get_bucket_namespace(bucket_json.name);
const mpu_path = nsfs._mpu_root_path();
const prefix = lifecycle_rule.filter.prefix;
const expiration = lifecycle_rule.abort_incomplete_multipart_upload.days_after_initiation;
const res = [];

const filter_func = _build_lifecycle_filter({prefix, days: expiration});
let dir_handle = await nb_native().fs.opendir(fs_context, mpu_path);
for (;;) {
const dir_entry = await dir_handle.read(fs_context);
if (!dir_entry) break;
const create_path = path.join(mpu_path, dir_entry.name, 'create_object_upload');
const { data: create_params_buffer } = await nb_native().fs.readFile(fs_context, create_path);
const create_params_parsed = JSON.parse(create_params_buffer.toString());
const stat = await nb_native().fs.stat(fs_context, path.join(mpu_path, dir_entry.name));
if (filter_func(create_params_parsed.key, stat)) {
res.push({obj_id: dir_entry.name, key: create_params_parsed.key, bucket: bucket_json.name});
}
}
await dir_handle.close(fs_context);
dir_handle = null;
return res;
}

/**
Expand Down

0 comments on commit eb26502

Please sign in to comment.