@@ -7,8 +7,10 @@ use crate::agent::http_transport::dynamic_routing::{
7
7
health_check:: HealthCheckStatus , node:: Node , snapshot:: routing_snapshot:: RoutingSnapshot ,
8
8
} ;
9
9
10
- // Some big value implying that node is unhealthy, should be much bigger than node's latency.
11
- const MAX_LATENCY : Duration = Duration :: from_secs ( 500 ) ;
10
+ // When a node is detected as unhealthy, we take the following actions:
11
+ // - Remove the node entirely from the routing.
12
+ // - Penalize its moving average by adding a specified value to the stored latency window. This ensures that any node exhibiting intermittent outages is appropriately penalized.
13
+ const PUNISH_LATENCY : Duration = Duration :: from_secs ( 2 ) ;
12
14
13
15
const WINDOW_SIZE : usize = 15 ;
14
16
@@ -20,6 +22,8 @@ type LatencyMovAvg = SumTreeSMA<Duration, u32, WINDOW_SIZE>;
20
22
#[ derive( Clone , Debug ) ]
21
23
struct WeightedNode {
22
24
node : Node ,
25
+ /// Reflects the status of the most recent health check.
26
+ is_healthy : bool ,
23
27
/// Moving mean of latencies measurements.
24
28
latency_mov_avg : LatencyMovAvg ,
25
29
/// Weight of the node (invers of the average latency), used for stochastic weighted random sampling.
@@ -49,14 +53,14 @@ impl LatencyRoutingSnapshot {
49
53
/// Helper function to sample nodes based on their weights.
50
54
/// Here weight index is selected based on the input number in range [0, 1]
51
55
#[ inline( always) ]
52
- fn weighted_sample ( weights : & [ f64 ] , number : f64 ) -> Option < usize > {
56
+ fn weighted_sample ( weighted_nodes : & [ ( f64 , & Node ) ] , number : f64 ) -> Option < usize > {
53
57
if !( 0.0 ..=1.0 ) . contains ( & number) {
54
58
return None ;
55
59
}
56
- let sum: f64 = weights . iter ( ) . sum ( ) ;
60
+ let sum: f64 = weighted_nodes . iter ( ) . map ( |n| n . 0 ) . sum ( ) ;
57
61
let mut weighted_number = number * sum;
58
- for ( idx, weight ) in weights . iter ( ) . enumerate ( ) {
59
- weighted_number -= weight ;
62
+ for ( idx, weighted_node ) in weighted_nodes . iter ( ) . enumerate ( ) {
63
+ weighted_number -= weighted_node . 0 ;
60
64
if weighted_number <= 0.0 {
61
65
return Some ( idx) ;
62
66
}
@@ -70,18 +74,21 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
70
74
}
71
75
72
76
fn next ( & self ) -> Option < Node > {
73
- // We select a node based on it's weight, using a stochastic weighted random sampling approach.
74
- let weights = self
75
- . weighted_nodes
76
- . iter ( )
77
- . map ( |n| n. weight )
78
- . collect :: < Vec < _ > > ( ) ;
77
+ // We select a healthy node based on its weight, using a stochastic weighted random sampling approach.
78
+
79
+ // Preallocate array for a better efficiency.
80
+ let mut healthy_weighted_nodes = Vec :: with_capacity ( self . weighted_nodes . len ( ) ) ;
81
+ for n in & self . weighted_nodes {
82
+ if n. is_healthy {
83
+ healthy_weighted_nodes. push ( ( n. weight , & n. node ) ) ;
84
+ }
85
+ }
79
86
// Generate a random float in the range [0, 1)
80
87
let mut rng = rand:: thread_rng ( ) ;
81
88
let rand_num = rng. gen :: < f64 > ( ) ;
82
89
// Using this random float and an array of weights we get an index of the node.
83
- let idx = weighted_sample ( weights . as_slice ( ) , rand_num) ;
84
- idx. map ( |idx| self . weighted_nodes [ idx] . node . clone ( ) )
90
+ let idx = weighted_sample ( healthy_weighted_nodes . as_slice ( ) , rand_num) ;
91
+ idx. map ( |idx| healthy_weighted_nodes [ idx] . 1 . clone ( ) )
85
92
}
86
93
87
94
fn sync_nodes ( & mut self , nodes : & [ Node ] ) -> bool {
@@ -116,11 +123,12 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
116
123
return false ;
117
124
}
118
125
119
- // If latency is None (meaning Node is unhealthy) , we assign some big value
120
- let latency = health. latency ( ) . unwrap_or ( MAX_LATENCY ) ;
126
+ // If the node is unhealthy, we penalize it's moving average.
127
+ let latency = health. latency ( ) . unwrap_or ( PUNISH_LATENCY ) ;
121
128
122
129
if let Some ( idx) = self . weighted_nodes . iter ( ) . position ( |x| & x. node == node) {
123
130
// Node is already in the array (it is not the first update_node() call).
131
+ self . weighted_nodes [ idx] . is_healthy = health. is_healthy ( ) ;
124
132
self . weighted_nodes [ idx] . latency_mov_avg . add_sample ( latency) ;
125
133
let latency_avg = self . weighted_nodes [ idx] . latency_mov_avg . get_average ( ) ;
126
134
// As nodes with smaller average latencies are preferred for routing, we use inverted values for weights.
@@ -131,6 +139,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
131
139
latency_mov_avg. add_sample ( latency) ;
132
140
let weight = 1.0 / latency_mov_avg. get_average ( ) . as_secs_f64 ( ) ;
133
141
self . weighted_nodes . push ( WeightedNode {
142
+ is_healthy : health. is_healthy ( ) ,
134
143
latency_mov_avg,
135
144
node : node. clone ( ) ,
136
145
weight,
@@ -152,7 +161,8 @@ mod tests {
152
161
node:: Node ,
153
162
snapshot:: {
154
163
latency_based_routing:: {
155
- weighted_sample, LatencyMovAvg , LatencyRoutingSnapshot , WeightedNode , MAX_LATENCY ,
164
+ weighted_sample, LatencyMovAvg , LatencyRoutingSnapshot , WeightedNode ,
165
+ PUNISH_LATENCY ,
156
166
} ,
157
167
routing_snapshot:: RoutingSnapshot ,
158
168
} ,
@@ -212,6 +222,7 @@ mod tests {
212
222
Duration :: from_millis( 1500 )
213
223
) ;
214
224
assert_eq ! ( weighted_node. weight, 1.0 / 1.5 ) ;
225
+ assert_eq ! ( snapshot. next( ) . unwrap( ) , node) ;
215
226
// Check third update
216
227
let health = HealthCheckStatus :: new ( Some ( Duration :: from_secs ( 3 ) ) ) ;
217
228
let is_updated = snapshot. update_node ( & node, health) ;
@@ -222,12 +233,25 @@ mod tests {
222
233
Duration :: from_millis( 2000 )
223
234
) ;
224
235
assert_eq ! ( weighted_node. weight, 0.5 ) ;
236
+ assert_eq ! ( snapshot. next( ) . unwrap( ) , node) ;
225
237
// Check forth update with none
226
238
let health = HealthCheckStatus :: new ( None ) ;
227
239
let is_updated = snapshot. update_node ( & node, health) ;
228
240
assert ! ( is_updated) ;
229
241
let weighted_node = snapshot. weighted_nodes . first ( ) . unwrap ( ) ;
230
- let avg_latency = Duration :: from_secs_f64 ( ( MAX_LATENCY . as_secs ( ) as f64 + 6.0 ) / 4.0 ) ;
242
+ let avg_latency = Duration :: from_secs_f64 ( ( PUNISH_LATENCY . as_secs ( ) as f64 + 6.0 ) / 4.0 ) ;
243
+ assert_eq ! ( weighted_node. latency_mov_avg. get_average( ) , avg_latency) ;
244
+ assert_eq ! ( weighted_node. weight, 1.0 / avg_latency. as_secs_f64( ) ) ;
245
+ assert_eq ! ( snapshot. weighted_nodes. len( ) , 1 ) ;
246
+ assert_eq ! ( snapshot. existing_nodes. len( ) , 1 ) ;
247
+ // No nodes returned, as the node is unhealthy.
248
+ assert ! ( snapshot. next( ) . is_none( ) ) ;
249
+ // Check fifth update
250
+ let health = HealthCheckStatus :: new ( Some ( Duration :: from_secs ( 1 ) ) ) ;
251
+ let is_updated = snapshot. update_node ( & node, health) ;
252
+ assert ! ( is_updated) ;
253
+ let weighted_node = snapshot. weighted_nodes . first ( ) . unwrap ( ) ;
254
+ let avg_latency = Duration :: from_secs_f64 ( ( PUNISH_LATENCY . as_secs ( ) as f64 + 7.0 ) / 5.0 ) ;
231
255
assert_eq ! ( weighted_node. latency_mov_avg. get_average( ) , avg_latency) ;
232
256
assert_eq ! ( weighted_node. weight, 1.0 / avg_latency. as_secs_f64( ) ) ;
233
257
assert_eq ! ( snapshot. weighted_nodes. len( ) , 1 ) ;
@@ -250,6 +274,7 @@ mod tests {
250
274
) ;
251
275
// Add node_1 to weighted_nodes manually
252
276
snapshot. weighted_nodes . push ( WeightedNode {
277
+ is_healthy : true ,
253
278
node : node_1. clone ( ) ,
254
279
latency_mov_avg : LatencyMovAvg :: from_zero ( Duration :: ZERO ) ,
255
280
weight : 0.0 ,
@@ -274,6 +299,7 @@ mod tests {
274
299
assert ! ( snapshot. weighted_nodes. is_empty( ) ) ;
275
300
// Add node_2 to weighted_nodes manually
276
301
snapshot. weighted_nodes . push ( WeightedNode {
302
+ is_healthy : true ,
277
303
node : node_2. clone ( ) ,
278
304
latency_mov_avg : LatencyMovAvg :: from_zero ( Duration :: ZERO ) ,
279
305
weight : 0.0 ,
@@ -289,6 +315,7 @@ mod tests {
289
315
assert_eq ! ( snapshot. weighted_nodes[ 0 ] . node, node_2) ;
290
316
// Add node_3 to weighted_nodes manually
291
317
snapshot. weighted_nodes . push ( WeightedNode {
318
+ is_healthy : true ,
292
319
node : node_3,
293
320
latency_mov_avg : LatencyMovAvg :: from_zero ( Duration :: ZERO ) ,
294
321
weight : 0.0 ,
@@ -308,11 +335,12 @@ mod tests {
308
335
#[ test]
309
336
fn test_weighted_sample ( ) {
310
337
// Case 1: empty array
311
- let arr: & [ f64 ] = & [ ] ;
338
+ let node = Node :: new ( "ic0.com" ) . unwrap ( ) ;
339
+ let arr = & [ ( 0.5 , & node) ] ;
312
340
let idx = weighted_sample ( arr, 0.5 ) ;
313
341
assert_eq ! ( idx, None ) ;
314
342
// Case 2: single element in array
315
- let arr: & [ f64 ] = & [ 1.0 ] ;
343
+ let arr = & [ ( 1.0 , & node ) ] ;
316
344
let idx = weighted_sample ( arr, 0.0 ) ;
317
345
assert_eq ! ( idx, Some ( 0 ) ) ;
318
346
let idx = weighted_sample ( arr, 1.0 ) ;
@@ -323,7 +351,7 @@ mod tests {
323
351
let idx = weighted_sample ( arr, 1.1 ) ;
324
352
assert_eq ! ( idx, None ) ;
325
353
// Case 3: two elements in array (second element has twice the weight of the first)
326
- let arr: & [ f64 ] = & [ 1.0 , 2.0 ] ; // prefixed_sum = [1.0, 3.0]
354
+ let arr = & [ ( 1.0 , & node ) , ( 2.0 , & node ) ] ; // prefixed_sum = [1.0, 3.0]
327
355
let idx = weighted_sample ( arr, 0.0 ) ; // 0.0 * 3.0 < 1.0
328
356
assert_eq ! ( idx, Some ( 0 ) ) ;
329
357
let idx = weighted_sample ( arr, 0.33 ) ; // 0.33 * 3.0 < 1.0
@@ -338,7 +366,7 @@ mod tests {
338
366
let idx = weighted_sample ( arr, 1.1 ) ;
339
367
assert_eq ! ( idx, None ) ;
340
368
// Case 4: four elements in array
341
- let arr: & [ f64 ] = & [ 1.0 , 2.0 , 1.5 , 2.5 ] ; // prefixed_sum = [1.0, 3.0, 4.5, 7.0]
369
+ let arr = & [ ( 1.0 , & node ) , ( 2.0 , & node ) , ( 1.5 , & node ) , ( 2.5 , & node ) ] ; // prefixed_sum = [1.0, 3.0, 4.5, 7.0]
342
370
let idx = weighted_sample ( arr, 0.14 ) ; // 0.14 * 7 < 1.0
343
371
assert_eq ! ( idx, Some ( 0 ) ) ; // probability ~0.14
344
372
let idx = weighted_sample ( arr, 0.15 ) ; // 0.15 * 7 > 1.0
0 commit comments