Skip to content

Commit 7a29318

Browse files
authored
Exposing published position thru chitchat (#4110)
* Exposing published positions through chitchat. The info is transmitted via an EventSubscriber at the same time as regular truncate: upon fetching the starting offset when assigning a shard to an IngestSource upon suggest truncate. This PR does not include any code to remove sources yet. Refactoring: Moving SourceUid to quickwit-proto Making it more difficult to misuse pubsub - Added #[must_use] for methods returning the handle - Internalizing the `Clone` mecanic. A "normal" Clone could yield unexpected result. * Apply suggestions from code review Co-authored-by: Adrien Guillo <[email protected]> * Improved doc in unit test
1 parent a65997a commit 7a29318

File tree

18 files changed

+758
-90
lines changed

18 files changed

+758
-90
lines changed

quickwit/quickwit-cluster/src/cluster.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ impl Cluster {
238238
.set(key, value);
239239
}
240240

241+
pub async fn get_self_key_value(&self, key: &str) -> Option<String> {
242+
self.chitchat()
243+
.await
244+
.lock()
245+
.await
246+
.self_node_state()
247+
.get_versioned(key)
248+
.map(|versioned_value| versioned_value.value.clone())
249+
}
250+
241251
/// Waits until the predicate holds true for the set of ready members.
242252
pub async fn wait_for_ready_members<F>(
243253
&self,

quickwit/quickwit-common/src/pubsub.rs

Lines changed: 100 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,54 @@
1919

2020
use std::collections::HashMap;
2121
use std::fmt;
22+
use std::fmt::Formatter;
23+
use std::marker::PhantomData;
2224
use std::sync::atomic::{AtomicUsize, Ordering};
2325
use std::sync::{Arc, Mutex, Weak};
2426
use std::time::Duration;
2527

2628
use async_trait::async_trait;
29+
use tokio::sync::Mutex as TokioMutex;
2730

2831
use crate::type_map::TypeMap;
2932

3033
pub trait Event: fmt::Debug + Clone + Send + Sync + 'static {}
3134

3235
#[async_trait]
33-
pub trait EventSubscriber<E>: fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static {
36+
pub trait EventSubscriber<E>: Send + Sync + 'static {
3437
async fn handle_event(&mut self, event: E);
3538
}
3639

37-
dyn_clone::clone_trait_object!(<E> EventSubscriber<E>);
40+
struct ClosureSubscriber<E, F> {
41+
callback: Arc<F>,
42+
_phantom: PhantomData<E>,
43+
}
44+
45+
impl<E, F> Clone for ClosureSubscriber<E, F> {
46+
fn clone(&self) -> Self {
47+
ClosureSubscriber {
48+
callback: self.callback.clone(),
49+
_phantom: self._phantom,
50+
}
51+
}
52+
}
53+
54+
impl<E, F> fmt::Debug for ClosureSubscriber<E, F> {
55+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
56+
f.debug_struct("ClosureSubscriber")
57+
.field("callback", &std::any::type_name::<F>())
58+
.finish()
59+
}
60+
}
61+
62+
#[async_trait]
63+
impl<E: Sync + Send + 'static, F: Fn(E) + Sync + Send + 'static> EventSubscriber<E>
64+
for ClosureSubscriber<E, F>
65+
{
66+
async fn handle_event(&mut self, event: E) {
67+
(self.callback)(event);
68+
}
69+
}
3870

3971
type EventSubscriptions<E> = HashMap<usize, EventSubscription<E>>;
4072

@@ -59,6 +91,7 @@ struct InnerEventBroker {
5991

6092
impl EventBroker {
6193
/// Subscribes to an event type.
94+
#[must_use]
6295
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
6396
where E: Event {
6497
let mut subscriptions = self
@@ -76,8 +109,7 @@ impl EventBroker {
76109
.fetch_add(1, Ordering::Relaxed);
77110

78111
let subscription = EventSubscription {
79-
subscription_id,
80-
subscriber: Box::new(subscriber),
112+
subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))),
81113
};
82114
let typed_subscriptions = subscriptions
83115
.get_mut::<EventSubscriptions<E>>()
@@ -99,6 +131,21 @@ impl EventBroker {
99131
}
100132
}
101133

134+
/// Subscribes to an event with a callback function.
135+
#[must_use]
136+
pub fn subscribe_fn<E>(
137+
&self,
138+
callback_fn: impl Fn(E) + Sync + Send + 'static,
139+
) -> EventSubscriptionHandle
140+
where
141+
E: Event,
142+
{
143+
self.subscribe(ClosureSubscriber {
144+
callback: Arc::new(callback_fn),
145+
_phantom: Default::default(),
146+
})
147+
}
148+
102149
/// Publishes an event.
103150
pub fn publish<E>(&self, event: E)
104151
where E: Event {
@@ -111,20 +158,18 @@ impl EventBroker {
111158
if let Some(typed_subscriptions) = subscriptions.get::<EventSubscriptions<E>>() {
112159
for subscription in typed_subscriptions.values() {
113160
let event = event.clone();
114-
let mut subscriber = subscription.subscriber.clone();
161+
let subscriber_clone = subscription.subscriber.clone();
115162
tokio::spawn(tokio::time::timeout(Duration::from_secs(600), async move {
116-
subscriber.handle_event(event).await;
163+
let mut subscriber_lock = subscriber_clone.lock().await;
164+
subscriber_lock.handle_event(event).await;
117165
}));
118166
}
119167
}
120168
}
121169
}
122170

123-
#[derive(Debug)]
124171
struct EventSubscription<E> {
125-
#[allow(dead_code)]
126-
subscription_id: usize, // Used for the `Debug` implementation.
127-
subscriber: Box<dyn EventSubscriber<E>>,
172+
subscriber: Arc<TokioMutex<Box<dyn EventSubscriber<E>>>>,
128173
}
129174

130175
pub struct EventSubscriptionHandle {
@@ -135,6 +180,12 @@ pub struct EventSubscriptionHandle {
135180

136181
impl EventSubscriptionHandle {
137182
pub fn cancel(self) {}
183+
184+
/// By default, dropping an event cancels the subscription.
185+
/// `forever` consumes the handle and avoid drop
186+
pub fn forever(mut self) {
187+
self.broker = Weak::new();
188+
}
138189
}
139190

140191
impl Drop for EventSubscriptionHandle {
@@ -184,15 +235,52 @@ mod tests {
184235
let event = MyEvent { value: 42 };
185236
event_broker.publish(event);
186237

187-
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
238+
tokio::time::sleep(Duration::from_millis(1)).await;
188239
assert_eq!(counter.load(Ordering::Relaxed), 42);
189240

190241
subscription_handle.cancel();
191242

192243
let event = MyEvent { value: 1337 };
193244
event_broker.publish(event);
194245

195-
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
246+
tokio::time::sleep(Duration::from_millis(1)).await;
196247
assert_eq!(counter.load(Ordering::Relaxed), 42);
197248
}
249+
250+
#[tokio::test]
251+
async fn test_event_broker_handle_drop() {
252+
let event_broker = EventBroker::default();
253+
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
254+
drop(event_broker.subscribe_fn::<MyEvent>(move |event| {
255+
tx.send(event.value).unwrap();
256+
}));
257+
event_broker.publish(MyEvent { value: 42 });
258+
assert!(rx.recv().await.is_none());
259+
}
260+
261+
#[tokio::test]
262+
async fn test_event_broker_handle_cancel() {
263+
let event_broker = EventBroker::default();
264+
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
265+
event_broker
266+
.subscribe_fn::<MyEvent>(move |event| {
267+
tx.send(event.value).unwrap();
268+
})
269+
.cancel();
270+
event_broker.publish(MyEvent { value: 42 });
271+
assert!(rx.recv().await.is_none());
272+
}
273+
274+
#[tokio::test]
275+
async fn test_event_broker_handle_forever() {
276+
let event_broker = EventBroker::default();
277+
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
278+
event_broker
279+
.subscribe_fn::<MyEvent>(move |event| {
280+
tx.send(event.value).unwrap();
281+
})
282+
.forever();
283+
event_broker.publish(MyEvent { value: 42 });
284+
assert_eq!(rx.recv().await, Some(42));
285+
}
198286
}

quickwit/quickwit-common/src/shared_consts.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ pub const DELETION_GRACE_PERIOD: Duration = Duration::from_secs(60 * 32); // 32
3535
/// In order to amortized search with scroll, we fetch more documents than are
3636
/// being requested.
3737
pub const SCROLL_BATCH_LEN: usize = 1_000;
38+
39+
/// Prefix used in chitchat to publish the shard positions.
40+
pub const SHARD_POSITIONS_PREFIX: &str = "shard_positions:";

quickwit/quickwit-common/src/tower/event_listener.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ mod tests {
129129

130130
impl Event for MyEvent {}
131131

132-
#[derive(Debug, Clone)]
133132
struct MySubscriber {
134133
counter: Arc<AtomicUsize>,
135134
}

quickwit/quickwit-control-plane/src/control_plane_model.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@ use quickwit_proto::metastore::{
3333
self, EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError,
3434
MetastoreService, MetastoreServiceClient, SourceType,
3535
};
36-
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
36+
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId, SourceUid};
3737
use serde::Serialize;
3838
use tracing::{error, info, warn};
3939

40-
use crate::SourceUid;
41-
4240
type NextShardId = ShardId;
4341
#[derive(Debug, Eq, PartialEq)]
4442
struct ShardTableEntry {

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,11 +470,9 @@ mod tests {
470470
use proptest::{prop_compose, proptest};
471471
use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams};
472472
use quickwit_metastore::IndexMetadata;
473-
use quickwit_proto::types::IndexUid;
473+
use quickwit_proto::types::{IndexUid, SourceUid};
474474

475475
use super::*;
476-
use crate::SourceUid;
477-
478476
#[test]
479477
fn test_indexing_plans_diff() {
480478
{

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::num::NonZeroU32;
2525

2626
use fnv::FnvHashMap;
2727
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
28-
use quickwit_proto::types::{IndexUid, ShardId};
28+
use quickwit_proto::types::{IndexUid, ShardId, SourceUid};
2929
use scheduling_logic_model::{IndexerOrd, SourceOrd};
3030
use tracing::error;
3131
use tracing::log::warn;
@@ -34,7 +34,6 @@ use crate::indexing_plan::PhysicalIndexingPlan;
3434
use crate::indexing_scheduler::scheduling::scheduling_logic_model::{
3535
SchedulingProblem, SchedulingSolution,
3636
};
37-
use crate::SourceUid;
3837

3938
/// If we have several pipelines below this threshold we
4039
/// reduce the number of pipelines.
@@ -530,13 +529,12 @@ mod tests {
530529

531530
use fnv::FnvHashMap;
532531
use quickwit_proto::indexing::{mcpu, IndexingTask};
533-
use quickwit_proto::types::{IndexUid, ShardId};
532+
use quickwit_proto::types::{IndexUid, ShardId, SourceUid};
534533

535534
use super::{
536535
build_physical_indexing_plan, group_shards_into_pipelines, indexing_task,
537536
spread_shards_optimally, SourceToSchedule, SourceToScheduleType,
538537
};
539-
use crate::SourceUid;
540538

541539
#[test]
542540
fn test_spread_shard_optimally() {

quickwit/quickwit-control-plane/src/lib.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,6 @@ pub(crate) mod metrics;
2626

2727
use quickwit_common::tower::Pool;
2828
use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, IndexingTask};
29-
use quickwit_proto::types::{IndexUid, SourceId};
30-
31-
/// It can however appear only once in a given index.
32-
/// In itself, `SourceId` is not unique, but the pair `(IndexUid, SourceId)` is.
33-
#[derive(PartialEq, Eq, Debug, PartialOrd, Ord, Hash, Clone)]
34-
pub struct SourceUid {
35-
pub index_uid: IndexUid,
36-
pub source_id: SourceId,
37-
}
3829

3930
/// Indexer-node specific information stored in the pool of available indexer nodes
4031
#[derive(Debug, Clone)]

quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ impl IndexingPipeline {
439439
ingester_pool: self.params.ingester_pool.clone(),
440440
queues_dir_path: self.params.queues_dir_path.clone(),
441441
storage_resolver: self.params.source_storage_resolver.clone(),
442+
event_broker: self.params.event_broker.clone(),
442443
}),
443444
source_checkpoint,
444445
))

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use quickwit_actors::{
3232
};
3333
use quickwit_cluster::Cluster;
3434
use quickwit_common::fs::get_cache_directory_path;
35-
use quickwit_common::pubsub::EventBroker;
35+
use quickwit_common::pubsub::{EventBroker, EventSubscriptionHandle};
3636
use quickwit_common::temp_dir;
3737
use quickwit_config::{
3838
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
@@ -56,7 +56,10 @@ use tracing::{debug, error, info, warn};
5656

5757
use super::merge_pipeline::{MergePipeline, MergePipelineParams};
5858
use super::MergePlanner;
59-
use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline};
59+
use crate::models::{
60+
DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, PublishedShardPositions,
61+
SpawnPipeline,
62+
};
6063
use crate::source::{AssignShards, Assignment};
6164
use crate::split_store::{LocalSplitStore, SplitStoreQuota};
6265
use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics};
@@ -117,6 +120,7 @@ pub struct IndexingService {
117120
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
118121
cooperative_indexing_permits: Option<Arc<Semaphore>>,
119122
event_broker: EventBroker,
123+
_event_subscription_handle: EventSubscriptionHandle,
120124
}
121125

122126
impl Debug for IndexingService {
@@ -144,6 +148,8 @@ impl IndexingService {
144148
storage_resolver: StorageResolver,
145149
event_broker: EventBroker,
146150
) -> anyhow::Result<IndexingService> {
151+
let published_shard_positions = PublishedShardPositions::new(cluster.clone());
152+
let event_subscription_handle = event_broker.subscribe(published_shard_positions);
147153
let split_store_space_quota = SplitStoreQuota::new(
148154
indexer_config.split_store_max_num_splits,
149155
indexer_config.split_store_max_num_bytes,
@@ -175,6 +181,7 @@ impl IndexingService {
175181
merge_pipeline_handles: HashMap::new(),
176182
cooperative_indexing_permits,
177183
event_broker,
184+
_event_subscription_handle: event_subscription_handle,
178185
})
179186
}
180187

@@ -418,10 +425,10 @@ impl IndexingService {
418425
let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self
419426
.indexing_pipelines
420427
.iter()
421-
.flat_map(|(pipeline, (_, pipeline_handle))| {
422-
let indexer_metrics = pipeline_handle.last_observation();
423-
let pipeline_metrics = indexer_metrics.pipeline_metrics_opt?;
424-
Some((pipeline, pipeline_metrics))
428+
.filter_map(|(pipeline_id, (_, pipeline_handle))| {
429+
let indexing_statistics = pipeline_handle.last_observation();
430+
let pipeline_metrics = indexing_statistics.pipeline_metrics_opt?;
431+
Some((pipeline_id, pipeline_metrics))
425432
})
426433
.collect();
427434
self.cluster

quickwit/quickwit-indexing/src/actors/uploader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,6 @@ mod tests {
912912
Ok(())
913913
}
914914

915-
#[derive(Clone)]
916915
struct ReportSplitListener {
917916
report_splits_tx: flume::Sender<ReportSplitsRequest>,
918917
}

quickwit/quickwit-indexing/src/models/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod processed_doc;
3030
mod publish_lock;
3131
mod publisher_message;
3232
mod raw_doc_batch;
33+
mod shard_positions;
3334
mod split_attrs;
3435

3536
pub use indexed_split::{
@@ -49,6 +50,7 @@ pub use publish_lock::{NewPublishLock, PublishLock};
4950
pub use publisher_message::SplitsUpdate;
5051
use quickwit_proto::types::PublishToken;
5152
pub use raw_doc_batch::RawDocBatch;
53+
pub use shard_positions::{PublishedShardPositions, PublishedShardPositionsUpdate};
5254
pub use split_attrs::{create_split_metadata, SplitAttrs};
5355

5456
#[derive(Debug)]

0 commit comments

Comments
 (0)