Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NC | lifecycle | add option to abort multiple uploads #8853

Merged
merged 1 commit into from
Mar 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async function main(argv = minimist(process.argv.slice(2))) {
} else if (type === TYPES.CONNECTION) {
await connection_management(action, user_input);
} else if (type === TYPES.LIFECYCLE) {
await lifecycle_management();
await lifecycle_management(argv);
} else {
throw_cli_error(ManageCLIError.InvalidType);
}
Expand Down Expand Up @@ -880,8 +880,9 @@ async function list_connections() {
* lifecycle_management runs the nc lifecycle management
* @returns {Promise<void>}
*/
async function lifecycle_management() {
await noobaa_cli_lifecycle.run_lifecycle(config_fs);
async function lifecycle_management(args) {
const disable_service_validation = args.disable_service_validation === 'true';
await noobaa_cli_lifecycle.run_lifecycle(config_fs, disable_service_validation);
}

exports.main = main;
Expand Down
4 changes: 3 additions & 1 deletion src/manage_nsfs/manage_nsfs_constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const VALID_OPTIONS_CONNECTION = {
'status': new Set(['name', 'decrypt', ...CLI_MUTUAL_OPTIONS]),
};

const VALID_OPTIONS_LIFECYCLE = new Set([...CLI_MUTUAL_OPTIONS]);
const VALID_OPTIONS_LIFECYCLE = new Set(['disable_service_validation', ...CLI_MUTUAL_OPTIONS]);

const VALID_OPTIONS_WHITELIST = new Set(['ips', ...CLI_MUTUAL_OPTIONS]);

Expand Down Expand Up @@ -154,6 +154,8 @@ const OPTION_TYPE = {
expected_hosts: 'string',
custom_upgrade_scripts_dir: 'string',
skip_verification: 'boolean',
// lifecycle options
disable_service_validation: 'string',
//connection
notification_protocol: 'string',
agent_request_object: 'string',
Expand Down
2 changes: 2 additions & 0 deletions src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ function validate_no_extra_options(type, action, input_options, is_options_from_
valid_options = VALID_OPTIONS.notification_options[action];
} else if (type === TYPES.CONNECTION) {
valid_options = VALID_OPTIONS.connection_options[action];
} else if (type === TYPES.LIFECYCLE) {
valid_options = VALID_OPTIONS.lifecycle_options;
} else {
valid_options = VALID_OPTIONS.whitelist_options;
}
Expand Down
153 changes: 134 additions & 19 deletions src/manage_nsfs/nc_lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const config = require('../../config');
const nb_native = require('../util/nb_native');
const NsfsObjectSDK = require('../sdk/nsfs_object_sdk');
const ManageCLIError = require('./manage_nsfs_cli_errors').ManageCLIError;
const path = require('path');
const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./manage_nsfs_cli_utils');

// TODO:
Expand All @@ -22,21 +23,23 @@ const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @returns {Promise<Void>}
*/
async function run_lifecycle(config_fs) {
async function run_lifecycle(config_fs, disable_service_validation) {
const options = { silent_if_missing: true };
const system_json = await config_fs.get_system_config_file(options);
await throw_if_noobaa_not_active(config_fs, system_json);
if (!disable_service_validation) await throw_if_noobaa_not_active(config_fs, system_json);

const bucket_names = await config_fs.list_buckets();
const concurrency = 10; // TODO - think about it
const concurrency = 10; // TODO - think about it
await P.map_with_concurrency(concurrency, bucket_names, async bucket_name => {
const bucket_json = await config_fs.get_bucket_by_name(bucket_name, options);
const account = { email: '', nsfs_account_config: config_fs.fs_context, access_keys: [] };
const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json);
//TODO temporary - need to check if we want to use a real account
object_sdk._simple_load_requesting_account();
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, j) => {
dbg.log0('NC LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket_json.name, 'rule', lifecycle_rule, 'j', j);
return handle_bucket_rule(lifecycle_rule, j, bucket_json, object_sdk);
return await handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk);
}
));
});
Expand All @@ -60,11 +63,84 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
}
}

/**
* get file time since last modified in days
* @param {nb.NativeFSStats} stat
*/
function _get_file_age_days(stat) {
//TODO how much do we care about rounding errors? (it is by days after all)
return (Date.now() - Number(stat.mtimeNsBigint) / 1e6) / 24 / 60 / 60 / 1000;
}

/**
* checks if tag query_tag is in the list tag_set
* @param {Object} query_tag
* @param {Array<Object>} tag_set
*/
function _list_contain_tag(query_tag, tag_set) {
for (const t of tag_set) {
if (t.key === query_tag.key && t.value === query_tag.value) return true;
}
return false;
}

/**
* checks if object has all the tags in filter_tags
* @param {Object} object_info
* @param {Array<Object>} filter_tags
* @returns
*/
function _file_contain_tags(object_info, filter_tags) {
if (object_info.tags === undefined) return false;
for (const tag of filter_tags) {
if (!_list_contain_tag(tag, object_info.tags)) {
return false;
}
}
return true;
}

/**
* @param {*} create_params_parsed
* @param {nb.NativeFSStats} stat
*/
function _get_lifecycle_object_info_for_mpu(create_params_parsed, stat) {
return {
key: create_params_parsed.key,
age: _get_file_age_days(stat),
tags: create_params_parsed.tagging,
};
}


/**
* @typedef {{
* filter: Object
* expiration: Number
* }} filter_params
*
* @param {filter_params} params
* @returns
*/
function _build_lifecycle_filter(params) {
/**
* @param {Object} object_info
*/
return function(object_info) {
if (params.filter?.prefix && !object_info.key.startsWith(params.filter.prefix)) return false;
if (params.expiration && object_info.age < params.expiration) return false;
if (params.filter?.tags && !_file_contain_tags(object_info, params.filter.tags)) return false;
if (params.filter?.object_size_greater_than && object_info.size < params.filter.object_size_greater_than) return false;
if (params.filter?.object_size_less_than && object_info.size > params.filter.object_size_less_than) return false;
return true;
};
}

/**
* handle_bucket_rule processes the lifecycle rule for a bucket
* @param {*} lifecycle_rule
* @param {*} j
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {*} j
* @param {Object} bucket_json
* @param {Object} object_sdk
*/
async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk) {
Expand All @@ -73,22 +149,26 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj
const should_process_lifecycle_rule = validate_rule_enabled(lifecycle_rule, bucket_json, now);
if (!should_process_lifecycle_rule) return;
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') rule', util.inspect(lifecycle_rule));
const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule);
const candidates = await get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, config_fs.fs_context);
const delete_objects_reply = await object_sdk.delete_multiple_objects({
bucket: bucket_json.name,
objects: delete_candidates // probably need to convert to the format expected by delete_multiple_objects
objects: candidates.delete_candidates // probably need to convert to the format expected by delete_multiple_objects
});
await candidates.abort_mpus?.forEach(async element => {
await object_sdk.abort_object_upload(element);
});
// TODO - implement notifications for the deleted objects
await update_lifecycle_rules_last_sync(config_fs, bucket_json, j, delete_objects_reply.num_objects_deleted);
}

/**
* get_delete_candidates gets the delete candidates for the lifecycle rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
*/
async function get_delete_candidates(bucket_json, lifecycle_rule) {
async function get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) {
// let reply_objects = []; // TODO: needed for the notification log file
const candidates = {delete_candidates: []};
if (lifecycle_rule.expiration) {
await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json);
if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) {
Expand All @@ -99,8 +179,10 @@ async function get_delete_candidates(bucket_json, lifecycle_rule) {
await get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json);
}
if (lifecycle_rule.abort_incomplete_multipart_upload) {
await get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json);
candidates.abort_mpus = await get_candidates_by_abort_incomplete_multipart_upload_rule(
lifecycle_rule, bucket_json, object_sdk, fs_context);
}
return candidates;
}

/**
Expand Down Expand Up @@ -170,20 +252,53 @@ async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, b
* TODO:
* POSIX - need to support both noncurrent_days and newer_noncurrent_versions
* GPFS - implement noncurrent_days using GPFS ILM policy as an optimization
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json) {
// TODO - implement
}

/**
* get_candidates_by_abort_incomplete_multipart_upload_rule processes the abort incomplete multipart upload rule
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json) {
// TODO - implement
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json, object_sdk, fs_context) {
const nsfs = await object_sdk._get_bucket_namespace(bucket_json.name);
const mpu_path = nsfs._mpu_root_path();
const filter = lifecycle_rule.filter;
const expiration = lifecycle_rule.abort_incomplete_multipart_upload.days_after_initiation;
const res = [];

const filter_func = _build_lifecycle_filter({filter, expiration});
let dir_handle;
//TODO this is almost identical to list_uploads except for error handling and support for pagination. should modify list-upload and use it in here instead
try {
dir_handle = await nb_native().fs.opendir(fs_context, mpu_path);
} catch (err) {
if (err.code !== 'ENOENT') throw err;
return;
}
for (;;) {
try {
const dir_entry = await dir_handle.read(fs_context);
if (!dir_entry) break;
const create_path = path.join(mpu_path, dir_entry.name, 'create_object_upload');
const { data: create_params_buffer } = await nb_native().fs.readFile(fs_context, create_path);
const create_params_parsed = JSON.parse(create_params_buffer.toString());
const stat = await nb_native().fs.stat(fs_context, path.join(mpu_path, dir_entry.name));
const object_lifecycle_info = _get_lifecycle_object_info_for_mpu(create_params_parsed, stat);
if (filter_func(object_lifecycle_info)) {
res.push({obj_id: dir_entry.name, key: create_params_parsed.key, bucket: bucket_json.name});
}
} catch (err) {
if (err.code !== 'ENOENT' || err.code !== 'ENOTDIR') throw err;
}
}
await dir_handle.close(fs_context);
dir_handle = null;
return res;
}

/**
Expand Down
Loading