diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6fca85aa021..01680aaf7df 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6537,6 +6537,7 @@ dependencies = [ "async-speed-limit", "async-trait", "bytesize", + "census", "coarsetime", "dyn-clone", "env_logger", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index b3b0da4f933..e5d5f6a4911 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -90,6 +90,7 @@ binggan = { version = "0.14" } bytes = { version = "1", features = ["serde"] } bytesize = { version = "1.3.0", features = ["serde"] } bytestring = "1.3.0" +census = "0.4.2" chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "54cbc70" } chrono = { version = "0.4", default-features = false, features = [ "clock", diff --git a/quickwit/quickwit-common/Cargo.toml b/quickwit/quickwit-common/Cargo.toml index 83170a8ec56..0c28c36cc44 100644 --- a/quickwit/quickwit-common/Cargo.toml +++ b/quickwit/quickwit-common/Cargo.toml @@ -15,6 +15,7 @@ anyhow = { workspace = true } async-speed-limit = { workspace = true } async-trait = { workspace = true } bytesize = { workspace = true } +census = { workspace = true } coarsetime = { workspace = true } dyn-clone = { workspace = true } env_logger = { workspace = true } diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 67ea089f8f1..9c6b593447e 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -40,6 +40,7 @@ pub mod temp_dir; pub mod test_utils; pub mod thread_pool; pub mod tower; +pub mod tracker; pub mod type_map; pub mod uri; diff --git a/quickwit/quickwit-common/src/tracker.rs b/quickwit/quickwit-common/src/tracker.rs new file mode 100644 index 00000000000..051b15ecf17 --- /dev/null +++ b/quickwit/quickwit-common/src/tracker.rs @@ -0,0 +1,208 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; + +use census::{Inventory, TrackedObject as InventoredObject}; + +/// A ressource tracker +/// +/// This is used to track whether an object is alive (still in use), or if it's dead (no longer +/// used, but not acknowledged). It does not keep any traces of object that were alive, but were +/// since acknowledged. +#[derive(Clone)] +pub struct Tracker { + inner_inventory: Inventory, + unacknowledged_drop_receiver: Arc>>, + return_channel: Sender, +} + +/// A single tracked object +#[derive(Debug)] +pub struct TrackedObject { + inner: Option>, + return_channel: Sender, +} + +impl TrackedObject { + /// acknoledge an object + pub fn acknowledge(mut self) { + self.inner.take(); + } + + /// Create an untracked object mostly for tests + pub fn untracked(value: T) -> Self { + Tracker::new().track(value) + } + + /// Create an object which is tracked only as long as it's alive, + /// but not once it's dead. + /// The object is tracked through the provided census inventory + pub fn track_alive_in(value: T, inventory: &Inventory) -> Self { + TrackedObject { + inner: Some(inventory.track(value)), + return_channel: channel().0, + } + } +} + +impl AsRef for TrackedObject { + fn as_ref(&self) -> &T { + self + } +} + +impl Deref for TrackedObject { + type Target = T; + fn deref(&self) -> &T { + self.inner + .as_ref() + .expect("inner should only be None during drop") + } +} + +impl Drop for TrackedObject { + fn drop(&mut self) { + if let Some(item) = self.inner.take() { + // if send fails, no one cared about getting that notification, it's fine to + // drop item + let _ = self.return_channel.send(item.as_ref().clone()); + } + } +} + +impl Default for Tracker { + fn default() -> Self { + Self::new() + } +} + +impl Tracker { + /// Create a new tracker + pub fn new() -> Self { + let (sender, receiver) = channel(); + Tracker { + inner_inventory: Inventory::new(), + unacknowledged_drop_receiver: Arc::new(Mutex::new(receiver)), + return_channel: sender, + } + } + + /// Return whether it is safe to recreate this tracker. + /// + /// A tracker is considered safe to recreate if this is the only instance left, + /// and it contains no alive object (it may contain dead objects though). + /// + /// Once this return true, it will stay that way until [Tracker::track] or [Tracker::clone] are + /// called. + pub fn safe_to_recreate(&self) -> bool { + Arc::strong_count(&self.unacknowledged_drop_receiver) == 1 + && self.inner_inventory.len() == 0 + } + + /// List object which are considered alive + pub fn list_ongoing(&self) -> Vec> { + self.inner_inventory.list() + } + + /// Take away the list of object considered dead + pub fn take_dead(&self) -> Vec { + let mut res = Vec::new(); + let receiver = self.unacknowledged_drop_receiver.lock().unwrap(); + while let Ok(dead_entry) = receiver.try_recv() { + res.push(dead_entry); + } + res + } + + /// Track a new object. + pub fn track(&self, value: T) -> TrackedObject { + TrackedObject { + inner: Some(self.inner_inventory.track(value)), + return_channel: self.return_channel.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::{InventoredObject, Tracker}; + + #[track_caller] + fn assert_tracked_eq( + got: Vec>, + expected: Vec, + ) { + assert_eq!( + got.len(), + expected.len(), + "expected vec of same lenght, {} != {}", + got.len(), + expected.len() + ); + for (got_item, expected_item) in got.into_iter().zip(expected) { + assert_eq!(*got_item, expected_item); + } + } + + #[test] + fn test_single_tracker() { + let tracker = Tracker::::new(); + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker.take_dead().is_empty()); + assert!(tracker.safe_to_recreate()); + + { + let tracked_1 = tracker.track(1); + assert_tracked_eq(tracker.list_ongoing(), vec![1]); + assert!(tracker.take_dead().is_empty()); + assert!(!tracker.safe_to_recreate()); + std::mem::drop(tracked_1); // done for clarity and silence unused var warn + } + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker.safe_to_recreate()); + assert_eq!(tracker.take_dead(), vec![1]); + assert!(tracker.safe_to_recreate()); + } + + #[test] + fn test_two_tracker() { + let tracker = Tracker::::new(); + let tracker2 = tracker.clone(); + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker.take_dead().is_empty()); + assert!(!tracker.safe_to_recreate()); + + { + let tracked_1 = tracker.track(1); + assert_tracked_eq(tracker.list_ongoing(), vec![1]); + assert_tracked_eq(tracker2.list_ongoing(), vec![1]); + assert!(tracker.take_dead().is_empty()); + assert!(tracker2.take_dead().is_empty()); + assert!(!tracker.safe_to_recreate()); + std::mem::drop(tracked_1); // done for clarity and silence unused var warn + } + + assert!(tracker.list_ongoing().is_empty()); + assert!(tracker2.list_ongoing().is_empty()); + assert_eq!(tracker2.take_dead(), vec![1]); + // we took awai the dead from tracker2, so they don't show up in tracker + assert!(tracker.take_dead().is_empty()); + } +} diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index b94e5406d89..92e5ec7e47a 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -31,6 +31,7 @@ //! Below we test panics at different steps in the indexing pipeline. use std::path::Path; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Barrier, Mutex}; use std::time::Duration; @@ -42,7 +43,9 @@ use quickwit_common::split_file; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::actors::MergeExecutor; use quickwit_indexing::merge_policy::{MergeOperation, MergeTask}; -use quickwit_indexing::models::MergeScratch; +use quickwit_indexing::models::{ + DetachIndexingPipeline, DetachMergePipeline, MergeScratch, SpawnPipeline, +}; use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox}; use quickwit_metastore::{ ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, @@ -50,7 +53,7 @@ use quickwit_metastore::{ }; use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService}; -use quickwit_proto::types::{IndexUid, NodeId}; +use quickwit_proto::types::{IndexUid, NodeId, PipelineUid}; use serde_json::Value as JsonValue; use tantivy::Directory; @@ -346,3 +349,122 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul Ok(()) } + +#[tokio::test] +async fn test_no_duplicate_merge_on_pipeline_restart() -> anyhow::Result<()> { + quickwit_common::setup_logging_for_tests(); + let doc_mapper_yaml = r#" + field_mappings: + - name: body + type: text + - name: ts + type: datetime + fast: true + timestamp_field: ts + "#; + let indexing_setting_yaml = r#" + split_num_docs_target: 2500 + merge_policy: + type: "limit_merge" + max_merge_ops: 1 + merge_factor: 4 + max_merge_factor: 4 + max_finalize_merge_operations: 1 + "#; + let search_fields = ["body"]; + let index_id = "test-index-merge-duplication"; + let mut test_index_builder = TestSandbox::create( + index_id, + doc_mapper_yaml, + indexing_setting_yaml, + &search_fields, + ) + .await?; + + // 0: start + // 1: 1st merge reached the failpoint + // 11: 1st merge failed + // 12: 2nd merge reached the failpoint + // 22: 2nd merge failed (we don't care about this state) + let state = Arc::new(AtomicU32::new(0)); + let state_clone = state.clone(); + + fail::cfg_callback("before-merge-split", move || { + use std::sync::atomic::Ordering; + state_clone.fetch_add(1, Ordering::Relaxed); + std::thread::sleep(std::time::Duration::from_millis(300)); + state_clone.fetch_add(10, Ordering::Relaxed); + panic!("kill merge pipeline"); + }) + .unwrap(); + + let batch: Vec = + std::iter::repeat_with(|| serde_json::json!({"body ": TEST_TEXT, "ts": 1631072713 })) + .take(500) + .collect(); + // this sometime fails because the ingest api isn't aware of the index yet?! + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + for _ in 0..4 { + test_index_builder + .add_documents_through_api(batch.clone()) + .await?; + } + + let (indexing_pipeline, merge_pipeline) = test_index_builder + .take_indexing_and_merge_pipeline() + .await?; + + // stop the pipeline + indexing_pipeline.kill().await; + merge_pipeline + .mailbox() + .ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline) + .await?; + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let pipeline_id = test_index_builder + .indexing_service() + .ask_for_res(SpawnPipeline { + index_id: index_id.to_string(), + source_config: quickwit_config::SourceConfig::ingest_api_default(), + pipeline_uid: PipelineUid::for_test(1u128), + }) + .await?; + + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + // we shouldn't have had a 2nd split run yet (the 1st one hasn't panicked just yet) + assert_eq!(state.load(Ordering::Relaxed), 1); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + assert_eq!(state.load(Ordering::Relaxed), 11); + + let merge_pipeline_id = pipeline_id.merge_pipeline_id(); + let indexing_pipeline = test_index_builder + .indexing_service() + .ask_for_res(DetachIndexingPipeline { pipeline_id }) + .await?; + let merge_pipeline = test_index_builder + .indexing_service() + .ask_for_res(DetachMergePipeline { + pipeline_id: merge_pipeline_id, + }) + .await?; + + indexing_pipeline.kill().await; + merge_pipeline + .mailbox() + .ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline) + .await?; + + // stoping the merge pipeline makes it recheck for possible dead merge + // (alternatively, it does that sooner when rebuilding the known split list) + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + // timing-wise, we can't have reached 22, but it would be logically correct to get that state + assert_eq!(state.load(Ordering::Relaxed), 12); + + let universe = test_index_builder.universe(); + universe.kill(); + fail::cfg("before-merge-split", "off").unwrap(); + universe.quit().await; + + Ok(()) +} diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 0f395d8e8d3..895d462e0ac 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -23,7 +23,7 @@ use futures::TryStreamExt; use itertools::Itertools; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox, - Observation, + Observation, SendError, }; use quickwit_cluster::Cluster; use quickwit_common::fs::get_cache_directory_path; @@ -539,12 +539,17 @@ impl IndexingService { // The queue capacity of the merge pipeline is unbounded, so `.send_message(...)` // should not block. // We avoid using `.quit()` here because it waits for the actor to exit. - merge_pipeline_handle + // In some case the pipeline could already be shutting down, in which case we can + // receive a Disconnected + match merge_pipeline_handle .handle .mailbox() .send_message(FinishPendingMergesAndShutdownPipeline) .await - .expect("merge pipeline mailbox should not be full"); + { + Ok(_) | Err(SendError::Disconnected) => (), + Err(SendError::Full) => panic!("merge pipeline mailbox should not be full"), + } } } // Finally, we remove the completed or failed merge pipelines. @@ -1601,7 +1606,7 @@ mod tests { let observation = indexing_server_handle.process_pending_and_observe().await; assert_eq!(observation.num_running_pipelines, 0); assert_eq!(observation.num_running_merge_pipelines, 0); - universe.sleep(*HEARTBEAT).await; + universe.sleep(2 * *HEARTBEAT).await; // Check that the merge pipeline is also shut down as they are no more indexing pipeilne on // the index. assert!(universe.get_one::().is_none()); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 37fca4687ab..1cb22878672 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -350,7 +350,8 @@ impl MergePipeline { self.params.merge_policy.clone(), merge_split_downloader_mailbox, self.params.merge_scheduler_service.clone(), - ); + ) + .await?; let (_, merge_planner_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index bd5dcfd7393..7846e5d8cdb 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -18,16 +18,16 @@ use std::time::Instant; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::tracker::Tracker; use quickwit_metastore::SplitMetadata; use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::types::DocMappingUid; use serde::Serialize; -use tantivy::Inventory; use time::OffsetDateTime; use tracing::{info, warn}; use super::MergeSchedulerService; -use crate::actors::merge_scheduler_service::schedule_merge; +use crate::actors::merge_scheduler_service::{schedule_merge, GetOperationTracker}; use crate::actors::MergeSplitDownloader; use crate::merge_policy::MergeOperation; use crate::models::NewSplits; @@ -80,11 +80,15 @@ pub struct MergePlanner { merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, - /// Inventory of ongoing merge operations. If everything goes well, - /// a merge operation is dropped after the publish of the merged split. + /// Track ongoing and failed merge operations for this index /// - /// It is used to GC the known_split_ids set. - ongoing_merge_operations_inventory: Inventory, + /// We don't want to emit a new merge for splits already in the process of + /// being merged, but we want to keep track of failed merges so we can + /// reschedule them. + // TODO currently the MergePlanner is teared down when a merge fails, so this + // mechanism is only useful when there are some merges left from a previous + // pipeline. We could only tear down the rest of the pipeline on error. + ongoing_merge_operations_tracker: Tracker, /// We use the actor start_time as a way to identify incarnations. /// @@ -134,8 +138,15 @@ impl Handler for MergePlanner { _plan_merge: RunFinalizeMergePolicyAndQuit, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - // Note we ignore messages that could be coming from a different incarnation. - // (See comment on `Self::incarnation_start_at`.) + // consume failed merges so that we may try to reschedule them one last time + for failed_merge in self.ongoing_merge_operations_tracker.take_dead() { + for split in failed_merge.splits { + // if they were from a dead merge, we always record them, they are likely + // already part of our known splits, and we don't want to rebuild the known + // split list as it's likely to log about not halving its size. + self.record_split(split); + } + } self.send_merge_ops(true, ctx).await?; Err(ActorExitStatus::Success) } @@ -183,30 +194,42 @@ impl MergePlanner { QueueCapacity::Bounded(1) } - pub fn new( + pub async fn new( pipeline_id: &MergePipelineId, immature_splits: Vec, merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, - ) -> MergePlanner { + ) -> anyhow::Result { let immature_splits: Vec = immature_splits .into_iter() .filter(|split_metadata| belongs_to_pipeline(pipeline_id, split_metadata)) .collect(); + let ongoing_merge_operations_tracker = merge_scheduler_service + .ask(GetOperationTracker(pipeline_id.index_uid.clone())) + .await?; + + let mut known_split_ids: HashSet = HashSet::new(); + let ongoing_merge_operations = ongoing_merge_operations_tracker.list_ongoing(); + for merge_op in ongoing_merge_operations { + for split in &merge_op.splits { + known_split_ids.insert(split.split_id().to_string()); + } + } + let mut merge_planner = MergePlanner { - known_split_ids: Default::default(), + known_split_ids, known_split_ids_recompute_attempt_id: 0, partitioned_young_splits: Default::default(), merge_policy, merge_split_downloader_mailbox, merge_scheduler_service, - ongoing_merge_operations_inventory: Inventory::default(), + ongoing_merge_operations_tracker, incarnation_started_at: Instant::now(), }; merge_planner.record_splits_if_necessary(immature_splits); - merge_planner + Ok(merge_planner) } fn rebuild_known_split_ids(&self) -> HashSet { @@ -217,7 +240,7 @@ impl MergePlanner { known_split_ids.insert(split.split_id().to_string()); } } - let ongoing_merge_operations = self.ongoing_merge_operations_inventory.list(); + let ongoing_merge_operations = self.ongoing_merge_operations_tracker.list_ongoing(); // Add splits that are known as in merge. for merge_op in ongoing_merge_operations { for split in &merge_op.splits { @@ -237,7 +260,7 @@ impl MergePlanner { /// Updates `known_split_ids` and return true if the split was not /// previously known and should be recorded. - fn acknownledge_split(&mut self, split_id: &str) -> bool { + fn acknowledge_split(&mut self, split_id: &str) -> bool { if self.known_split_ids.contains(split_id) { return false; } @@ -251,6 +274,10 @@ impl MergePlanner { if self.known_split_ids_recompute_attempt_id % 100 == 0 { self.known_split_ids = self.rebuild_known_split_ids(); self.known_split_ids_recompute_attempt_id = 0; + + for failed_merge in self.ongoing_merge_operations_tracker.take_dead() { + self.record_splits_if_necessary(failed_merge.splits); + } } } @@ -280,12 +307,13 @@ impl MergePlanner { // a split already in store to be received. // // See `known_split_ids`. - if !self.acknownledge_split(new_split.split_id()) { + if !self.acknowledge_split(new_split.split_id()) { continue; } self.record_split(new_split); } } + async fn compute_merge_ops( &mut self, is_finalize: bool, @@ -323,9 +351,8 @@ impl MergePlanner { let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?; for merge_operation in merge_ops { info!(merge_operation=?merge_operation, "schedule merge operation"); - let tracked_merge_operation = self - .ongoing_merge_operations_inventory - .track(merge_operation); + let tracked_merge_operation = + self.ongoing_merge_operations_tracker.track(merge_operation); schedule_merge( &self.merge_scheduler_service, tracked_merge_operation, @@ -435,7 +462,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); { @@ -560,7 +588,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); @@ -652,7 +681,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; let (merge_planner_mailbox, merge_planner_handle) = universe.spawn_builder().spawn(merge_planner); universe.sleep(Duration::from_secs(10)).await; @@ -717,7 +747,8 @@ mod tests { merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), - ); + ) + .await?; // We create a fake old mailbox that contains two new splits and a PlanMerge message from an // old incarnation. This could happen in real life if the merge pipeline failed // right after a `PlanMerge` was pushed to the pipeline. Note that #3847 did not diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index 7e09e8a4246..46376b6b820 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -14,13 +14,14 @@ use std::cmp::Reverse; use std::collections::binary_heap::PeekMut; -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, HashMap}; use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; -use tantivy::TrackedObject; +use quickwit_common::tracker::{TrackedObject, Tracker}; +use quickwit_proto::types::IndexUid; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::error; @@ -118,6 +119,8 @@ pub struct MergeSchedulerService { pending_merge_queue: BinaryHeap, next_merge_id: u64, pending_merge_bytes: u64, + tracked_operations: HashMap>, + gc_sequence_id: usize, } impl Default for MergeSchedulerService { @@ -135,6 +138,8 @@ impl MergeSchedulerService { pending_merge_queue: BinaryHeap::default(), next_merge_id: 0, pending_merge_bytes: 0, + tracked_operations: HashMap::new(), + gc_sequence_id: 0, } } @@ -189,6 +194,14 @@ impl MergeSchedulerService { .ongoing_merge_operations .set(num_merges); } + + fn maybe_gc_trackers(&mut self) { + self.gc_sequence_id += 1; + if self.gc_sequence_id % 100 == 0 { + self.tracked_operations + .retain(|_k, tracker| !tracker.safe_to_recreate()) + } + } } #[async_trait] @@ -289,17 +302,39 @@ impl Handler for MergeSchedulerService { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.schedule_pending_merges(ctx); + self.maybe_gc_trackers(); Ok(()) } } +#[derive(Debug)] +pub(crate) struct GetOperationTracker(pub IndexUid); + +#[async_trait] +impl Handler for MergeSchedulerService { + type Reply = Tracker; + + async fn handle( + &mut self, + get_operation_tracker: GetOperationTracker, + _ctx: &ActorContext, + ) -> Result { + let tracker = self + .tracked_operations + .entry(get_operation_tracker.0) + .or_default() + .clone(); + Ok(tracker) + } +} + #[cfg(test)] mod tests { use std::time::Duration; use quickwit_actors::Universe; + use quickwit_common::tracker::Tracker; use quickwit_metastore::SplitMetadata; - use tantivy::Inventory; use tokio::time::timeout; use super::*; @@ -339,7 +374,7 @@ mod tests { let (merge_scheduler_service, _) = universe .spawn_builder() .spawn(MergeSchedulerService::new(2)); - let inventory = Inventory::new(); + let inventory = Tracker::new(); let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe.create_test_mailbox(); diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 029c0c5b0f6..936432df1a4 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -23,13 +23,13 @@ use std::sync::Arc; pub(crate) use const_write_amplification::ConstWriteAmplificationMergePolicy; use itertools::Itertools; pub use nop_merge_policy::NopMergePolicy; +use quickwit_common::tracker::TrackedObject; use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_config::IndexingSettings; use quickwit_metastore::{SplitMaturity, SplitMetadata}; use quickwit_proto::types::SplitId; use serde::Serialize; pub(crate) use stable_log_merge_policy::StableLogMergePolicy; -use tantivy::TrackedObject; use tracing::{info_span, Span}; use crate::actors::MergePermit; @@ -55,8 +55,8 @@ pub struct MergeTask { impl MergeTask { #[cfg(any(test, feature = "testsuite"))] pub fn from_merge_operation_for_test(merge_operation: MergeOperation) -> MergeTask { - let inventory = tantivy::Inventory::default(); - let tracked_merge_operation = inventory.track(merge_operation); + let tracker = quickwit_common::tracker::Tracker::new(); + let tracked_merge_operation = tracker.track(merge_operation); MergeTask { merge_operation: tracked_merge_operation, _merge_permit: MergePermit::for_test(), @@ -397,13 +397,14 @@ pub mod tests { fn apply_merge( merge_policy: &Arc, split_index: &mut HashMap, - merge_op: &MergeOperation, + merge_op: TrackedObject, ) -> SplitMetadata { for split in merge_op.splits_as_slice() { assert!(split_index.remove(split.split_id()).is_some()); } let merged_split = fake_merge(merge_policy, merge_op.splits_as_slice()); split_index.insert(merged_split.split_id().to_string(), merged_split.clone()); + merge_op.acknowledge(); merged_split } @@ -427,7 +428,8 @@ pub mod tests { merge_policy.clone(), merge_task_mailbox, universe.get_or_spawn_one::(), - ); + ) + .await?; let mut split_index: HashMap = HashMap::default(); let (merge_planner_mailbox, merge_planner_handler) = universe.spawn_builder().spawn(merge_planner); @@ -448,7 +450,9 @@ pub mod tests { } let new_splits: Vec = merge_tasks .into_iter() - .map(|merge_op| apply_merge(&merge_policy, &mut split_index, &merge_op)) + .map(|merge_op| { + apply_merge(&merge_policy, &mut split_index, merge_op.merge_operation) + }) .collect(); merge_planner_mailbox .send_message(NewSplits { new_splits }) @@ -468,7 +472,7 @@ pub mod tests { let merge_tasks = merge_task_inbox.drain_for_test_typed::(); for merge_task in merge_tasks { - apply_merge(&merge_policy, &mut split_index, &merge_task); + apply_merge(&merge_policy, &mut split_index, merge_task.merge_operation); } let split_metadatas: Vec = split_index.values().cloned().collect(); diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 8756e6fdc5c..616040282d1 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -18,7 +18,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use bytes::Bytes; -use quickwit_actors::{Mailbox, Universe}; +use quickwit_actors::{ActorHandle, Mailbox, Universe}; use quickwit_cluster::{create_cluster_for_test, ChannelTransport}; use quickwit_common::pubsub::EventBroker; use quickwit_common::rand::append_random_suffix; @@ -32,13 +32,16 @@ use quickwit_ingest::{init_ingest_api, IngesterPool, QUEUES_DIR_NAME}; use quickwit_metastore::{ CreateIndexRequestExt, MetastoreResolver, Split, SplitMetadata, SplitState, }; +use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::{CreateIndexRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId, PipelineUid, SourceId}; use quickwit_storage::{Storage, StorageResolver}; use serde_json::Value as JsonValue; -use crate::actors::IndexingService; -use crate::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline}; +use crate::actors::{IndexingPipeline, IndexingService, MergePipeline}; +use crate::models::{ + DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, +}; /// Creates a Test environment. /// @@ -56,6 +59,7 @@ pub struct TestSandbox { storage: Arc, add_docs_id: AtomicUsize, universe: Universe, + indexing_pipeline_id: Option, _temp_dir: tempfile::TempDir, } @@ -130,6 +134,14 @@ impl TestSandbox { .await?; let (indexing_service, _indexing_service_handle) = universe.spawn_builder().spawn(indexing_service_actor); + + let indexing_pipeline_id = indexing_service + .ask_for_res(SpawnPipeline { + index_id: index_uid.index_id.to_string(), + source_config, + pipeline_uid: PipelineUid::for_test(1u128), + }) + .await?; Ok(TestSandbox { node_id, index_uid, @@ -141,6 +153,7 @@ impl TestSandbox { storage, add_docs_id: AtomicUsize::default(), universe, + indexing_pipeline_id: Some(indexing_pipeline_id), _temp_dir: temp_dir, }) } @@ -189,6 +202,56 @@ impl TestSandbox { Ok(pipeline_statistics) } + /// Adds documents and waits for them to be indexed (creating a separate split). + /// + /// The documents are expected to be `JsonValue`. + /// They can be created using the `serde_json::json!` macro. + pub async fn add_documents_through_api(&self, json_docs: I) -> anyhow::Result<()> + where + I: IntoIterator + 'static, + I::IntoIter: Send, + { + let ingest_api_service_mailbox = self + .universe + .get_one::() + .unwrap(); + + let batch_builder = + quickwit_ingest::DocBatchBuilder::new(self.index_uid.index_id.to_string()); + let mut json_writer = batch_builder.json_writer(); + for doc in json_docs { + json_writer.ingest_doc(doc)?; + } + let batch = json_writer.build(); + let ingest_request = quickwit_ingest::IngestRequest { + doc_batches: vec![batch], + commit: quickwit_ingest::CommitType::WaitFor as i32, + }; + ingest_api_service_mailbox + .ask_for_res(ingest_request) + .await?; + Ok(()) + } + + pub async fn take_indexing_and_merge_pipeline( + &mut self, + ) -> anyhow::Result<(ActorHandle, ActorHandle)> { + let pipeline_id = self.indexing_pipeline_id.take().unwrap(); + let merge_pipeline_id = pipeline_id.merge_pipeline_id(); + let indexing_pipeline = self + .indexing_service + .ask_for_res(DetachIndexingPipeline { pipeline_id }) + .await?; + let merge_pipeline = self + .indexing_service + .ask_for_res(DetachMergePipeline { + pipeline_id: merge_pipeline_id, + }) + .await?; + + Ok((indexing_pipeline, merge_pipeline)) + } + /// Returns the metastore of the TestSandbox. /// /// The metastore is a file-backed metastore. @@ -233,6 +296,11 @@ impl TestSandbox { &self.universe } + /// Returns a Mailbox for the indexing service + pub fn indexing_service(&self) -> Mailbox { + self.indexing_service.clone() + } + /// Gracefully quits all registered actors in the underlying universe and asserts that none of /// them panicked. /// diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index 5908fed5612..cf7db6ad66e 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -101,7 +101,7 @@ async fn validate_search_across_doc_mapping_updates( ) .await .unwrap(); - + tokio::time::sleep(Duration::from_millis(50)).await; sandbox .local_ingest(index_id, ingest_after_update) .await diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index 1e2aa97f1b6..de5616578b8 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::extract_time_range; +use quickwit_common::tracker::TrackedObject; use quickwit_common::uri::Uri; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader}; @@ -95,7 +96,7 @@ impl Actor for DeleteTaskPlanner { .ongoing_delete_operations_inventory .list() .iter() - .map(|tracked_operation| tracked_operation.as_ref().clone()) + .map(|tracked_operation| (**tracked_operation).clone()) .collect_vec(); DeleteTaskPlannerState { ongoing_delete_operations, @@ -195,9 +196,10 @@ impl DeleteTaskPlanner { split_with_deletes.split_metadata, ); info!(delete_operation=?delete_operation, "planned delete operation"); - let tracked_delete_operation = self - .ongoing_delete_operations_inventory - .track(delete_operation); + let tracked_delete_operation = TrackedObject::track_alive_in( + delete_operation, + &self.ongoing_delete_operations_inventory, + ); schedule_merge( &self.merge_scheduler_service, tracked_delete_operation,