@@ -8,6 +8,7 @@ var FlamebaseDatabase = require("flamebase-database-node");
88var Path = require ( "./model/path.js" ) ;
99var apply = require ( 'rus-diff' ) . apply ;
1010var clone = require ( 'rus-diff' ) . clone ;
11+ var sha1 = require ( 'sha1' ) ;
1112
1213JSON . stringifyAligned = require ( 'json-align' ) ;
1314
@@ -17,16 +18,21 @@ String.prototype.replaceAll = function(search, replacement) {
1718} ;
1819
1920var expectedDBNEnvVar = "DATABASE_NAME" ;
20- var expectedPORTNEnvVar = "DATABASE_PORT" ;
21+ var expectedPORTEnvVar = "DATABASE_PORT" ;
22+ var expectedAPIKeyEnvVar = "API_KEY" ;
2123var dbMaster = null ;
2224var port = null ;
25+ var APIKey = null ;
2326
2427process . argv . forEach ( function ( val , index , array ) {
2528 if ( val . indexOf ( expectedDBNEnvVar ) > - 1 ) {
2629 dbMaster = val . replaceAll ( expectedDBNEnvVar + "=" , "" ) ;
2730 }
28- if ( val . indexOf ( expectedPORTNEnvVar ) > - 1 ) {
29- port = val . replaceAll ( expectedPORTNEnvVar + "=" , "" ) ;
31+ if ( val . indexOf ( expectedPORTEnvVar ) > - 1 ) {
32+ port = val . replaceAll ( expectedPORTEnvVar + "=" , "" ) ;
33+ }
34+ if ( val . indexOf ( expectedAPIKeyEnvVar ) > - 1 ) {
35+ APIKey = val . replaceAll ( expectedAPIKeyEnvVar + "=" , "" ) ;
3036 }
3137} ) ;
3238
@@ -35,7 +41,6 @@ var TAG = "SERVER CLUSTER";
3541var logger = log4js . getLogger ( TAG ) ;
3642
3743var dbPaths = "paths" ;
38- var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
3944
4045if ( cluster . isMaster ) {
4146
@@ -58,7 +63,8 @@ if (cluster.isMaster) {
5863
5964 var action = {
6065 response : function ( connection , data , error , pId ) {
61- var result = { status : ( data === null || error !== null ? "KO" : "OK" ) , data : data , error : error } ;
66+ var result = { status : ( data === null || error !== null ? "KO" : "OK" ) ,
67+ data : ( data === null ? { } : data ) , error : error } ;
6268 logger . info ( "worker: " + pId ) ;
6369 logger . info ( "response: " + JSON . stringify ( result ) ) ;
6470 connection . response . contentType ( 'application/json' ) ;
@@ -68,7 +74,12 @@ if (cluster.isMaster) {
6874 this . addGreatListener ( connection , pId ) ;
6975 } ,
7076 addGreatListener : function ( connection , pId ) {
77+ var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
78+ paths . syncFromDatabase ( ) ;
79+ logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
7180 paths . syncFromDatabase ( ) ;
81+ logger . debug ( "getting" ) ;
82+ logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
7283
7384 if ( paths . ref === undefined ) {
7485 paths . ref = { }
@@ -92,27 +103,86 @@ if (cluster.isMaster) {
92103 paths . ref [ key ] . tokens [ connection . token ] . os = connection . os ;
93104 }
94105
106+ var keys = Object . keys ( paths . ref [ key ] . tokens ) ;
107+ var lastChangeTime = 0 ;
108+ var lastToken = null ;
109+
110+ for ( var i = keys . length - 1 ; i >= 0 ; i -- ) {
111+ var time = paths . ref [ key ] . tokens [ keys [ i ] ] . time ;
112+ if ( lastChangeTime < time ) {
113+ lastChangeTime = time ;
114+ lastToken = keys [ i ] ;
115+ }
116+ }
117+
95118 paths . ref [ key ] . tokens [ connection . token ] . time = new Date ( ) . getTime ( ) ;
96119
97120 paths . syncToDatabase ( ) ;
98121
122+ var equals = this . verifyLenght ( connection , pId ) ;
123+
99124 var object = this . getReference ( connection , pId ) ;
100125 object . FD . syncFromDatabase ( ) ;
101126 var len = 0 ;
127+
102128 if ( typeof object !== "string" ) {
103129 len = JSON . stringify ( object . FD . ref ) . length ;
130+ } else {
131+ this . response ( connection , null , object , pId ) ;
132+ return ;
104133 }
134+
105135 var data = { } ;
106136 data . len = len ;
107- data . info = "listener_added" ;
137+
138+ if ( lastToken === connection . token && equals ) {
139+ data . info = "listener_up_to_date" ;
140+ } else {
141+ data . info = "listener_ready_for_refresh_client" ;
142+ }
108143
109144 this . response ( connection , data , null , pId ) ;
110145 } else {
111146 this . response ( connection , null , "path_contains_dots" , pId ) ;
112147 }
113148
114149 } ,
150+ verifyLenght : function ( connection , pId ) {
151+ var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
152+ paths . syncFromDatabase ( ) ;
153+ var object = this . getReference ( connection , pId ) ;
154+ // var len = JSON.stringify(object.FD.ref).length;
155+ logger . debug ( sha1 ( JSON . stringify ( object . FD . ref ) ) . toUpperCase ( ) ) ;
156+ logger . debug ( connection . sha1 ) ;
157+
158+ var hash = sha1 ( JSON . stringify ( object . FD . ref ) ) . toUpperCase ( ) ;
159+ return hash === connection . sha1 ;
160+ } ,
161+ getUpdatesFrom : function ( connection , pId ) {
162+ var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
163+ paths . syncFromDatabase ( ) ;
164+ logger . debug ( "getting" ) ;
165+ logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
166+ var object = this . getReference ( connection , pId ) ;
167+ if ( typeof object === "string" ) {
168+ this . response ( connection , null , object , pId ) ;
169+ } else {
170+ var device = {
171+ token : connection . token ,
172+ os : connection . os
173+ }
174+ object . sendUpdateFor ( connection . content , device )
175+ var data = { } ;
176+ data . info = "updates_sent" ;
177+ data . len = JSON . stringify ( object . FD . ref ) . length ;
178+ this . response ( connection , data , null , pId ) ;
179+ }
180+ } ,
115181 updateData : function ( connection , pId ) {
182+ var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
183+ paths . syncFromDatabase ( ) ;
184+ logger . debug ( "getting" ) ;
185+ logger . debug ( JSON . stringifyAligned ( paths . ref ) ) ;
116186 var object = this . getReference ( connection , pId ) ;
117187 if ( typeof object === "string" ) {
118188 this . response ( connection , null , object , pId ) ;
@@ -125,40 +195,43 @@ if (cluster.isMaster) {
125195 logger . debug ( JSON . stringify ( differences ) ) ;
126196 apply ( object . FD . ref , JSON . parse ( differences ) ) ;
127197
128- if ( JSON . stringify ( object . FD . ref ) . length < connection . len ) {
129- logger . error ( "##########_inconsistency_length" ) ;
130- this . response ( connection , null , "inconsistency_length" , pId ) ;
131- return ;
198+ if ( JSON . stringify ( object . FD . ref ) . length !== connection . len ) {
199+ object . FD . syncToDatabase ( ) ;
200+ this . response ( connection , null , "data_updated_with_differences" , pId ) ;
201+ } else {
202+ object . FD . syncToDatabase ( ) ;
203+ this . response ( connection , "data_updated" , null , pId ) ;
132204 }
133-
134- object . FD . syncToDatabase ( ) ;
135-
136- this . response ( connection , "data_updated" , null , pId ) ;
137205 } else {
138206 this . response ( connection , "no_diff_updated" , null , pId ) ;
139207 }
140208 }
141209 } ,
142210 getReference : function ( connection , pId ) {
211+ var paths = new FlamebaseDatabase ( dbPaths , "/" ) ;
212+ paths . syncFromDatabase ( ) ;
213+ var error = null ;
143214 if ( connection . path !== undefined ) {
144215 if ( connection . path . indexOf ( "\." ) === - 1 ) {
145216 if ( connection . path . indexOf ( "/" ) === 0 ) {
146217 var key = connection . path . replaceAll ( "/" , "\." ) ;
147218 key = key . substr ( 1 , key . length - 1 ) ;
148219 if ( paths . ref [ key ] !== undefined ) {
149- return new Path ( paths . ref [ key ] , dbMaster , connection . path , pId ) ;
220+ return new Path ( APIKey , paths . ref [ key ] , dbMaster , connection . path , pId ) ;
150221 } else {
151- return "holder_not_found" ;
222+ error = "holder_not_found" ;
152223 }
153224 } else {
154- return "path_not_starts_with_slash" ;
225+ error = "path_not_starts_with_slash" ;
155226 }
156227 } else {
157- return "path_contains_dots" ;
228+ error = "path_contains_dots" ;
158229 }
159230 } else {
160- return "json_path_not_found" ;
231+ error = "json_path_not_found" ;
161232 }
233+ logger . error ( error ) ;
234+ return error ;
162235 } ,
163236 parseRequest : function ( req , res , worker ) {
164237 var response = res ;
@@ -193,6 +266,11 @@ if (cluster.isMaster) {
193266 logger . debug ( "path: " + connection [ key ] ) ;
194267 break ;
195268
269+ case "sha1" :
270+ connection [ key ] = message [ key ] ;
271+ logger . debug ( "sha1: " + connection [ key ] ) ;
272+ break ;
273+
196274 case "token" :
197275 connection [ key ] = message [ key ] ;
198276 logger . debug ( "token: " + connection [ key ] ) ;
@@ -203,6 +281,11 @@ if (cluster.isMaster) {
203281 logger . debug ( "differences: " + connection [ key ] ) ;
204282 break ;
205283
284+ case "content" :
285+ connection [ key ] = message [ key ] ;
286+ logger . debug ( "content: " + connection [ key ] ) ;
287+ break ;
288+
206289 case "len" :
207290 connection [ key ] = message [ key ] ;
208291 logger . debug ( "len: " + connection [ key ] ) ;
@@ -213,6 +296,11 @@ if (cluster.isMaster) {
213296 logger . debug ( "os: " + connection [ key ] ) ;
214297 break ;
215298
299+ case "clean" :
300+ connection [ key ] = message [ key ] ;
301+ logger . debug ( "clean: " + connection [ key ] ) ;
302+ break ;
303+
216304 default :
217305
218306 //
@@ -253,6 +341,15 @@ if (cluster.isMaster) {
253341 }
254342 break ;
255343
344+ case "get_updates" :
345+ try {
346+ this . getUpdatesFrom ( connection , worker ) ;
347+ } catch ( e ) {
348+ logger . error ( "there was an error parsing request from getUpdatesFrom: " + e . toString ( ) ) ;
349+ this . response ( connection , null , "cluster_" + worker + "_error_getting_updates" , worker ) ;
350+ }
351+ break ;
352+
256353 default :
257354 //
258355 break ;
@@ -287,7 +384,6 @@ if (cluster.isMaster) {
287384 } ) ;
288385
289386 app . listen ( port , function ( ) {
290- paths . syncFromDatabase ( ) ;
291387 logger . info ( "server cluster started on port " + port + " on " + cluster . worker . id + " worker" ) ;
292388 } ) ;
293389}
0 commit comments