diff --git a/migrations/1729684505755_block_proposals.ts b/migrations/1729684505755_block_proposals.ts index 9c8e492..1d6b0c6 100644 --- a/migrations/1729684505755_block_proposals.ts +++ b/migrations/1729684505755_block_proposals.ts @@ -50,4 +50,8 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('block_proposals', ['block_hash']); pgm.createIndex('block_proposals', ['index_block_hash']); pgm.createIndex('block_proposals', ['reward_cycle']); + + pgm.createConstraint('block_proposals', 'block_proposals_block_hash_unique', { + unique: ['block_hash'], + }); } diff --git a/migrations/1729684505756_block_responses.ts b/migrations/1729684505756_block_responses.ts index 58ee5f8..ce0c446 100644 --- a/migrations/1729684505756_block_responses.ts +++ b/migrations/1729684505756_block_responses.ts @@ -53,4 +53,8 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('block_responses', ['received_at']); pgm.createIndex('block_responses', ['signer_sighash']); pgm.createIndex('block_responses', ['accepted']); + + pgm.createConstraint('block_responses', 'block_responses_signer_key_sighash_unique', { + unique: ['signer_key', 'signer_sighash'], + }); } diff --git a/migrations/1729684505758_mock_proposals.ts b/migrations/1729684505758_mock_proposals.ts index a354180..683b857 100644 --- a/migrations/1729684505758_mock_proposals.ts +++ b/migrations/1729684505758_mock_proposals.ts @@ -58,4 +58,8 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('mock_proposals', ['stacks_tip']); pgm.createIndex('mock_proposals', ['index_block_hash']); pgm.createIndex('mock_proposals', ['burn_block_height']); + + pgm.createConstraint('mock_proposals', 'mock_proposals_idb_unique', { + unique: ['index_block_hash'], + }); } diff --git a/migrations/1729684505759_mock_signature.ts b/migrations/1729684505759_mock_signature.ts index 68d4062..dcc1ab2 100644 --- a/migrations/1729684505759_mock_signature.ts +++ b/migrations/1729684505759_mock_signature.ts @@ -71,4 +71,8 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('mock_signatures', ['stacks_tip']); pgm.createIndex('mock_signatures', ['index_block_hash']); pgm.createIndex('mock_signatures', ['burn_block_height']); + + pgm.createConstraint('mock_signatures', 'mock_signatures_signer_key_idb_unique', { + unique: ['signer_key', 'index_block_hash'], + }); } diff --git a/migrations/1729684505760_mock_blocks.ts b/migrations/1729684505760_mock_blocks.ts index a316192..d330187 100644 --- a/migrations/1729684505760_mock_blocks.ts +++ b/migrations/1729684505760_mock_blocks.ts @@ -65,6 +65,10 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('mock_blocks', ['index_block_hash']); pgm.createIndex('mock_blocks', ['burn_block_height']); + pgm.createConstraint('mock_blocks', 'mock_blocks_idb_unique', { + unique: ['index_block_hash'], + }); + // Mock block signer signatures pgm.createTable('mock_block_signer_signatures', { id: { @@ -99,4 +103,8 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('mock_block_signer_signatures', ['stacks_tip']); pgm.createIndex('mock_block_signer_signatures', ['stacks_tip_height']); pgm.createIndex('mock_block_signer_signatures', ['index_block_hash']); + + pgm.createConstraint('mock_block_signer_signatures', 'mock_block_signers_idb_unique', { + unique: ['index_block_hash', 'signer_key'], + }); } diff --git a/src/pg/chainhook/chainhook-pg-store.ts b/src/pg/chainhook/chainhook-pg-store.ts index 4bf2036..e31612f 100644 --- a/src/pg/chainhook/chainhook-pg-store.ts +++ b/src/pg/chainhook/chainhook-pg-store.ts @@ -188,10 +188,18 @@ export class ChainhookPgStore extends BasePgStoreModule { network_id: messageData.mock_proposal.peer_info.network_id, index_block_hash: normalizeHexString(messageData.mock_proposal.peer_info.index_block_hash), }; - await sql` + const result = await sql` INSERT INTO mock_blocks ${sql(dbMockBlock)} + ON CONFLICT ON CONSTRAINT mock_blocks_idb_unique DO NOTHING `; + if (result.count === 0) { + logger.info( + `Skipped inserting duplicate mock block height=${dbMockBlock.stacks_tip_height}, hash=${dbMockBlock.stacks_tip}` + ); + return; + } + for (const batch of batchIterate(messageData.mock_signatures, 500)) { const sigs = batch.map(sig => { const dbSig: DbMockBlockSignerSignature = { @@ -235,9 +243,15 @@ export class ChainhookPgStore extends BasePgStoreModule { // Metadata fields metadata_server_version: messageData.metadata.server_version, }; - await sql` + const result = await sql` INSERT INTO mock_signatures ${sql(dbMockSignature)} + ON CONFLICT ON CONSTRAINT mock_signatures_signer_key_idb_unique DO NOTHING `; + if (result.count === 0) { + logger.info( + `Skipped inserting duplicate mock signature height=${dbMockSignature.stacks_tip_height}, hash=${dbMockSignature.stacks_tip}, signer=${dbMockSignature.signer_key}` + ); + } } private async applyMockProposal( @@ -258,9 +272,15 @@ export class ChainhookPgStore extends BasePgStoreModule { network_id: messageData.network_id, index_block_hash: normalizeHexString(messageData.index_block_hash), }; - await sql` + const result = await sql` INSERT INTO mock_proposals ${sql(dbMockProposal)} + ON CONFLICT ON CONSTRAINT mock_proposals_idb_unique DO NOTHING `; + if (result.count === 0) { + logger.info( + `Skipped inserting duplicate mock proposal height=${dbMockProposal.stacks_tip_height}, hash=${dbMockProposal.stacks_tip}` + ); + } } private async applyBlockProposal( @@ -279,9 +299,15 @@ export class ChainhookPgStore extends BasePgStoreModule { reward_cycle: messageData.reward_cycle, burn_block_height: messageData.burn_height, }; - await sql` + const result = await sql` INSERT INTO block_proposals ${sql(dbBlockProposal)} + ON CONFLICT ON CONSTRAINT block_proposals_block_hash_unique DO NOTHING `; + if (result.count === 0) { + logger.info( + `Skipped inserting duplicate block proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}` + ); + } } private async applyBlockResponse( @@ -307,7 +333,7 @@ export class ChainhookPgStore extends BasePgStoreModule { } } - const dbBlockProposal: DbBlockResponse = { + const dbBlockResponse: DbBlockResponse = { received_at: unixTimeMillisecondsToISO(receivedAt), signer_key: normalizeHexString(signerPubkey), accepted: accepted, @@ -319,9 +345,16 @@ export class ChainhookPgStore extends BasePgStoreModule { reject_code: rejectCode, chain_id: accepted ? null : messageData.data.chain_id, }; - await sql` - INSERT INTO block_responses ${sql(dbBlockProposal)} + const result = await sql` + INSERT INTO block_responses ${sql(dbBlockResponse)} + ON CONFLICT ON CONSTRAINT block_responses_signer_key_sighash_unique DO NOTHING `; + + if (result.count === 0) { + logger.info( + `Skipped inserting duplicate block response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}` + ); + } } private async updateStacksBlock( diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 1592b8d..cfadf22 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -78,11 +78,6 @@ export class PgStore extends BasePgStore { // The `blocks` table (and its associated block_signer_signatures table) is the source of truth that is // never missing blocks and does not contain duplicate rows per block. // - // The block_proposals and block_responses tables can have duplicate rows. Duplicates can be detected in - // block_proposals using the block_hash column. Duplicates can be detected in block_responses by looking - // at (signer_key, signer_sighash). For both tables filter duplicates by using only the first row (the - // oldest id column). - // // Each block has a known set of signer_keys which can be determined by first looking up the block's // cycle_number from the `block_proposals` table matching on block_hash, then using cycle_number to look // up the set of signer_keys from the reward_set_signers table (matching cycle_number with reward_cycle). @@ -159,16 +154,6 @@ export class PgStore extends BasePgStore { LIMIT ${limit} OFFSET ${offset} ), - filtered_block_proposals AS ( - SELECT DISTINCT ON (block_hash) id, block_hash, received_at, reward_cycle AS cycle_number - FROM block_proposals - ORDER BY block_hash, id - ), - filtered_block_responses AS ( - SELECT DISTINCT ON (signer_key, signer_sighash) * - FROM block_responses - ORDER BY signer_key, signer_sighash, id - ), block_signers AS ( SELECT lb.id AS block_id, @@ -190,10 +175,10 @@ export class PgStore extends BasePgStore { END AS signer_status, EXTRACT(MILLISECOND FROM (fbr.received_at - bp.received_at)) AS response_time_ms FROM latest_blocks lb - LEFT JOIN filtered_block_proposals bp ON lb.block_hash = bp.block_hash + LEFT JOIN block_proposals bp ON lb.block_hash = bp.block_hash LEFT JOIN reward_set_signers rs ON lb.cycle_number = rs.cycle_number LEFT JOIN block_signer_signatures bss ON lb.block_height = bss.block_height AND rs.signer_key = bss.signer_key - LEFT JOIN filtered_block_responses fbr ON fbr.signer_key = rs.signer_key AND fbr.signer_sighash = lb.block_hash + LEFT JOIN block_responses fbr ON fbr.signer_key = rs.signer_key AND fbr.signer_sighash = lb.block_hash ), signer_state_aggregation AS ( SELECT @@ -286,36 +271,24 @@ export class PgStore extends BasePgStore { WHERE rss.cycle_number = ${cycleNumber} ), proposal_data AS ( - -- Fetch the first (oldest) proposal for each block_hash for the given cycle + -- Select all proposals for the given cycle SELECT bp.block_hash, bp.block_height, bp.received_at AS proposal_received_at FROM block_proposals bp WHERE bp.reward_cycle = ${cycleNumber} - AND bp.id = ( - -- Select the earliest proposal for each block_hash - SELECT MIN(sub_bp.id) - FROM block_proposals sub_bp - WHERE sub_bp.block_hash = bp.block_hash - ) ), response_data AS ( - -- Fetch the first (oldest) response for each (signer_key, signer_sighash) pair - SELECT DISTINCT ON (br.signer_key, br.signer_sighash) + -- Select responses associated with the proposals from the given cycle + SELECT br.signer_key, br.signer_sighash, br.accepted, br.received_at, br.id FROM block_responses br - WHERE br.id = ( - -- Select the earliest response for each signer_sighash and signer_key - SELECT MIN(sub_br.id) - FROM block_responses sub_br - WHERE sub_br.signer_key = br.signer_key - AND sub_br.signer_sighash = br.signer_sighash - ) + JOIN proposal_data pd ON br.signer_sighash = pd.block_hash -- Only responses linked to selected proposals ), signer_proposal_data AS ( -- Cross join signers with proposals and left join filtered responses @@ -377,7 +350,7 @@ export class PgStore extends BasePgStore { }[] >` WITH signer_data AS ( - -- Fetch the signer for the given cycle + -- Fetch the specific signer for the given cycle SELECT rss.signer_key, rss.signer_weight, @@ -387,39 +360,28 @@ export class PgStore extends BasePgStore { AND rss.signer_key = ${normalizeHexString(signerId)} ), proposal_data AS ( - -- Fetch the first (oldest) proposal for each block_hash for the given cycle + -- Select all proposals for the given cycle SELECT bp.block_hash, bp.block_height, bp.received_at AS proposal_received_at FROM block_proposals bp WHERE bp.reward_cycle = ${cycleNumber} - AND bp.id = ( - -- Select the earliest proposal for each block_hash - SELECT MIN(sub_bp.id) - FROM block_proposals sub_bp - WHERE sub_bp.block_hash = bp.block_hash - ) ), response_data AS ( - -- Fetch the first (oldest) response for each (signer_key, signer_sighash) pair - SELECT DISTINCT ON (br.signer_key, br.signer_sighash) + -- Select all responses for the proposals in the given cycle + SELECT br.signer_key, br.signer_sighash, br.accepted, br.received_at, br.id FROM block_responses br - WHERE br.id = ( - -- Select the earliest response for each signer_sighash and signer_key - SELECT MIN(sub_br.id) - FROM block_responses sub_br - WHERE sub_br.signer_key = br.signer_key - AND sub_br.signer_sighash = br.signer_sighash - ) + JOIN proposal_data pd ON br.signer_sighash = pd.block_hash + WHERE br.signer_key = ${normalizeHexString(signerId)} -- Filter for the specific signer ), signer_proposal_data AS ( - -- Cross join signers with proposals and left join filtered responses + -- Cross join the specific signer with proposals and left join filtered responses SELECT sd.signer_key, pd.block_hash, @@ -428,13 +390,13 @@ export class PgStore extends BasePgStore { rd.received_at AS response_received_at, EXTRACT(MILLISECOND FROM (rd.received_at - pd.proposal_received_at)) AS response_time_ms FROM signer_data sd - CROSS JOIN proposal_data pd -- Cross join to associate all signers with all proposals + CROSS JOIN proposal_data pd LEFT JOIN response_data rd ON pd.block_hash = rd.signer_sighash AND sd.signer_key = rd.signer_key -- Match signers with their corresponding responses ), aggregated_data AS ( - -- Aggregate the proposal and response data by signer + -- Aggregate the proposal and response data for the specific signer SELECT spd.signer_key, COUNT(CASE WHEN spd.accepted = true THEN 1 END)::integer AS proposals_accepted_count,