Skip to content

Commit fb903e6

Browse files
authored
Removing needless generics of event subscription handle. (#4126)
The dispatch is made using a function pointer.
1 parent 69e92f3 commit fb903e6

File tree

2 files changed

+17
-21
lines changed

2 files changed

+17
-21
lines changed

quickwit/quickwit-common/src/pubsub.rs

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
use std::collections::HashMap;
2121
use std::fmt;
22-
use std::marker::PhantomData;
2322
use std::sync::atomic::{AtomicUsize, Ordering};
2423
use std::sync::{Arc, Mutex, Weak};
2524
use std::time::Duration;
@@ -60,7 +59,7 @@ struct InnerEventBroker {
6059

6160
impl EventBroker {
6261
/// Subscribes to an event type.
63-
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle<E>
62+
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
6463
where E: Event {
6564
let mut subscriptions = self
6665
.inner
@@ -84,11 +83,19 @@ impl EventBroker {
8483
.get_mut::<EventSubscriptions<E>>()
8584
.expect("The subscription map should exist.");
8685
typed_subscriptions.insert(subscription_id, subscription);
87-
8886
EventSubscriptionHandle {
8987
subscription_id,
9088
broker: Arc::downgrade(&self.inner),
91-
_phantom: PhantomData,
89+
drop_me: |subscription_id, broker| {
90+
let mut subscriptions = broker
91+
.subscriptions
92+
.lock()
93+
.expect("the lock should not be poisoned");
94+
if let Some(typed_subscriptions) = subscriptions.get_mut::<EventSubscriptions<E>>()
95+
{
96+
typed_subscriptions.remove(&subscription_id);
97+
}
98+
},
9299
}
93100
}
94101

@@ -120,31 +127,20 @@ struct EventSubscription<E> {
120127
subscriber: Box<dyn EventSubscriber<E>>,
121128
}
122129

123-
#[derive(Debug)]
124-
pub struct EventSubscriptionHandle<E: Event> {
130+
pub struct EventSubscriptionHandle {
125131
subscription_id: usize,
126132
broker: Weak<InnerEventBroker>,
127-
_phantom: PhantomData<E>,
133+
drop_me: fn(usize, &InnerEventBroker),
128134
}
129135

130-
impl<E> EventSubscriptionHandle<E>
131-
where E: Event
132-
{
136+
impl EventSubscriptionHandle {
133137
pub fn cancel(self) {}
134138
}
135139

136-
impl<E> Drop for EventSubscriptionHandle<E>
137-
where E: Event
138-
{
140+
impl Drop for EventSubscriptionHandle {
139141
fn drop(&mut self) {
140142
if let Some(broker) = self.broker.upgrade() {
141-
let mut subscriptions = broker
142-
.subscriptions
143-
.lock()
144-
.expect("the lock should not be poisoned");
145-
if let Some(typed_subscriptions) = subscriptions.get_mut::<EventSubscriptions<E>>() {
146-
typed_subscriptions.remove(&self.subscription_id);
147-
}
143+
(self.drop_me)(self.subscription_id, &broker);
148144
}
149145
}
150146
}

quickwit/quickwit-serve/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ struct QuickwitServices {
133133
/// The control plane listens to metastore events.
134134
/// We must maintain a reference to the subscription handles to continue receiving
135135
/// notifications. Otherwise, the subscriptions are dropped.
136-
_report_splits_subscription_handle_opt: Option<EventSubscriptionHandle<ReportSplitsRequest>>,
136+
_report_splits_subscription_handle_opt: Option<EventSubscriptionHandle>,
137137
}
138138

139139
fn has_node_with_metastore_service(members: &[ClusterMember]) -> bool {

0 commit comments

Comments
 (0)