Skip to content

Commit 3841886

Browse files
committed
Refactored code base
1 parent 8289da3 commit 3841886

File tree

4 files changed

+96
-42
lines changed

4 files changed

+96
-42
lines changed

lib/server/balancer/route.js

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,11 @@
11
var Cookies = Npm.require('cookies');
22

3-
var workers = null;
4-
5-
// We need to start the worker pool after the server binded
6-
// This allow use to play nicely with tools like userdown
7-
WebApp.onListening(function() {
8-
var workersCount = Balancer._getWorkersCount();
9-
workers = new WorkerPool(workersCount);
10-
});
11-
123
Balancer.handleHttp = function handleHttp(req, res) {
13-
if(!Cluster.discovery) return processHereHTTP();
4+
if(!Cluster.discovery) return Balancer._processHereHTTP(req, res);
145

156
// if this is from a balance, we don't need to proxy it
167
if(req.headers['from-balancer']) {
17-
return processHereHTTP();
8+
return Balancer._processHereHTTP(req, res);
189
}
1910

2011
var cookies = new Cookies(req, res);
@@ -47,11 +38,11 @@ Balancer.handleHttp = function handleHttp(req, res) {
4738
// we can get the endpointHash from the cookie
4839
var endpointHash = cookies.get('cluster-endpoint');
4940
var endpoint = Balancer._pickEndpoint(endpointHash, cookies);
50-
if(!endpoint) return processHereHTTP();
41+
if(!endpoint) return Balancer._processHereHTTP(req, res);
5142
}
5243

5344
if(endpoint === Cluster._endpoint) {
54-
return processHereHTTP();
45+
return Balancer._processHereHTTP(req, res);
5546
}
5647

5748
Balancer._setFromBalanceUrlHeader(req);
@@ -60,11 +51,11 @@ Balancer.handleHttp = function handleHttp(req, res) {
6051
};
6152

6253
Balancer.handleWs = function handleWs(req, socket, head) {
63-
if(!Cluster.discovery) return processHereWS(req, socket, head);
54+
if(!Cluster.discovery) return Balancer._processHereWS(req, socket, head);
6455

6556
if(req.headers['from-balancer']) {
6657
// if this is from a balance, we don't need to proxy it
67-
return processHereWS(req, socket, head)
58+
return Balancer._processHereWS(req, socket, head)
6859
}
6960

7061
// try to get endpointHash from the our cluster-ddp url
@@ -86,16 +77,16 @@ Balancer.handleWs = function handleWs(req, socket, head) {
8677
// just process here. We don't need to route it to a random web service
8778
// because, it is possible that this endpoint is for some other service
8879
// than web.
89-
return processHereWS(req, socket, head);
80+
return Balancer._processHereWS(req, socket, head);
9081
}
9182
}
9283

9384
if(!endpoint) {
94-
return processHereWS(req, socket, head);
85+
return Balancer._processHereWS(req, socket, head);
9586
}
9687

9788
if(endpoint === Cluster._endpoint) {
98-
return processHereWS(req, socket, head);
89+
return Balancer._processHereWS(req, socket, head);
9990
}
10091

10192
Balancer._setFromBalanceUrlHeader(req);
@@ -104,25 +95,4 @@ Balancer.handleWs = function handleWs(req, socket, head) {
10495
};
10596

10697
OverShadowServerEvent('request', Balancer.handleHttp);
107-
OverShadowServerEvent('upgrade', Balancer.handleWs);
108-
109-
// Process locally. If there are any workers running, proxy DDP traffic to them
110-
111-
function processHereHTTP() {
112-
return false;
113-
}
114-
115-
function processHereWS(req, socket, head) {
116-
if(process.env['CLUSTER_WORKER_ID']) return;
117-
118-
var worker = workers && workers.pickWorker();
119-
// No worker, can't proxy. So process here.
120-
if(!worker) return false;
121-
122-
var target = {host: "127.0.0.1", port: worker.port};
123-
Balancer.proxy.ws(req, socket, head, {target: target}, function(error) {
124-
// not sure we can re-try websockets, simply log it
125-
console.error("Cluster: WS proxying to the worker:", worker, error.message);
126-
});
127-
return true;
128-
}
98+
OverShadowServerEvent('upgrade', Balancer.handleWs);

lib/server/balancer/workers.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
var workerMapping = {};
2+
var workers = null;
3+
4+
// We need to start the worker pool after the server binded
5+
// This allow use to play nicely with tools like userdown
6+
WebApp.onListening(function() {
7+
var workersCount = Balancer._getWorkersCount();
8+
workers = new WorkerPool(workersCount);
9+
});
10+
11+
Balancer._pickWorker = function() {
12+
return workers && workers.pickWorker();
13+
};
14+
15+
Balancer._processHereWS = function _processHereWS(req, socket, head) {
16+
if(process.env['CLUSTER_WORKER_ID']) return;
17+
18+
var worker = Balancer._pickWorker();
19+
// No worker, can't proxy. So process here.
20+
if(!worker) return false;
21+
22+
var target = {host: "127.0.0.1", port: worker.port};
23+
Balancer.proxy.ws(req, socket, head, {target: target}, function(error) {
24+
// not sure we can re-try websockets, simply log it
25+
console.error("Cluster: WS proxying to the worker:", worker, error.message);
26+
});
27+
return true;
28+
};
29+
30+
31+
Balancer._processHereHTTP = function _processHereHTTP(req, res) {
32+
if(process.env['CLUSTER_WORKER_ID']) return;
33+
34+
var longPollingMatcher = /^\/sockjs\/([0-9]+)\/(\w+)\/xhr/;
35+
var match = req.url.match(longPollingMatcher);
36+
if(match) {
37+
var id = match[1] + match[2];
38+
if(!workerMapping[id]) {
39+
var worker = Balancer._pickWorker();
40+
if(worker) {
41+
workerMapping[id] = {worker: worker, lastUpdate: Date.now()}
42+
}
43+
}
44+
45+
if(workerMapping[id]) {
46+
workerMapping[id].lastUpdate = Date.now();
47+
var target = {host: "127.0.0.1", port: workerMapping[id].worker.port}
48+
// Make sure we support long polling
49+
res.setTimeout(2 * 60 * 1000);
50+
Balancer.proxy.web(req, res, {target: target}, function(err) {
51+
res.writeHead(500);
52+
res.end();
53+
// Since this is long polling, error can happen even if user close the
54+
// session. That's why we don't print the message.
55+
//
56+
// Now we also delete the mapping. Then we don't have a leak on
57+
// workerMapping.
58+
// XXX come up with a better plan to clean up workerMapping
59+
delete workerMapping[id];
60+
});
61+
return true;
62+
} else {
63+
// Do not proxy, if we can't find out a worker
64+
return false;
65+
}
66+
} else {
67+
// Do not proxy for other files
68+
return false;
69+
}
70+
}

package.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ function configurePackage(api) {
5252
'lib/server/worker_pool.js',
5353
'lib/server/balancer/namespace.js',
5454
'lib/server/balancer/utils.js',
55+
'lib/server/balancer/workers.js',
5556
'lib/server/balancer/route.js',
5657
'lib/server/auto_connect.js'
5758
], ['server']);

todo.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ test new changes to the balancer
1010
test store
1111
backend registrations
1212
auto config
13-
- make cookie reset time to be around 1hour
14-
- remove the mongo-livedata dependency and use mongodb node driver
13+
make cookie reset time to be around 1hour
14+
remove the mongo-livedata dependency and use mongodb node driver
1515
- add the support to query expired entries locally - to handle when the MongoDB goes down
1616
- verbose mode for logging
1717
- so we can assume better distribution
@@ -20,3 +20,16 @@ auto config
2020
- support websocket URL based service delivery
2121
- load balance direct websocket connections (both web and discovery)
2222
- add an special api to reblance requests
23+
24+
# Multi Core Support
25+
26+
Add proxy support for static files
27+
28+
* If the file has a .xxx extension then don't proxy
29+
* For other, if there is fast-render then proxy
30+
* Othewise don't
31+
32+
Add proxy support for SockJS
33+
Add better worker management
34+
* If exit okay, then simply simply do a messgae and don't start the worker
35+
* Add a retry logic to restart the worker

0 commit comments

Comments
 (0)