Skip to content

Commit 3e1196c

Browse files
committed
More stream management tests
1 parent f915d6b commit 3e1196c

File tree

4 files changed

+79
-3
lines changed

4 files changed

+79
-3
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ debug = true
2323

2424
[profile.wasm]
2525
inherits = "release"
26-
debug = true
2726

2827
[profile.wasm_asyncify]
2928
inherits = "wasm"

crates/core/src/sync/streaming_sync.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ impl StreamingSyncIteration {
578578
// We will mark it as active again if it's part of the streams included in the
579579
// checkpoint.
580580
sub.active = false;
581+
sub.is_default = false;
581582

582583
tracked_subscriptions.push(sub);
583584
})?;
@@ -614,7 +615,7 @@ impl StreamingSyncIteration {
614615
}
615616
}
616617
tracked_subscriptions
617-
.retain(|subscription| !subscription.is_default || subscription.active);
618+
.retain(|subscription| subscription.has_subscribed_manually() || subscription.active);
618619

619620
debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id));
620621

crates/core/src/sync/subscriptions.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pub struct SubscribeToStream {
7373
#[derive(Deserialize)]
7474
pub struct UnsubscribeFromStream {
7575
pub stream: String,
76+
#[serde(default)]
7677
pub params: Option<Box<serde_json::value::RawValue>>,
7778
pub immediate: bool,
7879
}
@@ -84,7 +85,7 @@ pub fn apply_subscriptions(
8485
match subscription {
8586
SubscriptionChangeRequest::Subscribe(subscription) => {
8687
let stmt = db
87-
.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4, is_default = FALSE")
88+
.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4")
8889
.into_db_result(db)?;
8990

9091
stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?;

dart/test/sync_stream_test.dart

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ void main() {
175175

176176
var [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
177177
expect(stored, containsPair('is_default', 1));
178+
expect(stored, containsPair('ttl', isNull));
178179

179180
control(
180181
'subscriptions',
@@ -185,7 +186,29 @@ void main() {
185186

186187
[stored] = db.select('SELECT * FROM ps_stream_subscriptions');
187188
expect(stored, containsPair('active', 1));
189+
// It's still a default stream, but it now has a TTL to indicate the
190+
// explicit subscription.
191+
expect(stored, containsPair('is_default', 1));
192+
expect(stored, containsPair('ttl', isNotNull));
193+
194+
// Remove the stream from the checkpoint, should still be included due to
195+
// the explicit subscription.
196+
control(
197+
'line_text',
198+
json.encode(
199+
checkpoint(
200+
lastOpId: 1,
201+
buckets: [
202+
bucketDescription('a', priority: 1),
203+
],
204+
),
205+
),
206+
);
207+
208+
[stored] = db.select('SELECT * FROM ps_stream_subscriptions');
209+
expect(stored, containsPair('active', 0));
188210
expect(stored, containsPair('is_default', 0));
211+
expect(stored, containsPair('ttl', isNotNull));
189212
});
190213
});
191214

@@ -318,5 +341,57 @@ void main() {
318341
),
319342
);
320343
});
344+
345+
syncTest('can be made implicit', (_) {
346+
control(
347+
'subscriptions',
348+
json.encode({
349+
'subscribe': {'stream': 'a'}
350+
}));
351+
control('start', null);
352+
control(
353+
'line_text',
354+
json.encode(
355+
checkpoint(
356+
lastOpId: 1,
357+
buckets: [],
358+
streams: [('a', true)],
359+
),
360+
),
361+
);
362+
363+
var [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
364+
expect(stored, containsPair('is_default', 1));
365+
expect(stored, containsPair('ttl', isNotNull));
366+
367+
control(
368+
'subscriptions',
369+
json.encode({
370+
'unsubscribe': {'stream': 'a', 'immediate': false}
371+
}),
372+
);
373+
control('stop', null);
374+
375+
// The stream should no longer be requested
376+
var startInstructions = control('start', null);
377+
expect(
378+
startInstructions,
379+
contains(
380+
containsPair(
381+
'EstablishSyncStream',
382+
containsPair(
383+
'request',
384+
containsPair(
385+
'streams',
386+
{
387+
'include_defaults': true,
388+
'subscriptions': isEmpty,
389+
},
390+
),
391+
),
392+
),
393+
),
394+
);
395+
});
321396
});
322397
}

0 commit comments

Comments
 (0)