Skip to content

Commit 2c761f6

Browse files
committed
feat(DynamicWorkerPool): idleTimeout for workers
1 parent e9b4616 commit 2c761f6

5 files changed

+166
-38
lines changed

src/worker-pool/dynamic-worker-pool.spec.ts

+45-18
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ describe('DynamicWorkerPool', () => {
2222
});
2323

2424
test('size of new pool should be 0', () => {
25-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 5);
25+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 5, 0);
2626
expect(testPool.size).toEqual(0);
2727
});
2828

2929
test('execute task should aquire new workers and execute tasks on them', async () => {
30-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 2);
30+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 2, 0);
3131

3232
const workerDisposeSpy = jest.spyOn(testWorkerInstance, 'dispose');
3333
const createWorkerSpy = jest.spyOn(workerFactory, 'createWorker');
@@ -47,7 +47,7 @@ describe('DynamicWorkerPool', () => {
4747
});
4848

4949
test('worker pool should hould minSize workers', async () => {
50-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 3, 5);
50+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 3, 5, 0);
5151

5252
const workerDisposeSpy = jest.spyOn(testWorkerInstance, 'dispose');
5353
const createWorkerSpy = jest.spyOn(workerFactory, 'createWorker');
@@ -72,7 +72,7 @@ describe('DynamicWorkerPool', () => {
7272
});
7373

7474
test('executeTask should reject if worker pool max size is reached', async () => {
75-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 1);
75+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 1, 0);
7676

7777
const workerDisposeSpy = jest.spyOn(testWorkerInstance, 'dispose');
7878
const createWorkerSpy = jest.spyOn(workerFactory, 'createWorker');
@@ -91,7 +91,7 @@ describe('DynamicWorkerPool', () => {
9191
});
9292

9393
test('worker pool should re-use existing workers in pool', async () => {
94-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 1, 1);
94+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 1, 1, 0);
9595

9696
const workerDisposeSpy = jest.spyOn(testWorkerInstance, 'dispose');
9797
const createWorkerSpy = jest.spyOn(workerFactory, 'createWorker');
@@ -110,7 +110,7 @@ describe('DynamicWorkerPool', () => {
110110
});
111111

112112
test('worker pool should emit an error on error$ if worker disposal fails', async () => {
113-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 1);
113+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 0, 1, 0);
114114

115115
jest.spyOn(testWorkerInstance, 'dispose').mockImplementationOnce(async () => {
116116
throw new Error('DisposeError');
@@ -126,13 +126,13 @@ describe('DynamicWorkerPool', () => {
126126
});
127127

128128
test('executeTask on a stopped DynamicWorkerPool should reject', async () => {
129-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
129+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
130130
testPool.stop();
131131
await expect(testPool.executeTask('1')).rejects.toThrowError('Cannot aquire worker from stopped worker pool!');
132132
});
133133

134134
test('stop should terminate all current workers', async () => {
135-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
135+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
136136
const workerDisposeSpy = jest.spyOn(testWorkerInstance, 'dispose');
137137

138138
await Promise.all([
@@ -149,7 +149,7 @@ describe('DynamicWorkerPool', () => {
149149
});
150150

151151
test('busyWorkers$ should emit number of workers in use', async () => {
152-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
152+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
153153

154154
const busyWorkersHistoryPromise = testPool.busyWorkers$.pipe(bufferCount(13), first()).toPromise();
155155

@@ -167,7 +167,7 @@ describe('DynamicWorkerPool', () => {
167167
});
168168

169169
test('idleWorkers$ should emit number of idle workers', async () => {
170-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
170+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
171171

172172
const idleWorkersHistoryPromise = testPool.idleWorkers$.pipe(bufferCount(5), first()).toPromise();
173173

@@ -185,7 +185,7 @@ describe('DynamicWorkerPool', () => {
185185
});
186186

187187
test('availableWorkers$ should emit number of available workers', async () => {
188-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
188+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
189189

190190
const availableWorkersHistoryPromise = testPool.availableWorkers$.pipe(bufferCount(13), first()).toPromise();
191191

@@ -203,7 +203,7 @@ describe('DynamicWorkerPool', () => {
203203
});
204204

205205
test('availableWorkers$ should emit 0 for a stopped pool', async () => {
206-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
206+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
207207

208208
const availableWorkersHistoryPromise = testPool.availableWorkers$.pipe(bufferCount(4), first()).toPromise();
209209

@@ -215,13 +215,13 @@ describe('DynamicWorkerPool', () => {
215215
});
216216

217217
test('availableWorkers should return 0 for a stopped pool', async () => {
218-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
218+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
219219
await testPool.stop();
220220
expect(testPool.availableWorkers).toEqual(0);
221221
});
222222

223223
test('availableWorkers should return number of possible free workers', async () => {
224-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
224+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
225225
expect(testPool.availableWorkers).toEqual(5);
226226
const taskResultPromise = testPool.executeTask('1');
227227
expect(testPool.availableWorkers).toEqual(4);
@@ -230,13 +230,13 @@ describe('DynamicWorkerPool', () => {
230230
});
231231

232232
test('idleWorkers should return 0 for a stopped pool', async () => {
233-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
233+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
234234
await testPool.stop();
235235
expect(testPool.idleWorkers).toEqual(0);
236236
});
237237

238238
test('idleWorkers should return the number of created workers in idle state', async () => {
239-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
239+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
240240
expect(testPool.idleWorkers).toEqual(0);
241241
const taskResultPromise = testPool.executeTask('1');
242242
expect(testPool.idleWorkers).toEqual(0);
@@ -245,17 +245,44 @@ describe('DynamicWorkerPool', () => {
245245
});
246246

247247
test('busyWorkers should return 0 for a stopped pool', async () => {
248-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
248+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
249249
await testPool.stop();
250250
expect(testPool.busyWorkers).toEqual(0);
251251
});
252252

253253
test('busyWorkers should return the number of working workers', async () => {
254-
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5);
254+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 0);
255255
expect(testPool.busyWorkers).toEqual(0);
256256
const taskResultPromise = testPool.executeTask('1');
257257
expect(testPool.busyWorkers).toEqual(1);
258258
await taskResultPromise;
259259
expect(testPool.busyWorkers).toEqual(0);
260260
});
261+
262+
test('DynamicWorkerPool with idleTimeout should keep idle workers for specified time', async () => {
263+
const testPool = new DynamicWorkerPool<string, string, TestWorker>(workerFactory, 2, 5, 100);
264+
const workerDisposeSpy = jest.spyOn(testWorkerInstance, 'dispose');
265+
266+
await Promise.all([
267+
testPool.executeTask('1'),
268+
testPool.executeTask('2'),
269+
testPool.executeTask('3'),
270+
testPool.executeTask('4'),
271+
testPool.executeTask('5')
272+
]);
273+
274+
await new Promise((resolve) => setTimeout(resolve, 10));
275+
276+
expect(workerDisposeSpy).toHaveBeenCalledTimes(0);
277+
expect(testPool.size).toEqual(5);
278+
279+
await new Promise((resolve) => setTimeout(resolve, 100));
280+
281+
expect(testPool.size).toEqual(2);
282+
expect(workerDisposeSpy).toHaveBeenCalledTimes(3);
283+
284+
await testPool.stop();
285+
expect(testPool.size).toEqual(0);
286+
expect(workerDisposeSpy).toHaveBeenCalledTimes(5);
287+
});
261288
});

src/worker-pool/dynamic-worker-pool.ts

+40-16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { distinctUntilChanged, filter, first } from 'rxjs/operators';
44
import { ObservableQueue } from '../data-structures';
55
import { WorkerFactory } from '../worker-factory';
66
import { AbstractWorkerPool, AbstractWorkerPoolWorker } from './abstract-worker-pool';
7+
import { IdleWorkerDescription } from './idle-worker-description';
78

89
export interface DynamicWorkerPoolWorker<TTask, TResult> extends AbstractWorkerPoolWorker<TTask, TResult> {
910
dispose(): Promise<void>;
@@ -27,13 +28,14 @@ export class DynamicWorkerPool<
2728
private readonly availableWorkersSubject: BehaviorSubject<number>;
2829
private readonly errorSubject = new Subject<Error>();
2930
private readonly busyWorkersQueue = new ObservableQueue<TWorker>();
30-
private readonly idleWorkersQueue = new ObservableQueue<TWorker>();
31+
private readonly idleWorkersQueue = new ObservableQueue<IdleWorkerDescription<TWorker>>();
3132
private stopped = false;
3233

3334
constructor(
3435
private readonly workerFactory: WorkerFactory<TWorker>,
3536
private readonly minSize: number,
36-
private readonly maxSize: number
37+
private readonly maxSize: number,
38+
private readonly workersIdleTimeout: number
3739
) {
3840
super();
3941

@@ -66,26 +68,30 @@ export class DynamicWorkerPool<
6668

6769
await this.busyWorkers$
6870
.pipe(
69-
filter((x) => x === 0),
71+
filter((count) => count === 0),
7072
first()
7173
)
7274
.toPromise();
7375

74-
const existingWorkers = this.idleWorkersQueue.clear();
76+
const existingWorkerDescriptions = this.idleWorkersQueue.clear();
77+
existingWorkerDescriptions.forEach((d) => d.cancelIdleTimeout());
78+
79+
const existingWorkers = existingWorkerDescriptions.map(({ worker }) => worker);
7580
this.updatePoolSize();
7681
this.updateAvailableWorkers();
77-
await Promise.all(existingWorkers.map((w) => this.safeDispose(w)));
82+
await Promise.all(existingWorkers.map((worker) => this.safeDispose(worker)));
7883
}
7984

8085
protected aquireWorker(): TWorker {
8186
if (this.stopped) {
8287
throw new Error('Cannot aquire worker from stopped worker pool!');
8388
} else if (this.idleWorkersQueue.size > 0) {
84-
const worker = this.idleWorkersQueue.dequeue();
85-
this.busyWorkersQueue.enqueue(worker);
89+
const workerDescription = this.idleWorkersQueue.dequeue();
90+
workerDescription.cancelIdleTimeout();
91+
this.busyWorkersQueue.enqueue(workerDescription.worker);
8692
// NOTE: poolsize did not change since we reused an existing worker -> no need to update poolsize
8793
this.updateAvailableWorkers();
88-
return worker;
94+
return workerDescription.worker;
8995
} else if (this.size < this.maxSize) {
9096
const worker = this.workerFactory.createWorker();
9197
this.busyWorkersQueue.enqueue(worker);
@@ -98,16 +104,34 @@ export class DynamicWorkerPool<
98104
}
99105

100106
protected releaseWorker(worker: TWorker): void {
101-
const currentWorkersCount = this.size;
102-
this.busyWorkersQueue.drop(worker);
103-
if (currentWorkersCount > this.minSize) {
104-
// NOTE: safeDispose will never reject -> leave promise uncatched is safe
105-
this.safeDispose(worker);
106-
this.updatePoolSize();
107+
if (this.workersIdleTimeout < 1) {
108+
// Workers do not have an idle timeout -> remove or enqueue them immediately
109+
const currentWorkersCount = this.size;
110+
this.busyWorkersQueue.drop(worker);
111+
112+
if (currentWorkersCount > this.minSize) {
113+
// NOTE: safeDispose will never reject -> leave promise uncatched is safe
114+
this.safeDispose(worker);
115+
this.updatePoolSize();
116+
} else {
117+
this.idleWorkersQueue.enqueue(new IdleWorkerDescription<TWorker>(worker));
118+
// NOTE: poolsize did not change since we standbyed an existing worker -> no need to update poolsize
119+
}
107120
} else {
108-
this.idleWorkersQueue.enqueue(worker);
109-
// NOTE: poolsize did not change since we standbyed an existing worker -> no need to update poolsize
121+
// Workers do have an idle timeout -> enqueue them as idle worker and remove them if possible after timeout
122+
this.busyWorkersQueue.drop(worker);
123+
const idleWorkerDescription = new IdleWorkerDescription<TWorker>(worker, this.workersIdleTimeout);
124+
idleWorkerDescription.addTeardownLogic(() => {
125+
if (this.size > this.minSize) {
126+
this.idleWorkersQueue.drop(idleWorkerDescription);
127+
this.safeDispose(worker);
128+
this.updatePoolSize();
129+
this.updateAvailableWorkers();
130+
}
131+
});
132+
this.idleWorkersQueue.enqueue(idleWorkerDescription);
110133
}
134+
111135
this.updateAvailableWorkers();
112136
}
113137

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { IdleWorkerDescription } from './idle-worker-description';
2+
3+
describe('IdleWorkerDescription', () => {
4+
jest.useFakeTimers();
5+
6+
beforeEach(() => {
7+
jest.clearAllMocks();
8+
});
9+
10+
test('creating a new IdleWorkerDescription should start idleTimout if specified', () => {
11+
const idleWorkerDescription = new IdleWorkerDescription<unknown>(undefined, 3000);
12+
// required to avoid unused local variable warning
13+
expect(idleWorkerDescription).toBeDefined();
14+
expect(setTimeout).toHaveBeenCalledWith(expect.any(Function), 3000);
15+
});
16+
17+
test('cancelIdleTimeout should clear idle-teardown-logic timeout', () => {
18+
const idleWorkerDescription = new IdleWorkerDescription<unknown>(undefined, 3000);
19+
idleWorkerDescription.cancelIdleTimeout();
20+
expect(clearTimeout).toHaveBeenCalledTimes(1);
21+
});
22+
23+
test('cancelIdleTimeout should not have any effect if no timeout was specified', () => {
24+
const idleWorkerDescription = new IdleWorkerDescription<unknown>(undefined);
25+
idleWorkerDescription.cancelIdleTimeout();
26+
expect(clearTimeout).toHaveBeenCalledTimes(0);
27+
});
28+
29+
test('creating a new IdleWorkerDescription should not start idleTimout if not specified', () => {
30+
const idleWorkerDescription = new IdleWorkerDescription<unknown>(undefined);
31+
// required to avoid unused local variable warning
32+
expect(idleWorkerDescription).toBeDefined();
33+
expect(setTimeout).toHaveBeenCalledTimes(0);
34+
});
35+
36+
test('IdleWorkerDescription should execute teardown logics if idle timeout times out', () => {
37+
const idleWorkerDescription = new IdleWorkerDescription<unknown>(undefined, 3000);
38+
const tearDownLogics = [jest.fn(), jest.fn()];
39+
for (const tearDownLogic of tearDownLogics) {
40+
idleWorkerDescription.addTeardownLogic(tearDownLogic);
41+
}
42+
jest.runAllTimers();
43+
for (const tearDownLogic of tearDownLogics) {
44+
expect(tearDownLogic).toHaveBeenCalledTimes(1);
45+
}
46+
});
47+
});
+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
export type TeardownLogic = () => void;
2+
3+
// TODO: test
4+
export class IdleWorkerDescription<TWorker> {
5+
private readonly timeoutTimer: NodeJS.Timeout | undefined;
6+
private readonly teardownLogics: Array<TeardownLogic> = [];
7+
8+
constructor(public readonly worker: TWorker, idleTimeout?: number) {
9+
if (idleTimeout != null) {
10+
this.timeoutTimer = setTimeout(
11+
() => this.teardownLogics.forEach((teardownLogic) => teardownLogic()),
12+
idleTimeout
13+
);
14+
}
15+
}
16+
17+
public addTeardownLogic(teardownLogic: () => void): void {
18+
this.teardownLogics.push(teardownLogic);
19+
}
20+
21+
public cancelIdleTimeout(): void {
22+
if (this.timeoutTimer != null) {
23+
clearTimeout(this.timeoutTimer);
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)