@@ -42,6 +42,12 @@ const debug = Debug("cluster");
42
42
43
43
const REJECT_OVERWRITTEN_COMMANDS = new WeakSet < Command > ( ) ;
44
44
45
+ type OfflineQueueItem = {
46
+ command : Command ;
47
+ stream : WriteableStream ;
48
+ node : unknown ;
49
+ } ;
50
+
45
51
type ClusterStatus =
46
52
| "end"
47
53
| "close"
@@ -80,7 +86,7 @@ class Cluster extends Commander {
80
86
private manuallyClosing : boolean ;
81
87
private retryAttempts = 0 ;
82
88
private delayQueue : DelayQueue = new DelayQueue ( ) ;
83
- private offlineQueue = new Deque ( ) ;
89
+ private offlineQueue = new Deque < OfflineQueueItem > ( ) ;
84
90
private subscriber : ClusterSubscriber ;
85
91
private slotsTimer : NodeJS . Timer ;
86
92
private reconnectTimeout : NodeJS . Timer ;
@@ -204,13 +210,13 @@ class Cluster extends Commander {
204
210
}
205
211
this . connectionPool . reset ( nodes ) ;
206
212
207
- function readyHandler ( ) {
213
+ const readyHandler = ( ) => {
208
214
this . setStatus ( "ready" ) ;
209
215
this . retryAttempts = 0 ;
210
216
this . executeOfflineCommands ( ) ;
211
217
this . resetNodesRefreshInterval ( ) ;
212
218
resolve ( ) ;
213
- }
219
+ } ;
214
220
215
221
let closeListener : ( ) => void = undefined ;
216
222
const refreshListener = ( ) => {
@@ -229,15 +235,15 @@ class Cluster extends Commander {
229
235
this . disconnect ( true ) ;
230
236
}
231
237
} else {
232
- readyHandler . call ( this ) ;
238
+ readyHandler ( ) ;
233
239
}
234
240
} ) ;
235
241
} else {
236
- readyHandler . call ( this ) ;
242
+ readyHandler ( ) ;
237
243
}
238
244
} ;
239
245
240
- closeListener = function ( ) {
246
+ closeListener = ( ) => {
241
247
const error = new Error ( "None of startup nodes is available" ) ;
242
248
243
249
this . removeListener ( "refresh" , refreshListener ) ;
@@ -411,8 +417,8 @@ class Cluster extends Commander {
411
417
this . isRefreshing = true ;
412
418
413
419
const _this = this ;
414
- const wrapper = function ( error ?: Error ) {
415
- _this . isRefreshing = false ;
420
+ const wrapper = ( error ?: Error ) => {
421
+ this . isRefreshing = false ;
416
422
if ( callback ) {
417
423
callback ( error ) ;
418
424
}
@@ -729,7 +735,7 @@ class Cluster extends Commander {
729
735
debug ( "closed because %s" , reason ) ;
730
736
}
731
737
732
- let retryDelay ;
738
+ let retryDelay : unknown ;
733
739
if (
734
740
! this . manuallyClosing &&
735
741
typeof this . options . clusterRetryStrategy === "function"
@@ -743,13 +749,13 @@ class Cluster extends Commander {
743
749
if ( typeof retryDelay === "number" ) {
744
750
this . setStatus ( "reconnecting" ) ;
745
751
this . reconnectTimeout = setTimeout (
746
- function ( ) {
752
+ ( ) => {
747
753
this . reconnectTimeout = null ;
748
754
debug ( "Cluster is disconnected. Retrying after %dms" , retryDelay ) ;
749
755
this . connect ( ) . catch ( function ( err ) {
750
756
debug ( "Got error %s when reconnecting. Ignoring..." , err ) ;
751
757
} ) ;
752
- } . bind ( this ) ,
758
+ } ,
753
759
retryDelay
754
760
) ;
755
761
} else {
@@ -762,7 +768,7 @@ class Cluster extends Commander {
762
768
* Flush offline queue with error.
763
769
*/
764
770
private flushQueue ( error : Error ) {
765
- let item ;
771
+ let item : OfflineQueueItem ;
766
772
while ( ( item = this . offlineQueue . shift ( ) ) ) {
767
773
item . command . reject ( error ) ;
768
774
}
@@ -773,7 +779,7 @@ class Cluster extends Commander {
773
779
debug ( "send %d commands in offline queue" , this . offlineQueue . length ) ;
774
780
const offlineQueue = this . offlineQueue ;
775
781
this . resetOfflineQueue ( ) ;
776
- let item ;
782
+ let item : OfflineQueueItem ;
777
783
while ( ( item = offlineQueue . shift ( ) ) ) {
778
784
this . sendCommand ( item . command , item . stream , item . node ) ;
779
785
}
@@ -899,7 +905,7 @@ class Cluster extends Commander {
899
905
) ;
900
906
}
901
907
902
- private invokeReadyDelayedCallbacks ( err ) {
908
+ private invokeReadyDelayedCallbacks ( err ?: Error ) {
903
909
for ( const c of this . _readyDelayedCallbacks ) {
904
910
process . nextTick ( c , err ) ;
905
911
}
@@ -911,7 +917,7 @@ class Cluster extends Commander {
911
917
* Check whether Cluster is able to process commands
912
918
*/
913
919
private readyCheck ( callback : Callback < void | "fail" > ) : void {
914
- this . cluster ( "INFO" , function ( err , res ) {
920
+ this . cluster ( "INFO" , ( err , res ) => {
915
921
if ( err ) {
916
922
return callback ( err ) ;
917
923
}
@@ -1003,38 +1009,35 @@ class Cluster extends Commander {
1003
1009
* This process happens every time when #connect() is called since
1004
1010
* #startupNodes and DNS records may chanage.
1005
1011
*/
1006
- private resolveStartupNodeHostnames ( ) : Promise < RedisOptions [ ] > {
1012
+ private async resolveStartupNodeHostnames ( ) : Promise < RedisOptions [ ] > {
1007
1013
if ( ! Array . isArray ( this . startupNodes ) || this . startupNodes . length === 0 ) {
1008
- return Promise . reject (
1009
- new Error ( "`startupNodes` should contain at least one node." )
1010
- ) ;
1014
+ throw new Error ( "`startupNodes` should contain at least one node." ) ;
1011
1015
}
1012
1016
const startupNodes = normalizeNodeOptions ( this . startupNodes ) ;
1013
1017
1014
1018
const hostnames = getUniqueHostnamesFromOptions ( startupNodes ) ;
1015
1019
if ( hostnames . length === 0 ) {
1016
- return Promise . resolve ( startupNodes ) ;
1020
+ return startupNodes ;
1017
1021
}
1018
1022
1019
- return Promise . all (
1023
+ const configs = await Promise . all (
1020
1024
hostnames . map (
1021
1025
( this . options . useSRVRecords ? this . resolveSrv : this . dnsLookup ) . bind (
1022
1026
this
1023
1027
)
1024
1028
)
1025
- ) . then ( ( configs ) => {
1026
- const hostnameToConfig = zipMap ( hostnames , configs ) ;
1027
-
1028
- return startupNodes . map ( ( node ) => {
1029
- const config = hostnameToConfig . get ( node . host ) ;
1030
- if ( ! config ) {
1031
- return node ;
1032
- } else if ( this . options . useSRVRecords ) {
1033
- return Object . assign ( { } , node , config ) ;
1034
- } else {
1035
- return Object . assign ( { } , node , { host : config } ) ;
1036
- }
1037
- } ) ;
1029
+ ) ;
1030
+ const hostnameToConfig = zipMap ( hostnames , configs ) ;
1031
+
1032
+ return startupNodes . map ( ( node ) => {
1033
+ const config = hostnameToConfig . get ( node . host ) ;
1034
+ if ( ! config ) {
1035
+ return node ;
1036
+ }
1037
+ if ( this . options . useSRVRecords ) {
1038
+ return Object . assign ( { } , node , config ) ;
1039
+ }
1040
+ return Object . assign ( { } , node , { host : config } ) ;
1038
1041
} ) ;
1039
1042
}
1040
1043
0 commit comments