From 9e1ab352978e8a38cadc7b8f415b3d7c59a2559e Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 23 Dec 2024 15:12:15 +0100 Subject: [PATCH 01/10] track merges accross pipelines --- quickwit/Cargo.lock | 1 + quickwit/Cargo.toml | 1 + quickwit/quickwit-common/Cargo.toml | 1 + quickwit/quickwit-common/src/lib.rs | 1 + quickwit/quickwit-common/src/tracker.rs | 108 ++++++++++++++++++ .../src/actors/merge_pipeline.rs | 3 +- .../src/actors/merge_planner.rs | 72 ++++++++---- .../src/actors/merge_scheduler_service.rs | 43 ++++++- .../quickwit-indexing/src/merge_policy/mod.rs | 18 +-- .../src/actors/delete_task_planner.rs | 7 +- 10 files changed, 217 insertions(+), 38 deletions(-) create mode 100644 quickwit/quickwit-common/src/tracker.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 1662803ed0b..204d27faf8d 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6308,6 +6308,7 @@ dependencies = [ "async-speed-limit", "async-trait", "bytesize", + "census", "coarsetime", "dyn-clone", "env_logger", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index c3e3051470c..bb4d855665e 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -90,6 +90,7 @@ binggan = { version = "0.14" } bytes = { version = "1", features = ["serde"] } bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" +census = "0.4.2" chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "54cbc70" } chrono = { version = "0.4", default-features = false, features = [ "clock", diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 83170a8ec56..0c28c36cc44 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -15,6 +15,7 @@ anyhow = { workspace = true } async-speed-limit = { workspace = true } async-trait = { workspace = true } bytesize = { workspace = true } +census = { workspace = true } coarsetime = { workspace = true } dyn-clone = { workspace = true } env_logger = { workspace = true } diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index dff26829584..82fd62d1971 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -45,6 +45,7 @@ pub mod temp_dir; pub mod test_utils; pub mod thread_pool; pub mod tower; +pub mod tracker; pub mod type_map; pub mod uri; diff --git a/quickwit/quickwit-common/src/tracker.rs b/quickwit/quickwit-common/src/tracker.rs new file mode 100644 index 00000000000..3acdf06bace --- /dev/null +++ b/quickwit/quickwit-common/src/tracker.rs @@ -0,0 +1,108 @@ +use std::mem::MaybeUninit; +use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; + +use census::{Inventory, TrackedObject as InventoredObject}; + +pub type TrackedObject = InventoredObject>; + +#[derive(Clone)] +pub struct Tracker { + inner_inventory: Inventory>, + unacknoledged_drop_receiver: Arc>>, + return_channel: Sender, +} + +#[derive(Debug)] +pub struct RecordUnacknoledgedDrop { + // safety: this is always kept initialized except after Self::drop, where we move that + // that value away to either send it through the return channel, or drop it manually + inner: MaybeUninit, + acknoledged: AtomicBool, + return_channel: Sender, +} + +impl RecordUnacknoledgedDrop { + pub fn acknoledge(&self) { + self.acknoledged.store(true, Ordering::Relaxed); + } + + pub fn untracked(value: T) -> Self { + let (sender, _receiver) = channel(); + RecordUnacknoledgedDrop { + inner: MaybeUninit::new(value), + acknoledged: true.into(), + return_channel: sender, + } + } +} + +impl Deref for RecordUnacknoledgedDrop { + type Target = T; + fn deref(&self) -> &T { + unsafe { + // safety: see struct definition, this operation is valid except after drop. + self.inner.assume_init_ref() + } + } +} + +impl Drop for RecordUnacknoledgedDrop { + fn drop(&mut self) { + let item = unsafe { + // safety: see struct definition. Additionally, we don't touch to self.inner + // after this point so there is no risk of making a 2nd copy and cause a + // double-free + self.inner.assume_init_read() + }; + if !*self.acknoledged.get_mut() { + // if send fails, no one cared about getting that notification, it's fine to + // drop item + let _ = self.return_channel.send(item); + } + } +} + +impl Default for Tracker { + fn default() -> Self { + Self::new() + } +} + +impl Tracker { + pub fn new() -> Self { + let (sender, receiver) = channel(); + Tracker { + inner_inventory: Inventory::new(), + unacknoledged_drop_receiver: Arc::new(Mutex::new(receiver)), + return_channel: sender, + } + } + + pub fn rebuildable_from_the_void(&self) -> bool { + Arc::strong_count(&self.unacknoledged_drop_receiver) == 1 && self.inner_inventory.len() == 0 + } + + pub fn list_ongoing(&self) -> Vec> { + self.inner_inventory.list() + } + + pub fn take_dead(&self) -> Vec { + let mut res = Vec::new(); + let receiver = self.unacknoledged_drop_receiver.lock().unwrap(); + while let Ok(dead_entry) = receiver.try_recv() { + res.push(dead_entry); + } + res + } + + pub fn track(&self, value: T) -> TrackedObject { + self.inner_inventory.track(RecordUnacknoledgedDrop { + inner: MaybeUninit::new(value), + acknoledged: false.into(), + return_channel: self.return_channel.clone(), + }) + } +} diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 97c57a79b31..24cf20b65a8 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -355,7 +355,8 @@ impl MergePipeline { self.params.merge_policy.clone(), merge_split_downloader_mailbox, self.params.merge_scheduler_service.clone(), - ); + ) + .await?; let (_, merge_planner_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 900e2122eb9..362fd3f61cc 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -23,16 +23,16 @@ use std::time::Instant; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::tracker::Tracker; use quickwit_metastore::SplitMetadata; use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::types::DocMappingUid; use serde::Serialize; -use tantivy::Inventory; use time::OffsetDateTime; use tracing::{info, warn}; use super::MergeSchedulerService; -use crate::actors::merge_scheduler_service::schedule_merge; +use crate::actors::merge_scheduler_service::{schedule_merge, GetOperationTracker}; use crate::actors::MergeSplitDownloader; use crate::merge_policy::MergeOperation; use crate::models::NewSplits; @@ -85,11 +85,15 @@ pub struct MergePlanner { merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, - /// Inventory of ongoing merge operations. If everything goes well, - /// a merge operation is dropped after the publish of the merged split. + /// Track ongoing and failed merge operations for this index /// - /// It is used to GC the known_split_ids set. - ongoing_merge_operations_inventory: Inventory, + /// We don't want to emit a new merge for splits already in the process of + /// being merged, but we want to keep track of failed merges so we can + /// reschedule them. + // TODO currently the MergePlanner is teared down when a merge fails, so this + // mechanism is only useful when there are some merges left from a previous + // pipeline. We could only tear down the rest of the pipeline on error. + ongoing_merge_operations_tracker: Tracker, /// We use the actor start_time as a way to identify incarnations. /// @@ -139,8 +143,10 @@ impl Handler for MergePlanner { _plan_merge: RunFinalizeMergePolicyAndQuit, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - // Note we ignore messages that could be coming from a different incarnation. - // (See comment on `Self::incarnation_start_at`.) + // consume failed merges so that we may try to reschedule them one last time + for failed_merge in self.ongoing_merge_operations_tracker.take_dead() { + self.record_splits_if_necessary(failed_merge.splits); + } self.send_merge_ops(true, ctx).await?; Err(ActorExitStatus::Success) } @@ -188,30 +194,43 @@ impl MergePlanner { QueueCapacity::Bounded(1) } - pub fn new( + pub async fn new( pipeline_id: &MergePipelineId, immature_splits: Vec, merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, - ) -> MergePlanner { + ) -> anyhow::Result { let immature_splits: Vec = immature_splits .into_iter() .filter(|split_metadata| belongs_to_pipeline(pipeline_id, split_metadata)) .collect(); + // TODO it's unclear to me if we should also segregate by source id + let ongoing_merge_operations_tracker = merge_scheduler_service + .ask(GetOperationTracker(pipeline_id.index_uid.clone())) + .await?; + + let mut known_split_ids: HashSet = HashSet::new(); + let ongoing_merge_operations = ongoing_merge_operations_tracker.list_ongoing(); + for merge_op in ongoing_merge_operations { + for split in &merge_op.splits { + known_split_ids.insert(split.split_id().to_string()); + } + } + let mut merge_planner = MergePlanner { - known_split_ids: Default::default(), + known_split_ids, known_split_ids_recompute_attempt_id: 0, partitioned_young_splits: Default::default(), merge_policy, merge_split_downloader_mailbox, merge_scheduler_service, - ongoing_merge_operations_inventory: Inventory::default(), + ongoing_merge_operations_tracker, incarnation_started_at: Instant::now(), }; merge_planner.record_splits_if_necessary(immature_splits); - merge_planner + Ok(merge_planner) } fn rebuild_known_split_ids(&self) -> HashSet { @@ -222,7 +241,7 @@ impl MergePlanner { known_split_ids.insert(split.split_id().to_string()); } } - let ongoing_merge_operations = self.ongoing_merge_operations_inventory.list(); + let ongoing_merge_operations = self.ongoing_merge_operations_tracker.list_ongoing(); // Add splits that are known as in merge. for merge_op in ongoing_merge_operations { for split in &merge_op.splits { @@ -242,7 +261,7 @@ impl MergePlanner { /// Updates `known_split_ids` and return true if the split was not /// previously known and should be recorded. - fn acknownledge_split(&mut self, split_id: &str) -> bool { + fn acknowledge_split(&mut self, split_id: &str) -> bool { if self.known_split_ids.contains(split_id) { return false; } @@ -256,6 +275,10 @@ impl MergePlanner { if self.known_split_ids_recompute_attempt_id % 100 == 0 { self.known_split_ids = self.rebuild_known_split_ids(); self.known_split_ids_recompute_attempt_id = 0; + + for failed_merge in self.ongoing_merge_operations_tracker.take_dead() { + self.record_splits_if_necessary(failed_merge.splits); + } } } @@ -285,7 +308,7 @@ impl MergePlanner { // a split already in store to be received. // // See `known_split_ids`. - if !self.acknownledge_split(new_split.split_id()) { + if !self.acknowledge_split(new_split.split_id()) { continue; } self.record_split(new_split); @@ -328,9 +351,8 @@ impl MergePlanner { let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?; for merge_operation in merge_ops { info!(merge_operation=?merge_operation, "schedule merge operation"); - let tracked_merge_operation = self - .ongoing_merge_operations_inventory - .track(merge_operation); + let tracked_merge_operation = + self.ongoing_merge_operations_tracker.track(merge_operation); schedule_merge( &self.merge_scheduler_service, tracked_merge_operation, @@ -440,7 +462,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); { @@ -565,7 +588,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); @@ -657,7 +681,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); universe.sleep(Duration::from_secs(10)).await; @@ -722,7 +747,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; // We create a fake old mailbox that contains two new splits and a PlanMerge message from an // old incarnation. This could happen in real life if the merge pipeline failed // right after a `PlanMerge` was pushed to the pipeline. Note that #3847 did not diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index 597b9839f5c..fb1acf84856 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -19,13 +19,14 @@ use std::cmp::Reverse; use std::collections::binary_heap::PeekMut; -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, HashMap}; use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; -use tantivy::TrackedObject; +use quickwit_common::tracker::{TrackedObject, Tracker}; +use quickwit_proto::types::IndexUid; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::error; @@ -123,6 +124,8 @@ pub struct MergeSchedulerService { pending_merge_queue: BinaryHeap, next_merge_id: u64, pending_merge_bytes: u64, + tracked_operations: HashMap>, + gc_sequence_id: usize, } impl Default for MergeSchedulerService { @@ -140,6 +143,8 @@ impl MergeSchedulerService { pending_merge_queue: BinaryHeap::default(), next_merge_id: 0, pending_merge_bytes: 0, + tracked_operations: HashMap::new(), + gc_sequence_id: 0, } } @@ -194,6 +199,14 @@ impl MergeSchedulerService { .ongoing_merge_operations .set(num_merges); } + + fn maybe_gc_trackers(&mut self) { + self.gc_sequence_id += 1; + if self.gc_sequence_id % 100 == 0 { + self.tracked_operations + .retain(|_k, tracker| !tracker.rebuildable_from_the_void()) + } + } } #[async_trait] @@ -294,17 +307,39 @@ impl Handler for MergeSchedulerService { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.schedule_pending_merges(ctx); + self.maybe_gc_trackers(); Ok(()) } } +#[derive(Debug)] +pub(crate) struct GetOperationTracker(pub IndexUid); + +#[async_trait] +impl Handler for MergeSchedulerService { + type Reply = Tracker; + + async fn handle( + &mut self, + get_operation_tracker: GetOperationTracker, + _ctx: &ActorContext, + ) -> Result { + let tracker = self + .tracked_operations + .entry(get_operation_tracker.0) + .or_default() + .clone(); + Ok(tracker) + } +} + #[cfg(test)] mod tests { use std::time::Duration; use quickwit_actors::Universe; + use quickwit_common::tracker::Tracker; use quickwit_metastore::SplitMetadata; - use tantivy::Inventory; use tokio::time::timeout; use super::*; @@ -344,7 +379,7 @@ mod tests { let (merge_scheduler_service, _) = universe .spawn_builder() .spawn(MergeSchedulerService::new(2)); - let inventory = Inventory::new(); + let inventory = Tracker::new(); let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe.create_test_mailbox(); diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 02f2249c5dc..afcfe4fe7a1 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -28,13 +28,13 @@ use std::sync::Arc; pub(crate) use const_write_amplification::ConstWriteAmplificationMergePolicy; use itertools::Itertools; pub use nop_merge_policy::NopMergePolicy; +use quickwit_common::tracker::TrackedObject; use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; use quickwit_proto::types::SplitId; use serde::Serialize; pub(crate) use stable_log_merge_policy::StableLogMergePolicy; -use tantivy::TrackedObject; use tracing::{info_span, Span}; use crate::actors::MergePermit; @@ -60,8 +60,8 @@ pub struct MergeTask { impl MergeTask { #[cfg(any(test, feature = "testsuite"))] pub fn from_merge_operation_for_test(merge_operation: MergeOperation) -> MergeTask { - let inventory = tantivy::Inventory::default(); - let tracked_merge_operation = inventory.track(merge_operation); + let tracker = quickwit_common::tracker::Tracker::new(); + let tracked_merge_operation = tracker.track(merge_operation); MergeTask { merge_operation: tracked_merge_operation, _merge_permit: MergePermit::for_test(), @@ -402,13 +402,14 @@ pub mod tests { fn apply_merge( merge_policy: &Arc, split_index: &mut HashMap, - merge_op: &MergeOperation, + merge_op: &TrackedObject, ) -> SplitMetadata { for split in merge_op.splits_as_slice() { assert!(split_index.remove(split.split_id()).is_some()); } let merged_split = fake_merge(merge_policy, merge_op.splits_as_slice()); split_index.insert(merged_split.split_id().to_string(), merged_split.clone()); + merge_op.acknoledge(); merged_split } @@ -432,7 +433,8 @@ pub mod tests { merge_policy.clone(), merge_task_mailbox, universe.get_or_spawn_one::(), - ); + ) + .await?; let mut split_index: HashMap = HashMap::default(); let (merge_planner_mailbox, merge_planner_handler) = universe.spawn_builder().spawn(merge_planner); @@ -453,7 +455,9 @@ pub mod tests { } let new_splits: Vec = merge_tasks .into_iter() - .map(|merge_op| apply_merge(&merge_policy, &mut split_index, &merge_op)) + .map(|merge_op| { + apply_merge(&merge_policy, &mut split_index, &merge_op.merge_operation) + }) .collect(); merge_planner_mailbox .send_message(NewSplits { new_splits }) @@ -473,7 +477,7 @@ pub mod tests { let merge_tasks = merge_task_inbox.drain_for_test_typed::(); for merge_task in merge_tasks { - apply_merge(&merge_policy, &mut split_index, &merge_task); + apply_merge(&merge_policy, &mut split_index, &merge_task.merge_operation); } let split_metadatas: Vec = split_index.values().cloned().collect(); diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index 4166634c072..ce0efb011f9 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::extract_time_range; +use quickwit_common::tracker::RecordUnacknoledgedDrop; use quickwit_common::uri::Uri; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader}; @@ -88,7 +89,7 @@ pub struct DeleteTaskPlanner { /// a merge operation is dropped after the publish of the split that underwent /// the delete operation. /// The inventory is used to avoid sending twice the same delete operation. - ongoing_delete_operations_inventory: Inventory, + ongoing_delete_operations_inventory: Inventory>, } #[async_trait] @@ -100,7 +101,7 @@ impl Actor for DeleteTaskPlanner { .ongoing_delete_operations_inventory .list() .iter() - .map(|tracked_operation| tracked_operation.as_ref().clone()) + .map(|tracked_operation| (**tracked_operation).clone()) .collect_vec(); DeleteTaskPlannerState { ongoing_delete_operations, @@ -202,7 +203,7 @@ impl DeleteTaskPlanner { info!(delete_operation=?delete_operation, "planned delete operation"); let tracked_delete_operation = self .ongoing_delete_operations_inventory - .track(delete_operation); + .track(RecordUnacknoledgedDrop::untracked(delete_operation)); schedule_merge( &self.merge_scheduler_service, tracked_delete_operation, From 1299985132078d284f0040614bf315671a34667f Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 13 Jan 2025 17:21:32 +0100 Subject: [PATCH 02/10] test no double merge after pipeline restart --- quickwit/quickwit-indexing/failpoints/mod.rs | 93 ++++++++++++++++++- .../src/actors/merge_planner.rs | 1 - quickwit/quickwit-indexing/src/test_utils.rs | 74 ++++++++++++++- 3 files changed, 162 insertions(+), 6 deletions(-) diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index 70d7aeacabd..7daf7b8b8af 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -36,6 +36,7 @@ //! Below we test panics at different steps in the indexing pipeline. use std::path::Path; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Barrier, Mutex}; use std::time::Duration; @@ -47,7 +48,7 @@ use quickwit_common::split_file; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::actors::MergeExecutor; use quickwit_indexing::merge_policy::{MergeOperation, MergeTask}; -use quickwit_indexing::models::MergeScratch; +use quickwit_indexing::models::{MergeScratch, SpawnPipeline}; use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox}; use quickwit_metastore::{ ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, @@ -55,7 +56,7 @@ use quickwit_metastore::{ }; use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService}; -use quickwit_proto::types::{IndexUid, NodeId}; +use quickwit_proto::types::{IndexUid, NodeId, PipelineUid}; use serde_json::Value as JsonValue; use tantivy::Directory; @@ -351,3 +352,91 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul Ok(()) } + +#[tokio::test] +async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { + quickwit_common::setup_logging_for_tests(); + let doc_mapper_yaml = r#" + field_mappings: + - name: body + type: text + - name: ts + type: datetime + fast: true + timestamp_field: ts + "#; + let indexing_setting_yaml = r#" + split_num_docs_target: 2500 + merge_policy: + type: "limit_merge" + max_merge_ops: 1 + merge_factor: 4 + max_merge_factor: 4 + "#; + let search_fields = ["body"]; + let index_id = "test-index-merge-duplication"; + let mut test_index_builder = TestSandbox::create( + index_id, + doc_mapper_yaml, + indexing_setting_yaml, + &search_fields, + ) + .await?; + + // 0: start + // 1: 1st merge reached the failpoint + // 2: 2nd merge reached the failpoint + let state = Arc::new(AtomicU32::new(0)); + let state_clone = state.clone(); + + fail::cfg_callback("before-merge-split", move || { + use std::sync::atomic::Ordering; + state_clone.fetch_add(1, Ordering::Relaxed); + std::thread::sleep(std::time::Duration::from_millis(100)); + }) + .unwrap(); + + let batch: Vec = + std::iter::repeat_with(|| serde_json::json!({"body ": TEST_TEXT, "ts": 1631072713 })) + .take(500) + .collect(); + // this sometime fails because the ingest api isn't aware of the index yet?! + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + for _ in 0..4 { + test_index_builder + .add_documents_through_api(batch.clone()) + .await?; + } + + let (indexing_pipeline, merge_pipeline) = test_index_builder + .take_indexing_and_merge_pipeline() + .await?; + + // stop the pipeline + indexing_pipeline.kill().await; + merge_pipeline + .mailbox() + .ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline) + .await?; + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + test_index_builder + .indexing_service() + .ask_for_res(SpawnPipeline { + index_id: index_id.to_string(), + source_config: quickwit_config::SourceConfig::ingest_api_default(), + pipeline_uid: PipelineUid::for_test(1u128), + }) + .await?; + + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + // we shouldn't have reached state 2 + assert_eq!(state.load(Ordering::Relaxed), 1); + + let universe = test_index_builder.universe(); + universe.kill(); + fail::cfg("before-merge-split", "off").unwrap(); + universe.quit().await; + + Ok(()) +} diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 362fd3f61cc..22850f5c2b3 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -205,7 +205,6 @@ impl MergePlanner { .into_iter() .filter(|split_metadata| belongs_to_pipeline(pipeline_id, split_metadata)) .collect(); - // TODO it's unclear to me if we should also segregate by source id let ongoing_merge_operations_tracker = merge_scheduler_service .ask(GetOperationTracker(pipeline_id.index_uid.clone())) .await?; diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 3905ffde11d..f98b4689772 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -23,7 +23,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use bytes::Bytes; -use quickwit_actors::{Mailbox, Universe}; +use quickwit_actors::{ActorHandle, Mailbox, Universe}; use quickwit_cluster::{create_cluster_for_test, ChannelTransport}; use quickwit_common::pubsub::EventBroker; use quickwit_common::rand::append_random_suffix; @@ -37,13 +37,16 @@ use quickwit_ingest::{init_ingest_api, IngesterPool, QUEUES_DIR_NAME}; use quickwit_metastore::{ CreateIndexRequestExt, MetastoreResolver, Split, SplitMetadata, SplitState, }; +use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::{CreateIndexRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId, PipelineUid, SourceId}; use quickwit_storage::{Storage, StorageResolver}; use serde_json::Value as JsonValue; -use crate::actors::IndexingService; -use crate::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline}; +use crate::actors::{IndexingPipeline, IndexingService, MergePipeline}; +use crate::models::{ + DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, +}; /// Creates a Test environment. /// @@ -61,6 +64,7 @@ pub struct TestSandbox { storage: Arc, add_docs_id: AtomicUsize, universe: Universe, + indexing_pipeline_id: Option, _temp_dir: tempfile::TempDir, } @@ -135,6 +139,14 @@ impl TestSandbox { .await?; let (indexing_service, _indexing_service_handle) = universe.spawn_builder().spawn(indexing_service_actor); + + let indexing_pipeline_id = indexing_service + .ask_for_res(SpawnPipeline { + index_id: index_uid.index_id.to_string(), + source_config, + pipeline_uid: PipelineUid::for_test(1u128), + }) + .await?; Ok(TestSandbox { node_id, index_uid, @@ -146,6 +158,7 @@ impl TestSandbox { storage, add_docs_id: AtomicUsize::default(), universe, + indexing_pipeline_id: Some(indexing_pipeline_id), _temp_dir: temp_dir, }) } @@ -194,6 +207,56 @@ impl TestSandbox { Ok(pipeline_statistics) } + /// Adds documents and waits for them to be indexed (creating a separate split). + /// + /// The documents are expected to be `JsonValue`. + /// They can be created using the `serde_json::json!` macro. + pub async fn add_documents_through_api(&self, json_docs: I) -> anyhow::Result<()> + where + I: IntoIterator + 'static, + I::IntoIter: Send, + { + let ingest_api_service_mailbox = self + .universe + .get_one::() + .unwrap(); + + let batch_builder = + quickwit_ingest::DocBatchBuilder::new(self.index_uid.index_id.to_string()); + let mut json_writer = batch_builder.json_writer(); + for doc in json_docs { + json_writer.ingest_doc(doc)?; + } + let batch = json_writer.build(); + let ingest_request = quickwit_ingest::IngestRequest { + doc_batches: vec![batch], + commit: quickwit_ingest::CommitType::WaitFor as i32, + }; + ingest_api_service_mailbox + .ask_for_res(ingest_request) + .await?; + Ok(()) + } + + pub async fn take_indexing_and_merge_pipeline( + &mut self, + ) -> anyhow::Result<(ActorHandle, ActorHandle)> { + let pipeline_id = self.indexing_pipeline_id.take().unwrap(); + let merge_pipeline_id = pipeline_id.merge_pipeline_id(); + let indexing_pipeline = self + .indexing_service + .ask_for_res(DetachIndexingPipeline { pipeline_id }) + .await?; + let merge_pipeline = self + .indexing_service + .ask_for_res(DetachMergePipeline { + pipeline_id: merge_pipeline_id, + }) + .await?; + + Ok((indexing_pipeline, merge_pipeline)) + } + /// Returns the metastore of the TestSandbox. /// /// The metastore is a file-backed metastore. @@ -238,6 +301,11 @@ impl TestSandbox { &self.universe } + /// Returns a Mailbox for the indexing service + pub fn indexing_service(&self) -> Mailbox { + self.indexing_service.clone() + } + /// Gracefully quits all registered actors in the underlying universe and asserts that none of /// them panicked. /// From 100d4006489a10c4eca8d986bb5178d5702aa810 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Mon, 13 Jan 2025 19:03:11 +0100 Subject: [PATCH 03/10] improve test --- quickwit/quickwit-indexing/failpoints/mod.rs | 49 ++++++++++++++++--- .../src/actors/merge_planner.rs | 8 ++- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index 7daf7b8b8af..599194f84d0 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -48,7 +48,9 @@ use quickwit_common::split_file; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::actors::MergeExecutor; use quickwit_indexing::merge_policy::{MergeOperation, MergeTask}; -use quickwit_indexing::models::{MergeScratch, SpawnPipeline}; +use quickwit_indexing::models::{ + DetachIndexingPipeline, DetachMergePipeline, MergeScratch, SpawnPipeline, +}; use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox}; use quickwit_metastore::{ ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, @@ -372,6 +374,7 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { max_merge_ops: 1 merge_factor: 4 max_merge_factor: 4 + max_finalize_merge_operations: 1 "#; let search_fields = ["body"]; let index_id = "test-index-merge-duplication"; @@ -383,16 +386,20 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { ) .await?; - // 0: start - // 1: 1st merge reached the failpoint - // 2: 2nd merge reached the failpoint + // 0: start + // 1: 1st merge reached the failpoint + // 11: 1st merge failed + // 12: 2nd merge reached the failpoint + // 22: 2nd merge failed (we don't care about this state) let state = Arc::new(AtomicU32::new(0)); let state_clone = state.clone(); fail::cfg_callback("before-merge-split", move || { use std::sync::atomic::Ordering; state_clone.fetch_add(1, Ordering::Relaxed); - std::thread::sleep(std::time::Duration::from_millis(100)); + std::thread::sleep(std::time::Duration::from_millis(300)); + state_clone.fetch_add(10, Ordering::Relaxed); + panic!("kill merge pipeline"); }) .unwrap(); @@ -420,7 +427,7 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { .await?; tokio::time::sleep(std::time::Duration::from_millis(100)).await; - test_index_builder + let pipeline_id = test_index_builder .indexing_service() .ask_for_res(SpawnPipeline { index_id: index_id.to_string(), @@ -429,9 +436,35 @@ async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { }) .await?; - tokio::time::sleep(std::time::Duration::from_millis(300)).await; - // we shouldn't have reached state 2 + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + // we shouldn't have had a 2nd split run yet (the 1st one hasn't panicked just yet) assert_eq!(state.load(Ordering::Relaxed), 1); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert_eq!(state.load(Ordering::Relaxed), 11); + + let merge_pipeline_id = pipeline_id.merge_pipeline_id(); + let indexing_pipeline = test_index_builder + .indexing_service() + .ask_for_res(DetachIndexingPipeline { pipeline_id }) + .await?; + let merge_pipeline = test_index_builder + .indexing_service() + .ask_for_res(DetachMergePipeline { + pipeline_id: merge_pipeline_id, + }) + .await?; + + indexing_pipeline.kill().await; + merge_pipeline + .mailbox() + .ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline) + .await?; + + // stoping the merge pipeline makes it recheck for possible dead merge + // (alternatively, it does that sooner when rebuilding the known split list) + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + // timing-wise, we can't have reached 22, but it would be logically correct to get that state + assert_eq!(state.load(Ordering::Relaxed), 12); let universe = test_index_builder.universe(); universe.kill(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 22850f5c2b3..c9eade07a3c 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -145,7 +145,12 @@ impl Handler for MergePlanner { ) -> Result<(), ActorExitStatus> { // consume failed merges so that we may try to reschedule them one last time for failed_merge in self.ongoing_merge_operations_tracker.take_dead() { - self.record_splits_if_necessary(failed_merge.splits); + for split in failed_merge.splits { + // if they were from a dead merge, we always record them, they are likely + // already part of our known splits, and we don't want to rebuild the known + // split list as it's likely to log about not halving its size. + self.record_split(split); + } } self.send_merge_ops(true, ctx).await?; Err(ActorExitStatus::Success) @@ -313,6 +318,7 @@ impl MergePlanner { self.record_split(new_split); } } + async fn compute_merge_ops( &mut self, is_finalize: bool, From f63f24de688593ecf07d1c47da09d53e83d034a8 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 15 Jan 2025 11:26:54 +0100 Subject: [PATCH 04/10] add test to tracker --- quickwit/quickwit-common/src/tracker.rs | 79 ++++++++++++++++++- .../src/actors/merge_scheduler_service.rs | 2 +- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-common/src/tracker.rs b/quickwit/quickwit-common/src/tracker.rs index 3acdf06bace..be36311d4b0 100644 --- a/quickwit/quickwit-common/src/tracker.rs +++ b/quickwit/quickwit-common/src/tracker.rs @@ -81,7 +81,14 @@ impl Tracker { } } - pub fn rebuildable_from_the_void(&self) -> bool { + /// Return whether it is safe to recreate this tracker. + /// + /// A tracker is considered safe to recreate if this is the only instance left, + /// and it conaints no alive object (it may contain dead objects though). + /// + /// Once this return true, it will stay that way until [Tracker::track] or [Tracker::clone] are + /// called. + pub fn safe_to_recreate(&self) -> bool { Arc::strong_count(&self.unacknoledged_drop_receiver) == 1 && self.inner_inventory.len() == 0 } @@ -106,3 +113,73 @@ impl Tracker { }) } } + +#[cfg(test)] +mod tests { + use super::{TrackedObject, Tracker}; + + #[track_caller] + fn assert_tracked_eq( + got: Vec>, + expected: Vec, + ) { + assert_eq!( + got.len(), + expected.len(), + "expected vec of same lenght, {} != {}", + got.len(), + expected.len() + ); + for (got_item, expected_item) in got.into_iter().zip(expected) { + assert_eq!(**got_item, expected_item); + } + } + + #[test] + fn test_single_tracker() { + let tracker = Tracker::::new(); + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker.take_dead().is_empty()); + assert!(tracker.safe_to_recreate()); + + { + let tracked_1 = tracker.track(1); + assert_tracked_eq(tracker.list_ongoing(), vec![1]); + assert!(tracker.take_dead().is_empty()); + assert!(!tracker.safe_to_recreate()); + std::mem::drop(tracked_1); // done for clarity and silence unused var warn + } + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker.safe_to_recreate()); + assert_eq!(tracker.take_dead(), vec![1]); + assert!(tracker.safe_to_recreate()); + } + + #[test] + fn test_two_tracker() { + let tracker = Tracker::::new(); + let tracker2 = tracker.clone(); + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker.take_dead().is_empty()); + assert!(!tracker.safe_to_recreate()); + + { + let tracked_1 = tracker.track(1); + assert_tracked_eq(tracker.list_ongoing(), vec![1]); + assert_tracked_eq(tracker2.list_ongoing(), vec![1]); + assert!(tracker.take_dead().is_empty()); + assert!(tracker2.take_dead().is_empty()); + assert!(!tracker.safe_to_recreate()); + std::mem::drop(tracked_1); // done for clarity and silence unused var warn + } + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker2.list_ongoing().is_empty()); + assert_eq!(tracker2.take_dead(), vec![1]); + // we took awai the dead from tracker2, so they don't show up in tracker + assert!(tracker.take_dead().is_empty()); + } +} diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index fb1acf84856..3d8d8417d28 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -204,7 +204,7 @@ impl MergeSchedulerService { self.gc_sequence_id += 1; if self.gc_sequence_id % 100 == 0 { self.tracked_operations - .retain(|_k, tracker| !tracker.rebuildable_from_the_void()) + .retain(|_k, tracker| !tracker.safe_to_recreate()) } } } From 074fbc66dbe33c68227acc691764470b49beb72b Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 5 Feb 2025 10:57:38 +0100 Subject: [PATCH 05/10] increase sleep time, fixing a flaky test --- quickwit/quickwit-indexing/src/actors/indexing_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 0f395d8e8d3..48cb58346b5 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -1601,7 +1601,7 @@ mod tests { let observation = indexing_server_handle.process_pending_and_observe().await; assert_eq!(observation.num_running_pipelines, 0); assert_eq!(observation.num_running_merge_pipelines, 0); - universe.sleep(*HEARTBEAT).await; + universe.sleep(2 * *HEARTBEAT).await; // Check that the merge pipeline is also shut down as they are no more indexing pipeilne on // the index. assert!(universe.get_one::().is_none()); From 1826849e76fdd6fd04107a03141c78d1a0b0f6ff Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 5 Feb 2025 15:16:42 +0100 Subject: [PATCH 06/10] add sleep time, fixing another flaky test --- quickwit/quickwit-indexing/src/actors/merge_executor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 8f421fb96a5..6d81f5f103f 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -796,6 +796,7 @@ mod tests { delete_task_executor_handle .process_pending_and_observe() .await; + universe.sleep(*quickwit_actors::HEARTBEAT).await; let packager_msgs: Vec = merge_packager_inbox.drain_for_test_typed(); if !result_docs.is_empty() { @@ -852,6 +853,7 @@ mod tests { |split| split.split_state == quickwit_metastore::SplitState::MarkedForDeletion )); } + universe.sleep(*quickwit_actors::HEARTBEAT).await; test_sandbox.assert_quit().await; universe.assert_quit().await; Ok(()) From ae99e044f9fa139e72f05987cc06cd31554b7ef9 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 5 Feb 2025 15:19:21 +0100 Subject: [PATCH 07/10] typo --- quickwit/quickwit-common/src/tracker.rs | 37 ++++++++++--------- .../quickwit-indexing/src/merge_policy/mod.rs | 2 +- .../src/actors/delete_task_planner.rs | 6 +-- 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/quickwit/quickwit-common/src/tracker.rs b/quickwit/quickwit-common/src/tracker.rs index be36311d4b0..93d2fbe5482 100644 --- a/quickwit/quickwit-common/src/tracker.rs +++ b/quickwit/quickwit-common/src/tracker.rs @@ -6,40 +6,40 @@ use std::sync::{Arc, Mutex}; use census::{Inventory, TrackedObject as InventoredObject}; -pub type TrackedObject = InventoredObject>; +pub type TrackedObject = InventoredObject>; #[derive(Clone)] pub struct Tracker { - inner_inventory: Inventory>, - unacknoledged_drop_receiver: Arc>>, + inner_inventory: Inventory>, + unacknowledged_drop_receiver: Arc>>, return_channel: Sender, } #[derive(Debug)] -pub struct RecordUnacknoledgedDrop { +pub struct RecordUnacknowledgedDrop { // safety: this is always kept initialized except after Self::drop, where we move that // that value away to either send it through the return channel, or drop it manually inner: MaybeUninit, - acknoledged: AtomicBool, + acknowledged: AtomicBool, return_channel: Sender, } -impl RecordUnacknoledgedDrop { - pub fn acknoledge(&self) { - self.acknoledged.store(true, Ordering::Relaxed); +impl RecordUnacknowledgedDrop { + pub fn acknowledge(&self) { + self.acknowledged.store(true, Ordering::Relaxed); } pub fn untracked(value: T) -> Self { let (sender, _receiver) = channel(); - RecordUnacknoledgedDrop { + RecordUnacknowledgedDrop { inner: MaybeUninit::new(value), - acknoledged: true.into(), + acknowledged: true.into(), return_channel: sender, } } } -impl Deref for RecordUnacknoledgedDrop { +impl Deref for RecordUnacknowledgedDrop { type Target = T; fn deref(&self) -> &T { unsafe { @@ -49,7 +49,7 @@ impl Deref for RecordUnacknoledgedDrop { } } -impl Drop for RecordUnacknoledgedDrop { +impl Drop for RecordUnacknowledgedDrop { fn drop(&mut self) { let item = unsafe { // safety: see struct definition. Additionally, we don't touch to self.inner @@ -57,7 +57,7 @@ impl Drop for RecordUnacknoledgedDrop { // double-free self.inner.assume_init_read() }; - if !*self.acknoledged.get_mut() { + if !*self.acknowledged.get_mut() { // if send fails, no one cared about getting that notification, it's fine to // drop item let _ = self.return_channel.send(item); @@ -76,7 +76,7 @@ impl Tracker { let (sender, receiver) = channel(); Tracker { inner_inventory: Inventory::new(), - unacknoledged_drop_receiver: Arc::new(Mutex::new(receiver)), + unacknowledged_drop_receiver: Arc::new(Mutex::new(receiver)), return_channel: sender, } } @@ -89,7 +89,8 @@ impl Tracker { /// Once this return true, it will stay that way until [Tracker::track] or [Tracker::clone] are /// called. pub fn safe_to_recreate(&self) -> bool { - Arc::strong_count(&self.unacknoledged_drop_receiver) == 1 && self.inner_inventory.len() == 0 + Arc::strong_count(&self.unacknowledged_drop_receiver) == 1 + && self.inner_inventory.len() == 0 } pub fn list_ongoing(&self) -> Vec> { @@ -98,7 +99,7 @@ impl Tracker { pub fn take_dead(&self) -> Vec { let mut res = Vec::new(); - let receiver = self.unacknoledged_drop_receiver.lock().unwrap(); + let receiver = self.unacknowledged_drop_receiver.lock().unwrap(); while let Ok(dead_entry) = receiver.try_recv() { res.push(dead_entry); } @@ -106,9 +107,9 @@ impl Tracker { } pub fn track(&self, value: T) -> TrackedObject { - self.inner_inventory.track(RecordUnacknoledgedDrop { + self.inner_inventory.track(RecordUnacknowledgedDrop { inner: MaybeUninit::new(value), - acknoledged: false.into(), + acknowledged: false.into(), return_channel: self.return_channel.clone(), }) } diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index febdfd96f97..1d7f8c09603 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -404,7 +404,7 @@ pub mod tests { } let merged_split = fake_merge(merge_policy, merge_op.splits_as_slice()); split_index.insert(merged_split.split_id().to_string(), merged_split.clone()); - merge_op.acknoledge(); + merge_op.acknowledge(); merged_split } diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index f8975934671..c8bc1a53f7a 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::extract_time_range; -use quickwit_common::tracker::RecordUnacknoledgedDrop; +use quickwit_common::tracker::RecordUnacknowledgedDrop; use quickwit_common::uri::Uri; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader}; @@ -84,7 +84,7 @@ pub struct DeleteTaskPlanner { /// a merge operation is dropped after the publish of the split that underwent /// the delete operation. /// The inventory is used to avoid sending twice the same delete operation. - ongoing_delete_operations_inventory: Inventory>, + ongoing_delete_operations_inventory: Inventory>, } #[async_trait] @@ -198,7 +198,7 @@ impl DeleteTaskPlanner { info!(delete_operation=?delete_operation, "planned delete operation"); let tracked_delete_operation = self .ongoing_delete_operations_inventory - .track(RecordUnacknoledgedDrop::untracked(delete_operation)); + .track(RecordUnacknowledgedDrop::untracked(delete_operation)); schedule_merge( &self.merge_scheduler_service, tracked_delete_operation, From 0200e63cb6e96785151475e4f8499f6efc3fcdbd Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 5 Feb 2025 18:41:02 +0100 Subject: [PATCH 08/10] make RecordUnacknowledgedDrop::acknoledge() consume values --- quickwit/quickwit-common/src/tracker.rs | 96 ++++++++++--------- .../quickwit-indexing/src/merge_policy/mod.rs | 6 +- .../src/actors/delete_task_planner.rs | 11 ++- 3 files changed, 61 insertions(+), 52 deletions(-) diff --git a/quickwit/quickwit-common/src/tracker.rs b/quickwit/quickwit-common/src/tracker.rs index 93d2fbe5482..78d83459757 100644 --- a/quickwit/quickwit-common/src/tracker.rs +++ b/quickwit/quickwit-common/src/tracker.rs @@ -1,77 +1,83 @@ -use std::mem::MaybeUninit; use std::ops::Deref; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use census::{Inventory, TrackedObject as InventoredObject}; -pub type TrackedObject = InventoredObject>; - +/// A ressource tracker +/// +/// This is used to track whether an object is alive (still in use), or if it's dead (no longer +/// used, but not acknowledged). It does not keep any traces of object that were alive, but were +/// since acknowledged. #[derive(Clone)] -pub struct Tracker { - inner_inventory: Inventory>, +pub struct Tracker { + inner_inventory: Inventory, unacknowledged_drop_receiver: Arc>>, return_channel: Sender, } +/// A single tracked object #[derive(Debug)] -pub struct RecordUnacknowledgedDrop { - // safety: this is always kept initialized except after Self::drop, where we move that - // that value away to either send it through the return channel, or drop it manually - inner: MaybeUninit, - acknowledged: AtomicBool, +pub struct TrackedObject { + inner: Option>, return_channel: Sender, } -impl RecordUnacknowledgedDrop { - pub fn acknowledge(&self) { - self.acknowledged.store(true, Ordering::Relaxed); +impl TrackedObject { + /// acknoledge an object + pub fn acknowledge(mut self) { + self.inner.take(); } + /// Create an untracked object mostly for tests pub fn untracked(value: T) -> Self { - let (sender, _receiver) = channel(); - RecordUnacknowledgedDrop { - inner: MaybeUninit::new(value), - acknowledged: true.into(), - return_channel: sender, + Tracker::new().track(value) + } + + /// Create an object which is tracked only as long as it's alive, + /// but not once it's dead. + /// The object is tracked through the provided census inventory + pub fn track_alive_in(value: T, inventory: &Inventory) -> Self { + TrackedObject { + inner: Some(inventory.track(value)), + return_channel: channel().0, } } } -impl Deref for RecordUnacknowledgedDrop { +impl AsRef for TrackedObject { + fn as_ref(&self) -> &T { + self + } +} + +impl Deref for TrackedObject { type Target = T; fn deref(&self) -> &T { - unsafe { - // safety: see struct definition, this operation is valid except after drop. - self.inner.assume_init_ref() - } + self.inner + .as_ref() + .expect("inner should only be None during drop") } } -impl Drop for RecordUnacknowledgedDrop { +impl Drop for TrackedObject { fn drop(&mut self) { - let item = unsafe { - // safety: see struct definition. Additionally, we don't touch to self.inner - // after this point so there is no risk of making a 2nd copy and cause a - // double-free - self.inner.assume_init_read() - }; - if !*self.acknowledged.get_mut() { + if let Some(item) = self.inner.take() { // if send fails, no one cared about getting that notification, it's fine to // drop item - let _ = self.return_channel.send(item); + let _ = self.return_channel.send(item.as_ref().clone()); } } } -impl Default for Tracker { +impl Default for Tracker { fn default() -> Self { Self::new() } } -impl Tracker { +impl Tracker { + /// Create a new tracker pub fn new() -> Self { let (sender, receiver) = channel(); Tracker { @@ -84,7 +90,7 @@ impl Tracker { /// Return whether it is safe to recreate this tracker. /// /// A tracker is considered safe to recreate if this is the only instance left, - /// and it conaints no alive object (it may contain dead objects though). + /// and it contains no alive object (it may contain dead objects though). /// /// Once this return true, it will stay that way until [Tracker::track] or [Tracker::clone] are /// called. @@ -93,10 +99,12 @@ impl Tracker { && self.inner_inventory.len() == 0 } - pub fn list_ongoing(&self) -> Vec> { + /// List object which are considered alive + pub fn list_ongoing(&self) -> Vec> { self.inner_inventory.list() } + /// Take away the list of object considered dead pub fn take_dead(&self) -> Vec { let mut res = Vec::new(); let receiver = self.unacknowledged_drop_receiver.lock().unwrap(); @@ -106,22 +114,22 @@ impl Tracker { res } + /// Track a new object. pub fn track(&self, value: T) -> TrackedObject { - self.inner_inventory.track(RecordUnacknowledgedDrop { - inner: MaybeUninit::new(value), - acknowledged: false.into(), + TrackedObject { + inner: Some(self.inner_inventory.track(value)), return_channel: self.return_channel.clone(), - }) + } } } #[cfg(test)] mod tests { - use super::{TrackedObject, Tracker}; + use super::{InventoredObject, Tracker}; #[track_caller] fn assert_tracked_eq( - got: Vec>, + got: Vec>, expected: Vec, ) { assert_eq!( @@ -132,7 +140,7 @@ mod tests { expected.len() ); for (got_item, expected_item) in got.into_iter().zip(expected) { - assert_eq!(**got_item, expected_item); + assert_eq!(*got_item, expected_item); } } diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 1d7f8c09603..936432df1a4 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -397,7 +397,7 @@ pub mod tests { fn apply_merge( merge_policy: &Arc, split_index: &mut HashMap, - merge_op: &TrackedObject, + merge_op: TrackedObject, ) -> SplitMetadata { for split in merge_op.splits_as_slice() { assert!(split_index.remove(split.split_id()).is_some()); @@ -451,7 +451,7 @@ pub mod tests { let new_splits: Vec = merge_tasks .into_iter() .map(|merge_op| { - apply_merge(&merge_policy, &mut split_index, &merge_op.merge_operation) + apply_merge(&merge_policy, &mut split_index, merge_op.merge_operation) }) .collect(); merge_planner_mailbox @@ -472,7 +472,7 @@ pub mod tests { let merge_tasks = merge_task_inbox.drain_for_test_typed::(); for merge_task in merge_tasks { - apply_merge(&merge_policy, &mut split_index, &merge_task.merge_operation); + apply_merge(&merge_policy, &mut split_index, merge_task.merge_operation); } let split_metadatas: Vec = split_index.values().cloned().collect(); diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index c8bc1a53f7a..de5616578b8 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -21,7 +21,7 @@ use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::extract_time_range; -use quickwit_common::tracker::RecordUnacknowledgedDrop; +use quickwit_common::tracker::TrackedObject; use quickwit_common::uri::Uri; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader}; @@ -84,7 +84,7 @@ pub struct DeleteTaskPlanner { /// a merge operation is dropped after the publish of the split that underwent /// the delete operation. /// The inventory is used to avoid sending twice the same delete operation. - ongoing_delete_operations_inventory: Inventory>, + ongoing_delete_operations_inventory: Inventory, } #[async_trait] @@ -196,9 +196,10 @@ impl DeleteTaskPlanner { split_with_deletes.split_metadata, ); info!(delete_operation=?delete_operation, "planned delete operation"); - let tracked_delete_operation = self - .ongoing_delete_operations_inventory - .track(RecordUnacknowledgedDrop::untracked(delete_operation)); + let tracked_delete_operation = TrackedObject::track_alive_in( + delete_operation, + &self.ongoing_delete_operations_inventory, + ); schedule_merge( &self.merge_scheduler_service, tracked_delete_operation, From c60607dbfd140c1b89c90524218f8e146f25a0b0 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Wed, 5 Feb 2025 20:08:05 +0100 Subject: [PATCH 09/10] fix some flaky tests --- .../quickwit-indexing/src/actors/indexing_service.rs | 11 ++++++++--- .../quickwit-indexing/src/actors/merge_executor.rs | 2 -- .../src/tests/update_tests/doc_mapping_tests.rs | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 48cb58346b5..895d462e0ac 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -23,7 +23,7 @@ use futures::TryStreamExt; use itertools::Itertools; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox, - Observation, + Observation, SendError, }; use quickwit_cluster::Cluster; use quickwit_common::fs::get_cache_directory_path; @@ -539,12 +539,17 @@ impl IndexingService { // The queue capacity of the merge pipeline is unbounded, so `.send_message(...)` // should not block. // We avoid using `.quit()` here because it waits for the actor to exit. - merge_pipeline_handle + // In some case the pipeline could already be shutting down, in which case we can + // receive a Disconnected + match merge_pipeline_handle .handle .mailbox() .send_message(FinishPendingMergesAndShutdownPipeline) .await - .expect("merge pipeline mailbox should not be full"); + { + Ok(_) | Err(SendError::Disconnected) => (), + Err(SendError::Full) => panic!("merge pipeline mailbox should not be full"), + } } } // Finally, we remove the completed or failed merge pipelines. diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 6d81f5f103f..8f421fb96a5 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -796,7 +796,6 @@ mod tests { delete_task_executor_handle .process_pending_and_observe() .await; - universe.sleep(*quickwit_actors::HEARTBEAT).await; let packager_msgs: Vec = merge_packager_inbox.drain_for_test_typed(); if !result_docs.is_empty() { @@ -853,7 +852,6 @@ mod tests { |split| split.split_state == quickwit_metastore::SplitState::MarkedForDeletion )); } - universe.sleep(*quickwit_actors::HEARTBEAT).await; test_sandbox.assert_quit().await; universe.assert_quit().await; Ok(()) diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index 5908fed5612..cf7db6ad66e 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -101,7 +101,7 @@ async fn validate_search_across_doc_mapping_updates( ) .await .unwrap(); - + tokio::time::sleep(Duration::from_millis(50)).await; sandbox .local_ingest(index_id, ingest_after_update) .await From 2a6f742e076e38ce23f38cd1c45dbc976cb516b5 Mon Sep 17 00:00:00 2001 From: trinity Pointard Date: Thu, 6 Feb 2025 10:19:18 +0100 Subject: [PATCH 10/10] add license header --- quickwit/quickwit-common/src/tracker.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/quickwit/quickwit-common/src/tracker.rs b/quickwit/quickwit-common/src/tracker.rs index 78d83459757..051b15ecf17 100644 --- a/quickwit/quickwit-common/src/tracker.rs +++ b/quickwit/quickwit-common/src/tracker.rs @@ -1,3 +1,17 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::ops::Deref; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex};