From 4e34ddc5f07a6c813ecd23b244a6472c20a63ffd Mon Sep 17 00:00:00 2001 From: Gerald Baulig Date: Fri, 31 Jan 2025 15:11:33 +0100 Subject: [PATCH] fix(cfg): add importExt to config - replaces .ext of import path For some environments you have to replace the extention from .cjs to .js or else, e.g. for test --- .mocharc.json | 6 ++-- Dockerfile | 4 +-- buildImage.bash | 11 ++++++- buildImageBuildkit.bash | 10 ------- cfg/config_test.json | 5 ++++ src/worker.ts | 10 ++++--- test/grpc_acs.spec.ts | 63 +++++++++++++++++++++-------------------- test/kafka_acs.spec.ts | 50 ++++++++++++++++++-------------- test/utils.ts | 19 +++++++++++++ 9 files changed, 107 insertions(+), 71 deletions(-) delete mode 100755 buildImageBuildkit.bash diff --git a/.mocharc.json b/.mocharc.json index d4ff823..871d456 100644 --- a/.mocharc.json +++ b/.mocharc.json @@ -1,11 +1,11 @@ { "extension": [ - "ts" + "ts" ], "spec": "test/**/*.spec.ts", "require": "ts-node/register", "node-option": [ - "experimental-specifier-resolution=node", - "loader=ts-node/esm" + "experimental-specifier-resolution=node", + "loader=ts-node/esm" ] } \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 9487f68..26006f0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ ### Build -FROM node:22.11.0-alpine3.20 as build +FROM node:22.11.0-alpine3.20 AS build ENV NO_UPDATE_NOTIFIER=true USER node @@ -17,7 +17,7 @@ RUN npm run build ### Deployment -FROM node:22.11.0-alpine3.20 as deployment +FROM node:22.11.0-alpine3.20 AS deployment ENV NO_UPDATE_NOTIFIER=true diff --git a/buildImage.bash b/buildImage.bash index aed7a8d..ed8446e 100755 --- a/buildImage.bash +++ b/buildImage.bash @@ -1 +1,10 @@ -docker build -t restorecommerce/scheduling-srv . +#!/bin/bash + +SERVICE_NAME="scheduling-srv" + +DOCKER_BUILDKIT=1 docker build \ + --tag restorecommerce/$SERVICE_NAME \ + -f ./Dockerfile \ + --cache-from restorecommerce/$SERVICE_NAME \ + --build-arg APP_HOME=/home/node/$SERVICE_NAME \ + . \ No newline at end of file diff --git a/buildImageBuildkit.bash b/buildImageBuildkit.bash deleted file mode 100755 index 0433fd9..0000000 --- a/buildImageBuildkit.bash +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -SERVICE_NAME="scheduling-srv" - -DOCKER_BUILDKIT=1 docker build \ - --tag restorecommerce/$SERVICE_NAME \ - -f ./buildkit.Dockerfile \ - --cache-from restorecommerce/$SERVICE_NAME \ - --build-arg APP_HOME=/home/node/$SERVICE_NAME \ - . diff --git a/cfg/config_test.json b/cfg/config_test.json index 7f1b5df..a45519e 100644 --- a/cfg/config_test.json +++ b/cfg/config_test.json @@ -266,6 +266,11 @@ "prefix": "acs:" } }, + "externalJobs": { + "sourcePath": "./lib/external-jobs/", + "importPath": "./external-jobs/", + "importExt": ".js" + }, "errors": { "USER_NOT_LOGGED_IN": { "code": "401", diff --git a/src/worker.ts b/src/worker.ts index 78c43cd..d977cdd 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,3 +1,4 @@ +import path from 'path'; import * as _ from 'lodash-es'; import * as chassis from '@restorecommerce/chassis-srv'; import { Events, Topic, registerProtoMeta } from '@restorecommerce/kafka-client'; @@ -349,10 +350,11 @@ export class Worker { if (externalJobFiles?.length > 0) { await Promise.allSettled(externalJobFiles.map(async (externalFile) => { if (externalFile.endsWith('.js') || externalFile.endsWith('.cjs')) { - const importPath = [ + const importExt = cfg.get('externalJobs:importExt') ?? path.extname(externalFile); + const importPath = './' + path.join( process.env.EXTERNAL_JOBS_REQUIRE_DIR ?? cfg.get('externalJobs:importPath') ?? './external-jobs/', - externalFile - ].join('').replace(/\.cjs$/, '.js'); + externalFile.replace(/\.\w+$/, importExt), + ); try { const fileImport = await import(importPath); // check for double default @@ -361,7 +363,7 @@ export class Worker { } else { await fileImport.default(cfg, logger, events, runWorker); } - this.logger?.info('imported:', importPath); + this.logger?.info('Imported:', importPath); } catch (err: any) { this.logger?.error(`Error scheduling external job ${importPath}`, { err }); diff --git a/test/grpc_acs.spec.ts b/test/grpc_acs.spec.ts index bb899c0..1440584 100644 --- a/test/grpc_acs.spec.ts +++ b/test/grpc_acs.spec.ts @@ -5,8 +5,8 @@ import { Worker } from '../src/worker.js'; import { Topic } from '@restorecommerce/kafka-client'; import { createChannel, createClient } from '@restorecommerce/grpc-client'; import { - JobServiceClient as SchedulingServiceClient, JobServiceDefinition as SchedulingServiceDefinition, + JobServiceClient as SchedulingServiceClient, JobOptions_Priority, Backoff_Type, JobReadRequest, @@ -25,7 +25,8 @@ import { jobPolicySetRQ, permitJobRule, validateJobDonePayload, - cfg + cfg, + getSchedulingServiceClient } from './utils.js'; import { updateConfig } from '@restorecommerce/acs-client'; import * as _ from 'lodash-es'; @@ -110,9 +111,10 @@ const proto: any = ProtoUtils.getProtoFromPkgDefinition( pkgDef ); -const mockServer = new GrpcMockServer(cfg.get('client:acs-srv:address')); -const startGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { +let mockServerACS: GrpcMockServer; +const startACSGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { // create mock implementation based on the method name and output + mockServerACS = new GrpcMockServer(cfg.get('client:acs-srv:address')); const implementations = { isAllowed: (call: any, callback: any) => { const isAllowedResponse = methodWithOutput.filter(e => e.method === 'IsAllowed'); @@ -141,7 +143,7 @@ const startGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { } }; try { - mockServer.addService(PROTO_PATH, PKG_NAME, SERVICE_NAME, implementations, { + mockServerACS.addService(PROTO_PATH, PKG_NAME, SERVICE_NAME, implementations, { includeDirs: ['node_modules/@restorecommerce/protos/'], keepCase: true, longs: String, @@ -149,22 +151,23 @@ const startGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { defaults: true, oneofs: true }); - await mockServer.start(); - logger.info(`Mock ACS Server started on ${mockServer.serverAddress}`); + await mockServerACS.start(); + logger.info(`Mock ACS Server started on ${mockServerACS.serverAddress}`); } catch (err) { logger.error('Error starting mock ACS server', err); } + return mockServerACS; }; const IDS_PROTO_PATH = 'io/restorecommerce/user.proto'; const IDS_PKG_NAME = 'io.restorecommerce.user'; const IDS_SERVICE_NAME = 'UserService'; -const mockServerIDS = new GrpcMockServer(cfg.get('client:user:address')); - // Mock server for ids - findByToken +let mockServerIDS: GrpcMockServer; const startIDSGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { // create mock implementation based on the method name and output + mockServerIDS = new GrpcMockServer(cfg.get('client:user:address')); const implementations = { findByToken: (call: any, callback: any) => { if (call.request.token === 'admin_token') { @@ -187,16 +190,17 @@ const startIDSGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { } catch (err) { logger.error('Error starting mock IDS server', err); } + return mockServerIDS; }; -const stopGrpcMockServer = async () => { - await mockServer.stop(); +const stopACSGrpcMockServer = async () => { + await mockServerACS?.stop(); logger.info('Mock ACS Server closed successfully'); }; const stopIDSGrpcMockServer = async () => { - await mockServerIDS.stop(); + await mockServerIDS?.stop(); logger.info('Mock IDS Server closed successfully'); }; @@ -228,7 +232,7 @@ describe(`testing scheduling-srv ${testSuffix}: gRPC`, () => { // start acs mock service with PERMIT rule jobPolicySetRQ.policy_sets![0]!.policies![0]!.effect = Effect.PERMIT; jobPolicySetRQ.policy_sets![0]!.policies![0]!.rules = [permitJobRule]; - await startGrpcMockServer([ + await startACSGrpcMockServer([ { method: 'WhatIsAllowed', output: jobPolicySetRQ @@ -265,13 +269,7 @@ describe(`testing scheduling-srv ${testSuffix}: gRPC`, () => { // store user with tokens and role associations to Redis index `db-findByToken` await tokenRedisClient.set('admin-token', JSON.stringify(acsSubject)); - const schedulingClientCfg = cfg.get('client:schedulingClient'); - if (schedulingClientCfg) { - grpcSchedulingSrv = createClient({ - ...schedulingClientCfg, - logger - }, SchedulingServiceDefinition, createChannel(schedulingClientCfg.address)); - } + grpcSchedulingSrv = getSchedulingServiceClient(logger); const toDelete = (await grpcSchedulingSrv.read(JobReadRequest.fromPartial({ subject }), {})).total_count; const offset = await jobEvents.$offset(-1); @@ -289,19 +287,24 @@ describe(`testing scheduling-srv ${testSuffix}: gRPC`, () => { } }); afterEach(async () => { - await jobEvents.removeAllListeners('queuedJob'); - await jobEvents.removeAllListeners('jobsCreated'); - await jobEvents.removeAllListeners('jobsDeleted'); + await Promise.allSettled([ + jobEvents.removeAllListeners('queuedJob'), + jobEvents.removeAllListeners('jobsCreated'), + jobEvents.removeAllListeners('jobsDeleted'), + ]); }); after(async function (): Promise { this.timeout(20000); - await stopGrpcMockServer(); - await stopIDSGrpcMockServer(); - await jobEvents.removeAllListeners('queuedJob'); - await jobEvents.removeAllListeners('jobsCreated'); - await jobEvents.removeAllListeners('jobsDeleted'); - await worker.schedulingService.clear(); - await worker.stop(); + await Promise.allSettled([ + stopACSGrpcMockServer(), + stopIDSGrpcMockServer(), + jobEvents.removeAllListeners('queuedJob'), + jobEvents.removeAllListeners('jobsCreated'), + jobEvents.removeAllListeners('jobsDeleted'), + worker.schedulingService.clear(), + ]).then( + worker.stop + ); }); describe(`create a one-time job ${testSuffix}`, function postJob(): void { this.timeout(30000); diff --git a/test/kafka_acs.spec.ts b/test/kafka_acs.spec.ts index ce13af3..d6a61fc 100644 --- a/test/kafka_acs.spec.ts +++ b/test/kafka_acs.spec.ts @@ -102,10 +102,10 @@ const proto: any = ProtoUtils.getProtoFromPkgDefinition( pkgDef ); -const mockServer = new GrpcMockServer(cfg.get('client:acs-srv:address')); - -const startGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { +let mockServerACS: GrpcMockServer; +const startACSGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { // create mock implementation based on the method name and output + mockServerACS = new GrpcMockServer(cfg.get('client:acs-srv:address')); const implementations = { isAllowed: (call: any, callback: any) => { const isAllowedResponse = methodWithOutput.filter(e => e.method === 'IsAllowed'); @@ -120,7 +120,7 @@ const startGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { } }; try { - mockServer.addService(PROTO_PATH, PKG_NAME, SERVICE_NAME, implementations, { + mockServerACS.addService(PROTO_PATH, PKG_NAME, SERVICE_NAME, implementations, { includeDirs: ['node_modules/@restorecommerce/protos/'], keepCase: true, longs: String, @@ -128,22 +128,24 @@ const startGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { defaults: true, oneofs: true }); - await mockServer.start(); - logger.info(`Mock ACS Server started on ${mockServer.serverAddress}`); + await mockServerACS.start(); + logger.info(`Mock ACS Server started on ${mockServerACS.serverAddress}`); } catch (err) { logger.error('Error starting mock ACS server', err); } + mockServerACS; }; const IDS_PROTO_PATH = 'io/restorecommerce/user.proto'; const IDS_PKG_NAME = 'io.restorecommerce.user'; const IDS_SERVICE_NAME = 'UserService'; -const mockServerIDS = new GrpcMockServer(cfg.get('client:user:address')); // Mock server for ids - findByToken +let mockServerIDS: GrpcMockServer; const startIDSGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { // create mock implementation based on the method name and output + mockServerIDS = new GrpcMockServer(cfg.get('client:user:address')); const implementations = { findByToken: (call: any, callback: any) => { if (call.request.token === 'admin_token') { @@ -166,15 +168,16 @@ const startIDSGrpcMockServer = async (methodWithOutput: MethodWithOutput[]) => { } catch (err) { logger.error('Error starting mock IDS server', err); } + return mockServerIDS; }; -const stopGrpcMockServer = async () => { - await mockServer.stop(); +const stopACSGrpcMockServer = async () => { + await mockServerACS?.stop(); logger.info('Mock ACS Server closed successfully'); }; const stopIDSGrpcMockServer = async () => { - await mockServerIDS.stop(); + await mockServerIDS?.stop(); logger.info('Mock IDS Server closed successfully'); }; @@ -197,7 +200,7 @@ describe(`testing scheduling-srv ${testSuffix}: Kafka`, async () => { jobPolicySetRQ!.policy_sets![0]!.policies![0]!.rules = [permitJobRule]; // start mock acs-srv - needed for read operation since acs-client makes a req to acs-srv // to get applicable policies although acs-lookup is disabled - await startGrpcMockServer([{ method: 'WhatIsAllowed', output: jobPolicySetRQ }, + await startACSGrpcMockServer([{ method: 'WhatIsAllowed', output: jobPolicySetRQ }, { method: 'IsAllowed', output: { decision: 'PERMIT' } }]); jobTopic = await worker.events.topic(JOB_EVENTS_TOPIC); @@ -256,19 +259,24 @@ describe(`testing scheduling-srv ${testSuffix}: Kafka`, async () => { } }); afterEach(async () => { - await jobTopic.removeAllListeners('queuedJob'); - await jobTopic.removeAllListeners('jobsCreated'); - await jobTopic.removeAllListeners('jobsDeleted'); + await Promise.allSettled([ + jobTopic.removeAllListeners('queuedJob'), + jobTopic.removeAllListeners('jobsCreated'), + jobTopic.removeAllListeners('jobsDeleted'), + ]); }); after(async function (): Promise { this.timeout(20000); - await stopGrpcMockServer(); - await stopIDSGrpcMockServer(); - await jobTopic.removeAllListeners('queuedJob'); - await jobTopic.removeAllListeners('jobsCreated'); - await jobTopic.removeAllListeners('jobsDeleted'); - await worker.schedulingService.clear(); - await worker.stop(); + await Promise.allSettled([ + stopACSGrpcMockServer(), + stopIDSGrpcMockServer(), + jobTopic.removeAllListeners('queuedJob'), + jobTopic.removeAllListeners('jobsCreated'), + jobTopic.removeAllListeners('jobsDeleted'), + worker.schedulingService.clear(), + ]).then( + worker.stop + ); }); describe('create a one-time job', function postJob(): void { this.timeout(15000); diff --git a/test/utils.ts b/test/utils.ts index 8729ad3..3acbff2 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -5,9 +5,28 @@ import { Logger } from 'winston'; import { PolicySetRQResponse } from '@restorecommerce/acs-client'; import { Effect } from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/rule.js'; import { createServiceConfig } from '@restorecommerce/service-config'; +import { createChannel, createClient } from '@restorecommerce/grpc-client'; +import { + JobServiceDefinition as SchedulingServiceDefinition, + JobServiceClient as SchedulingServiceClient, +} from '@restorecommerce/rc-grpc-clients/dist/generated-server/io/restorecommerce/job.js'; export const cfg = createServiceConfig(process.cwd()); +let _client: SchedulingServiceClient +export function getSchedulingServiceClient(logger: Logger) { + const schedulingClientCfg = cfg.get('client:schedulingClient'); + _client = createClient( + { + ...schedulingClientCfg, + logger + }, + SchedulingServiceDefinition, + createChannel(schedulingClientCfg.address) + ); + return _client; +} + export function validateScheduledJob(job: any, expectedSchedule: string, logger: Logger): void { should.exist(job.data); should.exist(job.data.payload);