diff --git a/packages/transport/src/modules/sync/DatawalletModificationsProcessor.ts b/packages/transport/src/modules/sync/DatawalletModificationsProcessor.ts index 922a25082..3d3df4611 100644 --- a/packages/transport/src/modules/sync/DatawalletModificationsProcessor.ts +++ b/packages/transport/src/modules/sync/DatawalletModificationsProcessor.ts @@ -28,10 +28,9 @@ import { TokenController } from "../tokens/TokenController"; import { DatawalletModification, DatawalletModificationType } from "./local/DatawalletModification"; export class DatawalletModificationsProcessor { - private readonly creates: DatawalletModification[]; - private readonly updates: DatawalletModification[]; - private readonly deletes: DatawalletModification[]; + private readonly modificationsWithoutCacheChanges: DatawalletModification[]; private readonly cacheChanges: DatawalletModification[]; + private readonly deletedObjectIdentifiers: string[] = []; public get log(): ILogger { return this.logger; @@ -43,12 +42,8 @@ export class DatawalletModificationsProcessor { private readonly collectionProvider: IDatabaseCollectionProvider, private readonly logger: ILogger ) { - const modificationsGroupedByType = _.groupBy(modifications, (m) => m.type); - - this.creates = modificationsGroupedByType[DatawalletModificationType.Create] ?? []; - this.updates = modificationsGroupedByType[DatawalletModificationType.Update] ?? []; - this.deletes = modificationsGroupedByType[DatawalletModificationType.Delete] ?? []; - this.cacheChanges = modificationsGroupedByType[DatawalletModificationType.CacheChanged] ?? []; + this.modificationsWithoutCacheChanges = modifications.filter((m) => m.type !== DatawalletModificationType.CacheChanged); + this.cacheChanges = modifications.filter((m) => m.type === DatawalletModificationType.CacheChanged); } private readonly collectionsWithCacheableItems: string[] = [ @@ -61,41 +56,58 @@ export class DatawalletModificationsProcessor { ]; public async execute(): Promise { - await this.applyCreates(); - await this.applyUpdates(); - await this.applyDeletes(); + await this.applyModifications(); await this.applyCacheChanges(); } - private async applyCreates() { - if (this.creates.length === 0) { - return; - } - - const createsGroupedByObjectIdentifier = _.groupBy(this.creates, (c) => c.objectIdentifier); + private async applyModifications() { + const modificationsGroupedByObjectIdentifier = _.groupBy(this.modificationsWithoutCacheChanges, (m) => m.objectIdentifier); - for (const objectIdentifier in createsGroupedByObjectIdentifier) { - const currentCreates = createsGroupedByObjectIdentifier[objectIdentifier]; + for (const objectIdentifier in modificationsGroupedByObjectIdentifier) { + const currentModifications = modificationsGroupedByObjectIdentifier[objectIdentifier]; - const targetCollectionName = currentCreates[0].collection; + const targetCollectionName = currentModifications[0].collection; const targetCollection = await this.collectionProvider.getCollection(targetCollectionName); - let mergedPayload = { id: objectIdentifier }; + const lastModification = currentModifications.at(-1)!; + if (lastModification.type === DatawalletModificationType.Delete) { + await targetCollection.delete({ id: objectIdentifier }); + this.deletedObjectIdentifiers.push(objectIdentifier); - for (const create of currentCreates) { - mergedPayload = { ...mergedPayload, ...create.payload }; + continue; } - const newObject = Serializable.fromUnknown(mergedPayload); + let resultingObject: any = {}; + for (const modification of currentModifications) { + switch (modification.type) { + case DatawalletModificationType.Create: + case DatawalletModificationType.Update: + resultingObject = { ...resultingObject, ...modification.payload }; + break; + case DatawalletModificationType.Delete: + resultingObject = {}; + break; + case DatawalletModificationType.CacheChanged: + throw new TransportError("CacheChanged modifications are not allowed in this context."); + } + } const oldDoc = await targetCollection.read(objectIdentifier); if (oldDoc) { const oldObject = Serializable.fromUnknown(oldDoc); - const updatedObject = { ...oldObject.toJSON(), ...newObject.toJSON() }; - await targetCollection.update(oldDoc, updatedObject); + + const newObject = { + ...oldObject.toJSON(), + ...resultingObject + }; + + await targetCollection.update(oldDoc, newObject); } else { await this.simulateCacheChangeForCreate(targetCollectionName, objectIdentifier); - await targetCollection.create(newObject); + await targetCollection.create({ + id: objectIdentifier, + ...resultingObject + }); } } } @@ -117,26 +129,6 @@ export class DatawalletModificationsProcessor { this.cacheChanges.push(modification); } - private async applyUpdates() { - if (this.updates.length === 0) { - return; - } - - for (const updateModification of this.updates) { - const targetCollection = await this.collectionProvider.getCollection(updateModification.collection); - const oldDoc = await targetCollection.read(updateModification.objectIdentifier.toString()); - - if (!oldDoc) { - throw new TransportError("Document to update was not found."); - } - - const oldObject = Serializable.fromUnknown(oldDoc); - const newObject = { ...oldObject.toJSON(), ...updateModification.payload }; - - await targetCollection.update(oldDoc, newObject); - } - } - private async applyCacheChanges() { if (this.cacheChanges.length === 0) { return; @@ -144,30 +136,28 @@ export class DatawalletModificationsProcessor { this.ensureAllItemsAreCacheable(); - const cacheChangesWithoutDeletes = this.cacheChanges.filter((c) => !this.deletes.some((d) => d.objectIdentifier.equals(c.objectIdentifier))); + const cacheChangesWithoutDeletes = this.cacheChanges.filter((c) => !this.deletedObjectIdentifiers.some((d) => c.objectIdentifier.equals(d))); const cacheChangesGroupedByCollection = this.groupCacheChangesByCollection(cacheChangesWithoutDeletes); const caches = await this.cacheFetcher.fetchCacheFor({ files: cacheChangesGroupedByCollection.fileIds, - messages: cacheChangesGroupedByCollection.messageIds, relationshipTemplates: cacheChangesGroupedByCollection.relationshipTemplateIds, tokens: cacheChangesGroupedByCollection.tokenIds, identityDeletionProcesses: cacheChangesGroupedByCollection.identityDeletionProcessIds }); await this.saveNewCaches(caches.files, DbCollectionName.Files, File); - await this.saveNewCaches(caches.messages, DbCollectionName.Messages, Message); await this.saveNewCaches(caches.relationshipTemplates, DbCollectionName.RelationshipTemplates, RelationshipTemplate); await this.saveNewCaches(caches.tokens, DbCollectionName.Tokens, Token); await this.saveNewCaches(caches.identityDeletionProcesses, DbCollectionName.IdentityDeletionProcess, IdentityDeletionProcess); - // Need to fetch the cache for relationships after the cache for relationship templates was fetched, - // because when building the relationship cache, the cache of thecorresponding relationship template - // is needed - const relationshipCaches = await this.cacheFetcher.fetchCacheFor({ - relationships: cacheChangesGroupedByCollection.relationshipIds - }); + // Need to fetch the cache for relationships after the cache for relationship templates was fetched, because when building the relationship cache, the cache of thecorresponding relationship template is needed + const relationshipCaches = await this.cacheFetcher.fetchCacheFor({ relationships: cacheChangesGroupedByCollection.relationshipIds }); await this.saveNewCaches(relationshipCaches.relationships, DbCollectionName.Relationships, Relationship); + + // Need to fetch the cache for messages after the cache for relationships was fetched, because when building the message cache, the cache of thecorresponding relationship is needed + const messageCaches = await this.cacheFetcher.fetchCacheFor({ messages: cacheChangesGroupedByCollection.messageIds }); + await this.saveNewCaches(messageCaches.messages, DbCollectionName.Messages, Message); } @log() @@ -187,11 +177,11 @@ export class DatawalletModificationsProcessor { const fileIds = (groups[DbCollectionName.Files] ?? []).map((m) => m.objectIdentifier); const messageIds = (groups[DbCollectionName.Messages] ?? []).map((m) => m.objectIdentifier); const relationshipIds = (groups[DbCollectionName.Relationships] ?? []).map((m) => m.objectIdentifier); - const templateIds = (groups[DbCollectionName.RelationshipTemplates] ?? []).map((m) => m.objectIdentifier); + const relationshipTemplateIds = (groups[DbCollectionName.RelationshipTemplates] ?? []).map((m) => m.objectIdentifier); const tokenIds = (groups[DbCollectionName.Tokens] ?? []).map((m) => m.objectIdentifier); const identityDeletionProcessIds = (groups[DbCollectionName.IdentityDeletionProcess] ?? []).map((m) => m.objectIdentifier); - return { fileIds, messageIds, relationshipTemplateIds: templateIds, tokenIds, relationshipIds, identityDeletionProcessIds }; + return { fileIds, messageIds, relationshipTemplateIds, tokenIds, relationshipIds, identityDeletionProcessIds }; } private async saveNewCaches(caches: FetchCacheOutputItem[], collectionName: DbCollectionName, constructorOfT: new () => T) { @@ -208,17 +198,6 @@ export class DatawalletModificationsProcessor { }) ); } - - private async applyDeletes() { - if (this.deletes.length === 0) { - return; - } - - for (const deleteModification of this.deletes) { - const targetCollection = await this.collectionProvider.getCollection(deleteModification.collection); - await targetCollection.delete({ id: deleteModification.objectIdentifier.toString() }); - } - } } export class CacheFetcher { diff --git a/packages/transport/test/modules/sync/SyncController.ordered.test.ts b/packages/transport/test/modules/sync/SyncController.ordered.test.ts new file mode 100644 index 000000000..27aec0d2b --- /dev/null +++ b/packages/transport/test/modules/sync/SyncController.ordered.test.ts @@ -0,0 +1,48 @@ +import { IDatabaseConnection } from "@js-soft/docdb-access-abstractions"; +import { CoreDate } from "@nmshd/core-types"; +import { AccountController, Transport } from "../../../src"; +import { TestUtil } from "../../testHelpers/TestUtil"; + +describe("SyncController.ordered", function () { + let connection: IDatabaseConnection; + let transport: Transport; + + let sender: AccountController | undefined; + let recipient: AccountController | undefined; + let recipientSecondDevice: AccountController | undefined; + + beforeAll(async function () { + connection = await TestUtil.createDatabaseConnection(); + + transport = TestUtil.createTransport(connection, { datawalletEnabled: true }); + await transport.init(); + + sender = await TestUtil.createAccount(transport); + recipient = await TestUtil.createAccount(transport); + }); + + afterAll(async () => { + await sender?.close(); + await recipient?.close(); + await recipientSecondDevice?.close(); + + await connection.close(); + }); + + // eslint-disable-next-line jest/expect-expect -- no assertions are needed because it is sufficient that the onboarding does not throw an error + test("onboarding does not throw an exception because datawallet modifications are executed in the correct order", async function () { + const template = await sender!.relationshipTemplates.sendRelationshipTemplate({ content: {}, expiresAt: CoreDate.utc().add({ days: 1 }) }); + + // create and decompose a relationship + await TestUtil.addRelationshipWithExistingTemplate(sender!, recipient!, template); + await TestUtil.terminateAndDecomposeRelationshipMutually(sender!, recipient!); + + // create a relationship with the same template again + await TestUtil.addRelationshipWithExistingTemplate(sender!, recipient!, template); + + // onboard a second device for the recipient + const newDevice = await recipient!.devices.sendDevice({ name: "Test2", isAdmin: true }); + await recipient!.syncDatawallet(); + recipientSecondDevice = await TestUtil.onboardDevice(transport, await recipient!.devices.getSharedSecret(newDevice.id)); + }); +}); diff --git a/packages/transport/test/testHelpers/TestUtil.ts b/packages/transport/test/testHelpers/TestUtil.ts index e144aae36..a9a251579 100644 --- a/packages/transport/test/testHelpers/TestUtil.ts +++ b/packages/transport/test/testHelpers/TestUtil.ts @@ -332,7 +332,15 @@ export class TestUtil { maxNumberOfAllocations: 1 }); - const templateTo = await to.relationshipTemplates.loadPeerRelationshipTemplate(templateFrom.id, templateFrom.secretKey); + return await this.addRelationshipWithExistingTemplate(from, to, templateFrom); + } + + public static async addRelationshipWithExistingTemplate( + from: AccountController, + to: AccountController, + template: RelationshipTemplate + ): Promise<{ acceptedRelationshipFromSelf: Relationship; acceptedRelationshipPeer: Relationship }> { + const templateTo = await to.relationshipTemplates.loadPeerRelationshipTemplate(template.id, template.secretKey); const relRequest = await to.relationships.sendRelationship({ template: templateTo, @@ -385,6 +393,15 @@ export class TestUtil { return decomposedRelationshipPeer; } + public static async terminateAndDecomposeRelationshipMutually(from: AccountController, to: AccountController): Promise { + await TestUtil.terminateRelationship(from, to); + await TestUtil.decomposeRelationship(from, to); + + const relationship = (await to.relationships.getRelationshipToIdentity(from.identity.address))!; + await to.relationships.decompose(relationship.id); + await to.cleanupDataOfDecomposedRelationship(relationship); + } + public static async generateAddressPseudonym(backboneBaseUrl: string): Promise { const pseudoPublicKey = CoreBuffer.fromUtf8("deleted identity"); const pseudonym = await IdentityUtil.createAddress({ algorithm: 1, publicKey: pseudoPublicKey }, new URL(backboneBaseUrl).hostname);