@@ -27,17 +27,19 @@ use futures_util::{future, Future};
2727use itertools:: Itertools ;
2828use quickwit_actors:: ActorExitStatus ;
2929use quickwit_common:: new_coolid;
30- use quickwit_common:: test_utils:: wait_for_server_ready;
30+ use quickwit_common:: test_utils:: { wait_for_server_ready, wait_until_predicate } ;
3131use quickwit_common:: uri:: Uri as QuickwitUri ;
3232use quickwit_config:: service:: QuickwitService ;
3333use quickwit_config:: QuickwitConfig ;
3434use quickwit_metastore:: SplitState ;
35- use quickwit_rest_client:: rest_client:: { QuickwitClient , Transport , DEFAULT_BASE_URL } ;
35+ use quickwit_rest_client:: models:: IngestSource ;
36+ use quickwit_rest_client:: rest_client:: { CommitType , QuickwitClient , Transport , DEFAULT_BASE_URL } ;
3637use quickwit_serve:: { serve_quickwit, ListSplitsQueryParams } ;
3738use reqwest:: Url ;
3839use tempfile:: TempDir ;
3940use tokio:: sync:: watch:: { self , Receiver , Sender } ;
4041use tokio:: task:: JoinHandle ;
42+ use tracing:: debug;
4143
4244/// Configuration of a node made of a [`QuickwitConfig`] and a
4345/// set of services.
@@ -94,6 +96,43 @@ fn transport_url(addr: SocketAddr) -> Url {
9496 url
9597}
9698
99+ #[ macro_export]
100+ macro_rules! ingest_json {
101+ ( $( $json: tt) +) => {
102+ quickwit_rest_client:: models:: IngestSource :: Bytes ( json!( $( $json) +) . to_string( ) . into( ) )
103+ } ;
104+ }
105+
106+ pub async fn ingest_with_retry (
107+ client : & QuickwitClient ,
108+ index_id : & str ,
109+ ingest_source : IngestSource ,
110+ commit_type : CommitType ,
111+ ) -> anyhow:: Result < ( ) > {
112+ wait_until_predicate (
113+ || {
114+ let commit_type_clone = commit_type. clone ( ) ;
115+ let ingest_source_clone = ingest_source. clone ( ) ;
116+ async move {
117+ // Index one record.
118+ if let Err ( err) = client
119+ . ingest ( index_id, ingest_source_clone, None , commit_type_clone)
120+ . await
121+ {
122+ debug ! ( "Failed to index into {} due to error: {}" , index_id, err) ;
123+ false
124+ } else {
125+ true
126+ }
127+ }
128+ } ,
129+ Duration :: from_secs ( 10 ) ,
130+ Duration :: from_millis ( 100 ) ,
131+ )
132+ . await ?;
133+ Ok ( ( ) )
134+ }
135+
97136impl ClusterSandbox {
98137 // Starts one node that runs all the services.
99138 pub async fn start_standalone_node ( ) -> anyhow:: Result < Self > {
@@ -172,19 +211,31 @@ impl ClusterSandbox {
172211 & self ,
173212 expected_num_alive_nodes : usize ,
174213 ) -> anyhow:: Result < ( ) > {
175- let mut num_attempts = 0 ;
176- let max_num_attempts = 3 ;
177- while num_attempts < max_num_attempts {
178- tokio:: time:: sleep ( Duration :: from_millis ( 100 * ( num_attempts + 1 ) ) ) . await ;
179- let cluster_snapshot = self . indexer_rest_client . cluster ( ) . snapshot ( ) . await ?;
180- if cluster_snapshot. ready_nodes . len ( ) == expected_num_alive_nodes {
181- return Ok ( ( ) ) ;
182- }
183- num_attempts += 1 ;
184- }
185- if num_attempts == max_num_attempts {
186- anyhow:: bail!( "Too many attempts to get expected num members." ) ;
187- }
214+ wait_until_predicate (
215+ || async move {
216+ match self . indexer_rest_client . cluster ( ) . snapshot ( ) . await {
217+ Ok ( result) => {
218+ if result. live_nodes . len ( ) != expected_num_alive_nodes {
219+ debug ! (
220+ "wait_for_cluster_num_ready_nodes expected {} alive nodes, got {}" ,
221+ expected_num_alive_nodes,
222+ result. live_nodes. len( )
223+ ) ;
224+ false
225+ } else {
226+ true
227+ }
228+ }
229+ Err ( err) => {
230+ debug ! ( "wait_for_cluster_num_ready_nodes error {err}" ) ;
231+ false
232+ }
233+ }
234+ } ,
235+ Duration :: from_secs ( 10 ) ,
236+ Duration :: from_millis ( 100 ) ,
237+ )
238+ . await ?;
188239 Ok ( ( ) )
189240 }
190241
@@ -193,28 +244,30 @@ impl ClusterSandbox {
193244 & self ,
194245 required_pipeline_num : usize ,
195246 ) -> anyhow:: Result < ( ) > {
196- let mut num_attempts = 0 ;
197- let max_num_attempts = 3 ;
198- while num_attempts < max_num_attempts {
199- if num_attempts > 0 {
200- tokio:: time:: sleep ( Duration :: from_millis ( 100 * ( num_attempts) ) ) . await ;
201- }
202- if self
203- . indexer_rest_client
204- . node_stats ( )
205- . indexing ( )
206- . await
207- . unwrap ( )
208- . num_running_pipelines
209- == required_pipeline_num
210- {
211- return Ok ( ( ) ) ;
212- }
213- num_attempts += 1 ;
214- }
215- if num_attempts == max_num_attempts {
216- anyhow:: bail!( "Too many attempts to get expected number of pipelines." ) ;
217- }
247+ wait_until_predicate (
248+ || async move {
249+ match self . indexer_rest_client . node_stats ( ) . indexing ( ) . await {
250+ Ok ( result) => {
251+ if result. num_running_pipelines != required_pipeline_num {
252+ debug ! (
253+ "wait_for_indexing_pipelines expected {} pipelines, got {}" ,
254+ required_pipeline_num, result. num_running_pipelines
255+ ) ;
256+ false
257+ } else {
258+ true
259+ }
260+ }
261+ Err ( err) => {
262+ debug ! ( "wait_for_cluster_num_ready_nodes error {err}" ) ;
263+ false
264+ }
265+ }
266+ } ,
267+ Duration :: from_secs ( 10 ) ,
268+ Duration :: from_millis ( 100 ) ,
269+ )
270+ . await ?;
218271 Ok ( ( ) )
219272 }
220273
@@ -225,29 +278,43 @@ impl ClusterSandbox {
225278 split_states : Option < Vec < SplitState > > ,
226279 required_splits_num : usize ,
227280 ) -> anyhow:: Result < ( ) > {
228- let mut num_attempts = 0 ;
229- let max_num_attempts = 3 ;
230- while num_attempts < max_num_attempts {
231- if num_attempts > 0 {
232- tokio:: time:: sleep ( Duration :: from_millis ( 100 * ( num_attempts) ) ) . await ;
233- }
234- if self
235- . indexer_rest_client
236- . splits ( index_id)
237- . list ( ListSplitsQueryParams {
281+ wait_until_predicate (
282+ || {
283+ let splits_query_params = ListSplitsQueryParams {
238284 split_states : split_states. clone ( ) ,
239285 ..Default :: default ( )
240- } )
241- . await
242- . unwrap ( )
243- . len ( )
244- == required_splits_num
245- {
246- return Ok ( ( ) ) ;
247- }
248- num_attempts += 1 ;
249- }
250- anyhow:: bail!( "Too many attempts to get expected number of published splits." ) ;
286+ } ;
287+ async move {
288+ match self
289+ . indexer_rest_client
290+ . splits ( index_id)
291+ . list ( splits_query_params)
292+ . await
293+ {
294+ Ok ( result) => {
295+ if result. len ( ) != required_splits_num {
296+ debug ! (
297+ "wait_for_published_splits expected {} splits, got {}" ,
298+ required_splits_num,
299+ result. len( )
300+ ) ;
301+ false
302+ } else {
303+ true
304+ }
305+ }
306+ Err ( err) => {
307+ debug ! ( "wait_for_cluster_num_ready_nodes error {err}" ) ;
308+ false
309+ }
310+ }
311+ }
312+ } ,
313+ Duration :: from_secs ( 10 ) ,
314+ Duration :: from_millis ( 100 ) ,
315+ )
316+ . await ?;
317+ Ok ( ( ) )
251318 }
252319
253320 pub async fn shutdown ( self ) -> Result < Vec < HashMap < String , ActorExitStatus > > , anyhow:: Error > {
0 commit comments