Skip to content

Commit

Permalink
NC | lifecycle | add abort multipart upload handler
Browse files Browse the repository at this point in the history
Signed-off-by: nadav mizrahi <[email protected]>
  • Loading branch information
nadavMiz committed Mar 9, 2025
1 parent 887b78b commit 6e853e4
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 23 deletions.
7 changes: 4 additions & 3 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async function main(argv = minimist(process.argv.slice(2))) {
} else if (type === TYPES.CONNECTION) {
await connection_management(action, user_input);
} else if (type === TYPES.LIFECYCLE) {
await lifecycle_management();
await lifecycle_management(argv);
} else {
throw_cli_error(ManageCLIError.InvalidType);
}
Expand Down Expand Up @@ -880,8 +880,9 @@ async function list_connections() {
* lifecycle_management runs the nc lifecycle management
* @returns {Promise<void>}
*/
async function lifecycle_management() {
await noobaa_cli_lifecycle.run_lifecycle(config_fs);
async function lifecycle_management(args) {
const disable_service_validation = args.disable_service_validation === 'true';
await noobaa_cli_lifecycle.run_lifecycle(config_fs, disable_service_validation);
}

exports.main = main;
Expand Down
4 changes: 3 additions & 1 deletion src/manage_nsfs/manage_nsfs_constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const VALID_OPTIONS_CONNECTION = {
'status': new Set(['name', 'decrypt', ...CLI_MUTUAL_OPTIONS]),
};

const VALID_OPTIONS_LIFECYCLE = new Set([...CLI_MUTUAL_OPTIONS]);
const VALID_OPTIONS_LIFECYCLE = new Set(['disable_service_validation', ...CLI_MUTUAL_OPTIONS]);

const VALID_OPTIONS_WHITELIST = new Set(['ips', ...CLI_MUTUAL_OPTIONS]);

Expand Down Expand Up @@ -154,6 +154,8 @@ const OPTION_TYPE = {
expected_hosts: 'string',
custom_upgrade_scripts_dir: 'string',
skip_verification: 'boolean',
// lifecycle options
disable_service_validation: 'string',
//connection
notification_protocol: 'string',
agent_request_object: 'string',
Expand Down
2 changes: 2 additions & 0 deletions src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ function validate_no_extra_options(type, action, input_options, is_options_from_
valid_options = VALID_OPTIONS.notification_options[action];
} else if (type === TYPES.CONNECTION) {
valid_options = VALID_OPTIONS.connection_options[action];
} else if (type === TYPES.LIFECYCLE) {
valid_options = VALID_OPTIONS.lifecycle_options;
} else {
valid_options = VALID_OPTIONS.whitelist_options;
}
Expand Down
153 changes: 134 additions & 19 deletions src/manage_nsfs/nc_lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const config = require('../../config');
const nb_native = require('../util/nb_native');
const NsfsObjectSDK = require('../sdk/nsfs_object_sdk');
const ManageCLIError = require('./manage_nsfs_cli_errors').ManageCLIError;
const path = require('path');
const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./manage_nsfs_cli_utils');

// TODO:
Expand All @@ -22,21 +23,23 @@ const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @returns {Promise<Void>}
*/
async function run_lifecycle(config_fs) {
async function run_lifecycle(config_fs, disable_service_validation) {
const options = { silent_if_missing: true };
const system_json = await config_fs.get_system_config_file(options);
await throw_if_noobaa_not_active(config_fs, system_json);
if (!disable_service_validation) await throw_if_noobaa_not_active(config_fs, system_json);

const bucket_names = await config_fs.list_buckets();
const concurrency = 10; // TODO - think about it
const concurrency = 10; // TODO - think about it
await P.map_with_concurrency(concurrency, bucket_names, async bucket_name => {
const bucket_json = await config_fs.get_bucket_by_name(bucket_name, options);
const account = { email: '', nsfs_account_config: config_fs.fs_context, access_keys: [] };
const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json);
//TODO temporary - need to check if we want to use a real account
object_sdk._simple_load_requesting_account();
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, j) => {
dbg.log0('NC LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket_json.name, 'rule', lifecycle_rule, 'j', j);
return handle_bucket_rule(lifecycle_rule, j, bucket_json, object_sdk);
return await handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk);
}
));
});
Expand All @@ -60,11 +63,84 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
}
}

/**
* get file time since last modified in days
* @param {nb.NativeFSStats} stat
*/
function _get_file_age_days(stat) {
//TODO how much do we care about rounding errors? (it is by days after all)
return (Date.now() - Number(stat.mtimeNsBigint) / 1e6) / 24 / 60 / 60 / 1000;
}

/**
* checks if tag query_tag is in the list tag_set
* @param {Object} query_tag
* @param {Array<Object>} tag_set
*/
function _list_contain_tag(query_tag, tag_set) {
for (const t of tag_set) {
if (t.key === query_tag.key && t.value === query_tag.value) return true;
}
return false;
}

/**
* checks if object has all the tags in filter_tags
* @param {Object} object_info
* @param {Array<Object>} filter_tags
* @returns
*/
function _file_contain_tags(object_info, filter_tags) {
if (object_info.tags === undefined) return false;
for (const tag of filter_tags) {
if (!_list_contain_tag(tag, object_info.tags)) {
return false;
}
}
return true;
}

/**
* @param {*} create_params_parsed
* @param {nb.NativeFSStats} stat
*/
function _get_lifecycle_object_info_for_mpu(create_params_parsed, stat) {
return {
key: create_params_parsed.key,
age: _get_file_age_days(stat),
tags: create_params_parsed.tagging,
};
}


/**
* @typedef {{
* filter: Object
* expiration: Number
* }} filter_params
*
* @param {filter_params} params
* @returns
*/
function _build_lifecycle_filter(params) {
/**
* @param {Object} object_info
*/
return function(object_info) {
if (params.filter?.prefix && !object_info.key.startsWith(params.filter.prefix)) return false;
if (params.expiration && object_info.age < params.expiration) return false;
if (params.filter?.tags && !_file_contain_tags(object_info, params.filter.tags)) return false;
if (params.filter?.object_size_greater_than && object_info.size < params.filter.object_size_greater_than) return false;
if (params.filter?.object_size_less_than && object_info.size > params.filter.object_size_less_than) return false;
return true;
};
}

/**
* handle_bucket_rule processes the lifecycle rule for a bucket
* @param {*} lifecycle_rule
* @param {*} j
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {*} j
* @param {Object} bucket_json
* @param {Object} object_sdk
*/
async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk) {
Expand All @@ -73,22 +149,26 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj
const should_process_lifecycle_rule = validate_rule_enabled(lifecycle_rule, bucket_json, now);
if (!should_process_lifecycle_rule) return;
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') rule', util.inspect(lifecycle_rule));
const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule);
const candidates = await get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, config_fs.fs_context);
const delete_objects_reply = await object_sdk.delete_multiple_objects({
bucket: bucket_json.name,
objects: delete_candidates // probably need to convert to the format expected by delete_multiple_objects
objects: candidates.delete_candidates // probably need to convert to the format expected by delete_multiple_objects
});
await candidates.abort_mpus?.forEach(async element => {
await object_sdk.abort_object_upload(element);
});
// TODO - implement notifications for the deleted objects
await update_lifecycle_rules_last_sync(config_fs, bucket_json, j, delete_objects_reply.num_objects_deleted);
}

/**
* get_delete_candidates gets the delete candidates for the lifecycle rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
*/
async function get_delete_candidates(bucket_json, lifecycle_rule) {
async function get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) {
// let reply_objects = []; // TODO: needed for the notification log file
const candidates = {delete_candidates: []};
if (lifecycle_rule.expiration) {
await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json);
if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) {
Expand All @@ -99,8 +179,10 @@ async function get_delete_candidates(bucket_json, lifecycle_rule) {
await get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json);
}
if (lifecycle_rule.abort_incomplete_multipart_upload) {
await get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json);
candidates.abort_mpus = await get_candidates_by_abort_incomplete_multipart_upload_rule(
lifecycle_rule, bucket_json, object_sdk, fs_context);
}
return candidates;
}

/**
Expand Down Expand Up @@ -170,20 +252,53 @@ async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, b
* TODO:
* POSIX - need to support both noncurrent_days and newer_noncurrent_versions
* GPFS - implement noncurrent_days using GPFS ILM policy as an optimization
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json) {
// TODO - implement
}

/**
* get_candidates_by_abort_incomplete_multipart_upload_rule processes the abort incomplete multipart upload rule
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json) {
// TODO - implement
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json, object_sdk, fs_context) {
const nsfs = await object_sdk._get_bucket_namespace(bucket_json.name);
const mpu_path = nsfs._mpu_root_path();
const filter = lifecycle_rule.filter;
const expiration = lifecycle_rule.abort_incomplete_multipart_upload.days_after_initiation;
const res = [];

const filter_func = _build_lifecycle_filter({filter, expiration});
let dir_handle;
//TODO this is almost identical to list_uploads except for error handling and support for pagination. should modify list-upload and use it in here instead
try {
dir_handle = await nb_native().fs.opendir(fs_context, mpu_path);
} catch (err) {
if (err.code !== 'ENOENT') throw err;
return;
}
for (;;) {
try {
const dir_entry = await dir_handle.read(fs_context);
if (!dir_entry) break;
const create_path = path.join(mpu_path, dir_entry.name, 'create_object_upload');
const { data: create_params_buffer } = await nb_native().fs.readFile(fs_context, create_path);
const create_params_parsed = JSON.parse(create_params_buffer.toString());
const stat = await nb_native().fs.stat(fs_context, path.join(mpu_path, dir_entry.name));
const object_lifecycle_info = _get_lifecycle_object_info_for_mpu(create_params_parsed, stat);
if (filter_func(object_lifecycle_info)) {
res.push({obj_id: dir_entry.name, key: create_params_parsed.key, bucket: bucket_json.name});
}
} catch (err) {
if (err.code !== 'ENOENT' || err.code !== 'ENOTDIR') throw err;
}
}
await dir_handle.close(fs_context);
dir_handle = null;
return res;
}

/**
Expand Down
Loading

0 comments on commit 6e853e4

Please sign in to comment.