Skip to content

Commit aaafb5a

Browse files
authored
feat: add nft_custody pg materialized view to speed up nft event lookup
* feat: add nft_custody materialized view * fix: nft events unit test * fix: perform full query if user wants unanchored results * feat: only refresh materialized views when not in event replay
1 parent 7a11384 commit aaafb5a

File tree

4 files changed

+150
-43
lines changed

4 files changed

+150
-43
lines changed

src/datastore/postgres-store.ts

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -582,11 +582,17 @@ export class PgDataStore
582582
implements DataStore {
583583
readonly pool: Pool;
584584
readonly notifier?: PgNotifier;
585-
private constructor(pool: Pool, notifier: PgNotifier | undefined = undefined) {
585+
readonly eventReplay: boolean;
586+
private constructor(
587+
pool: Pool,
588+
notifier: PgNotifier | undefined = undefined,
589+
eventReplay: boolean = false
590+
) {
586591
// eslint-disable-next-line constructor-super
587592
super();
588593
this.pool = pool;
589594
this.notifier = notifier;
595+
this.eventReplay = eventReplay;
590596
}
591597

592598
/**
@@ -1186,6 +1192,7 @@ export class PgDataStore
11861192

11871193
const blocksUpdated = await this.updateBlock(client, data.block);
11881194
if (blocksUpdated !== 0) {
1195+
let newNftEvents = false;
11891196
for (const minerRewards of data.minerRewards) {
11901197
await this.updateMinerReward(client, minerRewards);
11911198
}
@@ -1200,6 +1207,7 @@ export class PgDataStore
12001207
await this.updateFtEvent(client, entry.tx, ftEvent);
12011208
}
12021209
for (const nftEvent of entry.nftEvents) {
1210+
newNftEvents = true;
12031211
await this.updateNftEvent(client, entry.tx, nftEvent);
12041212
}
12051213
for (const smartContract of entry.smartContracts) {
@@ -1212,6 +1220,9 @@ export class PgDataStore
12121220
await this.updateNamespaces(client, entry.tx, namespace);
12131221
}
12141222
}
1223+
if (newNftEvents && !this.eventReplay) {
1224+
await client.query(`REFRESH MATERIALIZED VIEW nft_custody`);
1225+
}
12151226

12161227
const tokenContractDeployments = data.txs
12171228
.filter(entry => entry.tx.type_id === DbTxTypeId.SmartContract)
@@ -2368,7 +2379,11 @@ export class PgDataStore
23682379
logger.verbose(`Entities marked as non-canonical: ${markedNonCanonical}`);
23692380
}
23702381

2371-
static async connect(skipMigrations = false, withNotifier = true): Promise<PgDataStore> {
2382+
static async connect(
2383+
skipMigrations = false,
2384+
withNotifier = true,
2385+
eventReplay = false
2386+
): Promise<PgDataStore> {
23722387
const clientConfig = getPgClientConfig();
23732388

23742389
const initTimer = stopwatch();
@@ -2424,10 +2439,10 @@ export class PgDataStore
24242439
try {
24252440
poolClient = await pool.connect();
24262441
if (!withNotifier) {
2427-
return new PgDataStore(pool);
2442+
return new PgDataStore(pool, undefined, eventReplay);
24282443
}
24292444
const notifier = new PgNotifier(clientConfig);
2430-
const store = new PgDataStore(pool, notifier);
2445+
const store = new PgDataStore(pool, notifier, eventReplay);
24312446
await store.connectPgNotifier();
24322447
return store;
24332448
} catch (error) {
@@ -5757,6 +5772,7 @@ export class PgDataStore
57575772
includeUnanchored: args.includeUnanchored,
57585773
});
57595774
const result = await client.query<AddressNftEventIdentifier & { count: string }>(
5775+
// Join against `nft_custody` materialized view only if we're looking for canonical results.
57605776
`
57615777
WITH address_transfers AS (
57625778
SELECT asset_identifier, value, sender, recipient, block_height, microblock_sequence, tx_index, event_index, tx_id
@@ -5772,7 +5788,9 @@ export class PgDataStore
57725788
ORDER BY asset_identifier, value, block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
57735789
)
57745790
SELECT sender, recipient, asset_identifier, value, block_height, tx_id, COUNT(*) OVER() AS count
5775-
FROM address_transfers INNER JOIN last_nft_transfers USING (asset_identifier, value, recipient)
5791+
FROM address_transfers
5792+
INNER JOIN ${args.includeUnanchored ? 'last_nft_transfers' : 'nft_custody'}
5793+
USING (asset_identifier, value, recipient)
57765794
ORDER BY block_height DESC, microblock_sequence DESC, tx_index DESC, event_index DESC
57775795
LIMIT $2 OFFSET $3
57785796
`,
@@ -6771,6 +6789,19 @@ export class PgDataStore
67716789
});
67726790
}
67736791

6792+
/**
6793+
* Called when a full event import is complete.
6794+
*/
6795+
async finishEventReplay() {
6796+
if (!this.eventReplay) {
6797+
return;
6798+
}
6799+
await this.queryTx(async client => {
6800+
// Refresh postgres materialized views.
6801+
await client.query(`REFRESH MATERIALIZED VIEW nft_custody`);
6802+
});
6803+
}
6804+
67746805
async close(): Promise<void> {
67756806
await this.notifier?.close();
67766807
await this.pool.end();

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ async function handleProgramArgs() {
285285
// or the `--force` option can be used.
286286
await cycleMigrations({ dangerousAllowDataLoss: true });
287287

288-
const db = await PgDataStore.connect(true, false);
288+
const db = await PgDataStore.connect(true, false, true);
289289
const eventServer = await startEventServer({
290290
datastore: db,
291291
chainId: getConfiguredChainID(),
@@ -315,6 +315,7 @@ async function handleProgramArgs() {
315315
});
316316
}
317317
}
318+
await db.finishEventReplay();
318319
console.log(`Event import and playback successful.`);
319320
await eventServer.closeAsync();
320321
await db.close();
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/* eslint-disable @typescript-eslint/camelcase */
2+
import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate';
3+
4+
export const shorthands: ColumnDefinitions | undefined = undefined;
5+
6+
export async function up(pgm: MigrationBuilder): Promise<void> {
7+
pgm.createMaterializedView('nft_custody', {}, `
8+
SELECT
9+
DISTINCT ON(asset_identifier, value) asset_identifier, value, recipient
10+
FROM
11+
nft_events
12+
WHERE
13+
canonical = true AND microblock_canonical = true
14+
ORDER BY
15+
asset_identifier DESC,
16+
value DESC,
17+
block_height DESC,
18+
microblock_sequence DESC,
19+
tx_index DESC,
20+
event_index DESC
21+
`);
22+
23+
pgm.createIndex('nft_custody', ['asset_identifier', 'value']);
24+
pgm.createIndex('nft_custody', 'recipient');
25+
}

src/tests/api-tests.ts

Lines changed: 87 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4600,7 +4600,11 @@ describe('api tests', () => {
46004600
const searchResult = await supertest(api.server).get(`/extended/v1/tx/0x1234/raw`);
46014601
expect(searchResult.status).toBe(404);
46024602
});
4603+
46034604
test('Success: nft events for address', async () => {
4605+
const addr1 = 'ST3J8EVYHVKH6XXPD61EE8XEHW4Y2K83861225AB1';
4606+
const addr2 = 'ST1HB64MAJ1MBV4CQ80GF01DZS4T1DSMX20ADCRA4';
4607+
46044608
const dbBlock: DbBlock = {
46054609
block_hash: '0xff',
46064610
index_block_hash: '0x1234',
@@ -4620,11 +4624,6 @@ describe('api tests', () => {
46204624
execution_cost_write_count: 0,
46214625
execution_cost_write_length: 0,
46224626
};
4623-
await db.updateBlock(client, dbBlock);
4624-
4625-
const addr1 = 'ST3J8EVYHVKH6XXPD61EE8XEHW4Y2K83861225AB1';
4626-
const addr2 = 'ST1HB64MAJ1MBV4CQ80GF01DZS4T1DSMX20ADCRA4';
4627-
46284627
const stxTx: DbTx = {
46294628
tx_id: '0x1111000000000000000000000000000000000000000000000000000000000000',
46304629
tx_index: 0,
@@ -4661,27 +4660,44 @@ describe('api tests', () => {
46614660
execution_cost_write_count: 0,
46624661
execution_cost_write_length: 0,
46634662
};
4664-
await db.updateTx(client, stxTx);
4665-
4666-
const nftEvent1: DbNftEvent = {
4667-
canonical: true,
4668-
event_type: DbEventTypeId.NonFungibleTokenAsset,
4669-
asset_event_type_id: DbAssetEventTypeId.Transfer,
4670-
event_index: 0,
4671-
tx_id: '0x1111000000000000000000000000000000000000000000000000000000000000',
4672-
tx_index: 1,
4673-
block_height: dbBlock.block_height,
4674-
asset_identifier: 'some-asset',
4675-
value: serializeCV(intCV(0)),
4676-
recipient: addr1,
4677-
sender: 'none',
4678-
};
4663+
const nftEvents: DbNftEvent[] = [];
46794664
for (let i = 0; i < 10; i++) {
4680-
await db.updateNftEvent(client, stxTx, nftEvent1);
4665+
nftEvents.push({
4666+
canonical: true,
4667+
event_type: DbEventTypeId.NonFungibleTokenAsset,
4668+
asset_event_type_id: DbAssetEventTypeId.Transfer,
4669+
event_index: 0,
4670+
tx_id: stxTx.tx_id,
4671+
tx_index: 1,
4672+
block_height: dbBlock.block_height,
4673+
asset_identifier: 'some-asset',
4674+
value: serializeCV(intCV(0)),
4675+
recipient: addr1,
4676+
sender: 'none',
4677+
});
46814678
}
4679+
4680+
await db.update({
4681+
block: dbBlock,
4682+
microblocks: [],
4683+
minerRewards: [],
4684+
txs: [
4685+
{
4686+
tx: stxTx,
4687+
stxLockEvents: [],
4688+
stxEvents: [],
4689+
ftEvents: [],
4690+
nftEvents: nftEvents,
4691+
contractLogEvents: [],
4692+
smartContracts: [],
4693+
names: [],
4694+
namespaces: [],
4695+
},
4696+
],
4697+
});
4698+
46824699
const limit = 2;
46834700
const offset = 0;
4684-
46854701
// test nft for given addresses
46864702
const result = await supertest(api.server).get(
46874703
`/extended/v1/address/${addr1}/nft_events?limit=${limit}&offset=${offset}`
@@ -4697,17 +4713,36 @@ describe('api tests', () => {
46974713
expect(result.body.nft_events[0].block_height).toBe(1);
46984714
expect(result.body.nft_events[0].value.repr).toBe('0');
46994715

4716+
const dbBlock2: DbBlock = {
4717+
block_hash: '0xffff',
4718+
index_block_hash: '0x123466',
4719+
parent_index_block_hash: '0x1234',
4720+
parent_block_hash: '0xff',
4721+
parent_microblock_hash: '',
4722+
parent_microblock_sequence: 0,
4723+
block_height: 2,
4724+
burn_block_time: 1594649995,
4725+
burn_block_hash: '0x123456',
4726+
burn_block_height: 124,
4727+
miner_txid: '0x4321',
4728+
canonical: true,
4729+
execution_cost_read_count: 0,
4730+
execution_cost_read_length: 0,
4731+
execution_cost_runtime: 0,
4732+
execution_cost_write_count: 0,
4733+
execution_cost_write_length: 0,
4734+
};
47004735
const stxTx1: DbTx = {
4701-
tx_id: '0x1111100000000000000000000000000000000000000000000000000000000000',
4736+
tx_id: '0x1111100000000000000000000000000000000000000000000000000000000001',
47024737
tx_index: 0,
47034738
anchor_mode: 3,
47044739
nonce: 0,
47054740
raw_tx: Buffer.alloc(0),
4706-
index_block_hash: dbBlock.index_block_hash,
4707-
block_hash: dbBlock.block_hash,
4708-
block_height: dbBlock.block_height,
4709-
burn_block_time: dbBlock.burn_block_time,
4710-
parent_burn_block_time: 1626122935,
4741+
index_block_hash: dbBlock2.index_block_hash,
4742+
block_hash: dbBlock2.block_hash,
4743+
block_height: dbBlock2.block_height,
4744+
burn_block_time: dbBlock2.burn_block_time,
4745+
parent_burn_block_time: 1626124935,
47114746
type_id: DbTxTypeId.TokenTransfer,
47124747
token_transfer_amount: 1n,
47134748
token_transfer_memo: Buffer.from('hi'),
@@ -4718,8 +4753,8 @@ describe('api tests', () => {
47184753
microblock_canonical: true,
47194754
microblock_sequence: I32_MAX,
47204755
microblock_hash: '',
4721-
parent_index_block_hash: dbBlock.parent_index_block_hash,
4722-
parent_block_hash: dbBlock.parent_block_hash,
4756+
parent_index_block_hash: dbBlock2.parent_index_block_hash,
4757+
parent_block_hash: dbBlock2.parent_block_hash,
47234758
post_conditions: Buffer.from([0x01, 0xf5]),
47244759
fee_rate: 1234n,
47254760
sponsored: false,
@@ -4733,22 +4768,37 @@ describe('api tests', () => {
47334768
execution_cost_write_count: 0,
47344769
execution_cost_write_length: 0,
47354770
};
4736-
await db.updateTx(client, stxTx1);
4737-
47384771
const nftEvent2: DbNftEvent = {
47394772
canonical: true,
47404773
event_type: DbEventTypeId.NonFungibleTokenAsset,
47414774
asset_event_type_id: DbAssetEventTypeId.Transfer,
47424775
event_index: 1,
4743-
tx_id: '0x1111100000000000000000000000000000000000000000000000000000000000',
4776+
tx_id: stxTx1.tx_id,
47444777
tx_index: 2,
4745-
block_height: dbBlock.block_height,
4778+
block_height: dbBlock2.block_height,
47464779
asset_identifier: 'some-asset',
47474780
value: serializeCV(intCV(0)),
47484781
recipient: addr2,
47494782
sender: 'none',
47504783
};
4751-
await db.updateNftEvent(client, stxTx, nftEvent2);
4784+
await db.update({
4785+
block: dbBlock2,
4786+
microblocks: [],
4787+
minerRewards: [],
4788+
txs: [
4789+
{
4790+
tx: stxTx1,
4791+
stxLockEvents: [],
4792+
stxEvents: [],
4793+
ftEvents: [],
4794+
nftEvents: [nftEvent2],
4795+
contractLogEvents: [],
4796+
smartContracts: [],
4797+
names: [],
4798+
namespaces: [],
4799+
},
4800+
],
4801+
});
47524802

47534803
const result1 = await supertest(api.server).get(`/extended/v1/address/${addr2}/nft_events`);
47544804
expect(result1.status).toBe(200);
@@ -4757,9 +4807,9 @@ describe('api tests', () => {
47574807
expect(result1.body.nft_events.length).toEqual(1);
47584808
expect(result1.body.nft_events[0].recipient).toBe(addr2);
47594809
expect(result1.body.nft_events[0].tx_id).toBe(
4760-
'0x1111100000000000000000000000000000000000000000000000000000000000'
4810+
'0x1111100000000000000000000000000000000000000000000000000000000001'
47614811
);
4762-
expect(result1.body.nft_events[0].block_height).toBe(1);
4812+
expect(result1.body.nft_events[0].block_height).toBe(2);
47634813
expect(result.body.nft_events[0].value.repr).toBe('0');
47644814

47654815
//check ownership for addr

0 commit comments

Comments
 (0)