Skip to content

Commit

Permalink
NC | lifecycle | add abort multipart upload handler
Browse files Browse the repository at this point in the history
Signed-off-by: nadav mizrahi <[email protected]>
  • Loading branch information
nadavMiz committed Mar 5, 2025
1 parent 4bb9068 commit 803f50b
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 29 deletions.
7 changes: 4 additions & 3 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async function main(argv = minimist(process.argv.slice(2))) {
} else if (type === TYPES.CONNECTION) {
await connection_management(action, user_input);
} else if (type === TYPES.LIFECYCLE) {
await lifecycle_management();
await lifecycle_management(argv);
} else {
throw_cli_error(ManageCLIError.InvalidType);
}
Expand Down Expand Up @@ -880,8 +880,9 @@ async function list_connections() {
* lifecycle_management runs the nc lifecycle management
* @returns {Promise<void>}
*/
async function lifecycle_management() {
await noobaa_cli_lifecycle.run_lifecycle(config_fs);
async function lifecycle_management(args) {
const disable_service_validation = args.disable_service_validation === 'true';
await noobaa_cli_lifecycle.run_lifecycle(config_fs, disable_service_validation);
}

exports.main = main;
Expand Down
4 changes: 3 additions & 1 deletion src/manage_nsfs/manage_nsfs_constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const VALID_OPTIONS_CONNECTION = {
'status': new Set(['name', 'decrypt', ...CLI_MUTUAL_OPTIONS]),
};

const VALID_OPTIONS_LIFECYCLE = new Set([...CLI_MUTUAL_OPTIONS]);
const VALID_OPTIONS_LIFECYCLE = new Set(['disable_service_validation', ...CLI_MUTUAL_OPTIONS]);

const VALID_OPTIONS_WHITELIST = new Set(['ips', ...CLI_MUTUAL_OPTIONS]);

Expand Down Expand Up @@ -154,6 +154,8 @@ const OPTION_TYPE = {
expected_hosts: 'string',
custom_upgrade_scripts_dir: 'string',
skip_verification: 'boolean',
// lifecycle options
disable_service_validation: 'string',
//connection
notification_protocol: 'string',
agent_request_object: 'string',
Expand Down
2 changes: 2 additions & 0 deletions src/manage_nsfs/manage_nsfs_validations.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ function validate_no_extra_options(type, action, input_options, is_options_from_
valid_options = VALID_OPTIONS.notification_options[action];
} else if (type === TYPES.CONNECTION) {
valid_options = VALID_OPTIONS.connection_options[action];
} else if (type === TYPES.LIFECYCLE) {
valid_options = VALID_OPTIONS.lifecycle_options;
} else {
valid_options = VALID_OPTIONS.whitelist_options;
}
Expand Down
107 changes: 91 additions & 16 deletions src/manage_nsfs/nc_lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const config = require('../../config');
const nb_native = require('../util/nb_native');
const NsfsObjectSDK = require('../sdk/nsfs_object_sdk');
const ManageCLIError = require('./manage_nsfs_cli_errors').ManageCLIError;
const path = require('path');
const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./manage_nsfs_cli_utils');

// TODO:
Expand All @@ -22,21 +23,22 @@ const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @returns {Promise<Void>}
*/
async function run_lifecycle(config_fs) {
async function run_lifecycle(config_fs, disable_service_validation) {
const options = { silent_if_missing: true };
const system_json = await config_fs.get_system_config_file(options);
await throw_if_noobaa_not_active(config_fs, system_json);
if (!disable_service_validation) await throw_if_noobaa_not_active(config_fs, system_json);

const bucket_names = await config_fs.list_buckets();
const concurrency = 10; // TODO - think about it
await P.map_with_concurrency(concurrency, bucket_names, async bucket_name => {
const bucket_json = await config_fs.get_bucket_by_name(bucket_name, options);
const account = { email: '', nsfs_account_config: config_fs.fs_context, access_keys: [] };
const object_sdk = new NsfsObjectSDK('', config_fs, account, bucket_json.versioning, config_fs.config_root, system_json);
object_sdk._simple_load_requesting_account();
await P.all(_.map(bucket_json.lifecycle_configuration_rules,
async (lifecycle_rule, j) => {
dbg.log0('NC LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket_json.name, 'rule', lifecycle_rule, 'j', j);
return handle_bucket_rule(lifecycle_rule, j, bucket_json, object_sdk);
return await handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk);
}
));
});
Expand All @@ -60,11 +62,47 @@ async function throw_if_noobaa_not_active(config_fs, system_json) {
}
}

function _get_file_age_days(stat) {
//TODO how much do we care about rounding errors? (it is by days after all)
return (Date.now() - Number(stat.mtimeNsBigint) / 1e6) / 24 / 60 / 60 / 1000;
}

function _list_contain_tag(query_tag, tag_set) {
for (const t of tag_set) {
if (t.key === query_tag.key && t.value === query_tag.value) return true;
}
return false;
}

function _file_contain_tags(stat, tags, nsfs) {
const xattr_tags = nsfs.get_tags_from_xattr(stat.xattr);
for (const tag of tags) {
if (!_list_contain_tag(tag, xattr_tags)) {
return false;
}
}
return true;
}

function _build_lifecycle_filter(params, nsfs) {
/**
* @param {string} key
* @param {nb.NativeFSStats} stat
*/
return function(key, stat) {
if (params.prefix && !key.startsWith(params.prefix)) return false;
//TODO how much do we care about rounding errors? (it is by days after all)
if (params.days && _get_file_age_days(stat) < params.days) return false;
if (params.tags && !_file_contain_tags(stat, params.tags, nsfs)) return false;
return true;
};
}

/**
* handle_bucket_rule processes the lifecycle rule for a bucket
* @param {*} lifecycle_rule
* @param {*} j
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {*} j
* @param {Object} bucket_json
* @param {Object} object_sdk
*/
async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, object_sdk) {
Expand All @@ -73,10 +111,13 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj
const should_process_lifecycle_rule = validate_rule_enabled(lifecycle_rule, bucket_json, now);
if (!should_process_lifecycle_rule) return;
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket_json.name, '(bucket id:', bucket_json._id, ') rule', util.inspect(lifecycle_rule));
const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule);
const delete_candidates = await get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, config_fs.fs_context);
const delete_objects_reply = await object_sdk.delete_multiple_objects({
bucket: bucket_json.name,
objects: delete_candidates // probably need to convert to the format expected by delete_multiple_objects
objects: delete_candidates.delete_objects // probably need to convert to the format expected by delete_multiple_objects
});
await delete_candidates.abort_mpus?.forEach(async element => {
await object_sdk.abort_object_upload(element);
});
// TODO - implement notifications for the deleted objects
await update_lifecycle_rules_last_sync(config_fs, bucket_json, j, delete_objects_reply.num_objects_deleted);
Expand All @@ -87,8 +128,9 @@ async function handle_bucket_rule(config_fs, lifecycle_rule, j, bucket_json, obj
* @param {Object} bucket_json
* @param {*} lifecycle_rule
*/
async function get_delete_candidates(bucket_json, lifecycle_rule) {
async function get_delete_candidates(bucket_json, lifecycle_rule, object_sdk, fs_context) {
// let reply_objects = []; // TODO: needed for the notification log file
const candidates = {delete_objects: []};
if (lifecycle_rule.expiration) {
await get_candidates_by_expiration_rule(lifecycle_rule, bucket_json);
if (lifecycle_rule.expiration.days || lifecycle_rule.expiration.expired_object_delete_marker) {
Expand All @@ -99,8 +141,10 @@ async function get_delete_candidates(bucket_json, lifecycle_rule) {
await get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json);
}
if (lifecycle_rule.abort_incomplete_multipart_upload) {
await get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json);
candidates.abort_mpus = await get_candidates_by_abort_incomplete_multipart_upload_rule(
lifecycle_rule, bucket_json, object_sdk, fs_context);
}
return candidates;
}

/**
Expand Down Expand Up @@ -170,20 +214,51 @@ async function get_candidates_by_expiration_delete_marker_rule(lifecycle_rule, b
* TODO:
* POSIX - need to support both noncurrent_days and newer_noncurrent_versions
* GPFS - implement noncurrent_days using GPFS ILM policy as an optimization
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_noncurrent_version_expiration_rule(lifecycle_rule, bucket_json) {
// TODO - implement
}

/**
* get_candidates_by_abort_incomplete_multipart_upload_rule processes the abort incomplete multipart upload rule
* @param {*} lifecycle_rule
* @param {Object} bucket_json
* @param {*} lifecycle_rule
* @param {Object} bucket_json
*/
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json) {
// TODO - implement
async function get_candidates_by_abort_incomplete_multipart_upload_rule(lifecycle_rule, bucket_json, object_sdk, fs_context) {
const nsfs = await object_sdk._get_bucket_namespace(bucket_json.name);
const mpu_path = nsfs._mpu_root_path();
const prefix = lifecycle_rule.filter.prefix;
const expiration = lifecycle_rule.abort_incomplete_multipart_upload.days_after_initiation;
const res = [];

const filter_func = _build_lifecycle_filter({prefix, days: expiration, nsfs});
let dir_handle;
try {
dir_handle = await nb_native().fs.opendir(fs_context, mpu_path);
} catch (err) {
if (err.code !== 'ENOENT') throw err;
return;
}
for (;;) {
try {
const dir_entry = await dir_handle.read(fs_context);
if (!dir_entry) break;
const create_path = path.join(mpu_path, dir_entry.name, 'create_object_upload');
const { data: create_params_buffer } = await nb_native().fs.readFile(fs_context, create_path);
const create_params_parsed = JSON.parse(create_params_buffer.toString());
const stat = await nb_native().fs.stat(fs_context, path.join(mpu_path, dir_entry.name));
if (filter_func(create_params_parsed.key, stat)) {
res.push({obj_id: dir_entry.name, key: create_params_parsed.key, bucket: bucket_json.name});
}
} catch (err) {
if (err.code !== 'ENOENT') throw err;
}
}
await dir_handle.close(fs_context);
dir_handle = null;
return res;
}

/**
Expand Down
19 changes: 10 additions & 9 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2082,7 +2082,7 @@ class NamespaceFS {
////////////////////

async get_object_tagging(params, object_sdk) {
const tag_set = [];
let tag_set = [];
let file_path;
let file;
const fs_context = this.prepare_fs_context(object_sdk);
Expand All @@ -2097,14 +2097,7 @@ class NamespaceFS {
file = await nb_native().fs.open(fs_context, file_path);
const stat = await file.stat(fs_context);
if (stat.xattr) {
for (const [xattr_key, xattr_value] of Object.entries(stat.xattr)) {
if (xattr_key.includes(XATTR_TAG)) {
tag_set.push({
key: xattr_key.replace(XATTR_TAG, ''),
value: xattr_value,
});
}
}
tag_set = this._get_tags_from_xattr(stat.xattr);
}
} catch (err) {
dbg.error(`NamespaceFS.get_object_tagging: failed in dir ${file_path} with error: `, err);
Expand Down Expand Up @@ -2478,6 +2471,14 @@ class NamespaceFS {
return xattr[XATTR_MD5_KEY];
}

_get_tags_from_xattr(xattr) {
const tags_xattr = Object.keys(xattr).filter(xattr_key => xattr_key.includes(XATTR_TAG));
return tags_xattr.map(key_xattr => ({
key: key_xattr.replace(XATTR_TAG, ''),
value: xattr[key_xattr]
}));
}

_number_of_tags_fs_xttr(xattr) {
return Object.keys(xattr).filter(xattr_key => xattr_key.includes(XATTR_TAG)).length;
}
Expand Down
74 changes: 74 additions & 0 deletions src/test/unit_tests/jest_tests/test_nc_lifecycle_cli.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

// disabling init_rand_seed as it takes longer than the actual test execution
process.env.DISABLE_INIT_RANDOM_SEED = 'true';

const path = require('path');
const fs_utils = require('../../../util/fs_utils');
const { ConfigFS } = require('../../../sdk/config_fs');
const { TMP_PATH, set_nc_config_dir_in_config, TEST_TIMEOUT, exec_manage_cli } = require('../../system_tests/test_utils');
const BucketSpaceFS = require('../../../sdk/bucketspace_fs');
const { TYPES, ACTIONS } = require('../../../manage_nsfs/manage_nsfs_constants');

const new_umask = process.env.NOOBAA_ENDPOINT_UMASK || 0o000;
const old_umask = process.umask(new_umask);
console.log('test_nc_lifecycle_cli: replacing old umask: ', old_umask.toString(8), 'with new umask: ', new_umask.toString(8));

const config_root = path.join(TMP_PATH, 'config_root_nc_lifecycle');
const root_path = path.join(TMP_PATH, 'root_path_nc_lifecycle/');
const config_fs = new ConfigFS(config_root);

function make_dummy_object_sdk(account_json) {
return {
requesting_account: account_json
};
}


describe('noobaa cli - lifecycle', () => {
const bucketspace_fs = new BucketSpaceFS({ config_root }, undefined);
const test_bucket = 'test-bucket';
const test_bucket2 = 'test-bucket2';
const account_options1 = {uid: 2002, gid: 2002, new_buckets_path: root_path, name: 'user2', config_root, allow_bucket_creation: "true"};

beforeAll(async () => {
await fs_utils.create_fresh_path(config_root, 0o777);
set_nc_config_dir_in_config(config_root);
await fs_utils.create_fresh_path(root_path, 0o777);
const res = await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, account_options1);
const json_account = JSON.parse(res).response.reply;
console.log(json_account);
const dummy_sdk = make_dummy_object_sdk(json_account);
await bucketspace_fs.create_bucket({ name: test_bucket }, dummy_sdk);
await bucketspace_fs.create_bucket({ name: test_bucket2 }, dummy_sdk);
});

afterEach(async () => {
await bucketspace_fs.delete_bucket_lifecycle({ name: test_bucket });
await bucketspace_fs.delete_bucket_lifecycle({ name: test_bucket2 });
});

afterAll(async () => {
await fs_utils.folder_delete(`${root_path}/${test_bucket}`);
await fs_utils.folder_delete(`${root_path}/${test_bucket2}`);
await fs_utils.folder_delete(root_path);
await fs_utils.folder_delete(config_root);
}, TEST_TIMEOUT);

it('lifecycle_cli - abort mpu by number of days ', async () => {
const lifecycle_rule = [
{
"id": "abort mpu after 3 days",
"status": "Enabled",
"filter": {"prefix": ""},
"abort_incomplete_multipart_upload": {
"days_after_initiation": 3
}
}
];
await bucketspace_fs.set_bucket_lifecycle_configuration_rules({ name: test_bucket, rules: lifecycle_rule });
await exec_manage_cli(TYPES.LIFECYCLE, '', {disable_service_validation: "true", config_root});
});

});

0 comments on commit 803f50b

Please sign in to comment.