Skip to content

Commit

Permalink
Push up to 128 frames in sync
Browse files Browse the repository at this point in the history
In my tests, this speeds up push 4-5 times.
Making push of 300 frames take 4-5s instead of 20s.

Signed-off-by: Piotr Jastrzebski <[email protected]>
  • Loading branch information
Piotr Jastrzebski committed Feb 8, 2025
1 parent c6e4e09 commit 74ff81b
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,25 +134,26 @@ impl SyncContext {
self.pull_with_retry(uri, self.max_retries).await
}

#[tracing::instrument(skip(self, frame))]
#[tracing::instrument(skip(self, frames))]
pub(crate) async fn push_one_frame(
&mut self,
frame: Bytes,
frames: Bytes,
generation: u32,
frame_no: u32,
batch_size: u32,
) -> Result<u32> {
let uri = format!(
"{}/sync/{}/{}/{}",
self.sync_url,
generation,
frame_no,
frame_no + 1
frame_no + batch_size
);
tracing::debug!("pushing frame");

let (generation, durable_frame_num) = self.push_with_retry(uri, frame, self.max_retries).await?;
let (generation, durable_frame_num) = self.push_with_retry(uri, frames, self.max_retries).await?;

if durable_frame_num > frame_no {
if durable_frame_num > frame_no + batch_size - 1 {
tracing::error!(
"server returned durable_frame_num larger than what we sent: sent={}, got={}",
frame_no,
Expand All @@ -162,7 +163,7 @@ impl SyncContext {
return Err(SyncError::InvalidPushFrameNoHigh(frame_no, durable_frame_num).into());
}

if durable_frame_num < frame_no {
if durable_frame_num < frame_no + batch_size - 1 {
// Update our knowledge of where the server is at frame wise.
self.durable_frame_num = durable_frame_num;

Expand All @@ -186,7 +187,7 @@ impl SyncContext {
Ok(durable_frame_num)
}

async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<(u32, u32)> {
async fn push_with_retry(&self, uri: String, body: Bytes, max_retries: usize) -> Result<(u32, u32)> {
let mut nr_retries = 0;
loop {
let mut req = http::Request::post(uri.clone());
Expand All @@ -200,7 +201,7 @@ impl SyncContext {
None => {}
}

let req = req.body(frame.clone().into()).expect("valid body");
let req = req.body(body.clone().into()).expect("valid body");

let res = self
.client
Expand Down Expand Up @@ -537,19 +538,25 @@ async fn try_push(

let mut frame_no = start_frame_no;
while frame_no <= end_frame_no {
let frame = conn.wal_get_frame(frame_no, page_size)?;
let batch_size = 128.min(end_frame_no - frame_no + 1);
let mut frames = conn.wal_get_frame(frame_no, page_size)?;
for idx in 1..batch_size {
let frame = conn.wal_get_frame(frame_no + idx, page_size)?;
frames.extend_from_slice(frame.as_ref())
}

// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = sync_ctx
.push_one_frame(frame.freeze(), generation, frame_no)
.push_one_frame(frames.freeze(), generation, frame_no, batch_size)
.await?;

if max_frame_no > frame_no {
frame_no = max_frame_no;
frame_no = max_frame_no + 1;
} else {
frame_no += batch_size;
}
frame_no += 1;
}

sync_ctx.write_metadata().await?;
Expand Down

0 comments on commit 74ff81b

Please sign in to comment.