@@ -5,8 +5,8 @@ import { Worker } from '../src/worker.js';
5
5
import { Topic } from '@restorecommerce/kafka-client' ;
6
6
import { createChannel , createClient } from '@restorecommerce/grpc-client' ;
7
7
import {
8
- JobServiceClient as SchedulingServiceClient ,
9
8
JobServiceDefinition as SchedulingServiceDefinition ,
9
+ JobServiceClient as SchedulingServiceClient ,
10
10
JobOptions_Priority ,
11
11
Backoff_Type ,
12
12
JobReadRequest ,
@@ -25,7 +25,8 @@ import {
25
25
jobPolicySetRQ ,
26
26
permitJobRule ,
27
27
validateJobDonePayload ,
28
- cfg
28
+ cfg ,
29
+ getSchedulingServiceClient
29
30
} from './utils.js' ;
30
31
import { updateConfig } from '@restorecommerce/acs-client' ;
31
32
import * as _ from 'lodash-es' ;
@@ -110,9 +111,10 @@ const proto: any = ProtoUtils.getProtoFromPkgDefinition(
110
111
pkgDef
111
112
) ;
112
113
113
- const mockServer = new GrpcMockServer ( cfg . get ( 'client:acs-srv:address' ) ) ;
114
- const startGrpcMockServer = async ( methodWithOutput : MethodWithOutput [ ] ) => {
114
+ let mockServerACS : GrpcMockServer ;
115
+ const startACSGrpcMockServer = async ( methodWithOutput : MethodWithOutput [ ] ) => {
115
116
// create mock implementation based on the method name and output
117
+ mockServerACS = new GrpcMockServer ( cfg . get ( 'client:acs-srv:address' ) ) ;
116
118
const implementations = {
117
119
isAllowed : ( call : any , callback : any ) => {
118
120
const isAllowedResponse = methodWithOutput . filter ( e => e . method === 'IsAllowed' ) ;
@@ -141,30 +143,31 @@ const startGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => {
141
143
}
142
144
} ;
143
145
try {
144
- mockServer . addService ( PROTO_PATH , PKG_NAME , SERVICE_NAME , implementations , {
146
+ mockServerACS . addService ( PROTO_PATH , PKG_NAME , SERVICE_NAME , implementations , {
145
147
includeDirs : [ 'node_modules/@restorecommerce/protos/' ] ,
146
148
keepCase : true ,
147
149
longs : String ,
148
150
enums : String ,
149
151
defaults : true ,
150
152
oneofs : true
151
153
} ) ;
152
- await mockServer . start ( ) ;
153
- logger . info ( `Mock ACS Server started on ${ mockServer . serverAddress } ` ) ;
154
+ await mockServerACS . start ( ) ;
155
+ logger . info ( `Mock ACS Server started on ${ mockServerACS . serverAddress } ` ) ;
154
156
} catch ( err ) {
155
157
logger . error ( 'Error starting mock ACS server' , err ) ;
156
158
}
159
+ return mockServerACS ;
157
160
} ;
158
161
159
162
const IDS_PROTO_PATH = 'io/restorecommerce/user.proto' ;
160
163
const IDS_PKG_NAME = 'io.restorecommerce.user' ;
161
164
const IDS_SERVICE_NAME = 'UserService' ;
162
165
163
- const mockServerIDS = new GrpcMockServer ( cfg . get ( 'client:user:address' ) ) ;
164
-
165
166
// Mock server for ids - findByToken
167
+ let mockServerIDS : GrpcMockServer ;
166
168
const startIDSGrpcMockServer = async ( methodWithOutput : MethodWithOutput [ ] ) => {
167
169
// create mock implementation based on the method name and output
170
+ mockServerIDS = new GrpcMockServer ( cfg . get ( 'client:user:address' ) ) ;
168
171
const implementations = {
169
172
findByToken : ( call : any , callback : any ) => {
170
173
if ( call . request . token === 'admin_token' ) {
@@ -187,16 +190,17 @@ const startIDSGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => {
187
190
} catch ( err ) {
188
191
logger . error ( 'Error starting mock IDS server' , err ) ;
189
192
}
193
+ return mockServerIDS ;
190
194
} ;
191
195
192
196
193
- const stopGrpcMockServer = async ( ) => {
194
- await mockServer . stop ( ) ;
197
+ const stopACSGrpcMockServer = async ( ) => {
198
+ await mockServerACS ? .stop ( ) ;
195
199
logger . info ( 'Mock ACS Server closed successfully' ) ;
196
200
} ;
197
201
198
202
const stopIDSGrpcMockServer = async ( ) => {
199
- await mockServerIDS . stop ( ) ;
203
+ await mockServerIDS ? .stop ( ) ;
200
204
logger . info ( 'Mock IDS Server closed successfully' ) ;
201
205
} ;
202
206
@@ -228,7 +232,7 @@ describe(`testing scheduling-srv ${testSuffix}: gRPC`, () => {
228
232
// start acs mock service with PERMIT rule
229
233
jobPolicySetRQ . policy_sets ! [ 0 ] ! . policies ! [ 0 ] ! . effect = Effect . PERMIT ;
230
234
jobPolicySetRQ . policy_sets ! [ 0 ] ! . policies ! [ 0 ] ! . rules = [ permitJobRule ] ;
231
- await startGrpcMockServer ( [
235
+ await startACSGrpcMockServer ( [
232
236
{
233
237
method : 'WhatIsAllowed' ,
234
238
output : jobPolicySetRQ
@@ -265,13 +269,7 @@ describe(`testing scheduling-srv ${testSuffix}: gRPC`, () => {
265
269
// store user with tokens and role associations to Redis index `db-findByToken`
266
270
await tokenRedisClient . set ( 'admin-token' , JSON . stringify ( acsSubject ) ) ;
267
271
268
- const schedulingClientCfg = cfg . get ( 'client:schedulingClient' ) ;
269
- if ( schedulingClientCfg ) {
270
- grpcSchedulingSrv = createClient ( {
271
- ...schedulingClientCfg ,
272
- logger
273
- } , SchedulingServiceDefinition , createChannel ( schedulingClientCfg . address ) ) ;
274
- }
272
+ grpcSchedulingSrv = getSchedulingServiceClient ( logger ) ;
275
273
const toDelete = ( await grpcSchedulingSrv . read ( JobReadRequest . fromPartial ( { subject } ) , { } ) ) . total_count ;
276
274
const offset = await jobEvents . $offset ( - 1 ) ;
277
275
@@ -289,19 +287,24 @@ describe(`testing scheduling-srv ${testSuffix}: gRPC`, () => {
289
287
}
290
288
} ) ;
291
289
afterEach ( async ( ) => {
292
- await jobEvents . removeAllListeners ( 'queuedJob' ) ;
293
- await jobEvents . removeAllListeners ( 'jobsCreated' ) ;
294
- await jobEvents . removeAllListeners ( 'jobsDeleted' ) ;
290
+ await Promise . allSettled ( [
291
+ jobEvents . removeAllListeners ( 'queuedJob' ) ,
292
+ jobEvents . removeAllListeners ( 'jobsCreated' ) ,
293
+ jobEvents . removeAllListeners ( 'jobsDeleted' ) ,
294
+ ] ) ;
295
295
} ) ;
296
296
after ( async function ( ) : Promise < any > {
297
297
this . timeout ( 20000 ) ;
298
- await stopGrpcMockServer ( ) ;
299
- await stopIDSGrpcMockServer ( ) ;
300
- await jobEvents . removeAllListeners ( 'queuedJob' ) ;
301
- await jobEvents . removeAllListeners ( 'jobsCreated' ) ;
302
- await jobEvents . removeAllListeners ( 'jobsDeleted' ) ;
303
- await worker . schedulingService . clear ( ) ;
304
- await worker . stop ( ) ;
298
+ await Promise . allSettled ( [
299
+ stopACSGrpcMockServer ( ) ,
300
+ stopIDSGrpcMockServer ( ) ,
301
+ jobEvents . removeAllListeners ( 'queuedJob' ) ,
302
+ jobEvents . removeAllListeners ( 'jobsCreated' ) ,
303
+ jobEvents . removeAllListeners ( 'jobsDeleted' ) ,
304
+ worker . schedulingService . clear ( ) ,
305
+ ] ) . then (
306
+ worker . stop
307
+ ) ;
305
308
} ) ;
306
309
describe ( `create a one-time job ${ testSuffix } ` , function postJob ( ) : void {
307
310
this . timeout ( 30000 ) ;
0 commit comments