Skip to content

Commit 8ef1f26

Browse files
committed
feat: handling submit by chunks
1 parent f314bde commit 8ef1f26

File tree

3 files changed

+106
-30
lines changed

3 files changed

+106
-30
lines changed

src/job-client.js

+81-30
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ const logger = require('log4js').getLogger("modzy.job-client");
33
const fs = require('fs');
44
const FormData = require('form-data');
55
const ApiError = require('./api-error.js');
6+
const parseUrl = require('url').parse;
7+
const humanReadToBytes = require("./size");
8+
const {byteArrayToChunks, fileToChunks} = require("./utils");
69

710
/**
811
* Utility class that mask the interaction with the job api
@@ -150,6 +153,7 @@ class JobClient{
150153
*/
151154
submitJobFiles(modelId, versionId, fileSources) {
152155
let job = {};
156+
let chunkSize = 1024*1024;
153157
return this.submitJob(
154158
{
155159
"model": {
@@ -160,12 +164,25 @@ class JobClient{
160164
).then(
161165
(openJob)=>{
162166
job = openJob;
163-
let inputPomise = Promise.resolve(openJob);
167+
return this.getFeatures();
168+
}
169+
).then(
170+
(features)=>{
171+
try{
172+
return humanReadToBytes(features["inputChunkMaximumSize"]);
173+
} catch (error){
174+
logger.warn(`unexpected error extracting inputChunkMaximumSize from ${features}, error: ${error}`);
175+
return 1024*1024;//default 1Mi
176+
}
177+
}
178+
).then(
179+
(maxChunkSize)=>{
180+
let inputPomise = Promise.resolve(job);
164181
Object.keys(fileSources).forEach(
165182
inputItemKey => {
166183
Object.keys(fileSources[inputItemKey]).forEach(
167184
dataItemKey => {
168-
inputPomise = inputPomise.then( () => this.appendInput(openJob, inputItemKey, dataItemKey, fileSources[inputItemKey][dataItemKey]) );
185+
inputPomise = inputPomise.then( () => this.appendInput(job, inputItemKey, dataItemKey, fileSources[inputItemKey][dataItemKey], maxChunkSize) );
169186
}
170187
);
171188
}
@@ -177,7 +194,7 @@ class JobClient{
177194
return this.closeJob(job);
178195
}
179196
).catch(
180-
(apiError) => {
197+
(apiError) => {
181198
//Try to cancel the job
182199
return this.cancelJob(job.jobIdentifier)
183200
.then((_)=>{throw(apiError);})
@@ -361,37 +378,46 @@ class JobClient{
361378
);
362379
}
363380

364-
appendInput(job, inputItemKey, dataItemKey, value){
381+
appendInput(job, inputItemKey, dataItemKey, inputValue, chunkSize){
365382
const requestURL = `${this.baseURL}/${job.jobIdentifier}/${inputItemKey}/${dataItemKey}`;
366-
logger.debug(`appendInput(${job.jobIdentifier}, ${inputItemKey}, ${dataItemKey}) POST ${requestURL}`);
367-
const data = new FormData();
368-
if( value.byteLength !== undefined ){
369-
data.append("input", value, dataItemKey );
370-
} else{
371-
//If is a file we need to trick axios
372-
data.append("input", fs.createReadStream(value), { knownLength: fs.statSync(value).size } );
383+
384+
let iterator;
385+
if( inputValue.byteLength !== undefined ){
386+
iterator = byteArrayToChunks(inputValue, chunkSize);
387+
} else {
388+
iterator = fileToChunks(inputValue, chunkSize);
373389
}
390+
return this.appendInputChunk(requestURL, iterator, chunkSize, dataItemKey, 0);
391+
}
374392

375-
return axios.post(
376-
requestURL,
377-
data,
378-
{
379-
headers: {
380-
...data.getHeaders(),
381-
"Content-Length": data.getLengthSync(),
382-
'Authorization': `ApiKey ${this.apiKey}`
383-
}
384-
}
385-
)
393+
appendInputChunk(requestURL, asyncGenerator, chunkSize, dataItemKey, chunkCount){
394+
return asyncGenerator
395+
.next()
386396
.then(
387-
( response )=>{
388-
logger.info(`appendInput(${job.jobIdentifier}, ${inputItemKey}, ${dataItemKey}) :: ${response.status} ${response.statusText}`);
389-
return job;
390-
}
391-
)
392-
.catch(
393-
( error )=>{
394-
throw( new ApiError( error ) );
397+
(entry)=>{
398+
if( entry && entry.value ){
399+
return new Promise(
400+
(resolve, reject)=>{
401+
logger.debug(`appendInputChunk(${requestURL}) [${chunkCount}] POST ${entry.value.length} bytes`);
402+
const requestObj = parseUrl(requestURL);
403+
requestObj.headers = { 'Authorization': `ApiKey ${this.apiKey}`};
404+
const data = new FormData({maxDataSize: chunkSize} );
405+
data.append("input", entry.value, dataItemKey );
406+
data.submit(requestObj, function(error, response){
407+
logger.info(`appendInputChunk(${requestURL}) [${chunkCount}] :: ${response.statusCode} ${response.statusMessage}`);
408+
if( error || response.statusCode >= 400){
409+
reject( new ApiError( error, requestURL, response.statusCode, response.statusMessage ) );
410+
}
411+
resolve(response.resume());
412+
});
413+
}
414+
).then(
415+
(_)=>{
416+
return this.appendInputChunk(requestURL, asyncGenerator, chunkSize, dataItemKey,chunkCount+1);
417+
}
418+
);
419+
}
420+
return null;
395421
}
396422
);
397423
}
@@ -444,6 +470,31 @@ class JobClient{
444470
);
445471
}
446472

473+
/**
474+
* Call the Modzy API Service that return the jobs features
475+
* @return {Object} a updated job instance
476+
* @throws {ApiError} If there is something wrong with the sevice or the call
477+
*/
478+
getFeatures(){
479+
const requestURL = `${this.baseURL}/features`;
480+
logger.debug(`getFeatures() GET ${requestURL}`);
481+
return axios.get(
482+
requestURL,
483+
{headers: {'Authorization':`ApiKey ${this.apiKey}`}}
484+
)
485+
.then(
486+
( response )=>{
487+
logger.info(`getFeatures() :: ${response.status} ${response.statusText}`);
488+
return response.data;
489+
}
490+
)
491+
.catch(
492+
( error )=>{
493+
throw( new ApiError( error ) );
494+
}
495+
);
496+
}
497+
447498
/**
448499
* Call the Modzy API Service that cancel the Job by it's identifier
449500
* @param {string} jobId - Identifier of the job

src/size.js

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
const SIZE_PATTERN = /^(\d+(\.\d+)?)([a-zA-Z]{0,2})$/;
3+
4+
const DataUnit = {
5+
"i": 1, // BYTES
6+
"K": 1000, // KILOBYTES
7+
"M": 1000 * 1000, // MEGABYTES
8+
"G": 1000 * 1000 * 1000, // GIGABYTES
9+
"T": 1000 * 1000 * 1000 * 1000, // TERABYTES
10+
"Ki": 1024, // KIBIBYTES
11+
"Mi": 1024 * 1024, // MEBIBYTES
12+
"Gi": 1024 * 1024 * 1024, // GIBIBYTES
13+
"Ti": 1024 * 1024 * 1024 * 1024, // TEBIBYTES
14+
"KB": 1024, // KIBIBYTES
15+
"MB": 1024 * 1024, // MEBIBYTES
16+
"GB": 1024 * 1024 * 1024, // GIBIBYTES
17+
"TB": 1024 * 1024 * 1024 * 1024, // TEBIBYTES
18+
}
19+
20+
function humanReadToBytes(humanSize){
21+
const match = humanSize.match(SIZE_PATTERN);
22+
return parseInt(match[1])*DataUnit[match[3]]
23+
}
24+
25+
module.exports = humanReadToBytes;

src/utils.js

Whitespace-only changes.

0 commit comments

Comments
 (0)