Skip to content

Commit 8f15f6f

Browse files
committed
refactor(projection)!: replace projectIntoSink 'sinks' prop with 'sinksFactory'
defer creating sinks until projection is subscribed allow projectIntoSink with empty projections
1 parent 0463c9d commit 8f15f6f

File tree

5 files changed

+144
-91
lines changed

5 files changed

+144
-91
lines changed

packages/e2e/test/projection/offline-fork.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ describe('resuming projection when intersection is not local tip', () => {
9494
cardanoNode,
9595
logger,
9696
projections,
97-
sinks: inMemorySinks
97+
sinksFactory: () => inMemorySinks
9898
});
9999

100100
// Project some events until we find at least 1 stake key registration

packages/projection/src/projectIntoSink.ts

+133-87
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import {
2828
tap
2929
} from 'rxjs';
3030
import { Projection } from './projections';
31-
import { Sink, Sinks } from './sinks';
31+
import { Sink, Sinks, SinksFactory } from './sinks';
3232
import { UnifiedProjectorEvent } from './types';
3333
import { WithNetworkInfo, withNetworkInfo, withRolledBackBlock } from './operators';
3434
import { combineProjections } from './combineProjections';
@@ -38,7 +38,7 @@ import uniq from 'lodash/uniq';
3838

3939
export interface ProjectIntoSinkProps<P, PS extends P> {
4040
projections: P;
41-
sinks: Sinks<PS>;
41+
sinksFactory: SinksFactory<PS>;
4242
cardanoNode: ObservableCardanoNode;
4343
logger: Logger;
4444
}
@@ -65,72 +65,91 @@ const blocksToPoints = (blocks: Array<Cardano.Block | 'origin'>) =>
6565
const pointDescription = (point: PointOrOrigin) =>
6666
point === 'origin' ? 'origin' : `slot ${point.slot}, block ${point.hash}`;
6767

68-
// TODO: try to write types that will infer returned observable type from supplied projections.
69-
// Inferring properties added by sinks (e.g. before() and after()) would be nice too, but probably not necessary.
70-
/**
71-
* @throws {@link InvalidIntersectionError} when no intersection with provided {@link sinks.StabilityWindowBuffer} is found.
72-
*/
73-
export const projectIntoSink = <P extends object, PS extends P>(
74-
props: ProjectIntoSinkProps<P, PS>
75-
): Observable<ProjectionsEvent<P>> => {
76-
const logger = contextLogger(props.logger, 'Projector');
77-
78-
const syncFromIntersection = ({ intersection, chainSync$ }: ObservableChainSync) =>
79-
new Observable<UnifiedProjectorEvent<{}>>((observer) => {
80-
logger.info(`Starting ChainSync from ${pointDescription(intersection.point)}`);
81-
return chainSync$.pipe(withRolledBackBlock(props.sinks.buffer)).subscribe(observer);
82-
});
68+
const syncFromIntersection = <PS>({
69+
chainSync: { intersection, chainSync$ },
70+
logger,
71+
sinks
72+
}: {
73+
logger: Logger;
74+
sinks: Sinks<PS>;
75+
chainSync: ObservableChainSync;
76+
}) =>
77+
new Observable<UnifiedProjectorEvent<{}>>((observer) => {
78+
logger.info(`Starting ChainSync from ${pointDescription(intersection.point)}`);
79+
return chainSync$.pipe(withRolledBackBlock(sinks.buffer)).subscribe(observer);
80+
});
8381

84-
const rollbackAndSyncFromIntersection = (initialChainSync: ObservableChainSync, tail: Cardano.Block | 'origin') =>
85-
new Observable<UnifiedProjectorEvent<{}>>((subscriber) => {
86-
logger.warn('Rolling back to find intersection');
87-
let skipFindingNewIntersection = true;
88-
let chainSync = initialChainSync;
89-
const rollback$ = props.sinks.buffer.tip$.pipe(
90-
takeWhile((block): block is Cardano.Block => block !== 'origin'),
91-
mergeMap((block): Observable<Cardano.Block> => {
92-
// we already have an intersection for the 1st tip
93-
if (skipFindingNewIntersection) {
94-
skipFindingNewIntersection = false;
95-
return of(block);
96-
}
97-
// try to find intersection with new tip
98-
return props.cardanoNode.findIntersect(blocksToPoints([block, tail, 'origin'])).pipe(
99-
take(1),
100-
tap((newChainSync) => {
101-
chainSync = newChainSync;
102-
}),
103-
map(() => block)
104-
);
105-
}),
106-
takeWhile((block) => !isIntersectionBlock(block, chainSync.intersection)),
107-
mergeMap(
108-
(block): Observable<UnifiedProjectorEvent<{}>> =>
109-
of({
110-
block,
111-
eventType: ChainSyncEventType.RollBackward,
112-
point: chainSync.intersection.point,
113-
// requestNext is a no-op when rolling back during initialization, because projectIntoSink will
114-
// delete block from the buffer for every RollBackward event via `manageBuffer`,
115-
// which will trigger the buffer to emit the next tip$
116-
requestNext: noop,
117-
tip: chainSync.intersection.tip
118-
})
119-
)
120-
);
121-
return concat(
122-
rollback$,
123-
defer(() => syncFromIntersection(chainSync))
124-
).subscribe(subscriber);
125-
});
82+
const rollbackAndSyncFromIntersection = <PS>({
83+
sinks,
84+
cardanoNode,
85+
initialChainSync,
86+
logger,
87+
tail
88+
}: {
89+
sinks: Sinks<PS>;
90+
cardanoNode: ObservableCardanoNode;
91+
initialChainSync: ObservableChainSync;
92+
logger: Logger;
93+
tail: Cardano.Block | 'origin';
94+
}) =>
95+
new Observable<UnifiedProjectorEvent<{}>>((subscriber) => {
96+
logger.warn('Rolling back to find intersection');
97+
let skipFindingNewIntersection = true;
98+
let chainSync = initialChainSync;
99+
const rollback$ = sinks.buffer.tip$.pipe(
100+
takeWhile((block): block is Cardano.Block => block !== 'origin'),
101+
mergeMap((block): Observable<Cardano.Block> => {
102+
// we already have an intersection for the 1st tip
103+
if (skipFindingNewIntersection) {
104+
skipFindingNewIntersection = false;
105+
return of(block);
106+
}
107+
// try to find intersection with new tip
108+
return cardanoNode.findIntersect(blocksToPoints([block, tail, 'origin'])).pipe(
109+
take(1),
110+
tap((newChainSync) => {
111+
chainSync = newChainSync;
112+
}),
113+
map(() => block)
114+
);
115+
}),
116+
takeWhile((block) => !isIntersectionBlock(block, chainSync.intersection)),
117+
mergeMap(
118+
(block): Observable<UnifiedProjectorEvent<{}>> =>
119+
of({
120+
block,
121+
eventType: ChainSyncEventType.RollBackward,
122+
point: chainSync.intersection.point,
123+
// requestNext is a no-op when rolling back during initialization, because projectIntoSink will
124+
// delete block from the buffer for every RollBackward event via `manageBuffer`,
125+
// which will trigger the buffer to emit the next tip$
126+
requestNext: noop,
127+
tip: chainSync.intersection.tip
128+
})
129+
)
130+
);
131+
return concat(
132+
rollback$,
133+
defer(() => syncFromIntersection({ chainSync, logger, sinks }))
134+
).subscribe(subscriber);
135+
});
126136

127-
const source$ = combineLatest([props.sinks.buffer.tip$, props.sinks.buffer.tail$]).pipe(
137+
const createChainSyncSource = <PS>({
138+
sinks,
139+
logger,
140+
cardanoNode
141+
}: {
142+
cardanoNode: ObservableCardanoNode;
143+
logger: Logger;
144+
sinks: Sinks<PS>;
145+
}) =>
146+
combineLatest([sinks.buffer.tip$, sinks.buffer.tail$]).pipe(
128147
take(1),
129148
mergeMap((blocks) => {
130149
const points = blocksToPoints(blocks);
131150
logger.info(`Starting projector with local tip at ${pointDescription(points[0])}`);
132151

133-
return props.cardanoNode.findIntersect(points).pipe(
152+
return cardanoNode.findIntersect(points).pipe(
134153
take(1),
135154
mergeMap((initialChainSync) => {
136155
if (initialChainSync.intersection.point === 'origin') {
@@ -149,41 +168,68 @@ export const projectIntoSink = <P extends object, PS extends P>(
149168
);
150169
}
151170
// buffer is empty, sync from origin
152-
return syncFromIntersection(initialChainSync);
171+
return syncFromIntersection({ chainSync: initialChainSync, logger, sinks });
153172
}
154173
if (blocks[0] !== 'origin' && initialChainSync.intersection.point.hash !== blocks[0].header.hash) {
155174
// rollback to intersection, then sync from intersection
156-
return rollbackAndSyncFromIntersection(initialChainSync, blocks[1]);
175+
return rollbackAndSyncFromIntersection({
176+
cardanoNode,
177+
initialChainSync,
178+
logger,
179+
sinks,
180+
tail: blocks[1]
181+
});
157182
}
158183
// intersection is at tip$ - no rollback, just sync from intersection
159-
return syncFromIntersection(initialChainSync);
184+
return syncFromIntersection({ chainSync: initialChainSync, logger, sinks });
160185
})
161186
);
162187
}),
163-
withNetworkInfo(props.cardanoNode)
188+
withNetworkInfo(cardanoNode)
164189
);
165-
// eslint-disable-next-line prefer-spread
166-
const projected$ = source$.pipe.apply(source$, combineProjections(props.projections) as any);
167-
const sinks: Sink<any, any>[] = Object.keys(props.sinks.projectionSinks)
168-
.filter((k) => k in props.projections)
169-
.map((k) => (props.sinks.projectionSinks as any)[k]);
170-
return projected$.pipe(
171-
props.sinks.before || passthrough(),
172-
concatMap((evt) => {
173-
const projectionSinks = sinks.map((sink) => sink.sink(evt));
174-
const projectorEvent = evt as UnifiedProjectorEvent<WithNetworkInfo>;
175-
return combineLatest(projectionSinks.map((o$) => o$.pipe(defaultIfEmpty(null)))).pipe(map(() => projectorEvent));
176-
}),
177-
props.sinks.buffer.handleEvents,
178-
props.sinks.after || passthrough(),
179-
tap((evt) => {
180-
logger.debug(
181-
`Processed event ${
182-
evt.eventType === ChainSyncEventType.RollForward ? 'RollForward' : 'RollBackward'
183-
} ${pointDescription(evt.block.header)}`
190+
191+
// TODO: try to write types that will infer returned observable type from supplied projections.
192+
// Inferring properties added by sinks (e.g. before() and after()) would be nice too, but probably not necessary.
193+
/**
194+
* @throws {@link InvalidIntersectionError} when no intersection with provided {@link selectedSinks.StabilityWindowBuffer} is found.
195+
*/
196+
export const projectIntoSink = <P extends object, PS extends P>({
197+
cardanoNode,
198+
logger: baseLogger,
199+
projections,
200+
sinksFactory
201+
}: ProjectIntoSinkProps<P, PS>): Observable<ProjectionsEvent<P>> => {
202+
const logger = contextLogger(baseLogger, 'Projector');
203+
204+
return defer(() => of(sinksFactory())).pipe(
205+
mergeMap((sinks) => {
206+
const source$ = createChainSyncSource({ cardanoNode, logger, sinks });
207+
// eslint-disable-next-line prefer-spread
208+
const projected$ = source$.pipe.apply(source$, combineProjections(projections) as any);
209+
const selectedSinks: Sink<any, any>[] = Object.keys(sinks.projectionSinks)
210+
.filter((k) => k in projections)
211+
.map((k) => (sinks.projectionSinks as any)[k]);
212+
return projected$.pipe(
213+
sinks.before || passthrough(),
214+
concatMap((evt) => {
215+
const projectionSinks = selectedSinks.map((sink) => sink.sink(evt));
216+
const projectorEvent = evt as UnifiedProjectorEvent<WithNetworkInfo>;
217+
return projectionSinks.length > 0
218+
? combineLatest(projectionSinks.map((o$) => o$.pipe(defaultIfEmpty(null)))).pipe(map(() => projectorEvent))
219+
: of(projectorEvent);
220+
}),
221+
sinks.buffer.handleEvents,
222+
sinks.after || passthrough(),
223+
tap((evt) => {
224+
logger.debug(
225+
`Processed event ${
226+
evt.eventType === ChainSyncEventType.RollForward ? 'RollForward' : 'RollBackward'
227+
} ${pointDescription(evt.block.header)}`
228+
);
229+
evt.requestNext();
230+
}),
231+
finalize(() => logger.info('Stopped'))
184232
);
185-
evt.requestNext();
186-
}),
187-
finalize(() => logger.info('Stopped'))
233+
})
188234
);
189235
};

packages/projection/src/sinks/inMemory/create.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { AllProjections } from '../../projections';
22
import { InMemoryStabilityWindowBuffer } from './InMemoryStabilityWindowBuffer';
33
import { InMemoryStore } from './types';
4-
import { Sinks } from '../types';
4+
import { Sinks, SinksFactory } from '../types';
55
import { stakeKeys } from './stakeKeys';
66
import { stakePools } from './stakePools';
77
import { withStaticContext } from '../../operators';
@@ -20,4 +20,9 @@ export const createSinks = (store: InMemoryStore): Sinks<AllProjections> => ({
2020
}
2121
});
2222

23+
export const createSinksFactory =
24+
(store: InMemoryStore): SinksFactory<AllProjections> =>
25+
() =>
26+
createSinks(store);
27+
2328
export type InMemorySinks = ReturnType<typeof createSinks>;

packages/projection/src/sinks/types.ts

+2
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,5 @@ export type Sinks<Projections> = {
5757

5858
type InferArg<T> = T extends (arg: infer Arg) => any ? Arg : never;
5959
export type SinkEventType<S extends { sink: any }> = InferArg<S['sink']>;
60+
61+
export type SinksFactory<P> = () => Sinks<P>;

packages/projection/test/projectIntoSink.test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const projectAll = ({ cardanoNode }: StubChainSyncData, projectionSinks: Sinks<P
1212
cardanoNode,
1313
logger,
1414
projections: Projections.allProjections,
15-
sinks: projectionSinks
15+
sinksFactory: () => projectionSinks
1616
}).pipe(toArray())
1717
);
1818

@@ -167,7 +167,7 @@ describe('projectIntoSink', () => {
167167
// not projecting stakePools
168168
stakeKeys: Projections.stakeKeys
169169
},
170-
sinks: inMemorySinks
170+
sinksFactory: () => inMemorySinks
171171
}).pipe(toArray())
172172
);
173173
expect(store.stakeKeys.size).toBeGreaterThan(0);

0 commit comments

Comments
 (0)