diff --git a/integration-tests/worker/CHANGELOG.md b/integration-tests/worker/CHANGELOG.md index 8d8ed46e6..c1fc62ddd 100644 --- a/integration-tests/worker/CHANGELOG.md +++ b/integration-tests/worker/CHANGELOG.md @@ -1,5 +1,13 @@ # @openfn/integration-tests-worker +## 1.0.65 + +### Patch Changes + +- Updated dependencies [ef1fb63] +- Updated dependencies [606f23b] + - @openfn/ws-worker@1.9.0 + ## 1.0.64 ### Patch Changes diff --git a/integration-tests/worker/package.json b/integration-tests/worker/package.json index 74c79ec98..61c1d2193 100644 --- a/integration-tests/worker/package.json +++ b/integration-tests/worker/package.json @@ -1,7 +1,7 @@ { "name": "@openfn/integration-tests-worker", "private": true, - "version": "1.0.64", + "version": "1.0.65", "description": "Lightning WOrker integration tests", "author": "Open Function Group ", "license": "ISC", diff --git a/integration-tests/worker/test/runs.test.ts b/integration-tests/worker/test/runs.test.ts index 7c15b29df..220d51295 100644 --- a/integration-tests/worker/test/runs.test.ts +++ b/integration-tests/worker/test/runs.test.ts @@ -25,7 +25,8 @@ test.before(async () => { repoDir: path.resolve('tmp/repo/attempts'), }, { - collectionsVersion: '1.0.0-next-f802225c', + collectionsVersion: '0.5.0', + collectionsUrl: 'http://localhost:4321/collections', runPublicKey: keys.public, } )); @@ -45,6 +46,7 @@ const humanMb = (sizeInBytes: number) => Math.round(sizeInBytes / 1024 / 1024); const run = async (t, attempt) => { return new Promise(async (done, reject) => { lightning.on('step:complete', ({ payload }) => { + console.log(payload); t.is(payload.reason, 'success'); // TODO friendlier job names for this would be nice (rather than run ids) diff --git a/packages/ws-worker/CHANGELOG.md b/packages/ws-worker/CHANGELOG.md index bfb16057d..4cf3c4345 100644 --- a/packages/ws-worker/CHANGELOG.md +++ b/packages/ws-worker/CHANGELOG.md @@ -1,5 +1,12 @@ # ws-worker +## 1.8.2 + +### Patch Changes + +- ef1fb63: Fix an issue running collections from an auto-loaded version +- 606f23b: Allow steps to specify their own adaptor version + ## 1.8.1 ### Patch Changes diff --git a/packages/ws-worker/README.md b/packages/ws-worker/README.md index 829669c73..a417cb856 100644 --- a/packages/ws-worker/README.md +++ b/packages/ws-worker/README.md @@ -66,6 +66,20 @@ To manually trigger a claim, post to `/claim`: curl -X POST http://localhost:2222/claim ``` +## Collections + +To enable collections with a local lightning: + +``` +pnpm start -collections-url http://localhost:4000/collections +``` + +To use the monorepo adaptor version: + +``` +pnpm start --collections-version local --collections-url http://localhost:4000/collections +``` + ## Architecture Lightning is expected to maintain a queue of runs. The Worker pulls those runs from the queue, via websocket, and sends them off to the Engine for execution. diff --git a/packages/ws-worker/package.json b/packages/ws-worker/package.json index 173208a43..3f997016c 100644 --- a/packages/ws-worker/package.json +++ b/packages/ws-worker/package.json @@ -1,6 +1,6 @@ { "name": "@openfn/ws-worker", - "version": "1.8.1", + "version": "1.8.2", "description": "A Websocket Worker to connect Lightning to a Runtime Engine", "main": "dist/index.js", "type": "module", diff --git a/packages/ws-worker/src/server.ts b/packages/ws-worker/src/server.ts index d04a4e3e6..b0b925ea1 100644 --- a/packages/ws-worker/src/server.ts +++ b/packages/ws-worker/src/server.ts @@ -60,9 +60,6 @@ export interface ServerApp extends Koa { engine: RuntimeEngine; options: ServerOptions; workloop?: Workloop; - // What version of the collections adaptor should we use? - // Can be set through CLI, or else it'll look up latest on startup - collectionsVersion?: string; execute: ({ id, token }: ClaimRun) => Promise; destroy: () => void; @@ -174,7 +171,7 @@ async function setupCollections(options: ServerOptions, logger: Logger) { 'npm view @openfn/language-collections@latest version' ); logger.log('Using collections version from @latest: ', version); - return version; + return version.trim(); } function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { @@ -322,7 +319,7 @@ function createServer(engine: RuntimeEngine, options: ServerOptions = {}) { if (options.lightning) { setupCollections(options, logger).then((version) => { - app.collectionsVersion = version; + app.options.collectionsVersion = version; connect(app, logger, options); }); } else { diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index 45a5e52e7..39629935e 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -88,8 +88,16 @@ export default ( const job = step as Job; if (job.expression?.match(/(collections\.)/)) { hasCollections = true; - job.adaptors ??= []; - job.adaptors.push(`@openfn/language-collections@${collectionsVersion}`); + if ( + !job.adaptors?.find((v) => + v.startsWith('@openfn/language-collections') + ) + ) { + job.adaptors ??= []; + job.adaptors.push( + `@openfn/language-collections@${collectionsVersion}` + ); + } } }); return hasCollections; diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index ed8f9da31..e193a7273 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -614,7 +614,36 @@ test('append the collections adaptor to jobs that use it', (t) => { t.deepEqual(b.adaptors, ['common', '@openfn/language-collections@1.0.0']); }); -test('append the collections credential to jobs that use it', (t) => { +test('do not append the collections adaptor to jobs that already have it', (t) => { + const run: Partial = { + id: 'w', + jobs: [ + createNode({ + id: 'a', + body: 'collections.each("c", "k", (state) => state)', + adaptor: '@openfn/language-collections@latest', + }), + ], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a')], + }; + + const { plan } = convertPlan(run as LightningPlan, { + collectionsVersion: '1.0.0', + }); + + const [_t, a] = plan.workflow.steps; + + // @ts-ignore + t.deepEqual(a.adaptors, ['@openfn/language-collections@latest']); + + t.deepEqual(plan.workflow.credentials, { + collections_token: true, + collections_endpoint: true, + }); +}); + +test('append the collections credential to workflows that use it', (t) => { const run: Partial = { id: 'w', jobs: [