Skip to content

Commit

Permalink
Feature/Split external event processing into separate processors (#86)
Browse files Browse the repository at this point in the history
* chore: split external event processing into seperate processors

* chore: remove circular dependency

* chore: add missing file

* chore: rename files and folder

* chore: refactor switch case

* chore: remove export of ExternalEventsProcessor

* chore: move map to ExternalEventProcessorRegistry

* chore: add missing file

* chore: add ExternalEventProcessorRegistry to foder index

* chore: unify constructors

* chore: move change item seperation

* chore: move to getter

* chore: remove unused code and rename address to ownAddress

* chore: fix generic constructor

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
sebbi08 and mergify[bot] authored Apr 8, 2024
1 parent 36daefa commit dfc630d
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 136 deletions.
Empty file modified .ci/runChecks.sh
100644 → 100755
Empty file.
1 change: 0 additions & 1 deletion packages/transport/src/modules/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ export * from "./sync/backbone/StartSyncRun";
export * from "./sync/backbone/SyncClient";
export * from "./sync/ChangedItems";
export * from "./sync/DatawalletModificationsProcessor";
export * from "./sync/ExternalEventsProcessor";
export * from "./sync/local/DatawalletModification";
export { SyncProgressCallback as SyncPercentageCallback, SyncStep } from "./sync/SyncCallback";
export * from "./sync/SyncController";
Expand Down
12 changes: 6 additions & 6 deletions packages/transport/src/modules/sync/ChangedItems.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ export class ChangedItems {
public readonly messages: Message[] = []
) {}

public addRelationship(relationship: Relationship): void {
this.relationships.push(relationship);
}

public addMessage(message: Message): void {
this.messages.push(message);
public addItem(item: Relationship | Message): void {
if (item instanceof Message) {
this.messages.push(item);
} else if (item instanceof Relationship) {
this.relationships.push(item);
}
}
}
112 changes: 0 additions & 112 deletions packages/transport/src/modules/sync/ExternalEventsProcessor.ts

This file was deleted.

60 changes: 43 additions & 17 deletions packages/transport/src/modules/sync/SyncController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@ import { log } from "@js-soft/ts-utils";
import { ControllerName, CoreDate, CoreError, CoreErrors, CoreId, RequestError, TransportController, TransportError, TransportLoggerFactory } from "../../core";
import { DependencyOverrides } from "../../core/DependencyOverrides";
import { AccountController } from "../accounts/AccountController";
import { ChangedItems } from "./ChangedItems";
import { DatawalletModificationMapper } from "./DatawalletModificationMapper";
import { CacheFetcher, DatawalletModificationsProcessor } from "./DatawalletModificationsProcessor";
import { ExternalEventsProcessor } from "./ExternalEventsProcessor";
import { SyncProgressReporter, SyncStep } from "./SyncCallback";
import { WhatToSync } from "./WhatToSync";
import { BackboneDatawalletModification } from "./backbone/BackboneDatawalletModification";
import { BackboneSyncRun } from "./backbone/BackboneSyncRun";
import { CreateDatawalletModificationsRequestItem } from "./backbone/CreateDatawalletModifications";
import { FinalizeSyncRunRequestExternalEventResult } from "./backbone/FinalizeSyncRun";
import { StartSyncRunStatus, SyncRunType } from "./backbone/StartSyncRun";
import { ISyncClient, SyncClient } from "./backbone/SyncClient";
import { ChangedItems } from "./ChangedItems";
import { DatawalletModificationMapper } from "./DatawalletModificationMapper";
import { CacheFetcher, DatawalletModificationsProcessor } from "./DatawalletModificationsProcessor";
import { ExternalEventProcessorRegistry } from "./externalEventProcessors";
import { DatawalletModification } from "./local/DatawalletModification";
import { DeviceMigrations } from "./migrations/DeviceMigrations";
import { IdentityMigrations } from "./migrations/IdentityMigrations";
import { SyncProgressReporter, SyncStep } from "./SyncCallback";
import { WhatToSync } from "./WhatToSync";

export class SyncController extends TransportController {
private syncInfo: IDatabaseMap;
private readonly client: ISyncClient;
private readonly deviceMigrations: DeviceMigrations;
private readonly identityMigrations: IdentityMigrations;
private readonly externalEventRegistry = new ExternalEventProcessorRegistry();

private _cacheFetcher?: CacheFetcher;
private get cacheFetcher() {
Expand Down Expand Up @@ -404,21 +405,46 @@ export class SyncController extends TransportController {

const externalEvents = await getExternalEventsResult.value.collect();

const externalEventProcessor = new ExternalEventsProcessor(
this.parent.messages,
this.parent.relationships,
externalEvents,
reporter,
this.eventBus,
this.parent.identity.address.toString()
);
await externalEventProcessor.execute();
const syncStep = reporter.createStep(SyncStep.ExternalEventsProcessing, externalEvents.length);
const results: FinalizeSyncRunRequestExternalEventResult[] = [];
const changedItems = new ChangedItems();

for (const externalEvent of externalEvents) {
try {
const externalEventProcessorConstructor = this.externalEventRegistry.getProcessorForItem(externalEvent.type);
const item = await new externalEventProcessorConstructor(this.eventBus, this.parent).execute(externalEvent);

if (item) changedItems.addItem(item);

results.push({
externalEventId: externalEvent.id
});
} catch (e: any) {
this.log.error("There was an error while trying to apply an external event: ", e);

let errorCode;
if (e.code) {
errorCode = e.code;
} else if (e.message) {
errorCode = e.message;
} else {
errorCode = JSON.stringify(e);
}

results.push({
externalEventId: externalEvent.id,
errorCode: errorCode
});
} finally {
syncStep.progress();
}
}

externalEventStep.finish();

return {
externalEventResults: externalEventProcessor.results,
changedItems: externalEventProcessor.changedItems
externalEventResults: results,
changedItems: changedItems
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { EventBus } from "@js-soft/ts-utils";
import { AccountController } from "../../accounts/AccountController";
import { Message } from "../../messages/local/Message";
import { Relationship } from "../../relationships/local/Relationship";
import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent";

export type ExternalEventProcessorConstructor = new (eventBus: EventBus, accountController: AccountController) => ExternalEventProcessor;

export abstract class ExternalEventProcessor {
public constructor(
protected readonly eventBus: EventBus,
protected readonly accountController: AccountController
) {}
public abstract execute(externalEvent: BackboneExternalEvent): Promise<Message | Relationship | undefined>;
protected get ownAddress(): string {
return this.accountController.identity.address.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { TransportError } from "../../../core";
import { ExternalEventProcessorConstructor } from "./ExternalEventProcessor";
import { MessageDeliveredExternalEventProcessor } from "./MessageDeliveredExternalEventProcessor";
import { MessageReceivedExternalEventProcessor } from "./MessageReceivedExternalEventProcessor";
import { RelationshipChangeCompletedExternalEventProcessor } from "./RelationshipChangeCompletedExternalEventProcessor";
import { RelationshipChangeCreatedExternalEventProcessor } from "./RelationshipChangeCreatedExternalEventProcessor";

export class ExternalEventProcessorRegistry {
private readonly processors = new Map<string, ExternalEventProcessorConstructor>();
public constructor() {
this.registerProcessor("MessageReceived", MessageReceivedExternalEventProcessor);
this.registerProcessor("MessageDelivered", MessageDeliveredExternalEventProcessor);
this.registerProcessor("RelationshipChangeCreated", RelationshipChangeCreatedExternalEventProcessor);
this.registerProcessor("RelationshipChangeCompleted", RelationshipChangeCompletedExternalEventProcessor);
}

public registerProcessor(externalEventName: string, externalEventProcessor: ExternalEventProcessorConstructor): void {
if (this.processors.has(externalEventName)) {
throw new TransportError(`There is already a externalEventProcessor registered for '${externalEventName}'. Use 'replaceProcessorForType' if you want to replace it.`);
}
this.processors.set(externalEventName, externalEventProcessor);
}

public registerOrReplaceProcessor(externalEventName: string, externalEventProcessor: ExternalEventProcessorConstructor): void {
this.processors.set(externalEventName, externalEventProcessor);
}

public getProcessorForItem(externalEventName: string): ExternalEventProcessorConstructor {
const externalEventProcessor = this.processors.get(externalEventName);
if (!externalEventProcessor) {
throw new TransportError(`There was no processor registered for '${externalEventName}'.`);
}
return externalEventProcessor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { MessageDeliveredEvent } from "../../../events";
import { Message } from "../../messages/local/Message";
import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent";
import { ExternalEventProcessor } from "./ExternalEventProcessor";

export class MessageDeliveredExternalEventProcessor extends ExternalEventProcessor {
public override async execute(externalEvent: BackboneExternalEvent): Promise<Message> {
const messageReceivedPayload = externalEvent.payload as { id: string };
const updatedMessages = await this.accountController.messages.updateCache([messageReceivedPayload.id]);

const deliveredMessage = updatedMessages[0];

this.eventBus.publish(new MessageDeliveredEvent(this.ownAddress, deliveredMessage));
return deliveredMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { CoreId } from "../../../core";
import { MessageReceivedEvent } from "../../../events";
import { Message } from "../../messages/local/Message";
import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent";
import { ExternalEventProcessor } from "./ExternalEventProcessor";

export class MessageReceivedExternalEventProcessor extends ExternalEventProcessor {
public override async execute(externalEvent: BackboneExternalEvent): Promise<Message> {
const newMessagePayload = externalEvent.payload as { id: string };
const newMessage = await this.accountController.messages.loadPeerMessage(CoreId.from(newMessagePayload.id));

this.eventBus.publish(new MessageReceivedEvent(this.ownAddress, newMessage));
return newMessage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { RelationshipChangedEvent } from "../../../events";
import { Relationship } from "../../relationships/local/Relationship";
import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent";
import { ExternalEventProcessor } from "./ExternalEventProcessor";

export class RelationshipChangeCompletedExternalEventProcessor extends ExternalEventProcessor {
public override async execute(externalEvent: BackboneExternalEvent): Promise<Relationship | undefined> {
const payload = externalEvent.payload as { changeId: string };
const relationship = await this.accountController.relationships.applyChangeById(payload.changeId);

if (relationship) {
this.eventBus.publish(new RelationshipChangedEvent(this.ownAddress, relationship));
return relationship;
}
return;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { RelationshipChangedEvent } from "../../../events";
import { Relationship } from "../../relationships/local/Relationship";
import { BackboneExternalEvent } from "../backbone/BackboneExternalEvent";
import { ExternalEventProcessor } from "./ExternalEventProcessor";

export class RelationshipChangeCreatedExternalEventProcessor extends ExternalEventProcessor {
public override async execute(externalEvent: BackboneExternalEvent): Promise<Relationship | undefined> {
const payload = externalEvent.payload as { changeId: string; relationshipId: string };
const relationship = await this.accountController.relationships.applyChangeById(payload.changeId);

if (relationship) {
this.eventBus.publish(new RelationshipChangedEvent(this.ownAddress, relationship));
return relationship;
}
return;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export * from "./ExternalEventProcessorRegistry";
export * from "./MessageDeliveredExternalEventProcessor";
export * from "./MessageReceivedExternalEventProcessor";
export * from "./RelationshipChangeCompletedExternalEventProcessor";
export * from "./RelationshipChangeCreatedExternalEventProcessor";

0 comments on commit dfc630d

Please sign in to comment.