1
1
var express = require ( 'express' ) ;
2
2
var bodyParser = require ( 'body-parser' ) ;
3
+ var timeout = require ( 'connect-timeout' )
3
4
var log4js = require ( 'log4js' ) ;
4
5
var cluster = require ( 'cluster' ) ;
5
6
var http = require ( 'http' ) ;
@@ -20,9 +21,11 @@ String.prototype.replaceAll = function(search, replacement) {
20
21
var expectedDBNEnvVar = "DATABASE_NAME" ;
21
22
var expectedPORTEnvVar = "DATABASE_PORT" ;
22
23
var expectedAPIKeyEnvVar = "API_KEY" ;
24
+ var expectedDebugKeyEnvVar = "DEBUG" ;
23
25
var dbMaster = null ;
24
26
var port = null ;
25
27
var APIKey = null ;
28
+ var debug = null ;
26
29
27
30
process . argv . forEach ( function ( val , index , array ) {
28
31
if ( val . indexOf ( expectedDBNEnvVar ) > - 1 ) {
@@ -31,6 +34,9 @@ process.argv.forEach(function (val, index, array) {
31
34
if ( val . indexOf ( expectedPORTEnvVar ) > - 1 ) {
32
35
port = val . replaceAll ( expectedPORTEnvVar + "=" , "" ) ;
33
36
}
37
+ if ( val . indexOf ( expectedDebugKeyEnvVar ) > - 1 ) {
38
+ debug = val . replaceAll ( expectedDebugKeyEnvVar + "=" , "" ) === "true" ? true : false ;
39
+ }
34
40
if ( val . indexOf ( expectedAPIKeyEnvVar ) > - 1 ) {
35
41
APIKey = val . replaceAll ( expectedAPIKeyEnvVar + "=" , "" ) ;
36
42
}
@@ -62,24 +68,17 @@ if (cluster.isMaster) {
62
68
} else {
63
69
64
70
var action = {
65
- response : function ( connection , data , error , pId ) {
71
+ response : function ( connection , data , error , pId ) {
66
72
var result = { status : ( data === null || error !== null ? "KO" : "OK" ) ,
67
73
data : ( data === null ? { } : data ) , error : error } ;
68
74
logger . info ( "worker: " + pId ) ;
69
75
logger . info ( "response: " + JSON . stringify ( result ) ) ;
70
76
connection . response . contentType ( 'application/json' ) ;
71
77
connection . response . send ( result ) ;
72
78
} ,
73
- addSingleListener : function ( connection , pId ) {
74
- this . addGreatListener ( connection , pId ) ;
75
- } ,
76
- addGreatListener : function ( connection , pId ) {
79
+ addListener : function ( connection , pId ) {
77
80
var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
78
81
paths . syncFromDatabase ( ) ;
79
- logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
80
- paths . syncFromDatabase ( ) ;
81
- logger . debug ( "getting" ) ;
82
- logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
83
82
84
83
if ( paths . ref === undefined ) {
85
84
paths . ref = { }
@@ -115,10 +114,10 @@ if (cluster.isMaster) {
115
114
}
116
115
}
117
116
118
- paths . ref [ key ] . tokens [ connection . token ] . time = new Date ( ) . getTime ( ) ;
119
-
120
117
paths . syncToDatabase ( ) ;
121
118
119
+ this . updateTime ( connection ) ;
120
+
122
121
var equals = this . verifyLenght ( connection , pId ) ;
123
122
124
123
var object = this . getReference ( connection , pId ) ;
@@ -135,7 +134,7 @@ if (cluster.isMaster) {
135
134
var data = { } ;
136
135
data . len = len ;
137
136
138
- if ( lastToken === connection . token && equals ) {
137
+ if ( lastToken === connection . token ) {
139
138
data . info = "listener_up_to_date" ;
140
139
} else {
141
140
data . info = "listener_ready_for_refresh_client" ;
@@ -147,22 +146,46 @@ if (cluster.isMaster) {
147
146
}
148
147
149
148
} ,
150
- verifyLenght : function ( connection , pId ) {
149
+ removeListener : function ( connection , pId ) {
151
150
var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
152
151
paths . syncFromDatabase ( ) ;
152
+
153
+ if ( connection . path . indexOf ( "\." ) === - 1 && connection . path . indexOf ( "/" ) === 0 ) {
154
+ var key = connection . path . replaceAll ( "/" , "\." ) ;
155
+ key = key . substr ( 1 , key . length - 1 ) ;
156
+
157
+ if ( paths . ref [ key ] !== undefined && paths . ref [ key ] . tokens !== undefined && paths . ref [ key ] . tokens [ connection . token ] !== undefined ) {
158
+ delete paths . ref [ key ] . tokens [ connection . token ] ;
159
+
160
+ paths . syncToDatabase ( ) ;
161
+
162
+ var data = { } ;
163
+ data . info = "listener_removed" ;
164
+
165
+ this . response ( connection , data , null , pId ) ;
166
+ } else {
167
+ if ( paths . ref [ key ] === undefined ) {
168
+ this . response ( connection , null , "path_not_found" , pId ) ;
169
+ } else {
170
+ this . response ( connection , null , "token_not_found" , pId ) ;
171
+ }
172
+ }
173
+ } else {
174
+ this . response ( connection , null , "path_contains_dots" , pId ) ;
175
+ }
176
+
177
+ } ,
178
+ verifyLenght : function ( connection , pId ) {
153
179
var object = this . getReference ( connection , pId ) ;
154
- // var len = JSON.stringify(object.FD.ref).length;
155
180
logger . debug ( sha1 ( JSON . stringify ( object . FD . ref ) ) . toUpperCase ( ) ) ;
156
181
logger . debug ( connection . sha1 ) ;
157
182
158
183
var hash = sha1 ( JSON . stringify ( object . FD . ref ) ) . toUpperCase ( ) ;
159
184
return hash === connection . sha1 ;
160
185
} ,
161
- getUpdatesFrom : function ( connection , pId ) {
186
+ getUpdatesFrom : function ( connection , pId ) {
162
187
var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
163
188
paths . syncFromDatabase ( ) ;
164
- logger . debug ( "getting" ) ;
165
- logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
166
189
var object = this . getReference ( connection , pId ) ;
167
190
if ( typeof object === "string" ) {
168
191
this . response ( connection , null , object , pId ) ;
@@ -171,18 +194,35 @@ if (cluster.isMaster) {
171
194
token : connection . token ,
172
195
os : connection . os
173
196
}
174
- object . sendUpdateFor ( connection . content , device )
175
- var data = { } ;
176
- data . info = "updates_sent" ;
177
- data . len = JSON . stringify ( object . FD . ref ) . length ;
178
- this . response ( connection , data , null , pId ) ;
197
+ object . sendUpdateFor ( connection . content , device , function ( ) {
198
+ var data = { } ;
199
+ data . info = "updates_sent" ;
200
+ data . len = JSON . stringify ( object . FD . ref ) . length ;
201
+ action . response ( connection , data , null , pId ) ;
202
+ } ) ;
179
203
}
180
204
} ,
181
- updateData : function ( connection , pId ) {
182
- var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
183
- paths . syncFromDatabase ( ) ;
184
- logger . debug ( "getting" ) ;
185
- logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
205
+ updateTime : function ( connection ) {
206
+ if ( connection . path . indexOf ( "\." ) === - 1 && connection . path . indexOf ( "/" ) === 0 ) {
207
+ var key = connection . path . replaceAll ( "/" , "\." ) ;
208
+ key = key . substr ( 1 , key . length - 1 ) ;
209
+
210
+ var paths = new FlamebaseDatabase ( dbPaths , "/" + key ) ;
211
+ paths . syncFromDatabase ( ) ;
212
+
213
+ if ( paths . ref === undefined ) {
214
+ paths . ref = { } ;
215
+ paths . ref . path = connection . path ;
216
+ }
217
+
218
+ if ( paths . ref . tokens === undefined ) {
219
+ paths . ref . tokens = { } ;
220
+ }
221
+ paths . ref . tokens [ connection . token ] . time = new Date ( ) . getTime ( ) ;
222
+ paths . syncToDatabase ( ) ;
223
+ }
224
+ } ,
225
+ updateData : function ( connection , pId ) {
186
226
var object = this . getReference ( connection , pId ) ;
187
227
if ( typeof object === "string" ) {
188
228
this . response ( connection , null , object , pId ) ;
@@ -192,22 +232,23 @@ if (cluster.isMaster) {
192
232
var differences = connection . differences ;
193
233
194
234
if ( differences !== undefined ) {
195
- logger . debug ( JSON . stringify ( differences ) ) ;
196
235
apply ( object . FD . ref , JSON . parse ( differences ) ) ;
236
+ this . updateTime ( connection ) ;
237
+
238
+ object . FD . syncToDatabase ( false , function ( ) {
239
+ if ( JSON . stringify ( object . FD . ref ) . length !== connection . len ) {
240
+ action . response ( connection , null , "data_updated_with_differences" , pId ) ;
241
+ } else {
242
+ action . response ( connection , "data_updated" , null , pId ) ;
243
+ }
244
+ } ) ;
197
245
198
- if ( JSON . stringify ( object . FD . ref ) . length !== connection . len ) {
199
- object . FD . syncToDatabase ( ) ;
200
- this . response ( connection , null , "data_updated_with_differences" , pId ) ;
201
- } else {
202
- object . FD . syncToDatabase ( ) ;
203
- this . response ( connection , "data_updated" , null , pId ) ;
204
- }
205
246
} else {
206
247
this . response ( connection , "no_diff_updated" , null , pId ) ;
207
248
}
208
249
}
209
250
} ,
210
- getReference : function ( connection , pId ) {
251
+ getReference : function ( connection , pId ) {
211
252
var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
212
253
paths . syncFromDatabase ( ) ;
213
254
var error = null ;
@@ -217,7 +258,7 @@ if (cluster.isMaster) {
217
258
var key = connection . path . replaceAll ( "/" , "\." ) ;
218
259
key = key . substr ( 1 , key . length - 1 ) ;
219
260
if ( paths . ref [ key ] !== undefined ) {
220
- return new Path ( APIKey , paths . ref [ key ] , dbMaster , connection . path , pId ) ;
261
+ return new Path ( APIKey , paths . ref [ key ] , dbMaster , connection . path , pId , debug . toString ( ) ) ;
221
262
} else {
222
263
error = "holder_not_found" ;
223
264
}
@@ -233,7 +274,7 @@ if (cluster.isMaster) {
233
274
logger . error ( error ) ;
234
275
return error ;
235
276
} ,
236
- parseRequest : function ( req , res , worker ) {
277
+ parseRequest : function ( req , res , worker ) {
237
278
var response = res ;
238
279
239
280
try {
@@ -314,21 +355,23 @@ if (cluster.isMaster) {
314
355
connection . response = response ;
315
356
316
357
switch ( connection . method ) {
317
- case "single_listener" :
358
+
359
+ case "create_listener" :
318
360
try {
319
- this . addSingleListener ( connection , worker ) ;
361
+ this . addListener ( connection , worker ) ;
320
362
} catch ( e ) {
321
- logger . error ( "there was an error parsing request from addSingleListener : " + e . toString ( ) ) ;
322
- this . response ( connection , null , "cluster_" + worker + "_error_adding_single " , worker ) ;
363
+ logger . error ( "there was an error parsing request from addGreatListener : " + e . toString ( ) ) ;
364
+ this . response ( connection , null , "cluster_" + worker + "_error_creating " , worker ) ;
323
365
}
324
366
break ;
325
367
326
- case "great_listener" :
368
+
369
+ case "remove_listener" :
327
370
try {
328
- this . addGreatListener ( connection , worker ) ;
371
+ this . removeListener ( connection , worker ) ;
329
372
} catch ( e ) {
330
373
logger . error ( "there was an error parsing request from addGreatListener: " + e . toString ( ) ) ;
331
- this . response ( connection , null , "cluster_" + worker + "_error_adding_great " , worker ) ;
374
+ this . response ( connection , null , "cluster_" + worker + "_error_removing_listener " , worker ) ;
332
375
}
333
376
break ;
334
377
@@ -374,6 +417,7 @@ if (cluster.isMaster) {
374
417
} ) ) ;
375
418
376
419
app . use ( bodyParser . json ( { limit : '50mb' } ) ) ;
420
+ app . use ( timeout ( '60s' ) )
377
421
378
422
app . route ( '/' )
379
423
. get ( function ( req , res ) {
0 commit comments