Skip to content

Commit 7d1a076

Browse files
committed
store: Fold BatchCopy into TableState
With recent changes, BatchCopy has become unnecessary
1 parent 6d46576 commit 7d1a076

File tree

1 file changed

+39
-78
lines changed

1 file changed

+39
-78
lines changed

store/postgres/src/copy.rs

Lines changed: 39 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -208,17 +208,17 @@ impl CopyState {
208208
})
209209
})
210210
.collect::<Result<_, _>>()?;
211-
tables.sort_by_key(|table| table.batch.dst.object.to_string());
211+
tables.sort_by_key(|table| table.dst.object.to_string());
212212

213213
let values = tables
214214
.iter()
215215
.map(|table| {
216216
(
217-
cts::entity_type.eq(table.batch.dst.object.as_str()),
217+
cts::entity_type.eq(table.dst.object.as_str()),
218218
cts::dst.eq(dst.site.id),
219-
cts::next_vid.eq(table.batch.next_vid()),
220-
cts::target_vid.eq(table.batch.target_vid()),
221-
cts::batch_size.eq(table.batch.batch_size()),
219+
cts::next_vid.eq(table.batcher.next_vid()),
220+
cts::target_vid.eq(table.batcher.target_vid()),
221+
cts::batch_size.eq(table.batcher.batch_size() as i64),
222222
)
223223
})
224224
.collect::<Vec<_>>();
@@ -294,51 +294,11 @@ pub(crate) fn source(
294294
/// so that we can copy rows from one to the other with very little
295295
/// transformation. See `CopyEntityBatchQuery` for the details of what
296296
/// exactly that means
297-
pub(crate) struct BatchCopy {
297+
struct TableState {
298298
src: Arc<Table>,
299299
dst: Arc<Table>,
300-
batcher: VidBatcher,
301-
}
302-
303-
impl BatchCopy {
304-
pub fn new(batcher: VidBatcher, src: Arc<Table>, dst: Arc<Table>) -> Self {
305-
Self { src, dst, batcher }
306-
}
307-
308-
/// Copy one batch of entities and update internal state so that the
309-
/// next call to `run` will copy the next batch
310-
pub fn run(&mut self, conn: &mut PgConnection) -> Result<Duration, StoreError> {
311-
let (duration, _) = self.batcher.step(|start, end| {
312-
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
313-
.execute(conn)?;
314-
Ok(())
315-
})?;
316-
317-
Ok(duration)
318-
}
319-
320-
pub fn finished(&self) -> bool {
321-
self.batcher.finished()
322-
}
323-
324-
/// The first `vid` that has not been copied yet
325-
pub fn next_vid(&self) -> i64 {
326-
self.batcher.next_vid()
327-
}
328-
329-
/// The last `vid` that should be copied
330-
pub fn target_vid(&self) -> i64 {
331-
self.batcher.target_vid()
332-
}
333-
334-
pub fn batch_size(&self) -> i64 {
335-
self.batcher.batch_size() as i64
336-
}
337-
}
338-
339-
struct TableState {
340-
batch: BatchCopy,
341300
dst_site: Arc<Site>,
301+
batcher: VidBatcher,
342302
duration_ms: i64,
343303
}
344304

@@ -354,14 +314,16 @@ impl TableState {
354314
let vid_range = VidRange::for_copy(conn, &src, target_block)?;
355315
let batcher = VidBatcher::load(conn, &src_layout.site.namespace, src.as_ref(), vid_range)?;
356316
Ok(Self {
357-
batch: BatchCopy::new(batcher, src, dst),
317+
src,
318+
dst,
358319
dst_site,
320+
batcher,
359321
duration_ms: 0,
360322
})
361323
}
362324

363325
fn finished(&self) -> bool {
364-
self.batch.finished()
326+
self.batcher.finished()
365327
}
366328

367329
fn load(
@@ -427,11 +389,12 @@ impl TableState {
427389
VidRange::new(current_vid, target_vid),
428390
)?
429391
.with_batch_size(size as usize);
430-
let batch = BatchCopy::new(batcher, src, dst);
431392

432393
Ok(TableState {
433-
batch,
394+
src,
395+
dst,
434396
dst_site: dst_layout.site.clone(),
397+
batcher,
435398
duration_ms,
436399
})
437400
}
@@ -460,20 +423,20 @@ impl TableState {
460423
update(
461424
cts::table
462425
.filter(cts::dst.eq(self.dst_site.id))
463-
.filter(cts::entity_type.eq(self.batch.dst.object.as_str()))
426+
.filter(cts::entity_type.eq(self.dst.object.as_str()))
464427
.filter(cts::duration_ms.eq(0)),
465428
)
466429
.set(cts::started_at.eq(sql("now()")))
467430
.execute(conn)?;
468431
let values = (
469-
cts::next_vid.eq(self.batch.next_vid()),
470-
cts::batch_size.eq(self.batch.batch_size()),
432+
cts::next_vid.eq(self.batcher.next_vid()),
433+
cts::batch_size.eq(self.batcher.batch_size() as i64),
471434
cts::duration_ms.eq(self.duration_ms),
472435
);
473436
update(
474437
cts::table
475438
.filter(cts::dst.eq(self.dst_site.id))
476-
.filter(cts::entity_type.eq(self.batch.dst.object.as_str())),
439+
.filter(cts::entity_type.eq(self.dst.object.as_str())),
477440
)
478441
.set(values)
479442
.execute(conn)?;
@@ -486,7 +449,7 @@ impl TableState {
486449
update(
487450
cts::table
488451
.filter(cts::dst.eq(self.dst_site.id))
489-
.filter(cts::entity_type.eq(self.batch.dst.object.as_str())),
452+
.filter(cts::entity_type.eq(self.dst.object.as_str())),
490453
)
491454
.set(cts::finished_at.eq(sql("now()")))
492455
.execute(conn)?;
@@ -512,7 +475,11 @@ impl TableState {
512475
}
513476

514477
fn copy_batch(&mut self, conn: &mut PgConnection) -> Result<Status, StoreError> {
515-
let duration = self.batch.run(conn)?;
478+
let (duration, _) = self.batcher.step(|start, end| {
479+
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
480+
.execute(conn)?;
481+
Ok(())
482+
})?;
516483

517484
self.record_progress(conn, duration)?;
518485

@@ -539,12 +506,12 @@ impl<'a> CopyProgress<'a> {
539506
let target_vid: i64 = state
540507
.tables
541508
.iter()
542-
.map(|table| table.batch.target_vid())
509+
.map(|table| table.batcher.target_vid())
543510
.sum();
544511
let current_vid = state
545512
.tables
546513
.iter()
547-
.map(|table| table.batch.next_vid())
514+
.map(|table| table.batcher.next_vid())
548515
.sum();
549516
Self {
550517
logger,
@@ -577,23 +544,23 @@ impl<'a> CopyProgress<'a> {
577544
}
578545
}
579546

580-
fn update(&mut self, batch: &BatchCopy) {
547+
fn update(&mut self, entity_type: &EntityType, batcher: &VidBatcher) {
581548
if self.last_log.elapsed() > LOG_INTERVAL {
582549
info!(
583550
self.logger,
584551
"Copied {:.2}% of `{}` entities ({}/{} entity versions), {:.2}% of overall data",
585-
Self::progress_pct(batch.next_vid(), batch.target_vid()),
586-
batch.dst.object,
587-
batch.next_vid(),
588-
batch.target_vid(),
589-
Self::progress_pct(self.current_vid + batch.next_vid(), self.target_vid)
552+
Self::progress_pct(batcher.next_vid(), batcher.target_vid()),
553+
entity_type,
554+
batcher.next_vid(),
555+
batcher.target_vid(),
556+
Self::progress_pct(self.current_vid + batcher.next_vid(), self.target_vid)
590557
);
591558
self.last_log = Instant::now();
592559
}
593560
}
594561

595-
fn table_finished(&mut self, batch: &BatchCopy) {
596-
self.current_vid += batch.next_vid();
562+
fn table_finished(&mut self, batcher: &VidBatcher) {
563+
self.current_vid += batcher.next_vid();
597564
}
598565

599566
fn finished(&self) {
@@ -728,9 +695,9 @@ impl Connection {
728695
if status == Status::Cancelled {
729696
return Ok(status);
730697
}
731-
progress.update(&table.batch);
698+
progress.update(&table.dst.object, &table.batcher);
732699
}
733-
progress.table_finished(&table.batch);
700+
progress.table_finished(&table.batcher);
734701
}
735702

736703
// Create indexes for all the attributes that were postponed at the start of
@@ -740,8 +707,8 @@ impl Connection {
740707
for table in state.tables.iter() {
741708
let arr = index_list.indexes_for_table(
742709
&self.dst.site.namespace,
743-
&table.batch.src.name.to_string(),
744-
&table.batch.dst,
710+
&table.src.name.to_string(),
711+
&table.dst,
745712
true,
746713
true,
747714
)?;
@@ -756,18 +723,12 @@ impl Connection {
756723
// Here we need to skip those created in the first step for the old fields.
757724
for table in state.tables.iter() {
758725
let orig_colums = table
759-
.batch
760726
.src
761727
.columns
762728
.iter()
763729
.map(|c| c.name.to_string())
764730
.collect_vec();
765-
for sql in table
766-
.batch
767-
.dst
768-
.create_postponed_indexes(orig_colums)
769-
.into_iter()
770-
{
731+
for sql in table.dst.create_postponed_indexes(orig_colums).into_iter() {
771732
let query = sql_query(sql);
772733
query.execute(conn)?;
773734
}

0 commit comments

Comments
 (0)