Skip to content

Commit

Permalink
add support for expiring objects on GET
Browse files Browse the repository at this point in the history
Signed-off-by: Utkarsh Srivastava <[email protected]>

add support for migration queue length based migration kickoffs

Signed-off-by: Utkarsh Srivastava <[email protected]>
  • Loading branch information
tangledbytes committed Feb 19, 2025
1 parent 48ffb40 commit 5ec1ef9
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 1 deletion.
10 changes: 10 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,16 @@ config.NSFS_LOW_FREE_SPACE_MB_UNLEASH = 10 * 1024;
// operations safely.
config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;

// NSFS_GLACIER_GET_FORCE_EXPIRE if set to true then any restored item in the GLACIER
// storage class will expire as soon as first GET request is received for it or
// if the previous restore time has exceed, whichever is the earlier.
config.NSFS_GLACIER_FORCE_EXPIRE_ON_GET = false;

// NSFS_GLACIER_DESIRED_MIGRATE_QUEUE_SIZE controls that how long migration queue/batch
// Once exceeded, migration calls are supposed to kick in regardless of configured
// interval
config.NSFS_GLACIER_DESIRED_MIGRATE_QUEUE_SIZE = 50;

// anonymous account name
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';

Expand Down
18 changes: 17 additions & 1 deletion src/manage_nsfs/manage_nsfs_glacier.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ async function process_migrations() {

if (
await backend.low_free_space() ||
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, GlacierBackend.MIGRATE_TIMESTAMP_FILE)
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, GlacierBackend.MIGRATE_TIMESTAMP_FILE) ||
await migrate_queue_too_long()
) {
await run_glacier_migrations(fs_context, backend);
await record_current_time(fs_context, GlacierBackend.MIGRATE_TIMESTAMP_FILE);
Expand Down Expand Up @@ -154,6 +155,21 @@ async function time_exceeded(fs_context, interval, timestamp_file) {
return false;
}

/**
* migrate_queue_too_long returns true if the underlying backend
* decides that the migrate batch size has exceeded the configured
* (NSFS_GLACIER_DESIRED_MIGRATE_QUEUE_SZ) approximate number of
* entries pending for migration
*
* @returns {Promise<boolean>}
*/
async function migrate_queue_too_long() {
const log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, GlacierBackend.MIGRATE_WAL_NAME, { locking: null });
const approx_entries = await log.approx_entries({ samples: 10 });

return approx_entries > config.NSFS_GLACIER_DESIRED_MIGRATE_QUEUE_SIZE;
}

/**
* record_current_time stores the current timestamp in ISO format into
* the given timestamp file
Expand Down
26 changes: 26 additions & 0 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,8 @@ class NamespaceFS {
}
}

await this._expire_object_if_desired(fs_context, file_path, file, stat);

await file.close(fs_context);
file = null;
object_sdk.throw_if_aborted();
Expand Down Expand Up @@ -3415,6 +3417,30 @@ class NamespaceFS {
return Math.max(in_bytes, free_from_percentage);
}

/**
* _expire_object_if_desired expires a object if the object has storage
* class set to GLACIER and NooBaa is configured for forced get based
* eviction
* @param {nb.NativeFSContext} fs_context
* @param {string} file_path
* @param {nb.NativeFile} file
* @param {nb.NativeFSStats} stat
*/
async _expire_object_if_desired(fs_context, file_path, file, stat) {
if (!config.NSFS_GLACIER_FORCE_EXPIRE_ON_GET) return;
if (
s3_utils.parse_storage_class(stat.xattr[GlacierBackend.STORAGE_CLASS_XATTR]) !== s3_utils.STORAGE_CLASS_GLACIER
) return;

// Remove all the restore related xattrs
await file.replacexattr(fs_context, {
// Set date to 1970-01-01 to force expiry
[GlacierBackend.XATTR_RESTORE_EXPIRY]: new Date(0).toISOString()
}, GlacierBackend.XATTR_RESTORE_REQUEST);

await this.append_to_migrate_wal(file_path);
}

async append_to_migrate_wal(entry) {
if (!config.NSFS_GLACIER_LOGS_ENABLED) return;

Expand Down
63 changes: 63 additions & 0 deletions src/util/persistent_logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,69 @@ class PersistentLogger {
}
}

/**
* approx_entries returns approximate number of enties in the current active file
* based on the chosen strategy and sample size
* @param {{
* strategy?: "TOP_K",
* samples?: number
* }} cfg
* @returns {Promise<number>}
*/
async approx_entries(cfg) {
const { strategy = "TOP_K", samples = 10 } = cfg;

// Open the reader with NO lock so that we don't interfere
// with the current writer
//
// We don't need any consistency guarantees etc here either so
// it's okay to even read partial writes
const reader = new NewlineReader(
this.fs_context,
this.active_path,
{ lock: null, skip_overflow_lines: true },
);

try {
let avg_length;
if (strategy === "TOP_K") {
avg_length = await this._get_top_k_entries_avg_length(reader, samples);
} else {
throw new Error("unsupported strategy:" + strategy);
}

const stat = await reader.fh.stat(this.fs_context);
return Math.round(stat.size / avg_length);
} finally {
await reader.close();
}
}

/**
* _get_top_k_entries_avg_length takes a new line reader and sample count
* and returns the average length of the entries from the sample
* @param {NewlineReader} reader
* @param {number} samples
*/
async _get_top_k_entries_avg_length(reader, samples) {
let count = 0;
let total_length = 0;
let entry = await reader.nextline();

while (entry !== null && count < samples) {
count += 1;
total_length += entry.length;

entry = await reader.nextline();
}

if (count < samples) {
dbg.log1("not enough samples in the active log file:", this.active_path, count);
}

return Math.round(total_length / count);
}

async _replace_active(log_noent) {
const inactive_file = `${this.namespace}.${Date.now()}.log`;
const inactive_file_path = path.join(this.dir, inactive_file);
Expand Down

0 comments on commit 5ec1ef9

Please sign in to comment.