From dfc630d54fd6f77d794d2bfd560dd8fc2903f501 Mon Sep 17 00:00:00 2001 From: Sebastian Mahr Date: Mon, 8 Apr 2024 12:31:41 +0200 Subject: [PATCH] Feature/Split external event processing into separate processors (#86) * chore: split external event processing into seperate processors * chore: remove circular dependency * chore: add missing file * chore: rename files and folder * chore: refactor switch case * chore: remove export of ExternalEventsProcessor * chore: move map to ExternalEventProcessorRegistry * chore: add missing file * chore: add ExternalEventProcessorRegistry to foder index * chore: unify constructors * chore: move change item seperation * chore: move to getter * chore: remove unused code and rename address to ownAddress * chore: fix generic constructor --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .ci/runChecks.sh | 0 packages/transport/src/modules/index.ts | 1 - .../src/modules/sync/ChangedItems.ts | 12 +- .../modules/sync/ExternalEventsProcessor.ts | 112 ------------------ .../src/modules/sync/SyncController.ts | 60 +++++++--- .../ExternalEventProcessor.ts | 18 +++ .../ExternalEventProcessorRegistry.ts | 35 ++++++ .../MessageDeliveredExternalEventProcessor.ts | 16 +++ .../MessageReceivedExternalEventProcessor.ts | 15 +++ ...ipChangeCompletedExternalEventProcessor.ts | 17 +++ ...shipChangeCreatedExternalEventProcessor.ts | 17 +++ .../sync/externalEventProcessors/index.ts | 5 + 12 files changed, 172 insertions(+), 136 deletions(-) mode change 100644 => 100755 .ci/runChecks.sh delete mode 100644 packages/transport/src/modules/sync/ExternalEventsProcessor.ts create mode 100644 packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessor.ts create mode 100644 packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessorRegistry.ts create mode 100644 packages/transport/src/modules/sync/externalEventProcessors/MessageDeliveredExternalEventProcessor.ts create mode 100644 packages/transport/src/modules/sync/externalEventProcessors/MessageReceivedExternalEventProcessor.ts create mode 100644 packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCompletedExternalEventProcessor.ts create mode 100644 packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCreatedExternalEventProcessor.ts create mode 100644 packages/transport/src/modules/sync/externalEventProcessors/index.ts diff --git a/.ci/runChecks.sh b/.ci/runChecks.sh old mode 100644 new mode 100755 diff --git a/packages/transport/src/modules/index.ts b/packages/transport/src/modules/index.ts index 14462b4fe..8ae9cb5e5 100644 --- a/packages/transport/src/modules/index.ts +++ b/packages/transport/src/modules/index.ts @@ -101,7 +101,6 @@ export * from "./sync/backbone/StartSyncRun"; export * from "./sync/backbone/SyncClient"; export * from "./sync/ChangedItems"; export * from "./sync/DatawalletModificationsProcessor"; -export * from "./sync/ExternalEventsProcessor"; export * from "./sync/local/DatawalletModification"; export { SyncProgressCallback as SyncPercentageCallback, SyncStep } from "./sync/SyncCallback"; export * from "./sync/SyncController"; diff --git a/packages/transport/src/modules/sync/ChangedItems.ts b/packages/transport/src/modules/sync/ChangedItems.ts index 5002a0f18..7b3c5be38 100644 --- a/packages/transport/src/modules/sync/ChangedItems.ts +++ b/packages/transport/src/modules/sync/ChangedItems.ts @@ -7,11 +7,11 @@ export class ChangedItems { public readonly messages: Message[] = [] ) {} - public addRelationship(relationship: Relationship): void { - this.relationships.push(relationship); - } - - public addMessage(message: Message): void { - this.messages.push(message); + public addItem(item: Relationship | Message): void { + if (item instanceof Message) { + this.messages.push(item); + } else if (item instanceof Relationship) { + this.relationships.push(item); + } } } diff --git a/packages/transport/src/modules/sync/ExternalEventsProcessor.ts b/packages/transport/src/modules/sync/ExternalEventsProcessor.ts deleted file mode 100644 index 8651ce921..000000000 --- a/packages/transport/src/modules/sync/ExternalEventsProcessor.ts +++ /dev/null @@ -1,112 +0,0 @@ -import { ILogger } from "@js-soft/logging-abstractions"; -import { EventBus } from "@js-soft/ts-utils"; -import { CoreId, TransportError, TransportLoggerFactory } from "../../core"; -import { MessageDeliveredEvent, MessageReceivedEvent, RelationshipChangedEvent } from "../../events"; -import { MessageController } from "../messages/MessageController"; -import { RelationshipsController } from "../relationships/RelationshipsController"; -import { ChangedItems } from "./ChangedItems"; -import { SyncProgressReporter, SyncProgressReporterStep, SyncStep } from "./SyncCallback"; -import { BackboneExternalEvent } from "./backbone/BackboneExternalEvent"; -import { FinalizeSyncRunRequestExternalEventResult } from "./backbone/FinalizeSyncRun"; - -export class ExternalEventsProcessor { - private readonly log: ILogger; - public readonly changedItems: ChangedItems = new ChangedItems(); - public readonly results: FinalizeSyncRunRequestExternalEventResult[] = []; - private readonly syncStep: SyncProgressReporterStep; - - public constructor( - private readonly messagesController: MessageController, - private readonly relationshipsController: RelationshipsController, - private readonly externalEvents: BackboneExternalEvent[], - reporter: SyncProgressReporter, - private readonly eventBus: EventBus, - private readonly ownAddress: string - ) { - this.log = TransportLoggerFactory.getLogger(ExternalEventsProcessor); - this.syncStep = reporter.createStep(SyncStep.ExternalEventsProcessing, externalEvents.length); - } - - public async execute(): Promise { - for (const externalEvent of this.externalEvents) { - try { - switch (externalEvent.type) { - case "MessageReceived": - await this.applyMessageReceivedEvent(externalEvent); - break; - case "MessageDelivered": - await this.applyMessageDeliveredEvent(externalEvent); - break; - case "RelationshipChangeCreated": - await this.applyRelationshipChangeCreatedEvent(externalEvent); - break; - case "RelationshipChangeCompleted": - await this.applyRelationshipChangeCompletedEvent(externalEvent); - break; - default: - throw new TransportError(`'${externalEvent.type}' is not a supported external event type.`); - } - - this.results.push({ - externalEventId: externalEvent.id - }); - } catch (e: any) { - this.log.error("There was an error while trying to apply an external event: ", e); - - let errorCode; - if (e.code) { - errorCode = e.code; - } else if (e.message) { - errorCode = e.message; - } else { - errorCode = JSON.stringify(e); - } - - this.results.push({ - externalEventId: externalEvent.id, - errorCode: errorCode - }); - } finally { - this.syncStep.progress(); - } - } - } - - private async applyRelationshipChangeCompletedEvent(externalEvent: BackboneExternalEvent) { - const payload = externalEvent.payload as { changeId: string }; - const relationship = await this.relationshipsController.applyChangeById(payload.changeId); - - if (relationship) { - this.eventBus.publish(new RelationshipChangedEvent(this.ownAddress, relationship)); - this.changedItems.addRelationship(relationship); - } - } - - private async applyRelationshipChangeCreatedEvent(externalEvent: BackboneExternalEvent) { - const payload = externalEvent.payload as { changeId: string; relationshipId: string }; - const relationship = await this.relationshipsController.applyChangeById(payload.changeId); - - if (relationship) { - this.eventBus.publish(new RelationshipChangedEvent(this.ownAddress, relationship)); - this.changedItems.addRelationship(relationship); - } - } - - private async applyMessageDeliveredEvent(externalEvent: BackboneExternalEvent) { - const messageReceivedPayload = externalEvent.payload as { id: string }; - const updatedMessages = await this.messagesController.updateCache([messageReceivedPayload.id]); - - const deliveredMessage = updatedMessages[0]; - - this.eventBus.publish(new MessageDeliveredEvent(this.ownAddress, deliveredMessage)); - this.changedItems.addMessage(deliveredMessage); - } - - private async applyMessageReceivedEvent(externalEvent: BackboneExternalEvent) { - const newMessagePayload = externalEvent.payload as { id: string }; - const newMessage = await this.messagesController.loadPeerMessage(CoreId.from(newMessagePayload.id)); - - this.eventBus.publish(new MessageReceivedEvent(this.ownAddress, newMessage)); - this.changedItems.addMessage(newMessage); - } -} diff --git a/packages/transport/src/modules/sync/SyncController.ts b/packages/transport/src/modules/sync/SyncController.ts index 1ce9c6c29..ba676ba0d 100644 --- a/packages/transport/src/modules/sync/SyncController.ts +++ b/packages/transport/src/modules/sync/SyncController.ts @@ -3,27 +3,28 @@ import { log } from "@js-soft/ts-utils"; import { ControllerName, CoreDate, CoreError, CoreErrors, CoreId, RequestError, TransportController, TransportError, TransportLoggerFactory } from "../../core"; import { DependencyOverrides } from "../../core/DependencyOverrides"; import { AccountController } from "../accounts/AccountController"; -import { ChangedItems } from "./ChangedItems"; -import { DatawalletModificationMapper } from "./DatawalletModificationMapper"; -import { CacheFetcher, DatawalletModificationsProcessor } from "./DatawalletModificationsProcessor"; -import { ExternalEventsProcessor } from "./ExternalEventsProcessor"; -import { SyncProgressReporter, SyncStep } from "./SyncCallback"; -import { WhatToSync } from "./WhatToSync"; import { BackboneDatawalletModification } from "./backbone/BackboneDatawalletModification"; import { BackboneSyncRun } from "./backbone/BackboneSyncRun"; import { CreateDatawalletModificationsRequestItem } from "./backbone/CreateDatawalletModifications"; import { FinalizeSyncRunRequestExternalEventResult } from "./backbone/FinalizeSyncRun"; import { StartSyncRunStatus, SyncRunType } from "./backbone/StartSyncRun"; import { ISyncClient, SyncClient } from "./backbone/SyncClient"; +import { ChangedItems } from "./ChangedItems"; +import { DatawalletModificationMapper } from "./DatawalletModificationMapper"; +import { CacheFetcher, DatawalletModificationsProcessor } from "./DatawalletModificationsProcessor"; +import { ExternalEventProcessorRegistry } from "./externalEventProcessors"; import { DatawalletModification } from "./local/DatawalletModification"; import { DeviceMigrations } from "./migrations/DeviceMigrations"; import { IdentityMigrations } from "./migrations/IdentityMigrations"; +import { SyncProgressReporter, SyncStep } from "./SyncCallback"; +import { WhatToSync } from "./WhatToSync"; export class SyncController extends TransportController { private syncInfo: IDatabaseMap; private readonly client: ISyncClient; private readonly deviceMigrations: DeviceMigrations; private readonly identityMigrations: IdentityMigrations; + private readonly externalEventRegistry = new ExternalEventProcessorRegistry(); private _cacheFetcher?: CacheFetcher; private get cacheFetcher() { @@ -404,21 +405,46 @@ export class SyncController extends TransportController { const externalEvents = await getExternalEventsResult.value.collect(); - const externalEventProcessor = new ExternalEventsProcessor( - this.parent.messages, - this.parent.relationships, - externalEvents, - reporter, - this.eventBus, - this.parent.identity.address.toString() - ); - await externalEventProcessor.execute(); + const syncStep = reporter.createStep(SyncStep.ExternalEventsProcessing, externalEvents.length); + const results: FinalizeSyncRunRequestExternalEventResult[] = []; + const changedItems = new ChangedItems(); + + for (const externalEvent of externalEvents) { + try { + const externalEventProcessorConstructor = this.externalEventRegistry.getProcessorForItem(externalEvent.type); + const item = await new externalEventProcessorConstructor(this.eventBus, this.parent).execute(externalEvent); + + if (item) changedItems.addItem(item); + + results.push({ + externalEventId: externalEvent.id + }); + } catch (e: any) { + this.log.error("There was an error while trying to apply an external event: ", e); + + let errorCode; + if (e.code) { + errorCode = e.code; + } else if (e.message) { + errorCode = e.message; + } else { + errorCode = JSON.stringify(e); + } + + results.push({ + externalEventId: externalEvent.id, + errorCode: errorCode + }); + } finally { + syncStep.progress(); + } + } externalEventStep.finish(); return { - externalEventResults: externalEventProcessor.results, - changedItems: externalEventProcessor.changedItems + externalEventResults: results, + changedItems: changedItems }; } diff --git a/packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessor.ts b/packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessor.ts new file mode 100644 index 000000000..9370572f5 --- /dev/null +++ b/packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessor.ts @@ -0,0 +1,18 @@ +import { EventBus } from "@js-soft/ts-utils"; +import { AccountController } from "../../accounts/AccountController"; +import { Message } from "../../messages/local/Message"; +import { Relationship } from "../../relationships/local/Relationship"; +import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent"; + +export type ExternalEventProcessorConstructor = new (eventBus: EventBus, accountController: AccountController) => ExternalEventProcessor; + +export abstract class ExternalEventProcessor { + public constructor( + protected readonly eventBus: EventBus, + protected readonly accountController: AccountController + ) {} + public abstract execute(externalEvent: BackboneExternalEvent): Promise; + protected get ownAddress(): string { + return this.accountController.identity.address.toString(); + } +} diff --git a/packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessorRegistry.ts b/packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessorRegistry.ts new file mode 100644 index 000000000..6d5644b8c --- /dev/null +++ b/packages/transport/src/modules/sync/externalEventProcessors/ExternalEventProcessorRegistry.ts @@ -0,0 +1,35 @@ +import { TransportError } from "../../../core"; +import { ExternalEventProcessorConstructor } from "./ExternalEventProcessor"; +import { MessageDeliveredExternalEventProcessor } from "./MessageDeliveredExternalEventProcessor"; +import { MessageReceivedExternalEventProcessor } from "./MessageReceivedExternalEventProcessor"; +import { RelationshipChangeCompletedExternalEventProcessor } from "./RelationshipChangeCompletedExternalEventProcessor"; +import { RelationshipChangeCreatedExternalEventProcessor } from "./RelationshipChangeCreatedExternalEventProcessor"; + +export class ExternalEventProcessorRegistry { + private readonly processors = new Map(); + public constructor() { + this.registerProcessor("MessageReceived", MessageReceivedExternalEventProcessor); + this.registerProcessor("MessageDelivered", MessageDeliveredExternalEventProcessor); + this.registerProcessor("RelationshipChangeCreated", RelationshipChangeCreatedExternalEventProcessor); + this.registerProcessor("RelationshipChangeCompleted", RelationshipChangeCompletedExternalEventProcessor); + } + + public registerProcessor(externalEventName: string, externalEventProcessor: ExternalEventProcessorConstructor): void { + if (this.processors.has(externalEventName)) { + throw new TransportError(`There is already a externalEventProcessor registered for '${externalEventName}'. Use 'replaceProcessorForType' if you want to replace it.`); + } + this.processors.set(externalEventName, externalEventProcessor); + } + + public registerOrReplaceProcessor(externalEventName: string, externalEventProcessor: ExternalEventProcessorConstructor): void { + this.processors.set(externalEventName, externalEventProcessor); + } + + public getProcessorForItem(externalEventName: string): ExternalEventProcessorConstructor { + const externalEventProcessor = this.processors.get(externalEventName); + if (!externalEventProcessor) { + throw new TransportError(`There was no processor registered for '${externalEventName}'.`); + } + return externalEventProcessor; + } +} diff --git a/packages/transport/src/modules/sync/externalEventProcessors/MessageDeliveredExternalEventProcessor.ts b/packages/transport/src/modules/sync/externalEventProcessors/MessageDeliveredExternalEventProcessor.ts new file mode 100644 index 000000000..bba868f08 --- /dev/null +++ b/packages/transport/src/modules/sync/externalEventProcessors/MessageDeliveredExternalEventProcessor.ts @@ -0,0 +1,16 @@ +import { MessageDeliveredEvent } from "../../../events"; +import { Message } from "../../messages/local/Message"; +import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent"; +import { ExternalEventProcessor } from "./ExternalEventProcessor"; + +export class MessageDeliveredExternalEventProcessor extends ExternalEventProcessor { + public override async execute(externalEvent: BackboneExternalEvent): Promise { + const messageReceivedPayload = externalEvent.payload as { id: string }; + const updatedMessages = await this.accountController.messages.updateCache([messageReceivedPayload.id]); + + const deliveredMessage = updatedMessages[0]; + + this.eventBus.publish(new MessageDeliveredEvent(this.ownAddress, deliveredMessage)); + return deliveredMessage; + } +} diff --git a/packages/transport/src/modules/sync/externalEventProcessors/MessageReceivedExternalEventProcessor.ts b/packages/transport/src/modules/sync/externalEventProcessors/MessageReceivedExternalEventProcessor.ts new file mode 100644 index 000000000..6ba8dcd85 --- /dev/null +++ b/packages/transport/src/modules/sync/externalEventProcessors/MessageReceivedExternalEventProcessor.ts @@ -0,0 +1,15 @@ +import { CoreId } from "../../../core"; +import { MessageReceivedEvent } from "../../../events"; +import { Message } from "../../messages/local/Message"; +import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent"; +import { ExternalEventProcessor } from "./ExternalEventProcessor"; + +export class MessageReceivedExternalEventProcessor extends ExternalEventProcessor { + public override async execute(externalEvent: BackboneExternalEvent): Promise { + const newMessagePayload = externalEvent.payload as { id: string }; + const newMessage = await this.accountController.messages.loadPeerMessage(CoreId.from(newMessagePayload.id)); + + this.eventBus.publish(new MessageReceivedEvent(this.ownAddress, newMessage)); + return newMessage; + } +} diff --git a/packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCompletedExternalEventProcessor.ts b/packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCompletedExternalEventProcessor.ts new file mode 100644 index 000000000..7b50c595e --- /dev/null +++ b/packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCompletedExternalEventProcessor.ts @@ -0,0 +1,17 @@ +import { RelationshipChangedEvent } from "../../../events"; +import { Relationship } from "../../relationships/local/Relationship"; +import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent"; +import { ExternalEventProcessor } from "./ExternalEventProcessor"; + +export class RelationshipChangeCompletedExternalEventProcessor extends ExternalEventProcessor { + public override async execute(externalEvent: BackboneExternalEvent): Promise { + const payload = externalEvent.payload as { changeId: string }; + const relationship = await this.accountController.relationships.applyChangeById(payload.changeId); + + if (relationship) { + this.eventBus.publish(new RelationshipChangedEvent(this.ownAddress, relationship)); + return relationship; + } + return; + } +} diff --git a/packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCreatedExternalEventProcessor.ts b/packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCreatedExternalEventProcessor.ts new file mode 100644 index 000000000..511a22b92 --- /dev/null +++ b/packages/transport/src/modules/sync/externalEventProcessors/RelationshipChangeCreatedExternalEventProcessor.ts @@ -0,0 +1,17 @@ +import { RelationshipChangedEvent } from "../../../events"; +import { Relationship } from "../../relationships/local/Relationship"; +import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent"; +import { ExternalEventProcessor } from "./ExternalEventProcessor"; + +export class RelationshipChangeCreatedExternalEventProcessor extends ExternalEventProcessor { + public override async execute(externalEvent: BackboneExternalEvent): Promise { + const payload = externalEvent.payload as { changeId: string; relationshipId: string }; + const relationship = await this.accountController.relationships.applyChangeById(payload.changeId); + + if (relationship) { + this.eventBus.publish(new RelationshipChangedEvent(this.ownAddress, relationship)); + return relationship; + } + return; + } +} diff --git a/packages/transport/src/modules/sync/externalEventProcessors/index.ts b/packages/transport/src/modules/sync/externalEventProcessors/index.ts new file mode 100644 index 000000000..9fcb7e88d --- /dev/null +++ b/packages/transport/src/modules/sync/externalEventProcessors/index.ts @@ -0,0 +1,5 @@ +export * from "./ExternalEventProcessorRegistry"; +export * from "./MessageDeliveredExternalEventProcessor"; +export * from "./MessageReceivedExternalEventProcessor"; +export * from "./RelationshipChangeCompletedExternalEventProcessor"; +export * from "./RelationshipChangeCreatedExternalEventProcessor";