diff --git a/bin/events-cmds/count.js b/bin/events-cmds/count.js index c454f6c..defaf6f 100644 --- a/bin/events-cmds/count.js +++ b/bin/events-cmds/count.js @@ -2,6 +2,7 @@ exports.command = 'count [bucket] [prefix]' exports.describe = 'Count the events in [bucket] for [prefix]' const _ = require('highland'); +const lodash = require('lodash'); const now = require('moment')().utc(); exports.builder = { @@ -28,6 +29,15 @@ exports.builder = { alias: 'sp', description: 'source account profile', default: process.env.AWS_PROFILE || 'default' + }, + format: { + alias: 'f', + choices: [ + 'kinesis', // events wrapped in kinesis records + 'eventbridge', // event wrapped in cloudwatch event format, delimited by EOL + 'raw' // raw event format, delimited by EOL + ], + default: 'kinesis' // original } } @@ -36,42 +46,8 @@ exports.handler = (argv) => { common.print(argv); - const get = (obj) => { - return common.s3.get(obj) - .flatMap((data) => { - const key = data.key; - const records = data.obj.Records; - - return _((push, next) => { - - let record = records.shift(); - - if (record) { - try { - let event = new Buffer(record.kinesis.data, 'base64').toString('utf8'); - event = JSON.parse(event); - - push(null, { - key: key, - record: record, - event: event - }); - } catch (err) { - console.log(err); - push(err); - } - - next(); - } else { - push(null, _.nil); - } - }) - }) - ; - } - common.s3.paginate() - .map(obj => get(obj)) + .map(obj => common.s3.getEvents(obj)) .parallel(argv.parallel) .reduce({}, (counters, cur) => { diff --git a/bin/events-cmds/display.js b/bin/events-cmds/display.js index 0fa3d71..8d7f966 100644 --- a/bin/events-cmds/display.js +++ b/bin/events-cmds/display.js @@ -2,6 +2,7 @@ exports.command = 'display [bucket] [prefix]' exports.describe = 'Display the events in [bucket] for [prefix]' const _ = require('highland'); +const lodash = require('lodash'); const now = require('moment')().utc(); exports.builder = { @@ -32,6 +33,15 @@ exports.builder = { alias: 'sp', description: 'source account profile', default: process.env.AWS_PROFILE || 'default' + }, + format: { + alias: 'f', + choices: [ + 'kinesis', // events wrapped in kinesis records + 'eventbridge', // event wrapped in cloudwatch event format, delimited by EOL + 'raw' // raw event format, delimited by EOL + ], + default: 'kinesis' // original } } @@ -40,44 +50,10 @@ exports.handler = (argv) => { common.print(argv); - const get = (obj) => { - return common.s3.get(obj) - .flatMap((data) => { - const key = data.key; - const records = data.obj.Records; - - return _((push, next) => { - - let record = records.shift(); - - if (record) { - try { - let event = new Buffer(record.kinesis.data, 'base64').toString('utf8'); - event = JSON.parse(event); - - push(null, { - key: key, - record: record, - event: event - }); - } catch (err) { - console.log(err); - push(err); - } - - next(); - } else { - push(null, _.nil); - } - }) - }) - ; - } - const filterByType = obj => argv.type === '*' || argv.type === obj.event.type; common.s3.paginate() - .map(obj => get(obj)) + .map(obj => common.s3.getEvents(obj)) .parallel(argv.parallel) .filter(filterByType) diff --git a/bin/events-cmds/replay.js b/bin/events-cmds/replay.js index 49bf08d..cd661b0 100644 --- a/bin/events-cmds/replay.js +++ b/bin/events-cmds/replay.js @@ -2,6 +2,7 @@ exports.command = 'replay [bucket] [prefix]' exports.describe = 'Replay the events in [bucket] for [prefix]' const _ = require('highland'); +const lodash = require('lodash'); const now = require('moment')().utc(); exports.builder = { @@ -57,7 +58,17 @@ exports.builder = { alias: 'tp', description: 'target account profile', default: process.env.AWS_PROFILE || 'default' - } + }, + // TODO target format + format: { + alias: 'f', + choices: [ + 'kinesis', // events wrapped in kinesis records + 'eventbridge', // event wrapped in cloudwatch event format, delimited by EOL + 'raw' // raw event format, delimited by EOL + ], + default: 'kinesis' // original + }, } exports.handler = (argv) => { @@ -65,40 +76,6 @@ exports.handler = (argv) => { common.print(argv); - const get = (obj) => { - return common.s3.get(obj) - .flatMap((data) => { - const key = data.key; - const records = data.obj.Records; - - return _((push, next) => { - - let record = records.shift(); - - if (record) { - try { - let event = new Buffer(record.kinesis.data, 'base64').toString('utf8'); - event = JSON.parse(event); - - push(null, { - key: key, - record: record, - event: event - }); - } catch (err) { - console.log(err); - push(err); - } - - next(); - } else { - push(null, _.nil); - } - }) - }) - ; - } - const filterByType = obj => argv.type === '*' || argv.type === obj.event.type; let batched = []; @@ -117,7 +94,9 @@ exports.handler = (argv) => { } else { - let buf = new Buffer(JSON.stringify({ + // TODO format x as a Record (kinesis or dynamo) or cw event + + let buf = Buffer.from(JSON.stringify({ Records: batched.concat(x) })); @@ -134,7 +113,7 @@ exports.handler = (argv) => { const replay = function(s) { return s - .map(obj => get(obj)) + .map(obj => common.s3.getEvents(obj)) .parallel(argv.parallel) .filter(filterByType) diff --git a/lib/common.js b/lib/common.js index 5780bd4..b83d0fe 100644 --- a/lib/common.js +++ b/lib/common.js @@ -10,6 +10,7 @@ const debug = require('debug')('common'); aws.config.setPromisesDependency(BbPromise); function Common(opts) { + this.options = opts; this.s3 = new S3(opts); this.lambda = new Lambda(opts); } @@ -19,6 +20,9 @@ module.exports = (opts) => { } Common.prototype.print = data => console.log(JSON.stringify(data, null, 2)); +Common.prototype.isKinesis = () => this.options.format === 'kinsis'; +Common.prototype.isEventBridge = () => this.options.format === 'eventbridge'; +Common.prototype.isRaw = () => this.options.format === 'raw'; function S3(opts) { this.options = opts; @@ -97,13 +101,17 @@ S3.prototype.get = function (obj) { // debug('decompress file: %j', data); // let gunzip = _.wrapCallback(zlib.gunzip); - return this.gunzip(new Buffer(data.Body)); + if (self.isKinesis()) { + return this.gunzip(Buffer.from(data.Body)); + } else { + return Buffer.from(data.Body).toString(); + } }) .then(data => { // debug('parse file: %j', data.toString()); return { key: obj.Key, - obj: JSON.parse(data) + obj: self.isKinesis() ? JSON.parse(data) : data }; }) .catch(err => { @@ -114,6 +122,42 @@ S3.prototype.get = function (obj) { ; } +S3.prototype.getEvents = function (obj) { + return this.s3.get(obj) + .flatMap((data) => { + const key = data.key; + const records = this.isKinesis() ? data.obj.Records : lodash.castArray(data.obj.split('\n')); + + return _((push, next) => { + + let record = records.shift(); + + if (record) { + try { + let event = this.isKinesis() ? + Buffer.from(record.kinesis.data, 'base64').toString('utf8') : + record; + event = JSON.parse(event); + + push(null, { + key: key, + record: this.isKinesis() ? record : {}, // TODO cw + event: event // TODO cw + }); + } catch (err) { + console.log(err); + push(err); + } + + next(); + } else { + push(null, _.nil); + } + }) + }) + ; +} + function Lambda(opts) { this.options = opts;