Skip to content

Commit ce85d3a

Browse files
committed
store: Count entities during copying
We used to count entities after all the data had been copied, but that is a very slow operation for large subgraphs; that in turn can lead to bad side-effects like connection timeouts and the copy ultimately failing. We now keep track of the number of current entity versions that we copy with each batch and calculate the entity count incrementally
1 parent 7d1a076 commit ce85d3a

File tree

4 files changed

+63
-11
lines changed

4 files changed

+63
-11
lines changed

store/postgres/src/copy.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use graph::{
3333
use itertools::Itertools;
3434

3535
use crate::{
36-
advisory_lock, catalog,
36+
advisory_lock, catalog, deployment,
3737
dynds::DataSourcesTable,
3838
primary::{DeploymentId, Site},
3939
relational::index::IndexList,
@@ -475,12 +475,18 @@ impl TableState {
475475
}
476476

477477
fn copy_batch(&mut self, conn: &mut PgConnection) -> Result<Status, StoreError> {
478-
let (duration, _) = self.batcher.step(|start, end| {
479-
rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
480-
.execute(conn)?;
481-
Ok(())
478+
let (duration, count) = self.batcher.step(|start, end| {
479+
let count = rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)?
480+
.count_current()
481+
.get_result::<i64>(conn)
482+
.optional()?;
483+
Ok(count.unwrap_or(0) as i32)
482484
})?;
483485

486+
let count = count.unwrap_or(0);
487+
488+
deployment::update_entity_count(conn, &self.dst_site, count)?;
489+
484490
self.record_progress(conn, duration)?;
485491

486492
if self.finished() {

store/postgres/src/deployment_store.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1592,13 +1592,10 @@ impl DeploymentStore {
15921592
.number
15931593
.checked_add(1)
15941594
.expect("block numbers fit into an i32");
1595-
dst.revert_block(conn, block_to_revert)?;
1596-
info!(logger, "Rewound subgraph to block {}", block.number;
1597-
"time_ms" => start.elapsed().as_millis());
1595+
let (_, count) = dst.revert_block(conn, block_to_revert)?;
1596+
deployment::update_entity_count(conn, &dst.site, count)?;
15981597

1599-
let start = Instant::now();
1600-
deployment::set_entity_count(conn, &dst.site, &dst.count_query)?;
1601-
info!(logger, "Counted the entities";
1598+
info!(logger, "Rewound subgraph to block {}", block.number;
16021599
"time_ms" => start.elapsed().as_millis());
16031600

16041601
deployment::set_history_blocks(

store/postgres/src/relational.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,9 @@ impl Layout {
10371037
/// numbers. After this operation, only entity versions inserted or
10381038
/// updated at blocks with numbers strictly lower than `block` will
10391039
/// remain
1040+
///
1041+
/// The `i32` that is returned is the amount by which the entity count
1042+
/// for the subgraph needs to be adjusted
10401043
pub fn revert_block(
10411044
&self,
10421045
conn: &mut PgConnection,

store/postgres/src/relational_queries.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4799,6 +4799,10 @@ impl<'a> CopyEntityBatchQuery<'a> {
47994799
last_vid,
48004800
})
48014801
}
4802+
4803+
pub fn count_current(self) -> CountCurrentVersionsQuery<'a> {
4804+
CountCurrentVersionsQuery::new(self)
4805+
}
48024806
}
48034807

48044808
impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
@@ -4810,6 +4814,8 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
48104814
// Construct a query
48114815
// insert into {dst}({columns})
48124816
// select {columns} from {src}
4817+
// where vid >= {first_vid} and vid <= {last_vid}
4818+
// returning {upper_inf(block_range)|true}
48134819
out.push_sql("insert into ");
48144820
out.push_sql(self.dst.qualified_name.as_str());
48154821
out.push_sql("(");
@@ -4905,6 +4911,12 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
49054911
out.push_bind_param::<BigInt, _>(&self.first_vid)?;
49064912
out.push_sql(" and vid <= ");
49074913
out.push_bind_param::<BigInt, _>(&self.last_vid)?;
4914+
out.push_sql("\n returning ");
4915+
if self.dst.immutable {
4916+
out.push_sql("true");
4917+
} else {
4918+
out.push_sql(BLOCK_RANGE_CURRENT);
4919+
}
49084920
Ok(())
49094921
}
49104922
}
@@ -4917,6 +4929,40 @@ impl<'a> QueryId for CopyEntityBatchQuery<'a> {
49174929

49184930
impl<'a, Conn> RunQueryDsl<Conn> for CopyEntityBatchQuery<'a> {}
49194931

4932+
#[derive(Debug, Clone)]
4933+
pub struct CountCurrentVersionsQuery<'a> {
4934+
copy: CopyEntityBatchQuery<'a>,
4935+
}
4936+
4937+
impl<'a> CountCurrentVersionsQuery<'a> {
4938+
pub fn new(copy: CopyEntityBatchQuery<'a>) -> Self {
4939+
Self { copy }
4940+
}
4941+
}
4942+
impl<'a> QueryFragment<Pg> for CountCurrentVersionsQuery<'a> {
4943+
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
4944+
// Generate a query
4945+
// with copy_cte as ( {copy} )
4946+
// select count(*) from copy_cte where {block_range_current}
4947+
out.push_sql("with copy_cte(current) as (");
4948+
self.copy.walk_ast(out.reborrow())?;
4949+
out.push_sql(")\nselect count(*) from copy_cte where current");
4950+
Ok(())
4951+
}
4952+
}
4953+
4954+
impl<'a> QueryId for CountCurrentVersionsQuery<'a> {
4955+
type QueryId = ();
4956+
4957+
const HAS_STATIC_QUERY_ID: bool = false;
4958+
}
4959+
4960+
impl<'a> Query for CountCurrentVersionsQuery<'a> {
4961+
type SqlType = BigInt;
4962+
}
4963+
4964+
impl<'a, Conn> RunQueryDsl<Conn> for CountCurrentVersionsQuery<'a> {}
4965+
49204966
/// Helper struct for returning the id's touched by the RevertRemove and
49214967
/// RevertExtend queries
49224968
#[derive(QueryableByName, PartialEq, Eq, Hash)]

0 commit comments

Comments
 (0)