Skip to content

Commit 35404c1

Browse files
authored
Merge pull request #55 from powersync-ja/feat/bucket-priorities
Support bucket with different priorities
2 parents 30e4ad2 + 4248310 commit 35404c1

14 files changed

+776
-215
lines changed

Diff for: crates/core/src/bucket_priority.rs

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use serde::{de::Visitor, Deserialize};
2+
use sqlite_nostd::ResultCode;
3+
4+
use crate::error::SQLiteError;
5+
6+
#[repr(transparent)]
7+
#[derive(Clone, Copy, PartialEq, Eq)]
8+
pub struct BucketPriority {
9+
pub number: i32,
10+
}
11+
12+
impl BucketPriority {
13+
pub fn may_publish_with_outstanding_uploads(self) -> bool {
14+
self == BucketPriority::HIGHEST
15+
}
16+
17+
pub const HIGHEST: BucketPriority = BucketPriority { number: 0 };
18+
19+
/// A low priority used to represent fully-completed sync operations across all priorities.
20+
pub const SENTINEL: BucketPriority = BucketPriority { number: i32::MAX };
21+
}
22+
23+
impl TryFrom<i32> for BucketPriority {
24+
type Error = SQLiteError;
25+
26+
fn try_from(value: i32) -> Result<Self, Self::Error> {
27+
if value < BucketPriority::HIGHEST.number || value == Self::SENTINEL.number {
28+
return Err(SQLiteError(
29+
ResultCode::MISUSE,
30+
Some("Invalid bucket priority".into()),
31+
));
32+
}
33+
34+
return Ok(BucketPriority { number: value });
35+
}
36+
}
37+
38+
impl Into<i32> for BucketPriority {
39+
fn into(self) -> i32 {
40+
self.number
41+
}
42+
}
43+
44+
impl PartialOrd<BucketPriority> for BucketPriority {
45+
fn partial_cmp(&self, other: &BucketPriority) -> Option<core::cmp::Ordering> {
46+
Some(self.number.partial_cmp(&other.number)?.reverse())
47+
}
48+
}
49+
50+
impl<'de> Deserialize<'de> for BucketPriority {
51+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
52+
where
53+
D: serde::Deserializer<'de>,
54+
{
55+
struct PriorityVisitor;
56+
impl<'de> Visitor<'de> for PriorityVisitor {
57+
type Value = BucketPriority;
58+
59+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
60+
formatter.write_str("a priority as an integer between 0 and 3 (inclusive)")
61+
}
62+
63+
fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
64+
where
65+
E: serde::de::Error,
66+
{
67+
BucketPriority::try_from(v).map_err(|e| E::custom(e.1.unwrap_or_default()))
68+
}
69+
70+
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
71+
where
72+
E: serde::de::Error,
73+
{
74+
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
75+
Self::visit_i32(self, i)
76+
}
77+
78+
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
79+
where
80+
E: serde::de::Error,
81+
{
82+
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
83+
Self::visit_i32(self, i)
84+
}
85+
}
86+
87+
deserializer.deserialize_i32(PriorityVisitor)
88+
}
89+
}

Diff for: crates/core/src/kv.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use sqlite::ResultCode;
99
use sqlite_nostd as sqlite;
1010
use sqlite_nostd::{Connection, Context};
1111

12+
use crate::bucket_priority::BucketPriority;
1213
use crate::create_sqlite_optional_text_fn;
1314
use crate::create_sqlite_text_fn;
1415
use crate::error::SQLiteError;
@@ -46,13 +47,14 @@ fn powersync_last_synced_at_impl(
4647
let db = ctx.db_handle();
4748

4849
// language=SQLite
49-
let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?;
50+
let statement = db.prepare_v2("select last_synced_at from ps_sync_state where priority = ?")?;
51+
statement.bind_int(1, BucketPriority::SENTINEL.into())?;
5052

5153
if statement.step()? == ResultCode::ROW {
5254
let client_id = statement.column_text(0)?;
53-
return Ok(Some(client_id.to_string()));
55+
Ok(Some(client_id.to_string()))
5456
} else {
55-
return Ok(None);
57+
Ok(None)
5658
}
5759
}
5860

Diff for: crates/core/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use core::ffi::{c_char, c_int};
1212
use sqlite::ResultCode;
1313
use sqlite_nostd as sqlite;
1414

15+
mod bucket_priority;
1516
mod checkpoint;
1617
mod crud_vtab;
1718
mod diff;

Diff for: crates/core/src/migrations.rs

+23
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use sqlite::ResultCode;
88
use sqlite_nostd as sqlite;
99
use sqlite_nostd::{Connection, Context};
1010

11+
use crate::bucket_priority::BucketPriority;
1112
use crate::error::{PSResult, SQLiteError};
1213
use crate::fix035::apply_v035_fix;
1314

@@ -310,5 +311,27 @@ json_array(
310311
.into_db_result(local_db)?;
311312
}
312313

314+
if current_version < 7 && target_version >= 7 {
315+
const SENTINEL_PRIORITY: i32 = BucketPriority::SENTINEL.number;
316+
let stmt = format!("\
317+
CREATE TABLE ps_sync_state (
318+
priority INTEGER NOT NULL,
319+
last_synced_at TEXT NOT NULL
320+
) STRICT;
321+
INSERT OR IGNORE INTO ps_sync_state (priority, last_synced_at)
322+
SELECT {}, value from ps_kv where key = 'last_synced_at';
323+
324+
INSERT INTO ps_migration(id, down_migrations)
325+
VALUES(7,
326+
json_array(
327+
json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = {}'),
328+
json_object('sql', 'DROP TABLE ps_sync_state'),
329+
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7')
330+
));
331+
", SENTINEL_PRIORITY, SENTINEL_PRIORITY);
332+
333+
local_db.exec_safe(&stmt).into_db_result(local_db)?;
334+
}
335+
313336
Ok(())
314337
}

Diff for: crates/core/src/operations.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ SELECT
1818
json_extract(e.value, '$.has_more') as has_more,
1919
json_extract(e.value, '$.after') as after,
2020
json_extract(e.value, '$.next_after') as next_after
21-
FROM json_each(json_extract(?, '$.buckets')) e",
21+
FROM json_each(json_extract(?1, '$.buckets')) e",
2222
)?;
2323
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
2424

Diff for: crates/core/src/operations_vtab.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -76,30 +76,29 @@ extern "C" fn update(
7676
} else if rowid.value_type() == sqlite::ColumnType::Null {
7777
// INSERT
7878
let op = args[2].text();
79-
let data = args[3].text();
8079

8180
let tab = unsafe { &mut *vtab.cast::<VirtualTable>() };
8281
let db = tab.db;
8382

8483
if op == "save" {
85-
let result = insert_operation(db, data);
84+
let result = insert_operation(db, args[3].text());
8685
vtab_result(vtab, result)
8786
} else if op == "sync_local" {
88-
let result = sync_local(db, data);
87+
let result = sync_local(db, &args[3]);
8988
if let Ok(result_row) = result {
9089
unsafe {
9190
*p_row_id = result_row;
9291
}
9392
}
9493
vtab_result(vtab, result)
9594
} else if op == "clear_remove_ops" {
96-
let result = clear_remove_ops(db, data);
95+
let result = clear_remove_ops(db, args[3].text());
9796
vtab_result(vtab, result)
9897
} else if op == "delete_pending_buckets" {
99-
let result = delete_pending_buckets(db, data);
98+
let result = delete_pending_buckets(db, args[3].text());
10099
vtab_result(vtab, result)
101100
} else if op == "delete_bucket" {
102-
let result = delete_bucket(db, data);
101+
let result = delete_bucket(db, args[3].text());
103102
vtab_result(vtab, result)
104103
} else {
105104
ResultCode::MISUSE as c_int

0 commit comments

Comments
 (0)