Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit fa309f0

Browse files
author
Chris Raynor
authored
[Feature] Add/Remove workers dynamically. (#72)
1 parent f9915c9 commit fa309f0

File tree

6 files changed

+162
-11
lines changed

6 files changed

+162
-11
lines changed

changelog.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
feature - Added ability to dynamically manage workers using `q.getWorkerCount()`, `q.addWorker()`, and `q.shutdownWorker()` (thanks to @startswithaj)

docs/guide.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* [Queue Security](#queue-security)
1111
* [Defining Specs (Optional)](#defining-specs-optional)
1212
* [Graceful Shutdown](#graceful-shutdown)
13+
* [Dynamic Worker Count](#dynamic-worker-count)
1314
* [Message Sanitization, Revisited](#message-sanitization-revisited)
1415
* [Custom references to tasks and specs](#custom-references-to-tasks-and-specs)
1516
* [Wrap Up](#wrap-up)
@@ -87,7 +88,7 @@ Multiple queue workers can be initialized on multiple machines and Firebase-Queu
8788

8889
Queue workers can take an optional options object to specify:
8990
- `specId` - specifies the spec type for this worker. This is important when creating multiple specs. Defaults to `null` which uses the default spec.
90-
- `numWorkers` - specifies the number of workers to run simultaneously for this node.js thread. Defaults to 1 worker.
91+
- `numWorkers` - specifies the number of initial workers to run simultaneously for this node.js thread. Defaults to 1 worker, and can be updated once the queue has been initialized (see the [Dynamic Worker Count](#dynamic-worker-count) section).
9192
- `sanitize` - specifies whether the `data` object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults to `true`.
9293
- `suppressStack` - specifies whether the queue will suppress error stack traces from being placed in the `_error_details` of the task if it's rejected with an Error.
9394

@@ -368,6 +369,15 @@ process.on('SIGINT', function() {
368369
```
369370

370371

372+
## Dynamic Worker Count
373+
374+
The number of workers running simultaneously in the same node.js thread can be managed dynamically using the following three methods on the instantiated Queue object:
375+
376+
- `getWorkerCount()` - This method returns the current number of workers on a queue.
377+
- `addWorker()` - This method instantiates a new worker with the queue's current specs.
378+
- `shutdownWorker()` - This method gracefully shuts down a worker and returns a promise fulfilled when shutdown. If there are no more workers to shutdown, the promise will be rejected.
379+
380+
371381
## Message Sanitization, Revisited
372382

373383
In our example at the beginning, you wanted to perform several actions on your chat system:

src/lib/queue_worker.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,10 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) {
750750
QueueWorker.prototype.shutdown = function() {
751751
var self = this;
752752

753+
if (!_.isNull(self.shutdownDeferred)) {
754+
return self.shutdownDeferred.promise;
755+
}
756+
753757
logger.debug(self._getLogEntry('shutting down'));
754758

755759
// Set the global shutdown deferred promise, which signals we're shutting down

src/queue.js

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ function Queue() {
6363
self.sanitize = DEFAULT_SANITIZE;
6464
self.suppressStack = DEFAULT_SUPPRESS_STACK;
6565
self.initialized = false;
66+
self.shuttingDown = false;
6667

6768
self.specChangeListener = null;
6869

@@ -173,6 +174,7 @@ function Queue() {
173174
for (var k = 0; k < self.numWorkers; k++) {
174175
self.workers[k].setTaskSpec(taskSpec);
175176
}
177+
self.currentTaskSpec = taskSpec;
176178
self.initialized = true;
177179
}, /* istanbul ignore next */ function(err) {
178180
logger.debug('Queue(): Error connecting to Firebase reference',
@@ -183,25 +185,81 @@ function Queue() {
183185
return self;
184186
}
185187

186-
187188
/**
188189
* Gracefully shuts down a queue.
189190
* @returns {RSVP.Promise} A promise fulfilled when all the worker processes
190191
* have finished their current tasks and are no longer listening for new ones.
191192
*/
192193
Queue.prototype.shutdown = function() {
193-
var self = this;
194-
194+
this.shuttingDown = true;
195195
logger.debug('Queue: Shutting down');
196-
if (!_.isNull(self.specChangeListener)) {
197-
self.specsRef.child(self.specId).off('value',
198-
self.specChangeListener);
199-
self.specChangeListener = null;
196+
if (!_.isNull(this.specChangeListener)) {
197+
this.specsRef.child(this.specId).off('value',
198+
this.specChangeListener);
199+
this.specChangeListener = null;
200200
}
201201

202-
return RSVP.all(_.map(self.workers, function(worker) {
202+
return RSVP.all(_.map(this.workers, function(worker) {
203203
return worker.shutdown();
204204
}));
205205
};
206206

207+
/**
208+
* Gets queue worker count.
209+
* @returns {Number} Total number of workers for this queue.
210+
*/
211+
Queue.prototype.getWorkerCount = function() {
212+
return this.workers.length;
213+
};
214+
215+
/**
216+
* Adds a queue worker.
217+
* @returns {QueueWorker} the worker created.
218+
*/
219+
Queue.prototype.addWorker = function() {
220+
if (this.shuttingDown) {
221+
throw new Error('Cannot add worker while queue is shutting down');
222+
}
223+
224+
logger.debug('Queue: adding worker');
225+
var processId = (this.specId ? this.specId + ':' : '') + this.workers.length;
226+
var worker = new QueueWorker(
227+
this.tasksRef,
228+
processId,
229+
this.sanitize,
230+
this.suppressStack,
231+
this.processingFunction
232+
);
233+
this.workers.push(worker);
234+
235+
if (_.isUndefined(this.specId)) {
236+
worker.setTaskSpec(DEFAULT_TASK_SPEC);
237+
// if the currentTaskSpec is not yet set it will be called once it's fetched
238+
} else if (!_.isUndefined(this.currentTaskSpec)) {
239+
worker.setTaskSpec(this.currentTaskSpec);
240+
}
241+
242+
return worker;
243+
};
244+
245+
/**
246+
* Shutdowns a queue worker if one exists.
247+
* @returns {RSVP.Promise} A promise fulfilled once the worker is shutdown
248+
* or rejected if there are no workers left to shutdown.
249+
*/
250+
Queue.prototype.shutdownWorker = function() {
251+
var worker = this.workers.pop();
252+
253+
var promise;
254+
if (_.isUndefined(worker)) {
255+
promise = RSVP.reject(new Error('No workers to shutdown'));
256+
} else {
257+
logger.debug('Queue: shutting down worker');
258+
promise = worker.shutdown();
259+
}
260+
261+
return promise;
262+
};
263+
264+
207265
module.exports = Queue;

test/lib/queue_worker.spec.js

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2117,7 +2117,7 @@ describe('QueueWorker', function() {
21172117
setTimeout(function() {
21182118
callbackComplete = true;
21192119
resolve();
2120-
}, 250);
2120+
}, 500);
21212121
});
21222122
});
21232123

@@ -2147,7 +2147,26 @@ describe('QueueWorker', function() {
21472147
} catch (errorB) {
21482148
done(errorB);
21492149
}
2150-
}, 100);
2150+
}, 500);
2151+
});
2152+
});
2153+
2154+
it('should return the same shutdown promise if shutdown is called twice', function(done) {
2155+
qw.setTaskSpec(th.validBasicTaskSpec);
2156+
tasksRef.push({
2157+
foo: 'bar'
2158+
}, function(errorA) {
2159+
if (errorA) {
2160+
return done(errorA);
2161+
}
2162+
try {
2163+
var firstPromise = qw.shutdown();
2164+
var secondPromise = qw.shutdown();
2165+
expect(firstPromise).to.deep.equal(secondPromise);
2166+
return done();
2167+
} catch (errorB) {
2168+
return done(errorB);
2169+
}
21512170
});
21522171
});
21532172
});

test/queue.spec.js

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,65 @@ describe('Queue', function() {
144144
});
145145
});
146146

147+
describe('#getWorkerCount', function() {
148+
it('should return worker count with options.numWorkers', function() {
149+
var numWorkers = 10;
150+
var q = new th.Queue(th.testRef, { numWorkers: numWorkers }, _.noop);
151+
expect(q.getWorkerCount()).to.equal(numWorkers);
152+
});
153+
});
154+
155+
describe('#addWorker', function() {
156+
it('should add worker', function() {
157+
var q = new th.Queue(th.testRef, _.noop);
158+
expect(q.getWorkerCount()).to.equal(1);
159+
q.addWorker();
160+
expect(q.getWorkerCount()).to.equal(2);
161+
});
162+
163+
it('should add worker with correct process id', function() {
164+
var specId = 'test_task';
165+
var q = new th.Queue(th.testRef, { specId: specId }, _.noop);
166+
var worker = q.addWorker();
167+
var specRegex = new RegExp('^' + specId + ':1:[a-f0-9\\-]{36}$');
168+
expect(worker.processId).to.match(specRegex);
169+
});
170+
171+
it('should not allow a worker to be added if the queue is shutting down', function() {
172+
var q = new th.Queue(th.testRef, _.noop);
173+
expect(q.getWorkerCount()).to.equal(1);
174+
q.shutdown();
175+
expect(function() {
176+
q.addWorker();
177+
}).to.throw('Cannot add worker while queue is shutting down');
178+
});
179+
});
180+
181+
describe('#shutdownWorker', function() {
182+
it('should remove worker', function() {
183+
var q = new th.Queue(th.testRef, _.noop);
184+
expect(q.getWorkerCount()).to.equal(1);
185+
q.shutdownWorker();
186+
expect(q.getWorkerCount()).to.equal(0);
187+
});
188+
189+
it('should shutdown worker', function() {
190+
var q = new th.Queue(th.testRef, _.noop);
191+
expect(q.getWorkerCount()).to.equal(1);
192+
var workerShutdownPromise = q.shutdownWorker();
193+
return workerShutdownPromise;
194+
});
195+
196+
it('should reject when no workers remaining', function() {
197+
var q = new th.Queue(th.testRef, _.noop);
198+
expect(q.getWorkerCount()).to.equal(1);
199+
q.shutdownWorker();
200+
return q.shutdownWorker().catch(function(error) {
201+
expect(error.message).to.equal('No workers to shutdown');
202+
});
203+
});
204+
});
205+
147206
describe('#shutdown', function() {
148207
var q;
149208

0 commit comments

Comments
 (0)