@@ -128,17 +128,19 @@ function logTransform2 (jsonObj) {
128
128
return ( `${ line . join ( ',' ) } \n` )
129
129
}
130
130
131
- async function processLogs ( bucket , filename , callback ) {
131
+ async function processLogs ( bucket , filename ) {
132
132
console . log ( 'Node version is: ' + process . version )
133
133
console . log ( 'BUCKET ' + bucket )
134
134
console . log ( 'FILENAME ' + filename )
135
- let processedFile = filename . split ( '.' ) [ 0 ]
136
- processedFile = processedFile . split ( '_' ) [ 0 ] . concat ( '_' , processedFile . split ( '_' ) [ 1 ] )
135
+
136
+ const filePrefix = filename . split ( '.' ) [ 0 ]
137
+ const processedFile = filePrefix . split ( '_' ) . slice ( 0 , 2 ) . join ( '_' )
138
+
137
139
console . log ( 'PROCESSEDFILENAME ' + processedFile )
138
- createPipeline ( bucket , filename , processedFile , callback )
140
+ return createPipeline ( bucket , filename , processedFile )
139
141
}
140
142
141
- function createPipeline ( bucket , filename , processedFile , callback ) {
143
+ async function createPipeline ( bucket , filename , processedFile ) {
142
144
const storage = new Storage ( {
143
145
keyFilename : 'metrics-processor-service-key.json'
144
146
} )
@@ -147,47 +149,53 @@ function createPipeline (bucket, filename, processedFile, callback) {
147
149
const readBucket = storage . bucket ( bucket )
148
150
const writeBucket = storage . bucket ( 'processed-logs-nodejs' )
149
151
150
- readBucket . file ( filename ) . download ( function ( err , contents ) {
151
- if ( err ) {
152
- console . log ( 'ERROR IN DOWNLOAD ' , filename , err )
153
- // callback(500)
154
- callback ( )
155
- } else {
156
- const stringContents = contents . toString ( )
157
- console . log ( 'String length: ' , stringContents . length )
158
- const contentsArray = stringContents . split ( '\n' )
159
- console . log ( 'Array Length: ' , contentsArray . length )
160
- let results = ''
161
- for ( const line of contentsArray ) {
162
- if ( line . length === 0 ) {
163
- continue
152
+ try {
153
+ const contents = await readBucket . file ( filename ) . download ( )
154
+
155
+ const stringContents = contents . toString ( )
156
+ console . log ( `String length: ${ stringContents . length } ` )
157
+ const contentsArray = stringContents . split ( '\n' ) . filter ( line => line . length > 0 )
158
+ console . log ( `Array Length: ${ contentsArray . length } ` )
159
+
160
+ let results = ''
161
+ for ( const line of contentsArray ) {
162
+ try {
163
+ const jsonparse = JSON . parse ( line )
164
+ const printout = logTransform2 ( jsonparse )
165
+ if ( printout !== undefined ) {
166
+ results = results . concat ( printout )
164
167
}
165
- try {
166
- const jsonparse = JSON . parse ( line )
167
- const printout = logTransform2 ( jsonparse )
168
- if ( printout !== undefined ) { results = results . concat ( printout ) }
169
- } catch ( err ) { console . log ( err ) }
168
+ } catch ( err ) {
169
+ console . log ( err )
170
170
}
171
+ }
171
172
172
- writeBucket . file ( processedFile ) . save ( results , function ( err ) {
173
- if ( err ) {
174
- console . log ( 'ERROR UPLOADING: ' , err )
175
- const used = process . memoryUsage ( )
176
- for ( const key in used ) {
177
- console . log ( `${ key } ${ Math . round ( used [ key ] / 1024 / 1024 * 100 ) / 100 } MB` )
178
- }
179
- callback ( 500 )
180
- } else {
181
- console . log ( 'Upload complete' )
182
- const used = process . memoryUsage ( )
183
- for ( const key in used ) {
184
- console . log ( `${ key } ${ Math . round ( used [ key ] / 1024 / 1024 * 100 ) / 100 } MB` )
185
- }
186
- callback ( 200 )
187
- }
188
- } )
173
+ try {
174
+ await writeBucket . file ( processedFile ) . save ( results ) ;
175
+ console . log ( `Upload complete: ${ processedFile } ` )
176
+ return {
177
+ statusCode : 200 ,
178
+ message : `Upload complete: ${ processedFile } `
179
+ }
180
+ } catch ( err ) {
181
+ console . log ( `ERROR UPLOADING ${ processedFile } - ${ err } ` )
182
+ return {
183
+ statusCode : 500 ,
184
+ message : `Error uploading file: ${ processedFile } `
185
+ }
186
+ } finally {
187
+ const used = process . memoryUsage ( )
188
+ for ( const key in used ) {
189
+ console . log ( `${ key } ${ Math . round ( used [ key ] / 1024 / 1024 * 100 ) / 100 } MB` )
190
+ }
189
191
}
190
- } )
192
+ } catch ( err ) {
193
+ console . log ( 'ERROR IN DOWNLOAD ' , filename , err )
194
+ return {
195
+ statusCode : 500 ,
196
+ message : `Error downloading file: ${ filename } `
197
+ }
198
+ }
191
199
}
192
200
193
201
app . post ( '/' , async ( req , res ) => {
@@ -215,9 +223,8 @@ app.post('/', async (req, res) => {
215
223
const bucket = req . body . message . attributes . bucketId
216
224
const filename = req . body . message . attributes . objectId
217
225
console . log ( 'EVENT TYPE: ' , eventType )
218
- processLogs ( bucket , filename , function ( status ) {
219
- res . status ( status ) . send ( )
220
- } )
226
+ const { statusCode, message } = await processLogs ( bucket , filename )
227
+ res . status ( statusCode ) . send ( message )
221
228
} )
222
229
223
230
const port = process . env . PORT || 8080
0 commit comments