@@ -62,18 +62,22 @@ FROM json_each(?) e",
6262 let supersede_statement = db. prepare_v2 (
6363 "\
6464 DELETE FROM ps_oplog
65- WHERE ps_oplog.superseded = 0
66- AND unlikely(ps_oplog.bucket = ?1)
65+ WHERE unlikely(ps_oplog.bucket = ?1)
6766 AND ps_oplog.key = ?2
6867RETURNING op_id, hash" ,
6968 ) ?;
7069 supersede_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
7170
7271 // language=SQLite
7372 let insert_statement = db. prepare_v2 ( "\
74- INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, superseded ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0 )") ?;
73+ INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)") ?;
7574 insert_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
7675
76+ let updated_row_statement = db. prepare_v2 (
77+ "\
78+ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
79+ ) ?;
80+
7781 // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
7882 // We can consider splitting this into separate SELECT and INSERT statements.
7983 // language=SQLite
@@ -98,7 +102,6 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
98102 let mut last_op: Option < i64 > = None ;
99103 let mut add_checksum: i32 = 0 ;
100104 let mut op_checksum: i32 = 0 ;
101- let mut remove_operations: i32 = 0 ;
102105
103106 while iterate_statement. step ( ) ? == ResultCode :: ROW {
104107 let op_id = iterate_statement. column_int64 ( 0 ) ?;
@@ -140,57 +143,75 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
140143 }
141144 supersede_statement. reset ( ) ?;
142145
143- let should_skip_remove = !superseded && op == "REMOVE" ;
144- if should_skip_remove {
145- // If a REMOVE statement did not replace (supersede) any previous
146- // operations, we do not need to persist it.
147- // The same applies if the bucket was not synced to the local db yet,
148- // even if it did supersede another operation.
149- // Handle the same as MOVE.
146+ if ( op == "REMOVE" ) {
147+ let should_skip_remove = !superseded;
148+
150149 add_checksum = add_checksum. wrapping_add ( checksum) ;
150+
151+ if !should_skip_remove {
152+ if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type, object_id) {
153+ updated_row_statement. bind_text (
154+ 1 ,
155+ object_type,
156+ sqlite:: Destructor :: STATIC ,
157+ ) ?;
158+ updated_row_statement. bind_text (
159+ 2 ,
160+ object_id,
161+ sqlite:: Destructor :: STATIC ,
162+ ) ?;
163+ updated_row_statement. exec ( ) ?;
164+ }
165+ }
166+
151167 continue ;
152168 }
153169
154- let opi = if op == "PUT" { 3 } else { 4 } ;
155170 insert_statement. bind_int64 ( 2 , op_id) ?;
156- insert_statement. bind_int ( 3 , opi) ?;
157171 if key != "" {
158- insert_statement. bind_text ( 4 , & key, sqlite:: Destructor :: STATIC ) ?;
172+ insert_statement. bind_text ( 3 , & key, sqlite:: Destructor :: STATIC ) ?;
159173 } else {
160- insert_statement. bind_null ( 4 ) ?;
174+ insert_statement. bind_null ( 3 ) ?;
161175 }
162176
163177 if let ( Ok ( object_type) , Ok ( object_id) ) = ( object_type, object_id) {
164- insert_statement. bind_text ( 5 , object_type, sqlite:: Destructor :: STATIC ) ?;
165- insert_statement. bind_text ( 6 , object_id, sqlite:: Destructor :: STATIC ) ?;
178+ insert_statement. bind_text ( 4 , object_type, sqlite:: Destructor :: STATIC ) ?;
179+ insert_statement. bind_text ( 5 , object_id, sqlite:: Destructor :: STATIC ) ?;
166180 } else {
181+ insert_statement. bind_null ( 4 ) ?;
167182 insert_statement. bind_null ( 5 ) ?;
168- insert_statement. bind_null ( 6 ) ?;
169183 }
170184 if let Ok ( data) = op_data {
171- insert_statement. bind_text ( 7 , data, sqlite:: Destructor :: STATIC ) ?;
185+ insert_statement. bind_text ( 6 , data, sqlite:: Destructor :: STATIC ) ?;
172186 } else {
173- insert_statement. bind_null ( 7 ) ?;
187+ insert_statement. bind_null ( 6 ) ?;
174188 }
175189
176- insert_statement. bind_int ( 8 , checksum) ?;
190+ insert_statement. bind_int ( 7 , checksum) ?;
177191 insert_statement. exec ( ) ?;
178192
179193 op_checksum = op_checksum. wrapping_add ( checksum) ;
180-
181- if opi == 4 {
182- // We persisted a REMOVE statement, so the bucket needs
183- // to be compacted at some point.
184- remove_operations += 1 ;
185- }
186194 } else if op == "MOVE" {
187195 add_checksum = add_checksum. wrapping_add ( checksum) ;
188196 } else if op == "CLEAR" {
189197 // Any remaining PUT operations should get an implicit REMOVE
190198 // language=SQLite
191- let clear_statement = db. prepare_v2 ( "UPDATE ps_oplog SET op=4, data=NULL, hash=0 WHERE (op=3 OR op=4) AND bucket=?1" ) . into_db_result ( db) ?;
192- clear_statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
193- clear_statement. exec ( ) ?;
199+ let clear_statement1 = db
200+ . prepare_v2 (
201+ "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
202+ SELECT row_type, row_id
203+ FROM ps_oplog
204+ WHERE bucket = ?1" ,
205+ )
206+ . into_db_result ( db) ?;
207+ clear_statement1. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
208+ clear_statement1. exec ( ) ?;
209+
210+ let clear_statement2 = db
211+ . prepare_v2 ( "DELETE FROM ps_oplog WHERE bucket = ?1" )
212+ . into_db_result ( db) ?;
213+ clear_statement2. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
214+ clear_statement2. exec ( ) ?;
194215
195216 // And we need to re-apply all of those.
196217 // We also replace the checksum with the checksum of the CLEAR op.
@@ -214,15 +235,13 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
214235 "UPDATE ps_buckets
215236 SET last_op = ?2,
216237 add_checksum = (add_checksum + ?3) & 0xffffffff,
217- op_checksum = (op_checksum + ?4) & 0xffffffff,
218- remove_operations = (remove_operations + ?5)
238+ op_checksum = (op_checksum + ?4) & 0xffffffff
219239 WHERE name = ?1" ,
220240 ) ?;
221241 statement. bind_text ( 1 , bucket, sqlite:: Destructor :: STATIC ) ?;
222242 statement. bind_int64 ( 2 , * last_op) ?;
223243 statement. bind_int ( 3 , add_checksum) ?;
224244 statement. bind_int ( 4 , op_checksum) ?;
225- statement. bind_int ( 5 , remove_operations) ?;
226245
227246 statement. exec ( ) ?;
228247 }
@@ -231,108 +250,39 @@ INSERT INTO ps_oplog(bucket, op_id, op, key, row_type, row_id, data, hash, super
231250}
232251
233252pub fn clear_remove_ops ( db : * mut sqlite:: sqlite3 , _data : & str ) -> Result < ( ) , SQLiteError > {
234- // language=SQLite
235- let statement = db. prepare_v2 (
236- "
237- SELECT
238- name,
239- last_applied_op,
240- (SELECT IFNULL(SUM(oplog.hash), 0)
241- FROM ps_oplog oplog
242- WHERE oplog.bucket = ps_buckets.name
243- AND oplog.op_id <= ps_buckets.last_applied_op
244- AND (oplog.superseded = 1 OR oplog.op != 3)
245- ) as checksum
246- FROM ps_buckets
247- WHERE ps_buckets.pending_delete = 0 AND
248- ps_buckets.remove_operations >= CASE
249- WHEN ?1 = '' THEN 1
250- ELSE IFNULL(?1 ->> 'threshold', 1)
251- END" ,
252- ) ?;
253- // Compact bucket if there are 50 or more operations
254- statement. bind_text ( 1 , _data, sqlite:: Destructor :: STATIC ) ;
255-
256- // language=SQLite
257- let update_statement = db. prepare_v2 (
258- "
259- UPDATE ps_buckets
260- SET add_checksum = (add_checksum + ?2) & 0xffffffff,
261- op_checksum = (op_checksum - ?2) & 0xffffffff,
262- remove_operations = 0
263- WHERE ps_buckets.name = ?1" ,
264- ) ?;
265-
266- // language=SQLite
267- let delete_statement = db. prepare_v2 (
268- "DELETE
269- FROM ps_oplog
270- WHERE (superseded = 1 OR op != 3)
271- AND bucket = ?1
272- AND op_id <= ?2" ,
273- ) ?;
274-
275- while statement. step ( ) ? == ResultCode :: ROW {
276- // Note: Each iteration here may be run in a separate transaction.
277- let name = statement. column_text ( 0 ) ?;
278- let last_applied_op = statement. column_int64 ( 1 ) ?;
279- let checksum = statement. column_int ( 2 ) ?;
280-
281- update_statement. bind_text ( 1 , name, sqlite:: Destructor :: STATIC ) ?;
282- update_statement. bind_int ( 2 , checksum) ?;
283- update_statement. exec ( ) ?;
284-
285- // Must use the same values as above
286- delete_statement. bind_text ( 1 , name, sqlite:: Destructor :: STATIC ) ?;
287- delete_statement. bind_int64 ( 2 , last_applied_op) ?;
288- delete_statement. exec ( ) ?;
289- }
253+ // No-op
290254
291255 Ok ( ( ) )
292256}
293257
294258pub fn delete_pending_buckets ( db : * mut sqlite:: sqlite3 , _data : & str ) -> Result < ( ) , SQLiteError > {
295- // language=SQLite
296- let statement = db. prepare_v2 (
297- "DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)" ) ?;
298- statement. exec ( ) ?;
299-
300- // language=SQLite
301- let statement = db. prepare_v2 ( "DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op" ) ?;
302- statement. exec ( ) ?;
259+ // No-op
303260
304261 Ok ( ( ) )
305262}
306263
307264pub fn delete_bucket ( db : * mut sqlite:: sqlite3 , name : & str ) -> Result < ( ) , SQLiteError > {
308- let id = gen_uuid ( ) ;
309- let new_name = format ! ( "$delete_{}_{}" , name, id. hyphenated( ) . to_string( ) ) ;
310-
311265 // language=SQLite
312266 let statement = db. prepare_v2 (
313- "UPDATE ps_oplog SET op=4, data=NULL, bucket=?1 WHERE op=3 AND superseded=0 AND bucket=?2" ,
267+ "\
268+ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
269+ SELECT row_type, row_id
270+ FROM ps_oplog
271+ WHERE bucket = ?1" ,
314272 ) ?;
315- statement. bind_text ( 1 , & new_name, sqlite:: Destructor :: STATIC ) ?;
316- statement. bind_text ( 2 , & name, sqlite:: Destructor :: STATIC ) ?;
273+ statement. bind_text ( 1 , & name, sqlite:: Destructor :: STATIC ) ?;
317274 statement. exec ( ) ?;
318275
319276 // Rename bucket
320277 // language=SQLite
321- let statement = db. prepare_v2 ( "UPDATE ps_oplog SET bucket=?1 WHERE bucket=?2" ) ?;
322- statement. bind_text ( 1 , & new_name, sqlite:: Destructor :: STATIC ) ?;
323- statement. bind_text ( 2 , name, sqlite:: Destructor :: STATIC ) ?;
278+ let statement = db. prepare_v2 ( "DELETE FROM ps_oplog WHERE bucket=?1" ) ?;
279+ statement. bind_text ( 1 , name, sqlite:: Destructor :: STATIC ) ?;
324280 statement. exec ( ) ?;
325281
326282 // language=SQLite
327283 let statement = db. prepare_v2 ( "DELETE FROM ps_buckets WHERE name = ?1" ) ?;
328284 statement. bind_text ( 1 , name, sqlite:: Destructor :: STATIC ) ?;
329285 statement. exec ( ) ?;
330286
331- // language=SQLite
332- let statement = db. prepare_v2 (
333- "INSERT INTO ps_buckets(name, pending_delete, last_op) SELECT ?1, 1, IFNULL(MAX(op_id), 0) FROM ps_oplog WHERE bucket = ?1" ) ?;
334- statement. bind_text ( 1 , & new_name, sqlite:: Destructor :: STATIC ) ?;
335- statement. exec ( ) ?;
336-
337287 Ok ( ( ) )
338288}
0 commit comments