Skip to content

Commit db1751b

Browse files
restrict maturity period to retention (#5543)
1 parent d4ad40d commit db1751b

File tree

8 files changed

+142
-9
lines changed

8 files changed

+142
-9
lines changed

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use quickwit_actors::{
3030
use quickwit_common::pubsub::EventBroker;
3131
use quickwit_common::temp_dir::TempDirectory;
3232
use quickwit_common::KillSwitch;
33-
use quickwit_config::{IndexingSettings, SourceConfig};
33+
use quickwit_config::{IndexingSettings, RetentionPolicy, SourceConfig};
3434
use quickwit_doc_mapper::DocMapper;
3535
use quickwit_ingest::IngesterPool;
3636
use quickwit_proto::indexing::IndexingPipelineId;
@@ -367,6 +367,7 @@ impl IndexingPipeline {
367367
UploaderType::IndexUploader,
368368
self.params.metastore.clone(),
369369
self.params.merge_policy.clone(),
370+
self.params.retention_policy.clone(),
370371
self.params.split_store.clone(),
371372
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
372373
self.params.max_concurrent_split_uploads_index,
@@ -585,6 +586,7 @@ pub struct IndexingPipelineParams {
585586

586587
// Merge-related parameters
587588
pub merge_policy: Arc<dyn MergePolicy>,
589+
pub retention_policy: Option<RetentionPolicy>,
588590
pub merge_planner_mailbox: Mailbox<MergePlanner>,
589591
pub max_concurrent_split_uploads_merge: usize,
590592

@@ -717,6 +719,7 @@ mod tests {
717719
storage,
718720
split_store,
719721
merge_policy: default_merge_policy(),
722+
retention_policy: None,
720723
queues_dir_path: PathBuf::from("./queues"),
721724
max_concurrent_split_uploads_index: 4,
722725
max_concurrent_split_uploads_merge: 5,
@@ -831,6 +834,7 @@ mod tests {
831834
storage,
832835
split_store,
833836
merge_policy: default_merge_policy(),
837+
retention_policy: None,
834838
max_concurrent_split_uploads_index: 4,
835839
max_concurrent_split_uploads_merge: 5,
836840
cooperative_indexing_permits: None,
@@ -908,6 +912,7 @@ mod tests {
908912
metastore: metastore.clone(),
909913
split_store: split_store.clone(),
910914
merge_policy: default_merge_policy(),
915+
retention_policy: None,
911916
max_concurrent_split_uploads: 2,
912917
merge_io_throughput_limiter_opt: None,
913918
merge_scheduler_service: universe.get_or_spawn_one(),
@@ -930,6 +935,7 @@ mod tests {
930935
storage,
931936
split_store,
932937
merge_policy: default_merge_policy(),
938+
retention_policy: None,
933939
max_concurrent_split_uploads_index: 4,
934940
max_concurrent_split_uploads_merge: 5,
935941
cooperative_indexing_permits: None,
@@ -1057,6 +1063,7 @@ mod tests {
10571063
storage,
10581064
split_store,
10591065
merge_policy: default_merge_policy(),
1066+
retention_policy: None,
10601067
max_concurrent_split_uploads_index: 4,
10611068
max_concurrent_split_uploads_merge: 5,
10621069
cooperative_indexing_permits: None,

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ impl IndexingService {
287287
})?;
288288
let merge_policy =
289289
crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings);
290+
let retention_policy = index_config.retention_policy_opt.clone();
290291
let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone());
291292

292293
let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
@@ -301,6 +302,7 @@ impl IndexingService {
301302
split_store: split_store.clone(),
302303
merge_scheduler_service: self.merge_scheduler_service.clone(),
303304
merge_policy: merge_policy.clone(),
305+
retention_policy: retention_policy.clone(),
304306
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
305307
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
306308
event_broker: self.event_broker.clone(),
@@ -329,6 +331,7 @@ impl IndexingService {
329331

330332
// Merge-related parameters
331333
merge_policy,
334+
retention_policy,
332335
max_concurrent_split_uploads_merge,
333336
merge_planner_mailbox,
334337

quickwit/quickwit-indexing/src/actors/merge_pipeline.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use quickwit_common::io::{IoControls, Limiter};
2929
use quickwit_common::pubsub::EventBroker;
3030
use quickwit_common::temp_dir::TempDirectory;
3131
use quickwit_common::KillSwitch;
32+
use quickwit_config::RetentionPolicy;
3233
use quickwit_doc_mapper::DocMapper;
3334
use quickwit_metastore::{
3435
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata,
@@ -286,6 +287,7 @@ impl MergePipeline {
286287
UploaderType::MergeUploader,
287288
self.params.metastore.clone(),
288289
self.params.merge_policy.clone(),
290+
self.params.retention_policy.clone(),
289291
self.params.split_store.clone(),
290292
merge_publisher_mailbox.into(),
291293
self.params.max_concurrent_split_uploads,
@@ -572,6 +574,7 @@ pub struct MergePipelineParams {
572574
pub merge_scheduler_service: Mailbox<MergeSchedulerService>,
573575
pub split_store: IndexingSplitStore,
574576
pub merge_policy: Arc<dyn MergePolicy>,
577+
pub retention_policy: Option<RetentionPolicy>,
575578
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
576579
pub merge_io_throughput_limiter_opt: Option<Limiter>,
577580
pub event_broker: EventBroker,
@@ -635,6 +638,7 @@ mod tests {
635638
merge_scheduler_service: universe.get_or_spawn_one(),
636639
split_store,
637640
merge_policy: default_merge_policy(),
641+
retention_policy: None,
638642
max_concurrent_split_uploads: 2,
639643
merge_io_throughput_limiter_opt: None,
640644
event_broker: Default::default(),

quickwit/quickwit-indexing/src/actors/uploader.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use once_cell::sync::OnceCell;
3131
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
3232
use quickwit_common::pubsub::EventBroker;
3333
use quickwit_common::spawn_named_task;
34+
use quickwit_config::RetentionPolicy;
3435
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
3536
use quickwit_metastore::{SplitMetadata, StageSplitsRequestExt};
3637
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, StageSplitsRequest};
@@ -166,6 +167,7 @@ pub struct Uploader {
166167
uploader_type: UploaderType,
167168
metastore: MetastoreServiceClient,
168169
merge_policy: Arc<dyn MergePolicy>,
170+
retention_policy: Option<RetentionPolicy>,
169171
split_store: IndexingSplitStore,
170172
split_update_mailbox: SplitsUpdateMailbox,
171173
max_concurrent_split_uploads: usize,
@@ -174,10 +176,12 @@ pub struct Uploader {
174176
}
175177

176178
impl Uploader {
179+
#[allow(clippy::too_many_arguments)]
177180
pub fn new(
178181
uploader_type: UploaderType,
179182
metastore: MetastoreServiceClient,
180183
merge_policy: Arc<dyn MergePolicy>,
184+
retention_policy: Option<RetentionPolicy>,
181185
split_store: IndexingSplitStore,
182186
split_update_mailbox: SplitsUpdateMailbox,
183187
max_concurrent_split_uploads: usize,
@@ -187,6 +191,7 @@ impl Uploader {
187191
uploader_type,
188192
metastore,
189193
merge_policy,
194+
retention_policy,
190195
split_store,
191196
split_update_mailbox,
192197
max_concurrent_split_uploads,
@@ -300,6 +305,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
300305
let index_uid = batch.index_uid();
301306
let ctx_clone = ctx.clone();
302307
let merge_policy = self.merge_policy.clone();
308+
let retention_policy = self.retention_policy.clone();
303309
debug!(split_ids=?split_ids, "start-stage-and-store-splits");
304310
let event_broker = self.event_broker.clone();
305311
spawn_named_task(
@@ -324,6 +330,7 @@ impl Handler<PackagedSplitBatch> for Uploader {
324330
)?;
325331
let split_metadata = create_split_metadata(
326332
&merge_policy,
333+
retention_policy.as_ref(),
327334
&packaged_split.split_attrs,
328335
packaged_split.tags.clone(),
329336
split_streamer.footer_range.start..split_streamer.footer_range.end,
@@ -535,6 +542,7 @@ mod tests {
535542
UploaderType::IndexUploader,
536543
MetastoreServiceClient::from_mock(mock_metastore),
537544
merge_policy,
545+
None,
538546
split_store,
539547
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
540548
4,
@@ -650,6 +658,7 @@ mod tests {
650658
UploaderType::IndexUploader,
651659
MetastoreServiceClient::from_mock(mock_metastore),
652660
merge_policy,
661+
None,
653662
split_store,
654663
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
655664
4,
@@ -797,6 +806,7 @@ mod tests {
797806
UploaderType::IndexUploader,
798807
MetastoreServiceClient::from_mock(mock_metastore),
799808
merge_policy,
809+
None,
800810
split_store,
801811
SplitsUpdateMailbox::Publisher(publisher_mailbox),
802812
4,
@@ -870,6 +880,7 @@ mod tests {
870880
UploaderType::IndexUploader,
871881
MetastoreServiceClient::from_mock(mock_metastore),
872882
default_merge_policy(),
883+
None,
873884
split_store,
874885
SplitsUpdateMailbox::Sequencer(sequencer_mailbox),
875886
4,
@@ -974,6 +985,7 @@ mod tests {
974985
UploaderType::IndexUploader,
975986
MetastoreServiceClient::from_mock(mock_metastore),
976987
merge_policy,
988+
None,
977989
split_store,
978990
SplitsUpdateMailbox::Publisher(publisher_mailbox),
979991
4,

quickwit/quickwit-indexing/src/merge_policy/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ pub mod tests {
396396
source_id: "test_source".to_string(),
397397
};
398398
let split_attrs = merge_split_attrs(pipeline_id, merged_split_id, splits).unwrap();
399-
create_split_metadata(merge_policy, &split_attrs, tags, 0..0)
399+
create_split_metadata(merge_policy, None, &split_attrs, tags, 0..0)
400400
}
401401

402402
fn apply_merge(

quickwit/quickwit-indexing/src/models/split_attrs.rs

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ use std::collections::BTreeSet;
2121
use std::fmt;
2222
use std::ops::{Range, RangeInclusive};
2323
use std::sync::Arc;
24+
use std::time::Duration;
2425

25-
use quickwit_metastore::SplitMetadata;
26+
use quickwit_metastore::{SplitMaturity, SplitMetadata};
2627
use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId, SourceId, SplitId};
2728
use tantivy::DateTime;
2829
use time::OffsetDateTime;
@@ -92,13 +93,27 @@ impl fmt::Debug for SplitAttrs {
9293

9394
pub fn create_split_metadata(
9495
merge_policy: &Arc<dyn MergePolicy>,
96+
retention_policy: Option<&quickwit_config::RetentionPolicy>,
9597
split_attrs: &SplitAttrs,
9698
tags: BTreeSet<String>,
9799
footer_offsets: Range<u64>,
98100
) -> SplitMetadata {
99101
let create_timestamp = OffsetDateTime::now_utc().unix_timestamp();
100-
let maturity =
102+
103+
let time_range = split_attrs
104+
.time_range
105+
.as_ref()
106+
.map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs());
107+
108+
let mut maturity =
101109
merge_policy.split_maturity(split_attrs.num_docs as usize, split_attrs.num_merge_ops);
110+
if let Some(max_maturity) = max_maturity_before_end_of_retention(
111+
retention_policy,
112+
create_timestamp,
113+
time_range.as_ref().map(|time_range| *time_range.end()),
114+
) {
115+
maturity = maturity.min(max_maturity);
116+
}
102117
SplitMetadata {
103118
node_id: split_attrs.node_id.to_string(),
104119
index_uid: split_attrs.index_uid.clone(),
@@ -107,10 +122,7 @@ pub fn create_split_metadata(
107122
split_id: split_attrs.split_id.clone(),
108123
partition_id: split_attrs.partition_id,
109124
num_docs: split_attrs.num_docs as usize,
110-
time_range: split_attrs
111-
.time_range
112-
.as_ref()
113-
.map(|range| range.start().into_timestamp_secs()..=range.end().into_timestamp_secs()),
125+
time_range,
114126
uncompressed_docs_size_in_bytes: split_attrs.uncompressed_docs_size_in_bytes,
115127
create_timestamp,
116128
maturity,
@@ -120,3 +132,80 @@ pub fn create_split_metadata(
120132
num_merge_ops: split_attrs.num_merge_ops,
121133
}
122134
}
135+
136+
/// reduce the maturity period of a split based on retention policy, so that it doesn't get merged
137+
/// after it expires.
138+
fn max_maturity_before_end_of_retention(
139+
retention_policy: Option<&quickwit_config::RetentionPolicy>,
140+
create_timestamp: i64,
141+
time_range_end: Option<i64>,
142+
) -> Option<SplitMaturity> {
143+
let time_range_end = time_range_end? as u64;
144+
let retention_period_s = retention_policy?.retention_period().ok()?.as_secs();
145+
146+
let maturity = if let Some(maturation_period_s) =
147+
(time_range_end + retention_period_s).checked_sub(create_timestamp as u64)
148+
{
149+
SplitMaturity::Immature {
150+
maturation_period: Duration::from_secs(maturation_period_s),
151+
}
152+
} else {
153+
// this split could be deleted as soon as it is created. Ideally we would
154+
// handle that sooner.
155+
SplitMaturity::Mature
156+
};
157+
Some(maturity)
158+
}
159+
160+
#[cfg(test)]
161+
mod tests {
162+
use std::time::Duration;
163+
164+
use quickwit_metastore::SplitMaturity;
165+
166+
use super::max_maturity_before_end_of_retention;
167+
168+
#[test]
169+
fn test_max_maturity_before_end_of_retention() {
170+
let retention_policy = quickwit_config::RetentionPolicy {
171+
evaluation_schedule: "daily".to_string(),
172+
retention_period: "300 sec".to_string(),
173+
};
174+
let create_timestamp = 1000;
175+
176+
// this should be deleted asap, not subject to merge
177+
assert_eq!(
178+
max_maturity_before_end_of_retention(
179+
Some(&retention_policy),
180+
create_timestamp,
181+
Some(200),
182+
),
183+
Some(SplitMaturity::Mature)
184+
);
185+
186+
// retention ends at 750 + 300 = 1050, which is 50s from now
187+
assert_eq!(
188+
max_maturity_before_end_of_retention(
189+
Some(&retention_policy),
190+
create_timestamp,
191+
Some(750),
192+
),
193+
Some(SplitMaturity::Immature {
194+
maturation_period: Duration::from_secs(50)
195+
})
196+
);
197+
198+
// no retention policy
199+
assert_eq!(
200+
max_maturity_before_end_of_retention(None, create_timestamp, Some(850),),
201+
None,
202+
);
203+
204+
// no timestamp_range.end but a retention policy, that's odd, don't change anything about
205+
// the maturity period
206+
assert_eq!(
207+
max_maturity_before_end_of_retention(Some(&retention_policy), create_timestamp, None,),
208+
None,
209+
);
210+
}
211+
}

quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ impl DeleteTaskPipeline {
181181
UploaderType::DeleteUploader,
182182
self.metastore.clone(),
183183
merge_policy,
184+
index_config.retention_policy_opt.clone(),
184185
split_store.clone(),
185186
SplitsUpdateMailbox::Publisher(publisher_mailbox),
186187
self.max_concurrent_split_uploads,

quickwit/quickwit-metastore/src/split_metadata.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ impl FromStr for SplitState {
344344
/// or `Immature` with a given maturation period.
345345
/// The maturity is determined by the `MergePolicy`.
346346
#[serde_as]
347-
#[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq)]
347+
#[derive(Clone, Copy, Debug, Default, Eq, Serialize, Deserialize, PartialEq, PartialOrd, Ord)]
348348
#[serde(tag = "type")]
349349
#[serde(rename_all = "snake_case")]
350350
pub enum SplitMaturity {
@@ -439,4 +439,21 @@ mod tests {
439439

440440
assert_eq!(format!("{:?}", split_metadata), expected_output);
441441
}
442+
443+
#[test]
444+
fn test_spit_maturity_order() {
445+
assert!(
446+
SplitMaturity::Mature
447+
< SplitMaturity::Immature {
448+
maturation_period: Duration::from_secs(0)
449+
}
450+
);
451+
assert!(
452+
SplitMaturity::Immature {
453+
maturation_period: Duration::from_secs(0)
454+
} < SplitMaturity::Immature {
455+
maturation_period: Duration::from_secs(1)
456+
}
457+
);
458+
}
442459
}

0 commit comments

Comments
 (0)