Skip to content

Commit d53a96e

Browse files
Merge pull request #1200 from input-output-hk/feat/projector-no-pg-transaction
Feat/projector no pg transaction
2 parents 1426bf1 + 2178cda commit d53a96e

26 files changed

+497
-53
lines changed

packages/cardano-services/src/Projection/createTypeormProjection.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import { Bootstrap, ProjectionEvent, logProjectionProgress, requestNext } from '@cardano-sdk/projection';
44
import { Cardano, ObservableCardanoNode } from '@cardano-sdk/core';
55
import { Logger } from 'ts-log';
6-
import { Observable, concat, defer, take, takeWhile } from 'rxjs';
6+
import { Observable, concat, defer, groupBy, mergeMap, take, takeWhile } from 'rxjs';
77
import {
88
PgConnectionConfig,
99
TypeormDevOptions,
@@ -72,7 +72,7 @@ export const createTypeormProjection = ({
7272
logger.debug(`Creating projection with policyIds ${JSON.stringify(handlePolicyIds)}`);
7373
logger.debug(`Using a ${blocksBufferLength} blocks buffer`);
7474

75-
const { mappers, entities, stores, extensions } = prepareTypeormProjection(
75+
const { mappers, entities, stores, extensions, willStore } = prepareTypeormProjection(
7676
{
7777
options: projectionOptions,
7878
projections
@@ -118,15 +118,23 @@ export const createTypeormProjection = ({
118118
defer(() =>
119119
projectionSource$.pipe(
120120
applyMappers(mappers),
121-
shareRetryBackoff(
122-
(evt$) =>
123-
evt$.pipe(
124-
withTypeormTransaction({ connection$: connect() }),
125-
applyStores(stores),
126-
buffer.storeBlockData(),
127-
typeormTransactionCommit()
128-
),
129-
{ shouldRetry: isRecoverableTypeormError }
121+
// if there are any relevant data to write into db
122+
groupBy((evt) => willStore(evt)),
123+
mergeMap((group$) =>
124+
group$.key
125+
? group$.pipe(
126+
shareRetryBackoff(
127+
(evt$) =>
128+
evt$.pipe(
129+
withTypeormTransaction({ connection$: connect() }),
130+
applyStores(stores),
131+
buffer.storeBlockData(),
132+
typeormTransactionCommit()
133+
),
134+
{ shouldRetry: isRecoverableTypeormError }
135+
)
136+
)
137+
: group$
130138
),
131139
tipTracker.trackProjectedTip(),
132140
requestNext(),

packages/cardano-services/src/Projection/prepareTypeormProjection.ts

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,24 @@ import {
2828
storeStakeKeyRegistrations,
2929
storeStakePoolRewardsJob,
3030
storeStakePools,
31-
storeUtxo
31+
storeUtxo,
32+
willStoreAddresses,
33+
willStoreAssets,
34+
willStoreHandleMetadata,
35+
willStoreHandles,
36+
willStoreNftMetadata,
37+
willStoreStakeKeyRegistrations,
38+
willStoreStakePoolMetadataJob,
39+
willStoreStakePoolRewardsJob,
40+
willStoreStakePools,
41+
willStoreUtxo
3242
} from '@cardano-sdk/projection-typeorm';
33-
import { Cardano } from '@cardano-sdk/core';
34-
import { Mappers as Mapper } from '@cardano-sdk/projection';
43+
import { Cardano, ChainSyncEventType } from '@cardano-sdk/core';
44+
import { Mappers as Mapper, ProjectionEvent } from '@cardano-sdk/projection';
45+
import { ObservableType, passthrough } from '@cardano-sdk/util-rxjs';
3546
import { POOLS_METRICS_INTERVAL_DEFAULT, POOLS_METRICS_OUTDATED_INTERVAL_DEFAULT } from '../Program/programs/types';
3647
import { Sorter } from '@hapi/topo';
37-
import { WithLogger } from '@cardano-sdk/util';
38-
import { passthrough } from '@cardano-sdk/util-rxjs';
48+
import { WithLogger, isNotNil } from '@cardano-sdk/util';
3949

4050
/** Used as mount segments, so must be URL-friendly */
4151
export enum ProjectionName {
@@ -118,6 +128,25 @@ type StoreOperators = typeof storeOperators;
118128
type StoreName = keyof StoreOperators;
119129
type StoreOperator = StoreOperators[StoreName];
120130

131+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
132+
type InferArg0<F extends Function> = F extends (arg0: infer Args) => any ? Args : never;
133+
type WillStore = {
134+
[k in keyof StoreOperators]: (args: ObservableType<InferArg0<StoreOperators[k]>>) => boolean;
135+
};
136+
137+
const willStore: Partial<WillStore> = {
138+
storeAddresses: willStoreAddresses,
139+
storeAssets: willStoreAssets,
140+
storeHandleMetadata: willStoreHandleMetadata,
141+
storeHandles: willStoreHandles,
142+
storeNftMetadata: willStoreNftMetadata,
143+
storeStakeKeyRegistrations: willStoreStakeKeyRegistrations,
144+
storeStakePoolMetadataJob: willStoreStakePoolMetadataJob,
145+
storeStakePoolRewardsJob: willStoreStakePoolRewardsJob,
146+
storeStakePools: willStoreStakePools,
147+
storeUtxo: willStoreUtxo
148+
};
149+
121150
const entities = {
122151
address: AddressEntity,
123152
asset: AssetEntity,
@@ -221,7 +250,7 @@ const storeMapperDependencies: Partial<Record<StoreName, MapperName[]>> = {
221250
};
222251

223252
const storeInterDependencies: Partial<Record<StoreName, StoreName[]>> = {
224-
storeAddresses: ['storeStakeKeyRegistrations'],
253+
storeAddresses: ['storeBlock', 'storeStakeKeyRegistrations'],
225254
storeAssets: ['storeBlock'],
226255
storeHandleMetadata: ['storeUtxo'],
227256
storeHandles: ['storeUtxo', 'storeAddresses', 'storeHandleMetadata'],
@@ -338,16 +367,24 @@ export const prepareTypeormProjection = (
338367
const selectedMappers = mapperSorter.nodes;
339368
const selectedStores = storeSorter.nodes;
340369
const extensions = requiredExtensions(projections);
370+
const willStoreCheckers = selectedStores
371+
.map((store) => Object.entries(storeOperators).find(([_, operator]) => store === operator)!)
372+
.map(([storeName]) => willStore[storeName as keyof StoreOperators])
373+
.filter(isNotNil);
341374
return {
342375
__debug: {
343376
entities: selectedEntities.map((Entity) => keyOf(entities, Entity)),
344377
mappers: selectedMappers.map((mapper) => keyOf(mapperOperators, mapper)),
345-
stores: selectedStores.map((store) => keyOf(storeOperators, store))
378+
stores: selectedStores.map((store) => keyOf(storeOperators, store)),
379+
willStoreCheckers: willStoreCheckers.map((checker) => checker.name)
346380
},
347381
entities: selectedEntities,
348382
extensions,
349383
mappers: selectedMappers,
350-
stores: selectedStores
384+
stores: selectedStores,
385+
willStore: <T extends ProjectionEvent>(evt: T) =>
386+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
387+
evt.eventType === ChainSyncEventType.RollBackward || willStoreCheckers.some((check) => check(evt as any))
351388
};
352389
};
353390

packages/cardano-services/test/Projection/createTypeormProjection.test.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
import { AssetEntity, OutputEntity, TokensEntity, createDataSource } from '@cardano-sdk/projection-typeorm';
1+
/* eslint-disable @typescript-eslint/no-explicit-any */
2+
import {
3+
AssetEntity,
4+
BlockEntity,
5+
OutputEntity,
6+
TokensEntity,
7+
createDataSource
8+
} from '@cardano-sdk/projection-typeorm';
29
import { ChainSyncDataSet, chainSyncData, logger } from '@cardano-sdk/util-dev';
310
import { ProjectionName, createTypeormProjection, prepareTypeormProjection } from '../../src';
411
import { lastValueFrom } from 'rxjs';
@@ -41,5 +48,48 @@ describe('createTypeormProjection', () => {
4148
await dataSource.destroy();
4249
});
4350

51+
it('only store blocks which have relevant information to the configured stores', async () => {
52+
// Setup projector
53+
const projections = [ProjectionName.Asset];
54+
const data = chainSyncData(ChainSyncDataSet.WithMint);
55+
56+
const emptyBlocksHashes = data.allEvents
57+
.filter((evt) => (evt as any).block && (evt as any).block.body.length === 0)
58+
.map((evt) => (evt as any).block.header.hash);
59+
60+
const projection$ = createTypeormProjection({
61+
blocksBufferLength: 10,
62+
cardanoNode: data.cardanoNode,
63+
connectionConfig$: projectorConnectionConfig$,
64+
devOptions: { dropSchema: true },
65+
logger,
66+
projections
67+
});
68+
69+
// Project
70+
await lastValueFrom(projection$);
71+
72+
// Setup query runner for assertions
73+
const { entities } = prepareTypeormProjection({ projections }, { logger });
74+
const dataSource = createDataSource({
75+
connectionConfig: projectorConnectionConfig,
76+
entities,
77+
logger
78+
});
79+
await dataSource.initialize();
80+
const queryRunner = dataSource.createQueryRunner();
81+
await queryRunner.connect();
82+
83+
// Check data in the database
84+
for (const emptyBlockHash of emptyBlocksHashes)
85+
expect(
86+
await queryRunner.manager.getRepository(BlockEntity).findOne({ where: { hash: emptyBlockHash } })
87+
).toBeNull();
88+
89+
expect.hasAssertions();
90+
91+
await queryRunner.release();
92+
await dataSource.destroy();
93+
});
4494
// PostgreSQL transaction retries are tested in projection-typeorm package
4595
});

packages/cardano-services/test/Projection/prepareTypeormProjection.test.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,20 @@ const prepare = (projections: ProjectionName[]) =>
55
prepareTypeormProjection({ projections }, { logger: dummyLogger }).__debug;
66

77
describe('prepareTypeormProjection', () => {
8-
describe('computes required entities, mappers and stores based on selected projections and presence of a buffer', () => {
8+
describe('computes required entities, mappers, predicates, and stores based on selected projections and presence of a buffer', () => {
99
test('utxo', () => {
10-
const { entities, mappers, stores } = prepare([ProjectionName.UTXO]);
10+
const { entities, mappers, stores, willStoreCheckers } = prepare([ProjectionName.UTXO]);
1111
expect(new Set(entities)).toEqual(new Set(['tokens', 'block', 'asset', 'nftMetadata', 'output', 'blockData']));
1212
expect(mappers).toEqual(['withMint', 'withUtxo']);
1313
expect(stores).toEqual(['storeBlock', 'storeAssets', 'storeUtxo']);
14+
expect(willStoreCheckers).toEqual(['willStoreAssets', 'willStoreUtxo']);
1415
});
1516

1617
test('stake-pool,stake-pool-metadata', () => {
17-
const { entities, mappers, stores } = prepare([ProjectionName.StakePool, ProjectionName.StakePoolMetadataJob]);
18+
const { entities, mappers, stores, willStoreCheckers } = prepare([
19+
ProjectionName.StakePool,
20+
ProjectionName.StakePoolMetadataJob
21+
]);
1822
expect(new Set(entities)).toEqual(
1923
new Set([
2024
'block',
@@ -29,6 +33,7 @@ describe('prepareTypeormProjection', () => {
2933
);
3034
expect(mappers).toEqual(['withCertificates', 'withStakePools']);
3135
expect(stores).toEqual(['storeBlock', 'storeStakePools', 'storeStakePoolMetadataJob']);
36+
expect(willStoreCheckers).toEqual(['willStoreStakePools', 'willStoreStakePoolMetadataJob']);
3237
});
3338
});
3439
});

packages/projection-typeorm/src/operators/storeAddresses.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ const lookupStakeKeyRegistration = async (pointer: Cardano.Pointer | undefined,
1616
return Hash28ByteBase16.fromEd25519KeyHashHex(stakeKeyRegistration.stakeKeyHash);
1717
};
1818

19+
export const willStoreAddresses = ({ addresses }: Mappers.WithAddresses) => addresses.length > 0;
20+
1921
export const storeAddresses = typeormOperator<Mappers.WithAddresses>(async (evt) => {
2022
const { addresses, eventType, queryRunner } = evt;
2123
if (addresses.length === 0 || eventType !== ChainSyncEventType.RollForward) return;

packages/projection-typeorm/src/operators/storeAssets.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ const rollBackward = async ({ mint, queryRunner }: StoreAssetEventParams): Promi
5353
return mintedAssetTotalSupplies;
5454
};
5555

56+
export const willStoreAssets = ({ mint }: Mappers.WithMint) => mint.length > 0;
57+
5658
export const storeAssets = typeormOperator<Mappers.WithMint, WithMintedAssetSupplies>(
5759
async ({ mint, block: { header }, eventType, queryRunner }) => {
5860
const storeAssetEventParams: StoreAssetEventParams = { header, mint, queryRunner };

packages/projection-typeorm/src/operators/storeHandleMetadata.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { Mappers } from '@cardano-sdk/projection';
44
import { WithStoredProducedUtxo } from './storeUtxo';
55
import { typeormOperator } from './util';
66

7+
export const willStoreHandleMetadata = ({ handleMetadata }: Mappers.WithHandleMetadata) => handleMetadata.length > 0;
8+
79
export const storeHandleMetadata = typeormOperator<Mappers.WithHandleMetadata & WithStoredProducedUtxo>(
810
async ({
911
eventType,

packages/projection-typeorm/src/operators/storeHandles.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ const withTotalSupplies = (
230230
)
231231
);
232232

233+
export const willStoreHandles = ({ handles }: Mappers.WithHandles) => handles.length > 0;
234+
233235
export const storeHandles = typeormOperator<Mappers.WithHandles & WithMintedAssetSupplies & Mappers.WithHandleMetadata>(
234236
async ({ handles, queryRunner, eventType, block, mintedAssetTotalSupplies, handleMetadata }) => {
235237
const handleEventParams: HandleEventParams = {

packages/projection-typeorm/src/operators/storeNftMetadata.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ const handleRollBackwardEvent = async (
104104
}
105105
};
106106

107+
export const willStoreNftMetadata = ({ nftMetadata, cip67 }: Mappers.WithCIP67 & Mappers.WithNftMetadata) =>
108+
nftMetadata.length > 0 || Object.keys(cip67.byLabel).length > 0;
109+
107110
export const storeNftMetadata = typeormOperator<Mappers.WithCIP67 & Mappers.WithMint & Mappers.WithNftMetadata>(
108111
async (evt) => {
109112
const nftMetadataRepository = evt.queryRunner.manager.getRepository(NftMetadataEntity);

packages/projection-typeorm/src/operators/storePoolMetricsUpdateJob.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import { typeormOperator } from './util';
55

66
export const createStorePoolMetricsUpdateJob = (jobFrequency = 1000, jobOutdatedFrequency?: number) => {
77
// Remember the blockNo of last sent job in order to no resend another job in case of rollback
8-
let lastSentBlock: Cardano.BlockNo | undefined;
8+
let lastAllSentBlock: Cardano.BlockNo | undefined;
9+
let lastOutdatedSentBlock: Cardano.BlockNo | undefined;
910
// Metrics updated before this slot is considered outdated
1011
let outdatedSlot: Cardano.Slot;
1112
let reachedTheTip = false;
1213

14+
// eslint-disable-next-line complexity
1315
return typeormOperator<WithPgBoss>(async ({ eventType, pgBoss, block: { header }, tip }) => {
1416
let insertFirstJob = false;
1517

@@ -20,23 +22,31 @@ export const createStorePoolMetricsUpdateJob = (jobFrequency = 1000, jobOutdated
2022

2123
const sendForAll = async () => {
2224
// run the update for all pools
23-
lastSentBlock = blockNo;
25+
lastAllSentBlock = blockNo;
26+
27+
if (lastOutdatedSentBlock === undefined) lastOutdatedSentBlock = blockNo;
28+
2429
outdatedSlot = slot;
2530
await pgBoss.send(STAKE_POOL_METRICS_UPDATE, { slot }, { slot });
2631
};
2732

2833
const sendForOutdated = async () => {
2934
// run the update for only pools with outdated metrics
30-
lastSentBlock = blockNo;
35+
lastOutdatedSentBlock = blockNo;
3136
await pgBoss.send(STAKE_POOL_METRICS_UPDATE, { outdatedSlot, slot }, { slot });
3237
};
3338

3439
if (insertFirstJob) {
3540
await sendForAll();
36-
} else if (blockNo !== lastSentBlock && reachedTheTip) {
37-
if (blockNo % jobFrequency === 0) {
41+
} else if ((blockNo !== lastAllSentBlock || blockNo !== lastOutdatedSentBlock) && reachedTheTip) {
42+
if (lastAllSentBlock && blockNo - lastAllSentBlock >= jobFrequency) {
3843
await sendForAll();
39-
} else if (jobOutdatedFrequency && outdatedSlot && blockNo % jobOutdatedFrequency === 0) {
44+
} else if (
45+
jobOutdatedFrequency &&
46+
lastOutdatedSentBlock &&
47+
outdatedSlot &&
48+
blockNo - lastOutdatedSentBlock >= jobOutdatedFrequency
49+
) {
4050
await sendForOutdated();
4151
}
4252
}

packages/projection-typeorm/src/operators/storeStakeKeyRegistrations.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { Mappers } from '@cardano-sdk/projection';
33
import { StakeKeyRegistrationEntity } from '../entity';
44
import { certificatePointerToId, typeormOperator } from './util';
55

6+
export const willStoreStakeKeyRegistrations = ({ stakeKeyRegistrations }: Mappers.WithStakeKeyRegistrations) =>
7+
stakeKeyRegistrations.length > 0;
8+
69
export const storeStakeKeyRegistrations = typeormOperator<Mappers.WithStakeKeyRegistrations>(
710
async ({ eventType, queryRunner, block, stakeKeyRegistrations }) => {
811
// Deleted by db with ON DELETE CASCADE

packages/projection-typeorm/src/operators/storeStakePoolMetadataJob.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { STAKE_POOL_METADATA_QUEUE, StakePoolMetadataJob, defaultJobOptions } fr
44
import { WithPgBoss } from './withTypeormTransaction';
55
import { certificatePointerToId, typeormOperator } from './util';
66

7+
export const willStoreStakePoolMetadataJob = ({ stakePools }: Mappers.WithStakePools) => stakePools.updates.length > 0;
8+
79
export const createStoreStakePoolMetadataJob = (retryDelay = defaultJobOptions.retryDelay) =>
810
typeormOperator<Mappers.WithStakePools & WithPgBoss>(async ({ block: { header }, eventType, pgBoss, stakePools }) => {
911
const { slot } = header;

packages/projection-typeorm/src/operators/storeStakePoolRewardsJob.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@ import { STAKE_POOL_REWARDS, defaultJobOptions } from '../pgBoss';
33
import { WithPgBoss } from './withTypeormTransaction';
44
import { typeormOperator } from './util';
55

6+
export const willStoreStakePoolRewardsJob = ({
7+
crossEpochBoundary,
8+
epochNo
9+
}: {
10+
crossEpochBoundary: boolean;
11+
epochNo: Cardano.EpochNo;
12+
}) => crossEpochBoundary && epochNo !== 1;
13+
614
export const storeStakePoolRewardsJob = typeormOperator<WithPgBoss>(
715
async ({ block: { header }, crossEpochBoundary, epochNo, eventType, pgBoss }) => {
816
const { slot } = header;

packages/projection-typeorm/src/operators/storeStakePools.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ const rollBackward = async (evt: Event) => {
312312
await undoUpdateLatestCertificatesAndDeletePoolsWithZeroRegistrations(evt);
313313
};
314314

315+
export const willStoreStakePools = ({ stakePools: { updates, retirements } }: Mappers.WithStakePools) =>
316+
updates.length > 0 || retirements.length > 0;
317+
315318
export const storeStakePools = typeormOperator<Mappers.WithStakePools>(async (evt) => {
316319
await (evt.eventType === ChainSyncEventType.RollForward ? rollForward(evt) : rollBackward(evt));
317320
});

packages/projection-typeorm/src/operators/storeUtxo.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ export interface WithStoredProducedUtxo {
1111
storedProducedUtxo: Map<Mappers.ProducedUtxo, ObjectLiteral>;
1212
}
1313

14+
export const willStoreUtxo = ({ utxo: { produced, consumed } }: Mappers.WithUtxo) =>
15+
produced.length > 0 || consumed.length > 0;
16+
1417
export const storeUtxo = typeormOperator<Mappers.WithUtxo, WithStoredProducedUtxo>(
1518
async ({ utxo: { consumed, produced }, block: { header }, eventType, queryRunner }) => {
1619
const utxoRepository = queryRunner.manager.getRepository(OutputEntity);

0 commit comments

Comments
 (0)