Skip to content

Commit

Permalink
Finish restore-writer
Browse files Browse the repository at this point in the history
  • Loading branch information
zoltan-spotlightdata committed Sep 11, 2019
2 parents 1cc9e61 + 541b249 commit 4565306
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 1 deletion.
3 changes: 3 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ validateEnv();

import sleep from './helpers/utils/sleep';
import downloadAndExtractCache from './helpers/caches/downloadAndExtractCache';
import Writer from './writer';

const rp = request.defaults({
headers: {
Expand Down Expand Up @@ -251,6 +252,8 @@ export default class SpotlightPipeline {
}
}

export class SpotlightWriter extends Writer {}

export class NanowireError extends Error {
constructor(type, code, ...params) {
super(...params);
Expand Down
150 changes: 150 additions & 0 deletions lib/writer/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import fs from 'fs-extra';
import raven from 'raven';

import minio from '../vendor/minio-manager';

import elasticsearchConfig from '../config/elasticsearch';

import getCurrentDateStamp from '../helpers/utils/getCurrentDateStamp';

export default class Writer {
constructor(metadata) {
this.metadata = metadata;
this.pluginName = process.env.PLUGIN_ID;

const taskId = this.metadata.task._id;

this.taskWriteStream = fs.createWriteStream(`${__dirname}/../../task-${taskId}`, {
flags: 'a',
});

this.writeGroupOutput = this.writeGroupOutput.bind(this);
this.appendTaskOutput = this.appendTaskOutput.bind(this);
}

writeGroupOutput(groupOutput) {
return new Promise((resolve, reject) => {
const taskId = this.metadata.task._id;

let data = JSON.stringify({
update: {
_id: taskId,
_type: elasticsearchConfig.ES_GROUP_TYPE_GROUP,
_index: elasticsearchConfig.ES_GROUP_INDEX,
},
});

data += '\n';

data += JSON.stringify({
doc: {
meta: {
userId: this.metadata.task.userId,
projectId: this.metadata.task.projectId,
jobId: this.metadata.task.jobId,
taskId: this.metadata.task._id,
storedAt: getCurrentDateStamp(),
},
jsonLD: {
'@type': 'NaturalLanguageProcessing',
naturalLanguageProcessing: groupOutput,
},
},
doc_as_upsert: true,
});

data += '\n';

fs.writeFile(`${__dirname}/../../group-${taskId}`, data, err => {
if (err) {
raven.captureException(err);

return reject(err);
}

return resolve();
});
});
}

appendTaskOutput(taskOutput) {
const taskId = this.metadata.task._id;

let data = JSON.stringify({
update: {
_id: `${taskOutput['@id']}:${taskId}`,
_type: elasticsearchConfig.ES_GROUP_TYPE_TASK,
_index: elasticsearchConfig.ES_GROUP_INDEX,
_parent: taskId,
},
});

data += '\n';

data += JSON.stringify({
doc: taskOutput,
doc_as_upsert: true,
});

data += '\n';

this.taskWriteStream.write(data);
}

// Returns the misc object to update
// {
// "storePayloads": ["t-1/group/previous-plugin.bin","t-1/group/current-plugin.bin"]
// }
async store() {
const bucket = process.env.MINIO_BUCKET;
const jobId = this.metadata.job._id;
const taskId = this.metadata.task._id;

let storePayloads = [];

if (this.metadata.task.metadata) {
if (this.metadata.task.metadata.storePayloads) {
storePayloads = this.metadata.task.metadata.storePayloads;
}
}

this.taskWriteStream.end();

const storePromise = new Promise((resolve, reject) => {
const groupWriteStream = fs.createWriteStream(`${__dirname}/../../group-${taskId}`, {
flags: 'a',
});
const taskReadStream = fs.createReadStream(`${__dirname}/../../task-${taskId}`);

groupWriteStream.on('close', () => {
minio
.uploadFileUsingMinio(
bucket,
`${jobId}/${taskId}/group/${this.pluginName}.bin`,
`${__dirname}/../../group-${taskId}`
)
.then(() => resolve(`${jobId}/${taskId}/group/${this.pluginName}.bin`))
.catch(reject);
});

groupWriteStream.on('error', reject);

taskReadStream.pipe(groupWriteStream);
});

try {
const payloadLocation = await storePromise;

storePayloads.push(payloadLocation);

await fs.remove(`${__dirname}/../../group-${taskId}`);
await fs.remove(`${__dirname}/../../task-${taskId}`);

return {
storePayloads,
};
} catch (e) {
throw e;
}
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@spotlightdata/nanowire-plugin-js",
"version": "9.0.18",
"version": "9.0.19",
"description": "A Node.js client for Spotlight's Redis Pipeline",
"main": "dist/index.js",
"author": "Will Evans",
Expand Down
Loading

0 comments on commit 4565306

Please sign in to comment.