Skip to content

Commit

Permalink
NC | Lifecycle | Add lock to lifecycle worker
Browse files Browse the repository at this point in the history
Signed-off-by: Romy <[email protected]>
  • Loading branch information
romayalon committed Mar 9, 2025
1 parent 3e038b0 commit e524d4a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 27 deletions.
2 changes: 1 addition & 1 deletion src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ async function list_connections() {
* @returns {Promise<void>}
*/
async function lifecycle_management() {
await noobaa_cli_lifecycle.run_lifecycle(config_fs);
await noobaa_cli_lifecycle.run_lifecycle_under_lock(config_fs);
}

exports.main = main;
Expand Down
30 changes: 6 additions & 24 deletions src/manage_nsfs/manage_nsfs_glacier.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ const SCAN_LOCK = 'scan.lock';

async function process_migrations() {
const fs_context = native_fs_utils.get_process_fs_context();

await lock_and_run(fs_context, CLUSTER_LOCK, async () => {
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK);
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
const backend = getGlacierBackend();

if (
Expand All @@ -40,8 +40,8 @@ async function run_glacier_migrations(fs_context, backend) {

async function process_restores() {
const fs_context = native_fs_utils.get_process_fs_context();

await lock_and_run(fs_context, CLUSTER_LOCK, async () => {
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, CLUSTER_LOCK);
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
const backend = getGlacierBackend();

if (
Expand All @@ -67,8 +67,8 @@ async function run_glacier_restore(fs_context, backend) {

async function process_expiry() {
const fs_context = native_fs_utils.get_process_fs_context();

await lock_and_run(fs_context, SCAN_LOCK, async () => {
const lock_path = path.join(config.NSFS_GLACIER_LOGS_DIR, SCAN_LOCK);
await native_fs_utils.lock_and_run(fs_context, lock_path, async () => {
const backend = getGlacierBackend();
if (
await backend.low_free_space() ||
Expand Down Expand Up @@ -212,24 +212,6 @@ function get_tz_date(hours, mins, secs, tz) {
return date;
}

/**
* lock_and_run acquires a flock and calls the given callback after
* acquiring the lock
* @param {nb.NativeFSContext} fs_context
* @param {string} lockfilename
* @param {Function} cb
*/
async function lock_and_run(fs_context, lockfilename, cb) {
const lockfd = await nb_native().fs.open(fs_context, path.join(config.NSFS_GLACIER_LOGS_DIR, lockfilename), 'w');

try {
await lockfd.fcntllock(fs_context, 'EXCLUSIVE');
await cb();
} finally {
await lockfd.close(fs_context);
}
}

exports.process_migrations = process_migrations;
exports.process_restores = process_restores;
exports.process_expiry = process_expiry;
22 changes: 20 additions & 2 deletions src/manage_nsfs/nc_lifecycle.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,38 @@

const dbg = require('../util/debug_module')(__filename);
const _ = require('lodash');
const path = require('path');
const util = require('util');
const P = require('../util/promise');
const config = require('../../config');
const nb_native = require('../util/nb_native');
const NsfsObjectSDK = require('../sdk/nsfs_object_sdk');
const native_fs_utils = require('../util/native_fs_utils');
const ManageCLIError = require('./manage_nsfs_cli_errors').ManageCLIError;
const { throw_cli_error, get_service_status, NOOBAA_SERVICE_NAME } = require('./manage_nsfs_cli_utils');

// TODO:
// TODO:
// implement
// 1. notifications
// 2. POSIX scanning and filtering per rule
// 3. GPFS ILM policy and apply for scanning and filtering optimization

const CLUSTER_LOCK = 'cluster.lock';

/**
* run_lifecycle_under_lock runs the lifecycle workflow under a file system lock
* lifecycle workflow is being locked to prevent multiple instances from running the lifecycle workflow
* @param {import('../sdk/config_fs').ConfigFS} config_fs
*/
async function run_lifecycle_under_lock(config_fs) {
const lock_path = path.join(config_fs.config_root, CLUSTER_LOCK);
await native_fs_utils.lock_and_run(config_fs.fs_context, lock_path, async () => {
dbg.log0('run_lifecycle_under_lock acquired lock - start lifecycle');
await run_lifecycle(config_fs);
dbg.log0('run_lifecycle_under_lock done lifecycle - released lock');
});
}

/**
* run_lifecycle runs the lifecycle workflow
* @param {import('../sdk/config_fs').ConfigFS} config_fs
Expand Down Expand Up @@ -204,5 +222,5 @@ async function update_lifecycle_rules_last_sync(config_fs, bucket_json, j, num_o
}

// EXPORTS
exports.run_lifecycle = run_lifecycle;
exports.run_lifecycle_under_lock = run_lifecycle_under_lock;

21 changes: 21 additions & 0 deletions src/util/native_fs_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,24 @@ function translate_error_codes(err, entity) {
return err;
}

/**
* lock_and_run acquires a flock and calls the given callback after
* acquiring the lock
* @param {nb.NativeFSContext} fs_context
* @param {string} lock_path
* @param {Function} cb
*/
async function lock_and_run(fs_context, lock_path, cb) {
const lockfd = await nb_native().fs.open(fs_context, lock_path, 'w');

try {
await lockfd.fcntllock(fs_context, 'EXCLUSIVE');
await cb();
} finally {
await lockfd.close(fs_context);
}
}

exports.get_umasked_mode = get_umasked_mode;
exports._make_path_dirs = _make_path_dirs;
exports._create_path = _create_path;
Expand Down Expand Up @@ -754,3 +772,6 @@ exports.get_bucket_tmpdir_full_path = get_bucket_tmpdir_full_path;
exports.get_bucket_tmpdir_name = get_bucket_tmpdir_name;
exports.entity_enum = entity_enum;
exports.translate_error_codes = translate_error_codes;

exports.lock_and_run = lock_and_run;

0 comments on commit e524d4a

Please sign in to comment.