Skip to content

Commit 625b25f

Browse files
imotovfulmicoton
andauthored
Add an internal incarnation id to the index (#3212)
Adds an internal incarnation id to the index in order to differentiate a newly created index from a recently deleted index with the same name. Fixes #3041 Co-authored-by: Paul Masurel <[email protected]>
1 parent 5318eee commit 625b25f

File tree

58 files changed

+1357
-271
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1357
-271
lines changed

quickwit/Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-cluster/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ thiserror = { workspace = true }
2020
tokio = { workspace = true }
2121
tokio-stream = { workspace = true }
2222
tracing = { workspace = true }
23+
ulid = { workspace = true }
2324
utoipa = { workspace = true }
2425

2526
quickwit-common = { workspace = true }

quickwit/quickwit-cluster/src/cluster.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use tracing::{debug, error, info};
4242
use crate::error::{ClusterError, ClusterResult};
4343
use crate::member::{
4444
build_cluster_members, ClusterMember, ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY,
45-
INDEXING_TASK_PREFIX, INDEXING_TASK_SEPARATOR,
45+
INDEXING_TASK_PREFIX,
4646
};
4747
use crate::QuickwitService;
4848

@@ -342,10 +342,7 @@ impl Cluster {
342342
for (indexing_task, indexing_tasks_group) in
343343
indexing_tasks.iter().group_by(|&task| task).into_iter()
344344
{
345-
let key = format!(
346-
"{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}{}{INDEXING_TASK_SEPARATOR}{}",
347-
indexing_task.index_id, indexing_task.source_id
348-
);
345+
let key = format!("{INDEXING_TASK_PREFIX}:{}", indexing_task.to_string());
349346
current_indexing_tasks_keys.remove(&key);
350347
chitchat_guard
351348
.self_node_state()
@@ -495,6 +492,7 @@ mod tests {
495492
use quickwit_common::test_utils::wait_until_predicate;
496493
use quickwit_proto::indexing_api::IndexingTask;
497494
use rand::Rng;
495+
use ulid::Ulid;
498496

499497
use super::*;
500498

@@ -559,6 +557,7 @@ mod tests {
559557
let indexing_task = IndexingTask {
560558
index_id: "index-1".to_string(),
561559
source_id: "source-1".to_string(),
560+
incarnation_id: Ulid::new().to_string(),
562561
};
563562
cluster2
564563
.set_key_value(GRPC_ADVERTISE_ADDR_KEY, "127.0.0.1:1001")
@@ -626,13 +625,15 @@ mod tests {
626625
.unwrap(),
627626
);
628627
let mut random_generator = rand::thread_rng();
629-
let indexing_tasks = (0..1_000)
628+
// TODO: increase it back to 1000 when https://github.com/quickwit-oss/chitchat/issues/81 is fixed
629+
let indexing_tasks = (0..500)
630630
.map(|_| {
631631
let index_id = random_generator.gen_range(0..=10_000);
632632
let source_id = random_generator.gen_range(0..=100);
633633
IndexingTask {
634634
index_id: format!("index-{index_id}"),
635635
source_id: format!("source-{source_id}"),
636+
incarnation_id: "11111111111111111111111111".to_string(),
636637
}
637638
})
638639
.collect_vec();
@@ -739,11 +740,13 @@ mod tests {
739740
let chitchat_handle = cluster1.chitchat_handle.chitchat();
740741
let mut chitchat_guard = chitchat_handle.lock().await;
741742
chitchat_guard.self_node_state().set(
742-
format!("{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}my_good_index{INDEXING_TASK_SEPARATOR}my_source"),
743+
format!(
744+
"{INDEXING_TASK_PREFIX}:my_good_index:my_source:11111111111111111111111111"
745+
),
743746
"2".to_string(),
744747
);
745748
chitchat_guard.self_node_state().set(
746-
format!("{INDEXING_TASK_PREFIX}{INDEXING_TASK_SEPARATOR}my_bad_index{INDEXING_TASK_SEPARATOR}my_source"),
749+
format!("{INDEXING_TASK_PREFIX}:my_bad_index:my_source:11111111111111111111111111"),
747750
"malformatted value".to_string(),
748751
);
749752
}

quickwit/quickwit-cluster/src/member.rs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -162,38 +162,27 @@ pub(crate) fn build_cluster_member(
162162
))
163163
}
164164

165-
// Parses indexing task key into the pair (index_id, source_id).
166-
fn parse_indexing_task_key(key: &str) -> anyhow::Result<(&str, &str)> {
167-
let (_prefix, key_without_prefix) =
168-
key.split_once(INDEXING_TASK_SEPARATOR).ok_or_else(|| {
169-
anyhow!(
170-
"Indexing task must contain the delimiter character `:`: `{}`",
171-
key
172-
)
173-
})?;
174-
let (index_id, source_id) = key_without_prefix
175-
.split_once(INDEXING_TASK_SEPARATOR)
176-
.ok_or_else(|| {
177-
anyhow!(
178-
"Indexing task index ID and source ID must be separated by character `:`: `{}`",
179-
key_without_prefix
180-
)
181-
})?;
182-
Ok((index_id, source_id))
165+
// Parses indexing task key into the IndexingTask.
166+
fn parse_indexing_task_key(key: &str) -> anyhow::Result<IndexingTask> {
167+
let (_prefix, reminder) = key.split_once(INDEXING_TASK_SEPARATOR).ok_or_else(|| {
168+
anyhow!(
169+
"Indexing task must contain the delimiter character `:`: `{}`",
170+
key
171+
)
172+
})?;
173+
IndexingTask::try_from(reminder)
183174
}
184175

185-
/// Parses indexing tasks serialized in keys formatted as `INDEXING_TASK_PREFIX:index_id:source_id`.
186-
/// Malformed keys and values are ignored, just warnings are emitted.
176+
/// Parses indexing tasks serialized in keys formatted as
177+
/// `INDEXING_TASK_PREFIX:index_id:index_incarnation:source_id`. Malformed keys and values are
178+
/// ignored, just warnings are emitted.
187179
pub(crate) fn parse_indexing_tasks(node_state: &NodeState, node_id: &str) -> Vec<IndexingTask> {
188180
node_state
189181
.iter_key_values(|key, _| key.starts_with(INDEXING_TASK_PREFIX))
190182
.map(|(key, versioned_value)| {
191-
let (index_id, source_id) = parse_indexing_task_key(key)?;
183+
let indexing_task = parse_indexing_task_key(key)?;
192184
let num_tasks: usize = versioned_value.value.parse()?;
193-
Ok((0..num_tasks).map(|_| IndexingTask {
194-
index_id: index_id.to_string(),
195-
source_id: source_id.to_string(),
196-
}))
185+
Ok((0..num_tasks).map(move |_| indexing_task.clone()))
197186
})
198187
.flatten_ok()
199188
.filter_map(

quickwit/quickwit-control-plane/src/indexing_plan.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,15 @@ fn compute_node_score(node_id: &str, physical_plan: &PhysicalIndexingPlan) -> f3
245245
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
246246
pub(crate) struct IndexSourceId {
247247
pub index_id: String,
248+
pub incarnation_id: String,
248249
pub source_id: String,
249250
}
250251

251252
impl From<IndexingTask> for IndexSourceId {
252253
fn from(indexing_task: IndexingTask) -> Self {
253254
Self {
254255
index_id: indexing_task.index_id,
256+
incarnation_id: indexing_task.incarnation_id,
255257
source_id: indexing_task.source_id,
256258
}
257259
}
@@ -303,6 +305,7 @@ pub(crate) fn build_indexing_plan(
303305
indexing_tasks.push(IndexingTask {
304306
index_id: index_source_id.index_id.clone(),
305307
source_id: index_source_id.source_id.clone(),
308+
incarnation_id: index_source_id.incarnation_id.clone(),
306309
});
307310
}
308311
}
@@ -383,6 +386,7 @@ mod tests {
383386
let index_source_id = IndexSourceId {
384387
index_id: "one-source-index".to_string(),
385388
source_id: "source-0".to_string(),
389+
incarnation_id: "11111111111111111111111111".to_string(),
386390
};
387391
source_configs_map.insert(
388392
index_source_id.clone(),
@@ -405,6 +409,7 @@ mod tests {
405409
IndexingTask {
406410
index_id: index_source_id.index_id.to_string(),
407411
source_id: index_source_id.source_id.to_string(),
412+
incarnation_id: "11111111111111111111111111".to_string(),
408413
}
409414
);
410415
}
@@ -417,6 +422,7 @@ mod tests {
417422
let index_source_id = IndexSourceId {
418423
index_id: "ingest-api-index".to_string(),
419424
source_id: INGEST_API_SOURCE_ID.to_string(),
425+
incarnation_id: "11111111111111111111111111".to_string(),
420426
};
421427
source_configs_map.insert(
422428
index_source_id.clone(),
@@ -439,6 +445,7 @@ mod tests {
439445
IndexingTask {
440446
index_id: index_source_id.index_id.to_string(),
441447
source_id: index_source_id.source_id.to_string(),
448+
incarnation_id: "11111111111111111111111111".to_string(),
442449
}
443450
);
444451
}
@@ -451,14 +458,17 @@ mod tests {
451458
let file_index_source_id = IndexSourceId {
452459
index_id: "one-source-index".to_string(),
453460
source_id: "file-source".to_string(),
461+
incarnation_id: "11111111111111111111111111".to_string(),
454462
};
455463
let cli_ingest_index_source_id = IndexSourceId {
456464
index_id: "second-source-index".to_string(),
457465
source_id: CLI_INGEST_SOURCE_ID.to_string(),
466+
incarnation_id: "11111111111111111111111111".to_string(),
458467
};
459468
let kafka_index_source_id = IndexSourceId {
460469
index_id: "third-source-index".to_string(),
461470
source_id: "kafka-source".to_string(),
471+
incarnation_id: "11111111111111111111111111".to_string(),
462472
};
463473
source_configs_map.insert(
464474
file_index_source_id.clone(),
@@ -511,10 +521,12 @@ mod tests {
511521
let kafka_index_source_id_1 = IndexSourceId {
512522
index_id: index_1.to_string(),
513523
source_id: source_1.to_string(),
524+
incarnation_id: "11111111111111111111111111".to_string(),
514525
};
515526
let kafka_index_source_id_2 = IndexSourceId {
516527
index_id: index_2.to_string(),
517528
source_id: source_2.to_string(),
529+
incarnation_id: "11111111111111111111111111".to_string(),
518530
};
519531
source_configs_map.insert(
520532
kafka_index_source_id_1.clone(),
@@ -543,12 +555,14 @@ mod tests {
543555
indexing_tasks.push(IndexingTask {
544556
index_id: index_1.to_string(),
545557
source_id: source_1.to_string(),
558+
incarnation_id: "11111111111111111111111111".to_string(),
546559
});
547560
}
548561
for _ in 0..2 {
549562
indexing_tasks.push(IndexingTask {
550563
index_id: index_2.to_string(),
551564
source_id: source_2.to_string(),
565+
incarnation_id: "11111111111111111111111111".to_string(),
552566
});
553567
}
554568

@@ -593,6 +607,7 @@ mod tests {
593607
let kafka_index_source_id_1 = IndexSourceId {
594608
index_id: index_1.to_string(),
595609
source_id: source_1.to_string(),
610+
incarnation_id: "11111111111111111111111111".to_string(),
596611
};
597612
source_configs_map.insert(
598613
kafka_index_source_id_1.clone(),
@@ -609,10 +624,12 @@ mod tests {
609624
IndexingTask {
610625
index_id: index_1.to_string(),
611626
source_id: source_1.to_string(),
627+
incarnation_id: "11111111111111111111111111".to_string(),
612628
},
613629
IndexingTask {
614630
index_id: index_1.to_string(),
615631
source_id: source_1.to_string(),
632+
incarnation_id: "11111111111111111111111111".to_string(),
616633
},
617634
];
618635

@@ -631,7 +648,7 @@ mod tests {
631648
let source_configs: HashMap<IndexSourceId, SourceConfig> = index_id_sources
632649
.into_iter()
633650
.map(|(index_id, source_config)| {
634-
(IndexSourceId { index_id, source_id: source_config.source_id.to_string() }, source_config)
651+
(IndexSourceId { index_id, source_id: source_config.source_id.to_string(), incarnation_id: "11111111111111111111111111".to_string(), }, source_config)
635652
})
636653
.collect();
637654
let mut indexing_tasks = build_indexing_plan(&indexers, &source_configs);

quickwit/quickwit-control-plane/src/scheduler.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ impl IndexingScheduler {
189189
(
190190
IndexSourceId {
191191
index_id: index_metadata.index_config.index_id.to_string(),
192+
incarnation_id: index_metadata.incarnation_id.to_string(),
192193
source_id,
193194
},
194195
source_config,
@@ -826,10 +827,12 @@ mod tests {
826827
let task_1 = IndexingTask {
827828
index_id: "index-1".to_string(),
828829
source_id: "source-1".to_string(),
830+
incarnation_id: "11111111111111111111111111".to_string(),
829831
};
830832
let task_2 = IndexingTask {
831833
index_id: "index-1".to_string(),
832834
source_id: "source-2".to_string(),
835+
incarnation_id: "11111111111111111111111111".to_string(),
833836
};
834837
running_plan.insert(
835838
"indexer-1".to_string(),
@@ -848,10 +851,12 @@ mod tests {
848851
let task_1 = IndexingTask {
849852
index_id: "index-1".to_string(),
850853
source_id: "source-1".to_string(),
854+
incarnation_id: "11111111111111111111111111".to_string(),
851855
};
852856
let task_2 = IndexingTask {
853857
index_id: "index-1".to_string(),
854858
source_id: "source-2".to_string(),
859+
incarnation_id: "11111111111111111111111111".to_string(),
855860
};
856861
running_plan.insert("indexer-1".to_string(), vec![task_1.clone()]);
857862
desired_plan.insert("indexer-1".to_string(), vec![task_2.clone()]);
@@ -876,10 +881,12 @@ mod tests {
876881
let task_1 = IndexingTask {
877882
index_id: "index-1".to_string(),
878883
source_id: "source-1".to_string(),
884+
incarnation_id: "11111111111111111111111111".to_string(),
879885
};
880886
let task_2 = IndexingTask {
881887
index_id: "index-2".to_string(),
882888
source_id: "source-2".to_string(),
889+
incarnation_id: "11111111111111111111111111".to_string(),
883890
};
884891
running_plan.insert("indexer-2".to_string(), vec![task_2.clone()]);
885892
desired_plan.insert("indexer-1".to_string(), vec![task_1.clone()]);
@@ -912,6 +919,7 @@ mod tests {
912919
let task_1 = IndexingTask {
913920
index_id: "index-1".to_string(),
914921
source_id: "source-1".to_string(),
922+
incarnation_id: "11111111111111111111111111".to_string(),
915923
};
916924
running_plan.insert("indexer-1".to_string(), vec![task_1.clone()]);
917925
desired_plan.insert(
@@ -935,6 +943,7 @@ mod tests {
935943
let task_1 = IndexingTask {
936944
index_id: "index-1".to_string(),
937945
source_id: "source-1".to_string(),
946+
incarnation_id: "11111111111111111111111111".to_string(),
938947
};
939948
running_plan.insert(
940949
"indexer-1".to_string(),

quickwit/quickwit-indexing/failpoints/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use quickwit_indexing::actors::MergeExecutor;
4848
use quickwit_indexing::merge_policy::MergeOperation;
4949
use quickwit_indexing::models::{IndexingPipelineId, MergeScratch, ScratchDirectory};
5050
use quickwit_indexing::{get_tantivy_directory_from_split_bundle, TestSandbox};
51-
use quickwit_metastore::{ListSplitsQuery, Split, SplitMetadata, SplitState};
51+
use quickwit_metastore::{IndexConfigId, ListSplitsQuery, Split, SplitMetadata, SplitState};
5252
use serde_json::Value as JsonValue;
5353
use tantivy::{Directory, Inventory};
5454

@@ -283,7 +283,7 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul
283283
tantivy_dirs,
284284
};
285285
let pipeline_id = IndexingPipelineId {
286-
index_id: index_id.to_string(),
286+
index_config_id: IndexConfigId::for_test(index_id.to_string()),
287287
source_id: "test-source".to_string(),
288288
node_id: "test-node".to_string(),
289289
pipeline_ord: 0,

0 commit comments

Comments
 (0)