1
1
import * as _ from 'lodash-es' ;
2
2
import { errors } from '@restorecommerce/chassis-srv' ;
3
3
import * as kafkaClient from '@restorecommerce/kafka-client' ;
4
- import { AuthZAction , ACSAuthZ , updateConfig , DecisionResponse , Operation , PolicySetRQResponse , ACSResource , CtxResource } from '@restorecommerce/acs-client' ;
4
+ import {
5
+ AuthZAction ,
6
+ ACSAuthZ ,
7
+ updateConfig ,
8
+ DecisionResponse ,
9
+ Operation ,
10
+ PolicySetRQResponse ,
11
+ CtxResource ,
12
+ CustomQueryArgs
13
+ } from '@restorecommerce/acs-client' ;
5
14
import {
6
15
JobServiceImplementation as SchedulingServiceServiceImplementation ,
7
- JobFailed , JobDone , DeepPartial , JobList , JobListResponse , Data ,
16
+ JobFailed , JobDone , DeepPartial , JobList , JobListResponse ,
8
17
Backoff_Type , JobOptions_Priority , JobReadRequest , JobReadRequest_SortOrder ,
9
- JobResponse
18
+ JobResponse , Job ,
10
19
} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job.js' ;
11
20
import { createClient , RedisClientType } from 'redis' ;
12
21
import { NewJob , Priority } from './types.js' ;
@@ -17,9 +26,15 @@ import * as uuid from 'uuid';
17
26
import { Logger } from 'winston' ;
18
27
import { Response_Decision } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/access_control.js' ;
19
28
import { Attribute } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/attribute.js' ;
20
- import { DeleteRequest , DeleteResponse , ResourceListResponse } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base.js' ;
21
- import { Queue , QueueOptions , Job } from 'bullmq' ;
29
+ import {
30
+ DeleteRequest ,
31
+ DeleteResponse ,
32
+ ResourceListResponse
33
+ } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/resource_base.js' ;
34
+ import { Queue , QueueOptions , Job as BullJob } from 'bullmq' ;
22
35
import { Status } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/status.js' ;
36
+ import { Subject } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/auth.js' ;
37
+ import { Meta } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/meta.js' ;
23
38
24
39
const { parseExpression } = pkg ;
25
40
const JOB_DONE_EVENT = 'jobDone' ;
@@ -34,34 +49,26 @@ const QUEUE_CLEANUP = 'queueCleanup';
34
49
*/
35
50
export class SchedulingService implements SchedulingServiceServiceImplementation {
36
51
37
- jobEvents : kafkaClient . Topic ;
38
- logger : Logger ;
39
-
40
52
queuesConfigList : any ;
41
53
queuesList : Queue [ ] ;
42
54
defaultQueueName : string ;
43
-
44
- redisClient : RedisClientType < any , any > ;
45
55
resourceEventsEnabled : boolean ;
46
56
canceledJobs : Set < string > ;
47
- bullOptions : any ;
48
- cfg : any ;
49
- authZ : ACSAuthZ ;
50
57
authZCheck : boolean ;
51
58
repeatJobIdRedisClient : RedisClientType < any , any > ;
52
59
53
-
54
60
constructor (
55
- jobEvents : kafkaClient . Topic ,
56
- private redisConfig : any , logger : any , redisClient : RedisClientType < any , any > ,
57
- bullOptions : any , cfg : any , authZ : ACSAuthZ ) {
58
- this . jobEvents = jobEvents ;
61
+ private readonly jobEvents : kafkaClient . Topic ,
62
+ private readonly redisConfig : any ,
63
+ private readonly logger : Logger ,
64
+ private readonly redisClient : RedisClientType < any , any > ,
65
+ private readonly bullOptions : any ,
66
+ private readonly cfg : any ,
67
+ private readonly authZ : ACSAuthZ
68
+ ) {
59
69
this . resourceEventsEnabled = true ;
60
- this . bullOptions = bullOptions ;
61
- this . logger = logger ;
62
70
this . queuesList = [ ] ;
63
71
this . queuesConfigList = [ ] ;
64
- this . redisClient = redisClient ;
65
72
66
73
const repeatJobIdCfg = cfg . get ( 'redis' ) ;
67
74
repeatJobIdCfg . database = cfg . get ( 'redis:db-indexes:db-repeatJobId' ) ;
@@ -74,7 +81,6 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
74
81
} ) . catch ( err => logger . error ( 'Redis client error for repeatable job store' , { err } ) ) ;
75
82
76
83
this . canceledJobs = new Set < string > ( ) ;
77
- this . cfg = cfg ;
78
84
this . authZ = authZ ;
79
85
this . authZCheck = this . cfg . get ( 'authorization:enabled' ) ;
80
86
@@ -236,7 +242,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
236
242
// for jobs created via Kafka currently there are no acs checks
237
243
this . disableAC ( ) ;
238
244
const createDispatch = [ ] ;
239
- let result : Job [ ] = [ ] ;
245
+ let result : BullJob [ ] = [ ] ;
240
246
const logger = this . logger ;
241
247
const create = this . create ;
242
248
// Get the jobs
@@ -379,7 +385,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
379
385
* @param millis
380
386
* @param opts
381
387
*/
382
- getNextMillis ( millis , opts ) {
388
+ getNextMillis ( millis : number , opts : any ) {
383
389
if ( opts ?. every ) {
384
390
return Math . floor ( millis / opts . every ) * opts . every + opts . every ;
385
391
}
@@ -405,7 +411,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
405
411
}
406
412
}
407
413
408
- private md5 ( str ) {
414
+ private md5 ( str : string ) {
409
415
return crypto
410
416
. createHash ( 'md5' )
411
417
. update ( str )
@@ -419,7 +425,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
419
425
* @param repeat - job repeate options
420
426
* @param jobId - job id
421
427
*/
422
- async storeRepeatKey ( repeatId , scsJobId , options ) {
428
+ async storeRepeatKey ( repeatId : string , scsJobId : string , options : any ) {
423
429
try {
424
430
if ( repeatId && scsJobId ) {
425
431
this . logger . info ( 'Repeat key mapped to external SCS JobId' , { repeatId, scsJobId } ) ;
@@ -492,7 +498,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
492
498
}
493
499
}
494
500
495
- const result : Job [ ] = [ ] ;
501
+ const result : BullJob [ ] = [ ] ;
496
502
// Scheduling jobs
497
503
for ( let i = 0 ; i < jobs . length ; i += 1 ) {
498
504
const job = jobs [ i ] ;
@@ -536,7 +542,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
536
542
537
543
if ( ! job . data . meta ) {
538
544
const now = new Date ( ) ;
539
- const metaObj = {
545
+ const metaObj : Meta = {
540
546
created : now ,
541
547
modified : now ,
542
548
modified_by : '' ,
@@ -573,7 +579,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
573
579
} ;
574
580
575
581
if ( ( bullOptions as any ) . timeout === 1 ) {
576
- delete bullOptions [ ' timeout' ] ;
582
+ delete ( bullOptions as any ) . timeout ;
577
583
}
578
584
579
585
// Match the Job Type with the Queue Name and add the Job to this Queue.
@@ -638,10 +644,10 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
638
644
return jobListResponse ;
639
645
}
640
646
641
- private filterByOwnerShip ( customArgsObj , result ) {
647
+ private filterByOwnerShip ( customArgsObj : CustomQueryArgs , result : BullJob [ ] ) : BullJob [ ] {
642
648
// applying filter based on custom arguments (filterByOwnerShip)
643
- let filteredResult : Job [ ] = [ ] ;
644
- const customArgs = ( customArgsObj ) ?. custom_arguments ;
649
+ const filteredResult = new Array < BullJob > ( ) ;
650
+ const customArgs = customArgsObj ?. custom_arguments ;
645
651
if ( customArgs ?. value ) {
646
652
let customArgsFilter ;
647
653
try {
@@ -675,8 +681,8 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
675
681
}
676
682
}
677
683
}
678
- } ) ;
679
- filteredResult = filteredResult . concat ( filteredResp ) ;
684
+ } ) as BullJob [ ] ;
685
+ filteredResult . push ( ... filteredResp ) ;
680
686
}
681
687
return filteredResult ;
682
688
} else {
@@ -738,24 +744,18 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
738
744
return { operation_status : acsResponse . operation_status } ;
739
745
}
740
746
741
- let result = new Array < Job > ( ) ;
747
+ let result : Array < BullJob > ;
742
748
if ( _ . isEmpty ( request ) || _ . isEmpty ( request . filter )
743
749
&& ( ! request . filter || ! request . filter . job_ids
744
750
|| _ . isEmpty ( request . filter . job_ids ) )
745
751
&& ( ! request . filter || ! request . filter . type ||
746
752
_ . isEmpty ( request . filter . type ) ) ) {
747
753
result = await this . _getJobList ( ) ;
748
- let custom_arguments ;
749
- if ( acsResponse ?. custom_query_args ?. length > 0 ) {
750
- custom_arguments = acsResponse . custom_query_args [ 0 ] . custom_arguments ;
751
- }
752
- result = this . filterByOwnerShip ( { custom_arguments } , result ) ;
754
+ const custom_arguments = acsResponse . custom_query_args ?. [ 0 ] ;
755
+ result = this . filterByOwnerShip ( custom_arguments , result ) ;
753
756
} else {
754
- const logger = this . logger ;
755
- let jobIDs = request . filter . job_ids || [ ] ;
756
- if ( ! _ . isArray ( jobIDs ) ) {
757
- jobIDs = [ jobIDs ] ;
758
- }
757
+ result = new Array < BullJob > ( ) ;
758
+ const jobIDs = Array . isArray ( request . filter . job_ids ) ? request . filter . job_ids : [ request . filter . job_ids ] ;
759
759
const typeFilterName = request . filter . type ;
760
760
761
761
// Search in all the queues and retrieve jobs after JobID
@@ -846,11 +846,8 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
846
846
if ( typeFilterName ) {
847
847
result = result . filter ( job => job . name === typeFilterName ) ;
848
848
}
849
- let custom_arguments ;
850
- if ( acsResponse ?. custom_query_args ?. length > 0 ) {
851
- custom_arguments = acsResponse . custom_query_args [ 0 ] . custom_arguments ;
852
- }
853
- result = this . filterByOwnerShip ( { custom_arguments } , result ) ;
849
+ const custom_arguments = acsResponse . custom_query_args ?. [ 0 ] ?. custom_arguments ;
850
+ result = this . filterByOwnerShip ( custom_arguments , result ) ;
854
851
}
855
852
856
853
result = result . filter ( valid => ! ! valid ) ;
@@ -910,7 +907,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
910
907
return jobListResponse ;
911
908
}
912
909
913
- async _getJobList ( ) : Promise < Job [ ] > {
910
+ async _getJobList ( ) : Promise < BullJob [ ] > {
914
911
let jobsList : any [ ] = [ ] ;
915
912
for ( const queue of this . queuesList ) {
916
913
const getJobsResult = await queue . getJobs ( this . bullOptions [ 'activeAndFutureJobTypes' ] ) ;
@@ -1165,7 +1162,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
1165
1162
const mappedJobs = request ?. items ?. reduce ( ( obj , job ) => {
1166
1163
obj [ job . id ] = job ;
1167
1164
return obj ;
1168
- } , { } ) ;
1165
+ } , { } as Record < string , Job > ) ;
1169
1166
1170
1167
const jobData = await this . read ( JobReadRequest . fromPartial (
1171
1168
{
@@ -1181,7 +1178,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
1181
1178
subject
1182
1179
} ) , { } ) ;
1183
1180
1184
- const result = [ ] ;
1181
+ const result = new Array < Job > ( ) ;
1185
1182
1186
1183
jobData ?. items ?. forEach ( async ( job ) => {
1187
1184
const mappedJob = mappedJobs [ job ?. payload ?. id ] ;
@@ -1198,7 +1195,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
1198
1195
} ;
1199
1196
1200
1197
if ( endJob . when && endJob . options ) {
1201
- delete endJob . options [ ' delay' ] ;
1198
+ delete ( endJob . options as any ) . delay ;
1202
1199
}
1203
1200
1204
1201
result . push ( endJob ) ;
@@ -1363,7 +1360,7 @@ export class SchedulingService implements SchedulingServiceServiceImplementation
1363
1360
* @param action resource action
1364
1361
* @param subject subject name
1365
1362
*/
1366
- async createMetadata ( resources : any , action : string , subject ) : Promise < any > {
1363
+ async createMetadata ( resources : any , action : string , subject : Subject ) : Promise < any > {
1367
1364
const orgOwnerAttributes : Attribute [ ] = [ ] ;
1368
1365
if ( resources && ! _ . isArray ( resources ) ) {
1369
1366
resources = [ resources ] ;
0 commit comments