Skip to content

Commit

Permalink
Merge pull request #8286 from jackyalbo/jacky-pers_log
Browse files Browse the repository at this point in the history
Fixing bucket logging after refactor
  • Loading branch information
jackyalbo authored Aug 19, 2024
2 parents 528a6e2 + 8c21f68 commit 46ccb82
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 16 deletions.
16 changes: 9 additions & 7 deletions src/manage_nsfs/manage_nsfs_logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@

const config = require('../../config');
const { throw_cli_error, write_stdout_response} = require('../manage_nsfs/manage_nsfs_cli_utils');
const nc_mkm = require('../manage_nsfs/nc_master_key_manager').get_instance();
const ManageCLIError = require('../manage_nsfs/manage_nsfs_cli_errors').ManageCLIError;
const ManageCLIResponse = require('../manage_nsfs/manage_nsfs_cli_responses').ManageCLIResponse;
const { export_logs_to_target } = require('../util/bucket_logs_utils');
const http_utils = require('../util/http_utils');
const AWS = require('aws-sdk');

// This command goes over the logs in the persistent log and move the entries to log objects in the target buckets
async function export_bucket_logging(config_fs) {
let config_fs;
/** This command goes over the logs in the persistent log and move the entries to log objects in the target buckets
/* @param {import('../sdk/config_fs').ConfigFS} shared_config_fs
*/
async function export_bucket_logging(shared_config_fs) {
config_fs = shared_config_fs;
const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`;
const noobaa_con = new AWS.S3({
endpoint,
Expand All @@ -21,7 +24,7 @@ async function export_bucket_logging(config_fs) {
agent: http_utils.get_unsecured_agent(endpoint)
}
});
const success = await export_logs_to_target(config_fs, noobaa_con, get_bucket_owner_keys);
const success = await export_logs_to_target(config_fs.fs_context, noobaa_con, get_bucket_owner_keys);
if (success) {
write_stdout_response(ManageCLIResponse.LoggingExported);
} else {
Expand All @@ -31,15 +34,14 @@ async function export_bucket_logging(config_fs) {

/**
* return bucket owner's access and secret key
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {string} log_bucket_name
* @returns {Promise<Object>}
*/
async function get_bucket_owner_keys(config_fs, log_bucket_name) {
async function get_bucket_owner_keys(log_bucket_name) {
const log_bucket_config_data = await config_fs.get_bucket_by_name(log_bucket_name);
const log_bucket_owner = log_bucket_config_data.bucket_owner;
const owner_config_data = await config_fs.get_account_by_name(log_bucket_owner, { show_secrets: true, decrypt_secret_key: true });
return nc_mkm.decrypt_access_keys(owner_config_data);
return owner_config_data.access_keys;
}

exports.export_bucket_logging = export_bucket_logging;
7 changes: 5 additions & 2 deletions src/test/system_tests/test_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ function get_coretest_path() {
* @param {object} options
* @returns {Promise<string>}
*/
async function exec_manage_cli(type, action, options, is_silent) {
async function exec_manage_cli(type, action, options, is_silent, env) {
let flags = ``;
for (const key in options) {
if (options[key] !== undefined) {
Expand All @@ -264,7 +264,10 @@ async function exec_manage_cli(type, action, options, is_silent) {

const command = `node src/cmd/manage_nsfs ${type} ${action} ${flags}`;
try {
const res = await os_utils.exec(command, { return_stdout: true });
const res = await os_utils.exec(command, {
return_stdout: true,
env,
});
return res;
} catch (err) {
console.error('test_utils.exec_manage_cli error', err);
Expand Down
1 change: 1 addition & 0 deletions src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ require('./test_upgrade_scripts.js');
require('./test_tiering_ttl_worker');
// require('./test_tiering_upload');
//require('./test_s3_worm');
require('./test_bucket_logging');

// UPGRADE
// require('./test_postgres_upgrade'); // TODO currently working with mongo -> once changing to postgres - need to uncomment
Expand Down
1 change: 1 addition & 0 deletions src/test/unit_tests/nc_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require('./test_nsfs_glacier_backend');
require('./test_s3_bucket_policy');
require('./test_nsfs_versioning');
require('./test_bucketspace_versioning');
require('./test_nc_bucket_logging');

// TODO: uncomment when supported
//require('./test_s3_ops');
Expand Down
File renamed without changes.
50 changes: 50 additions & 0 deletions src/test/unit_tests/test_nc_bucket_logging.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (C) 2024 NooBaa */
'use strict';

const path = require('path');
const mocha = require('mocha');
const assert = require('assert');
const fs_utils = require('../../util/fs_utils');
const nb_native = require('../../util/nb_native');
const { get_process_fs_context } = require('../../util/native_fs_utils');
const { ManageCLIResponse } = require('../../manage_nsfs/manage_nsfs_cli_responses');
const { exec_manage_cli, get_coretest_path, TMP_PATH } = require('../system_tests/test_utils');
const { TYPES, ACTIONS } = require('../../manage_nsfs/manage_nsfs_constants');

const DEFAULT_FS_CONFIG = get_process_fs_context();

const coretest_path = get_coretest_path();
const coretest = require(coretest_path);
coretest.setup({});

mocha.describe('cli logging flow', async function() {
this.timeout(50000); // eslint-disable-line no-invalid-this
const bucket_path = path.join(TMP_PATH, 'log_bucket');
const pers_log_path = path.join(TMP_PATH, 'pers_logs');

mocha.before(async () => {
await fs_utils.create_fresh_path(pers_log_path);
await fs_utils.create_fresh_path(bucket_path);
await fs_utils.file_must_exist(bucket_path);
await exec_manage_cli(TYPES.ACCOUNT, ACTIONS.ADD, { name: 'logbucketowner', user: 'root', new_buckets_path: bucket_path});
await exec_manage_cli(TYPES.BUCKET, ACTIONS.ADD, { name: 'logbucket', path: bucket_path, owner: 'logbucketowner'});
const data = '{"noobaa_bucket_logging":"true","op":"GET","bucket_owner":"[email protected]",' +
'"source_bucket":"s3-bucket",' +
'"object_key":"/s3-bucket?list-type=2&prefix=&delimiter=%2F&encoding-type=url",' +
'"log_bucket":"logbucket",' +
'"log_prefix":"","remote_ip":"100.64.0.2",' +
'"request_uri":"/s3-bucket?list-type=2&prefix=&delimiter=%2F&encoding-type=url",' +
'"http_status":102,"request_id":"lztyrl5k-7enflf-19sm"}';
await nb_native().fs.writeFile(DEFAULT_FS_CONFIG, path.join(pers_log_path + '/', 'bucket_logging.log'),
Buffer.from(data + '\n'));
});

mocha.it('cli run logging', async function() {
const res = await exec_manage_cli(TYPES.LOGGING, '', {}, false, { 'GUARANTEED_LOGS_PATH': pers_log_path});
const entries = await nb_native().fs.readdir(DEFAULT_FS_CONFIG, bucket_path);
const log_objects = entries.filter(entry => !entry.name.startsWith('.'));
assert.equal(log_objects.length, 1); // 1 new log_object should have been uploaded to the bucket
const parsed = JSON.parse(res);
assert.equal(parsed.response.code, ManageCLIResponse.LoggingExported.code);
});
});
13 changes: 6 additions & 7 deletions src/util/bucket_logs_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils');
const Semaphore = require('../util/semaphore');
const P = require('../util/promise');
const nb_native = require('../util/nb_native');
const AWS = require('aws-sdk');

const sem = new Semaphore(config.BUCKET_LOG_CONCURRENCY);

Expand All @@ -25,7 +26,7 @@ const BUCKET_NAME_DEL = "_";
/**
* This function will process the persistent log of bucket logging
* and will upload the log files in using provided noobaa connection
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {nb.NativeFSContext} fs_context
* @param {AWS.S3} s3_connection
* @param {function} bucket_to_owner_keys_func
*/
Expand All @@ -50,17 +51,16 @@ async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_
* This function gets a persistent log file, will go over it's entries one by one,
* and will upload the entry to the target_bucket using the provided s3 connection
* in order to know which user to use to upload to each bucket we will need to provide bucket_to_owner_keys_func
* @param {import('../sdk/config_fs').ConfigFS} config_fs
* @param {nb.NativeFSContext} fs_context
* @param {AWS.S3} s3_connection
* @param {string} log_file
* @param {function} bucket_to_owner_keys_func
* @returns {Promise<Boolean>}
*/
async function _upload_to_targets(config_fs, s3_connection, log_file, bucket_to_owner_keys_func) {
async function _upload_to_targets(fs_context, s3_connection, log_file, bucket_to_owner_keys_func) {
const bucket_streams = {};
const promises = [];
try {
const fs_context = config_fs.fs_context;
const file = new LogFile(fs_context, log_file);
dbg.log1('uploading file to target buckets', log_file);
await file.collect_and_process(async entry => {
Expand All @@ -75,13 +75,12 @@ async function _upload_to_targets(config_fs, s3_connection, log_file, bucket_to_
const upload_stream = new stream.PassThrough();
let access_keys;
try {
access_keys = await bucket_to_owner_keys_func(config_fs, target_bucket);
access_keys = await bucket_to_owner_keys_func(target_bucket);
} catch (err) {
dbg.warn('Error when trying to resolve bucket keys', err);
if (err.rpc_code === 'NO_SUCH_BUCKET') return; // If the log_bucket doesn't exist any more - nowhere to upload - just skip
}
s3_connection.config.credentials.accessKeyId = access_keys[0].access_key;
s3_connection.config.credentials.secretAccessKey = access_keys[0].secret_key;
s3_connection.config.credentials = new AWS.Credentials(access_keys[0].access_key, access_keys[0].secret_key);
const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
promises.push(sem.surround(() => P.retry({
attempts: 3,
Expand Down

0 comments on commit 46ccb82

Please sign in to comment.