Skip to content

Commit b969d92

Browse files
committed
Handle sync lines with escapes
1 parent 8f91f28 commit b969d92

File tree

4 files changed

+41
-22
lines changed

4 files changed

+41
-22
lines changed

crates/core/src/sync/line.rs

+27-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}
1515
use super::bucket_priority::BucketPriority;
1616
use super::Checksum;
1717

18+
/// While we would like to always borrow strings for efficiency, that's not consistently possible.
19+
/// With the JSON decoder, borrowing from input data is only possible when the string contains no
20+
/// escape sequences (otherwise, the string is not a direct view of input data and we need an
21+
/// internal copy).
22+
type SyncLineStr<'a> = Cow<'a, str>;
23+
1824
#[derive(Deserialize, Debug)]
1925

2026
pub enum SyncLine<'a> {
@@ -53,7 +59,7 @@ pub struct CheckpointDiff<'a> {
5359
#[serde(borrow)]
5460
pub updated_buckets: Vec<BucketChecksum<'a>>,
5561
#[serde(borrow)]
56-
pub removed_buckets: Vec<&'a str>,
62+
pub removed_buckets: Vec<SyncLineStr<'a>>,
5763
#[serde(default)]
5864
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
5965
pub write_checkpoint: Option<i64>,
@@ -74,7 +80,8 @@ pub struct CheckpointPartiallyComplete {
7480

7581
#[derive(Deserialize, Debug)]
7682
pub struct BucketChecksum<'a> {
77-
pub bucket: &'a str,
83+
#[serde(borrow)]
84+
pub bucket: SyncLineStr<'a>,
7885
pub checksum: Checksum,
7986
#[serde(default)]
8087
pub priority: Option<BucketPriority>,
@@ -87,14 +94,15 @@ pub struct BucketChecksum<'a> {
8794

8895
#[derive(Deserialize, Debug)]
8996
pub struct DataLine<'a> {
90-
pub bucket: &'a str,
97+
#[serde(borrow)]
98+
pub bucket: SyncLineStr<'a>,
9199
pub data: Vec<OplogEntry<'a>>,
92100
// #[serde(default)]
93101
// pub has_more: bool,
94102
// #[serde(default, borrow)]
95-
// pub after: Option<&'a str>,
103+
// pub after: Option<SyncLineStr<'a>>,
96104
// #[serde(default, borrow)]
97-
// pub next_after: Option<&'a str>,
105+
// pub next_after: Option<SyncLineStr<'a>>,
98106
}
99107

100108
#[derive(Deserialize, Debug)]
@@ -104,11 +112,11 @@ pub struct OplogEntry<'a> {
104112
pub op_id: i64,
105113
pub op: OpType,
106114
#[serde(default, borrow)]
107-
pub object_id: Option<&'a str>,
115+
pub object_id: Option<SyncLineStr<'a>>,
108116
#[serde(default, borrow)]
109-
pub object_type: Option<&'a str>,
117+
pub object_type: Option<SyncLineStr<'a>>,
110118
#[serde(default, borrow)]
111-
pub subkey: Option<&'a str>,
119+
pub subkey: Option<SyncLineStr<'a>>,
112120
#[serde(default, borrow)]
113121
pub data: Option<OplogData<'a>>,
114122
}
@@ -367,6 +375,17 @@ mod tests {
367375
assert_eq!(diff.removed_buckets.len(), 0);
368376
}
369377

378+
#[test]
379+
fn parse_checkpoint_diff_escape() {
380+
let SyncLine::CheckpointDiff(diff) = deserialize(
381+
r#"{"checkpoint_diff": {"last_op_id": "10", "buckets": [], "updated_buckets": [], "removed_buckets": ["foo\""], "write_checkpoint": null}}"#,
382+
) else {
383+
panic!("Expected checkpoint diff")
384+
};
385+
386+
assert_eq!(diff.removed_buckets[0], "foo\"");
387+
}
388+
370389
#[test]
371390
fn parse_checkpoint_diff_no_write_checkpoint() {
372391
let SyncLine::CheckpointDiff(diff) = deserialize(

crates/core/src/sync/operations.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub fn insert_bucket_operations(
2323
let BucketInfo {
2424
id: bucket_id,
2525
last_applied_op,
26-
} = adapter.lookup_bucket(line.bucket)?;
26+
} = adapter.lookup_bucket(&*line.bucket)?;
2727

2828
// This is an optimization for initial sync - we can avoid persisting individual REMOVE
2929
// operations when last_applied_op = 0.
@@ -65,10 +65,10 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
6565
match data.op {
6666
OpType::PUT | OpType::REMOVE => {
6767
let key: Cow<'static, str> = if let (Some(object_type), Some(object_id)) =
68-
(data.object_type, data.object_id)
68+
(&data.object_type, &data.object_id)
6969
{
70-
let subkey = data.subkey.unwrap_or("null");
71-
Cow::Owned(format!("{}/{}/{}", &object_type, &object_id, subkey))
70+
let subkey = data.subkey.as_ref().map(|i| &**i).unwrap_or("null");
71+
Cow::Owned(format!("{}/{}/{}", &*object_type, &*object_id, subkey))
7272
} else {
7373
Cow::Borrowed("")
7474
};
@@ -101,16 +101,16 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
101101

102102
if !should_skip_remove {
103103
if let (Some(object_type), Some(object_id)) =
104-
(data.object_type, data.object_id)
104+
(&data.object_type, &data.object_id)
105105
{
106106
updated_row_statement.bind_text(
107107
1,
108-
object_type,
108+
&*object_type,
109109
sqlite::Destructor::STATIC,
110110
)?;
111111
updated_row_statement.bind_text(
112112
2,
113-
object_id,
113+
&*object_id,
114114
sqlite::Destructor::STATIC,
115115
)?;
116116
updated_row_statement.exec()?;
@@ -127,9 +127,9 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
127127
insert_statement.bind_null(3)?;
128128
}
129129

130-
if let (Some(object_type), Some(object_id)) = (data.object_type, data.object_id) {
131-
insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?;
132-
insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?;
130+
if let (Some(object_type), Some(object_id)) = (&data.object_type, &data.object_id) {
131+
insert_statement.bind_text(4, &*object_type, sqlite::Destructor::STATIC)?;
132+
insert_statement.bind_text(5, &*object_id, sqlite::Destructor::STATIC)?;
133133
} else {
134134
insert_statement.bind_null(4)?;
135135
insert_statement.bind_null(5)?;

crates/core/src/sync/streaming_sync.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl StreamingSyncIteration {
290290
target.apply_diff(&diff);
291291
validated_but_not_applied = None;
292292
self.adapter
293-
.delete_buckets(diff.removed_buckets.iter().copied())?;
293+
.delete_buckets(diff.removed_buckets.iter().map(|i| &**i))?;
294294

295295
let progress = self.load_progress(target)?;
296296
self.status.update(
@@ -493,7 +493,7 @@ impl SyncTarget {
493493
let mut buckets = BTreeMap::<String, OwnedBucketChecksum>::new();
494494
for bucket in &checkpoint.buckets {
495495
buckets.insert(bucket.bucket.to_string(), OwnedBucketChecksum::from(bucket));
496-
to_delete.remove(bucket.bucket);
496+
to_delete.remove(&*bucket.bucket);
497497
}
498498

499499
*self = SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets));
@@ -522,7 +522,7 @@ impl OwnedCheckpoint {
522522

523523
fn apply_diff<'a>(&mut self, diff: &CheckpointDiff<'a>) {
524524
for removed in &diff.removed_buckets {
525-
self.buckets.remove(*removed);
525+
self.buckets.remove(&**removed);
526526
}
527527

528528
for updated in &diff.updated_buckets {

crates/core/src/sync/sync_status.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl SyncDownloadProgress {
206206
}
207207

208208
pub fn increment_download_count(&mut self, line: &DataLine) {
209-
if let Some(info) = self.buckets.get_mut(line.bucket) {
209+
if let Some(info) = self.buckets.get_mut(&*line.bucket) {
210210
info.since_last += line.data.len() as i64
211211
}
212212
}

0 commit comments

Comments
 (0)