@@ -28,7 +28,7 @@ use async_trait::async_trait;
28
28
use quickwit_common:: pubsub:: EventSubscriber ;
29
29
use quickwit_common:: rendezvous_hasher:: { node_affinity, sort_by_rendez_vous_hash} ;
30
30
use quickwit_proto:: search:: { ReportSplit , ReportSplitsRequest } ;
31
- use tracing:: warn;
31
+ use tracing:: { info , warn} ;
32
32
33
33
use crate :: { SearchJob , SearchServiceClient , SearcherPool , SEARCH_METRICS } ;
34
34
@@ -80,7 +80,7 @@ impl EventSubscriber<ReportSplitsRequest> for SearchJobPlacer {
80
80
. max_by_key ( |node_addr| node_affinity ( * node_addr, & report_split. split_id ) )
81
81
// This actually never happens thanks to the if-condition at the
82
82
// top of this function.
83
- . expect ( "`nodes` should not be empty. " ) ;
83
+ . expect ( "`nodes` should not be empty" ) ;
84
84
splits_per_node
85
85
. entry ( * node_addr)
86
86
. or_default ( )
@@ -150,39 +150,51 @@ impl SearchJobPlacer {
150
150
mut jobs : Vec < J > ,
151
151
excluded_addrs : & HashSet < SocketAddr > ,
152
152
) -> anyhow:: Result < impl Iterator < Item = ( SearchServiceClient , Vec < J > ) > > {
153
- let num_nodes = self . searcher_pool . len ( ) ;
153
+ let mut all_nodes = self . searcher_pool . pairs ( ) ;
154
154
155
- let mut candidate_nodes: Vec < CandidateNodes > = self
156
- . searcher_pool
157
- . pairs ( )
155
+ if all_nodes. is_empty ( ) {
156
+ bail ! (
157
+ "failed to assign search jobs: there are no available searcher nodes in the \
158
+ cluster"
159
+ ) ;
160
+ }
161
+ if !excluded_addrs. is_empty ( ) && excluded_addrs. len ( ) < all_nodes. len ( ) {
162
+ all_nodes. retain ( |( grpc_addr, _) | !excluded_addrs. contains ( grpc_addr) ) ;
163
+
164
+ // This should never happen, but... belt and suspenders policy.
165
+ if all_nodes. is_empty ( ) {
166
+ bail ! (
167
+ "failed to assign search jobs: there are no searcher nodes candidates for \
168
+ these jobs"
169
+ ) ;
170
+ }
171
+ info ! (
172
+ "excluded {} nodes from search job placement, {} remaining" ,
173
+ excluded_addrs. len( ) ,
174
+ all_nodes. len( )
175
+ ) ;
176
+ }
177
+ let mut candidate_nodes: Vec < _ > = all_nodes
158
178
. into_iter ( )
159
- . filter ( |( grpc_addr, _) | {
160
- excluded_addrs. is_empty ( )
161
- || excluded_addrs. len ( ) == num_nodes
162
- || !excluded_addrs. contains ( grpc_addr)
163
- } )
164
- . map ( |( grpc_addr, client) | CandidateNodes {
179
+ . map ( |( grpc_addr, client) | CandidateNode {
165
180
grpc_addr,
166
181
client,
167
182
load : 0 ,
168
183
} )
169
184
. collect ( ) ;
170
185
171
- if candidate_nodes. is_empty ( ) {
172
- bail ! (
173
- "failed to assign search jobs. there are no available searcher nodes in the pool"
174
- ) ;
175
- }
176
186
jobs. sort_unstable_by ( Job :: compare_cost) ;
177
187
188
+ let num_nodes = candidate_nodes. len ( ) ;
189
+
178
190
let mut job_assignments: HashMap < SocketAddr , ( SearchServiceClient , Vec < J > ) > =
179
191
HashMap :: with_capacity ( num_nodes) ;
180
192
181
193
let total_load: usize = jobs. iter ( ) . map ( |job| job. cost ( ) ) . sum ( ) ;
182
194
183
195
// allow around 5% disparity. Round up so we never end up in a case where
184
196
// target_load * num_nodes < total_load
185
- // some of our tests needs 2 splits to be put on 2 different searchers. It makes sens for
197
+ // some of our tests needs 2 splits to be put on 2 different searchers. It makes sense for
186
198
// these tests to keep doing so (testing root merge). Either we can make the allowed
187
199
// difference stricter, find the right split names ("split6" instead of "split2" works).
188
200
// or modify mock_split_meta() so that not all splits have the same job cost
@@ -239,25 +251,25 @@ impl SearchJobPlacer {
239
251
}
240
252
241
253
#[ derive( Debug , Clone ) ]
242
- struct CandidateNodes {
254
+ struct CandidateNode {
243
255
pub grpc_addr : SocketAddr ,
244
256
pub client : SearchServiceClient ,
245
257
pub load : usize ,
246
258
}
247
259
248
- impl Hash for CandidateNodes {
260
+ impl Hash for CandidateNode {
249
261
fn hash < H : Hasher > ( & self , state : & mut H ) {
250
262
self . grpc_addr . hash ( state) ;
251
263
}
252
264
}
253
265
254
- impl PartialEq for CandidateNodes {
266
+ impl PartialEq for CandidateNode {
255
267
fn eq ( & self , other : & Self ) -> bool {
256
268
self . grpc_addr == other. grpc_addr
257
269
}
258
270
}
259
271
260
- impl Eq for CandidateNodes { }
272
+ impl Eq for CandidateNode { }
261
273
262
274
/// Groups jobs by index id and returns a list of `SearchJob` per index
263
275
pub fn group_jobs_by_index_id (
0 commit comments