Skip to content

Commit

Permalink
feat: ws-worker respond to wakeup call (#877)
Browse files Browse the repository at this point in the history
* feat: trigger instant claim on work-available event

* feat: update lightning mock to send work-available event via ws

* fix: whitelist timestamp key in lighning mock

* chore: fix type issues

* tests: should recieve worker:queue message events from lightning

* tests: add onMessage mock to socket mock

* tests: manually trigger claim on connected workers

* feat: emit messages for worker:queue directly

* minor tweaks
- make ?wakeup easier in the mock
- catch claim errors in the worker after work-available
- fix a typo in the readme

* changesets

* lint

* version bumps

* simplify wake-up tests

* simplify wakup in mock

* [email protected]

---------

Co-authored-by: Joe Clark <[email protected]>
  • Loading branch information
doc-han and josephjclark authored Feb 21, 2025
1 parent 7cbc8cc commit 36dc3db
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 33 deletions.
9 changes: 9 additions & 0 deletions integration-tests/worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# @openfn/integration-tests-worker

## 1.0.77

### Patch Changes

- Updated dependencies [87f10f7]
- Updated dependencies [87f10f7]
- @openfn/lightning-mock@2.1.0
- @openfn/ws-worker@1.11.0

## 1.0.76

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/worker/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@openfn/integration-tests-worker",
"private": true,
"version": "1.0.76",
"version": "1.0.77",
"description": "Lightning WOrker integration tests",
"author": "Open Function Group <[email protected]>",
"license": "ISC",
Expand Down
6 changes: 6 additions & 0 deletions packages/lightning-mock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @openfn/lightning-mock

## 2.1.0

### Minor Changes

- 87f10f7: Add ?wakeup paramter to trigger work-available events

## 2.0.31

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The server exposes a small dev API allowing you to post an Run.
You can add an run (`{ jobs, triggers, edges }`) to the queue with:

```
curl http://localhost:8888/run --json @tmp/run.json
curl -X POST http://localhost:8888/run --json @tmp/run.json
```

Here's an example run:
Expand Down
2 changes: 1 addition & 1 deletion packages/lightning-mock/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/lightning-mock",
"version": "2.0.31",
"version": "2.1.0",
"private": true,
"description": "A mock Lightning server",
"main": "dist/index.js",
Expand Down
19 changes: 19 additions & 0 deletions packages/lightning-mock/src/api-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import type {
import { ServerState } from './server';
import { RUN_COMPLETE } from './events';
import type { DevServer, LightningEvents } from './types';
import { PhoenixEvent } from './socket-server';

type Api = {
startRun(runId: string): void;
messageClients(message: PhoenixEvent): void;
};

const setupDevAPI = (
Expand All @@ -40,6 +42,10 @@ const setupDevAPI = (

app.getDataclip = (id: string) => state.dataclips[id];

app.messageSocketClients = (message: PhoenixEvent) => {
api.messageClients(message);
};

app.enqueueRun = (run: LightningPlan, workerId = 'rte') => {
state.runs[run.id] = run;
state.results[run.id] = {
Expand Down Expand Up @@ -169,6 +175,19 @@ const setupRestAPI = (

app.enqueueRun(run);

// triggering wakeup in all connected workers
if ('wakeup' in ctx.query) {
logger.info(
'WAKE UP! Sending work-available event to all listening workers'
);
app.messageSocketClients({
topic: 'worker:queue',
event: 'work-available',
payload: {},
join_ref: '',
ref: '',
});
}
ctx.response.status = 200;
});

Expand Down
11 changes: 9 additions & 2 deletions packages/lightning-mock/src/api-sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ const createSocketAPI = (

return {
startRun,
messageClients: wss.sendToClients.bind(this),
close: () => {
server.close();
(wss as any).close();
Expand Down Expand Up @@ -352,8 +353,14 @@ const createSocketAPI = (
runId: string
) {
const { ref, join_ref, topic } = evt;
const { final_dataclip_id, reason, error_type, error_message, ...rest } =
evt.payload;
const {
final_dataclip_id,
reason,
error_type,
error_message,
timestamp, // whitelist timestamp
...rest
} = evt.payload;

logger?.info('Completed run ', runId);
logger?.debug(final_dataclip_id);
Expand Down
15 changes: 15 additions & 0 deletions packages/lightning-mock/src/socket-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type MockSocketServer = typeof WebSocketServer & {
topic: Topic,
events: Record<string, EventHandler>
) => { unsubscribe: () => void };
sendToClients: (message: PhoenixEvent) => void;
};

function createServer({
Expand All @@ -95,6 +96,8 @@ function createServer({
phoenix: new Set([() => null]),
};

const clients: Set<DevSocket> = new Set();

const wsServer =
server ||
new WebSocketServer({
Expand Down Expand Up @@ -202,6 +205,9 @@ function createServer({

logger?.debug(`>> [${topic}] ${event} ${ref} :: ${stringify(payload)}`);

// tracking connected worker:queue workers
if (topic === 'worker:queue' && event === 'phx_join') clients.add(ws);

if (event in events) {
// handle system/phoenix events
// @ts-ignore
Expand All @@ -227,10 +233,19 @@ function createServer({
}
}
});

ws.on('close', () => clients.delete(ws));
});

const mockServer = wsServer as MockSocketServer;

mockServer.sendToClients = async (message) => {
clients.forEach((client) => {
// @ts-ignore
client.sendJSON(message);
});
};

// debug API
// TODO should this in fact be (topic, event, fn)?
mockServer.listenToChannel = (topic: Topic, fn: EventHandler) => {
Expand Down
9 changes: 6 additions & 3 deletions packages/lightning-mock/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import type {
Credential,
} from '@openfn/lexicon/lightning';
import type { ServerState } from './server';
import { PhoenixEvent } from './socket-server';

export type LightningEvents = 'log' | 'run-complete';
export type LightningEvents = 'log' | 'run-complete' | string; // not complete!

export type DevServer = Koa & {
state: ServerState;
Expand All @@ -20,13 +21,15 @@ export type DevServer = Koa & {
getQueueLength(): number;
getResult(runId: string): any;
getState(): ServerState;
messageSocketClients(message: PhoenixEvent): void;
on(event: LightningEvents, fn: (evt: any) => void): void;
once(event: LightningEvents, fn: (evt: any) => void): void;
onSocketEvent(
event: LightningEvents,
runId: string,
fn: (evt: any) => void
): void;
fn: (evt: any) => void,
once?: boolean
): () => void;
registerRun(run: LightningPlan): void;
removeAllListeners(): void;
reset(): void;
Expand Down
10 changes: 10 additions & 0 deletions packages/ws-worker/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# ws-worker

## 1.11.0

### Minor Changes

- 87f10f7: Respond to `work:available` events.

When the worker receives `work:available` in the worker queue, it'll instantly trigger a claim event.

This claim is independent of the workloop and does not affect backoff in any way.

## 1.10.0

### Minor Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/ws-worker/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@openfn/ws-worker",
"version": "1.10.0",
"version": "1.11.0",
"description": "A Websocket Worker to connect Lightning to a Runtime Engine",
"main": "dist/index.js",
"type": "module",
Expand Down
5 changes: 5 additions & 0 deletions packages/ws-worker/src/channels/worker-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const connectToWorkerQueue = (

const channel = socket.channel('worker:queue') as Channel;

channel.onMessage = (ev, load) => {
events.emit('message', ev, load);
return load;
};

channel
.join()
.receive('ok', () => {
Expand Down
3 changes: 3 additions & 0 deletions packages/ws-worker/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import * as l from '@openfn/lexicon/lightning';

// events from lightning to workers
export const WORK_AVAILABLE = 'work-available';

// These are worker-lightning events, used in the websocket
export const CLAIM = 'claim';
export const GET_PLAN = 'fetch:plan';
Expand Down
4 changes: 4 additions & 0 deletions packages/ws-worker/src/mock/sockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ export class MockSocket {
this.callbacks.onError = callback;
}

onMessage(callback: EventHandler): void {
this.callbacks.onMessage = callback;
}

onClose(callback: EventHandler): void {
// TODO this isn't actually hooked up right now
this.callbacks.onClose = callback;
Expand Down
14 changes: 12 additions & 2 deletions packages/ws-worker/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Router from '@koa/router';
import { humanId } from 'human-id';
import { createMockLogger, Logger } from '@openfn/logger';
import { ClaimRun } from '@openfn/lexicon/lightning';
import { INTERNAL_RUN_COMPLETE } from './events';
import { INTERNAL_RUN_COMPLETE, WORK_AVAILABLE } from './events';
import destroy from './api/destroy';
import startWorkloop, { Workloop } from './api/workloop';
import claim from './api/claim';
Expand Down Expand Up @@ -135,6 +135,15 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
logger.debug(e);
};

// handles messages for the worker:queue
const onMessage = (event: string) => {
if (event === WORK_AVAILABLE) {
claim(app, logger, { maxWorkers: options.maxWorkflows }).catch(() => {
// do nothing - it's fine if claim throws here
});
}
};

connectToWorkerQueue(
options.lightning!,
app.id,
Expand All @@ -144,7 +153,8 @@ function connect(app: ServerApp, logger: Logger, options: ServerOptions = {}) {
)
.on('connect', onConnect)
.on('disconnect', onDisconnect)
.on('error', onError);
.on('error', onError)
.on('message', onMessage);
}

async function setupCollections(options: ServerOptions, logger: Logger) {
Expand Down
Loading

0 comments on commit 36dc3db

Please sign in to comment.