Skip to content

add-multi-format-support #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 11 additions & 35 deletions bin/events-cmds/count.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ exports.command = 'count [bucket] [prefix]'
exports.describe = 'Count the events in [bucket] for [prefix]'

const _ = require('highland');
const lodash = require('lodash');
const now = require('moment')().utc();

exports.builder = {
Expand All @@ -28,6 +29,15 @@ exports.builder = {
alias: 'sp',
description: 'source account profile',
default: process.env.AWS_PROFILE || 'default'
},
format: {
alias: 'f',
choices: [
'kinesis', // events wrapped in kinesis records
'eventbridge', // event wrapped in cloudwatch event format, delimited by EOL
'raw' // raw event format, delimited by EOL
],
default: 'kinesis' // original
}
}

Expand All @@ -36,42 +46,8 @@ exports.handler = (argv) => {

common.print(argv);

const get = (obj) => {
return common.s3.get(obj)
.flatMap((data) => {
const key = data.key;
const records = data.obj.Records;

return _((push, next) => {

let record = records.shift();

if (record) {
try {
let event = new Buffer(record.kinesis.data, 'base64').toString('utf8');
event = JSON.parse(event);

push(null, {
key: key,
record: record,
event: event
});
} catch (err) {
console.log(err);
push(err);
}

next();
} else {
push(null, _.nil);
}
})
})
;
}

common.s3.paginate()
.map(obj => get(obj))
.map(obj => common.s3.getEvents(obj))
.parallel(argv.parallel)

.reduce({}, (counters, cur) => {
Expand Down
46 changes: 11 additions & 35 deletions bin/events-cmds/display.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ exports.command = 'display [bucket] [prefix]'
exports.describe = 'Display the events in [bucket] for [prefix]'

const _ = require('highland');
const lodash = require('lodash');
const now = require('moment')().utc();

exports.builder = {
Expand Down Expand Up @@ -32,6 +33,15 @@ exports.builder = {
alias: 'sp',
description: 'source account profile',
default: process.env.AWS_PROFILE || 'default'
},
format: {
alias: 'f',
choices: [
'kinesis', // events wrapped in kinesis records
'eventbridge', // event wrapped in cloudwatch event format, delimited by EOL
'raw' // raw event format, delimited by EOL
],
default: 'kinesis' // original
}
}

Expand All @@ -40,44 +50,10 @@ exports.handler = (argv) => {

common.print(argv);

const get = (obj) => {
return common.s3.get(obj)
.flatMap((data) => {
const key = data.key;
const records = data.obj.Records;

return _((push, next) => {

let record = records.shift();

if (record) {
try {
let event = new Buffer(record.kinesis.data, 'base64').toString('utf8');
event = JSON.parse(event);

push(null, {
key: key,
record: record,
event: event
});
} catch (err) {
console.log(err);
push(err);
}

next();
} else {
push(null, _.nil);
}
})
})
;
}

const filterByType = obj => argv.type === '*' || argv.type === obj.event.type;

common.s3.paginate()
.map(obj => get(obj))
.map(obj => common.s3.getEvents(obj))
.parallel(argv.parallel)

.filter(filterByType)
Expand Down
53 changes: 16 additions & 37 deletions bin/events-cmds/replay.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ exports.command = 'replay [bucket] [prefix]'
exports.describe = 'Replay the events in [bucket] for [prefix]'

const _ = require('highland');
const lodash = require('lodash');
const now = require('moment')().utc();

exports.builder = {
Expand Down Expand Up @@ -57,48 +58,24 @@ exports.builder = {
alias: 'tp',
description: 'target account profile',
default: process.env.AWS_PROFILE || 'default'
}
},
// TODO target format
format: {
alias: 'f',
choices: [
'kinesis', // events wrapped in kinesis records
'eventbridge', // event wrapped in cloudwatch event format, delimited by EOL
'raw' // raw event format, delimited by EOL
],
default: 'kinesis' // original
},
}

exports.handler = (argv) => {
const common = require('../../lib/common')(argv);

common.print(argv);

const get = (obj) => {
return common.s3.get(obj)
.flatMap((data) => {
const key = data.key;
const records = data.obj.Records;

return _((push, next) => {

let record = records.shift();

if (record) {
try {
let event = new Buffer(record.kinesis.data, 'base64').toString('utf8');
event = JSON.parse(event);

push(null, {
key: key,
record: record,
event: event
});
} catch (err) {
console.log(err);
push(err);
}

next();
} else {
push(null, _.nil);
}
})
})
;
}

const filterByType = obj => argv.type === '*' || argv.type === obj.event.type;

let batched = [];
Expand All @@ -117,7 +94,9 @@ exports.handler = (argv) => {
}
else {

let buf = new Buffer(JSON.stringify({
// TODO format x as a Record (kinesis or dynamo) or cw event

let buf = Buffer.from(JSON.stringify({
Records: batched.concat(x)
}));

Expand All @@ -134,7 +113,7 @@ exports.handler = (argv) => {

const replay = function(s) {
return s
.map(obj => get(obj))
.map(obj => common.s3.getEvents(obj))
.parallel(argv.parallel)

.filter(filterByType)
Expand Down
48 changes: 46 additions & 2 deletions lib/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const debug = require('debug')('common');
aws.config.setPromisesDependency(BbPromise);

function Common(opts) {
this.options = opts;
this.s3 = new S3(opts);
this.lambda = new Lambda(opts);
}
Expand All @@ -19,6 +20,9 @@ module.exports = (opts) => {
}

Common.prototype.print = data => console.log(JSON.stringify(data, null, 2));
Common.prototype.isKinesis = () => this.options.format === 'kinsis';
Common.prototype.isEventBridge = () => this.options.format === 'eventbridge';
Common.prototype.isRaw = () => this.options.format === 'raw';

function S3(opts) {
this.options = opts;
Expand Down Expand Up @@ -97,13 +101,17 @@ S3.prototype.get = function (obj) {

// debug('decompress file: %j', data);
// let gunzip = _.wrapCallback(zlib.gunzip);
return this.gunzip(new Buffer(data.Body));
if (self.isKinesis()) {
return this.gunzip(Buffer.from(data.Body));
} else {
return Buffer.from(data.Body).toString();
}
})
.then(data => {
// debug('parse file: %j', data.toString());
return {
key: obj.Key,
obj: JSON.parse(data)
obj: self.isKinesis() ? JSON.parse(data) : data
};
})
.catch(err => {
Expand All @@ -114,6 +122,42 @@ S3.prototype.get = function (obj) {
;
}

S3.prototype.getEvents = function (obj) {
return this.s3.get(obj)
.flatMap((data) => {
const key = data.key;
const records = this.isKinesis() ? data.obj.Records : lodash.castArray(data.obj.split('\n'));

return _((push, next) => {

let record = records.shift();

if (record) {
try {
let event = this.isKinesis() ?
Buffer.from(record.kinesis.data, 'base64').toString('utf8') :
record;
event = JSON.parse(event);

push(null, {
key: key,
record: this.isKinesis() ? record : {}, // TODO cw
event: event // TODO cw
});
} catch (err) {
console.log(err);
push(err);
}

next();
} else {
push(null, _.nil);
}
})
})
;
}

function Lambda(opts) {
this.options = opts;

Expand Down