Skip to content

Commit e46273a

Browse files
authored
serialize event count in store function call (#1065)
if multiple events are processed concurrently, all event process request might read the initial store size and write to the store, potentially exceeding the store size limit. Serializing the store size read should fix this. Once the size is loaded in memory, further event process read should just read the in memory value
1 parent 49f19a6 commit e46273a

File tree

1 file changed

+15
-7
lines changed

1 file changed

+15
-7
lines changed

lib/event_processor/batch_event_processor.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
7474
private batchSize: number;
7575
private eventStore?: Store<EventWithId>;
7676
private eventCountInStore: Maybe<number> = undefined;
77+
private eventCountWaitPromise: Promise<unknown> = Promise.resolve();
7778
private maxEventsInStore: number = MAX_EVENTS_IN_STORE;
7879
private dispatchRepeater: Repeater;
7980
private failedEventRepeater?: Repeater;
@@ -264,15 +265,22 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
264265
}
265266
}
266267

267-
private async findEventCountInStore(): Promise<void> {
268+
private async readEventCountInStore(store: Store<EventWithId>): Promise<void> {
269+
try {
270+
const keys = await store.getKeys();
271+
this.eventCountInStore = keys.length;
272+
} catch (e) {
273+
this.logger?.error(e);
274+
}
275+
}
276+
277+
private async findEventCountInStore(): Promise<unknown> {
268278
if (this.eventStore && this.eventCountInStore === undefined) {
269-
try {
270-
const keys = await this.eventStore.getKeys();
271-
this.eventCountInStore = keys.length;
272-
} catch (e) {
273-
this.logger?.error(e);
274-
}
279+
const store = this.eventStore;
280+
this.eventCountWaitPromise = this.eventCountWaitPromise.then(() => this.readEventCountInStore(store));
281+
return this.eventCountWaitPromise;
275282
}
283+
return Promise.resolve();
276284
}
277285

278286
private async storeEvent(eventWithId: EventWithId): Promise<void> {

0 commit comments

Comments
 (0)