Skip to content

Commit a93c2e5

Browse files
committed
New protocol format
1 parent 2765848 commit a93c2e5

File tree

5 files changed

+69
-45
lines changed

5 files changed

+69
-45
lines changed

crates/core/src/sync/checkpoint.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use alloc::{string::String, vec::Vec};
1+
use alloc::{rc::Rc, string::String, vec::Vec};
22
use num_traits::Zero;
33

44
use crate::sync::{
@@ -15,7 +15,7 @@ pub struct OwnedBucketChecksum {
1515
pub checksum: Checksum,
1616
pub priority: BucketPriority,
1717
pub count: Option<i64>,
18-
pub subscriptions: BucketSubscriptionReason,
18+
pub subscriptions: Rc<Vec<BucketSubscriptionReason>>,
1919
}
2020

2121
impl OwnedBucketChecksum {

crates/core/src/sync/line.rs

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use alloc::borrow::Cow;
2+
use alloc::rc::Rc;
23
use alloc::string::{String, ToString};
34
use alloc::vec::Vec;
45
use serde::de::{Error, IgnoredAny, VariantAccess, Visitor};
@@ -130,23 +131,19 @@ pub struct BucketChecksum<'a> {
130131
#[serde(default)]
131132
pub count: Option<i64>,
132133
#[serde(default)]
133-
pub subscriptions: BucketSubscriptionReason,
134+
pub subscriptions: Rc<Vec<BucketSubscriptionReason>>,
134135
// #[serde(default)]
135136
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
136137
// pub last_op_id: Option<i64>,
137138
}
138139

139140
/// The reason for why a bucket was included in a checkpoint.
140-
#[derive(Debug, Default, Clone)]
141+
#[derive(Debug)]
141142
pub enum BucketSubscriptionReason {
142-
/// A bucket was created for all of the subscription ids we've explicitly requested in the sync
143-
/// request.
144-
ExplicitlySubscribed { subscriptions: Vec<i64> },
145143
/// A bucket was created from a default stream.
146-
IsDefault { stream_name: String },
147-
/// We're talking to an older sync service not sending the reason.
148-
#[default]
149-
Unknown,
144+
DerivedFromDefaultStream(String),
145+
/// A bucket was created for a subscription id we've explicitly requested in the sync request.
146+
DerivedFromExplicitSubscription(i64),
150147
}
151148

152149
impl<'de> Deserialize<'de> for BucketSubscriptionReason {
@@ -156,37 +153,38 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason {
156153
{
157154
struct MyVisitor;
158155

156+
const VARIANTS: &'static [&'static str] = &["def", "sub"];
157+
159158
impl<'de> Visitor<'de> for MyVisitor {
160159
type Value = BucketSubscriptionReason;
161160

162161
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
163162
write!(formatter, "a subscription reason")
164163
}
165164

166-
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
165+
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
167166
where
168-
A: serde::de::SeqAccess<'de>,
167+
A: serde::de::EnumAccess<'de>,
169168
{
170-
let mut subscriptions = Vec::<i64>::new();
171-
172-
while let Some(item) = seq.next_element::<&'de str>()? {
173-
subscriptions.push(item.parse().map_err(|_| A::Error::custom("not an int"))?);
174-
}
175-
176-
Ok(BucketSubscriptionReason::ExplicitlySubscribed { subscriptions })
177-
}
169+
let (key, variant) = data.variant::<&'de str>()?;
170+
Ok(match key {
171+
"def" => BucketSubscriptionReason::DerivedFromDefaultStream(
172+
variant.newtype_variant()?,
173+
),
174+
"sub" => {
175+
let textual_id = variant.newtype_variant::<&'de str>()?;
176+
let id = textual_id
177+
.parse()
178+
.map_err(|_| A::Error::custom("not an int"))?;
178179

179-
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
180-
where
181-
E: serde::de::Error,
182-
{
183-
Ok(BucketSubscriptionReason::IsDefault {
184-
stream_name: v.to_string(),
180+
BucketSubscriptionReason::DerivedFromExplicitSubscription(id)
181+
}
182+
other => return Err(A::Error::unknown_variant(other, VARIANTS)),
185183
})
186184
}
187185
}
188186

189-
deserializer.deserialize_any(MyVisitor)
187+
deserializer.deserialize_enum("BucketSubscriptionReason", VARIANTS, MyVisitor)
190188
}
191189
}
192190

crates/core/src/sync/streaming_sync.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -635,22 +635,23 @@ impl StreamingSyncIteration {
635635

636636
// Iterate over buckets to associate them with subscriptions
637637
for bucket in tracked.checkpoint.buckets.values() {
638-
match &bucket.subscriptions {
639-
BucketSubscriptionReason::ExplicitlySubscribed { subscriptions } => {
640-
for subscription_id in subscriptions {
641-
if let Ok(index) =
642-
tracked_subscriptions.binary_search_by_key(subscription_id, |s| s.id)
643-
{
644-
resolved[index].mark_associated_with_bucket(&bucket);
645-
}
638+
for reason in &*bucket.subscriptions {
639+
let subscription_index = match reason {
640+
BucketSubscriptionReason::DerivedFromDefaultStream(stream_name) => {
641+
default_stream_subscriptions
642+
.get(stream_name.as_str())
643+
.cloned()
646644
}
647-
}
648-
BucketSubscriptionReason::IsDefault { stream_name } => {
649-
if let Some(index) = default_stream_subscriptions.get(stream_name.as_str()) {
650-
resolved[*index].mark_associated_with_bucket(&bucket);
645+
BucketSubscriptionReason::DerivedFromExplicitSubscription(subscription_id) => {
646+
tracked_subscriptions
647+
.binary_search_by_key(subscription_id, |s| s.id)
648+
.ok()
651649
}
650+
};
651+
652+
if let Some(index) = subscription_index {
653+
resolved[index].mark_associated_with_bucket(&bucket);
652654
}
653-
BucketSubscriptionReason::Unknown => {}
654655
}
655656
}
656657

crates/core/src/sync/sync_status.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ pub struct ActiveStreamSubscription {
278278
pub priority: Option<BucketPriority>,
279279
pub active: bool,
280280
pub is_default: bool,
281+
pub has_explicit_subscription: bool,
281282
pub expires_at: Option<Timestamp>,
282283
pub last_synced_at: Option<Timestamp>,
283284
}
@@ -292,13 +293,25 @@ impl ActiveStreamSubscription {
292293
priority: None,
293294
associated_buckets: Vec::new(),
294295
active: local.active,
296+
has_explicit_subscription: local.has_subscribed_manually(),
295297
expires_at: local.expires_at.clone().map(|e| Timestamp(e)),
296298
last_synced_at: local.last_synced_at.map(|e| Timestamp(e)),
297299
}
298300
}
299301

300302
pub fn mark_associated_with_bucket(&mut self, bucket: &OwnedBucketChecksum) {
301-
self.associated_buckets.push(bucket.bucket.clone());
303+
match self.associated_buckets.binary_search(&bucket.bucket) {
304+
Ok(_) => {
305+
// The bucket is already part of the list
306+
return;
307+
}
308+
Err(position) => {
309+
// Insert here to keep vec sorted
310+
self.associated_buckets
311+
.insert(position, bucket.bucket.clone());
312+
}
313+
};
314+
302315
self.priority = Some(match self.priority {
303316
None => bucket.priority,
304317
Some(prio) => min(prio, bucket.priority),

dart/test/sync_stream_test.dart

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ void main() {
7272
lastOpId: 1,
7373
buckets: [
7474
bucketDescription('a',
75-
subscriptions: 'my_default_stream', priority: 1),
75+
subscriptions: [
76+
{'def': 'my_default_stream'}
77+
],
78+
priority: 1),
7679
],
7780
streams: [('my_default_stream', true)],
7881
),
@@ -90,6 +93,7 @@ void main() {
9093
'associated_buckets': ['a'],
9194
'active': true,
9295
'is_default': true,
96+
'has_explicit_subscription': false,
9397
'expires_at': null,
9498
'last_synced_at': null,
9599
'priority': 1,
@@ -125,7 +129,11 @@ void main() {
125129
checkpoint(
126130
lastOpId: 1,
127131
buckets: [
128-
bucketDescription('a', subscriptions: stream, priority: 1),
132+
bucketDescription('a',
133+
subscriptions: [
134+
{'def': stream}
135+
],
136+
priority: 1),
129137
],
130138
streams: [(stream, true)],
131139
),
@@ -154,7 +162,11 @@ void main() {
154162
checkpoint(
155163
lastOpId: 1,
156164
buckets: [
157-
bucketDescription('a', subscriptions: 'a', priority: 1),
165+
bucketDescription('a',
166+
subscriptions: [
167+
{'def': 'a'}
168+
],
169+
priority: 1),
158170
],
159171
streams: [('a', true)],
160172
),

0 commit comments

Comments
 (0)