diff --git a/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift b/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift index 688cda5c1ac..e7b497f67d0 100644 --- a/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift +++ b/WireAPI/Sources/WireAPI/Network/NetworkService/NetworkService.swift @@ -111,8 +111,10 @@ extension NetworkService: URLSessionWebSocketDelegate { didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data? ) { - webSocketsByTask[webSocketTask]?.close() - webSocketsByTask[webSocketTask] = nil + Task { + await webSocketsByTask[webSocketTask]?.close() + webSocketsByTask[webSocketTask] = nil + } } } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift index 3fc55920206..0760adb01d6 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift @@ -19,20 +19,20 @@ import Foundation import WireFoundation -final class PushChannel: PushChannelProtocol { +public final class PushChannel: PushChannelProtocol { - typealias Stream = AsyncThrowingStream + public typealias Stream = AsyncThrowingStream private let webSocket: any WebSocketProtocol private let decoder = JSONDecoder() - init(webSocket: any WebSocketProtocol) { + public init(webSocket: any WebSocketProtocol) { self.webSocket = webSocket } - func open() throws -> Stream { + public func open() async throws -> Stream { print("opening new push channel") - return try webSocket.open().map { [weak self, decoder] message in + return try await webSocket.open().map { [weak self, decoder] message in do { switch message { case let .data(data): @@ -50,15 +50,15 @@ final class PushChannel: PushChannelProtocol { } } catch { print("failed to get next web socket message: \(error)") - self?.close() + await self?.close() throw error } }.toStream() } - func close() { + public func close() async { print("closing push channel") - webSocket.close() + await webSocket.close() } } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelProtocol.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelProtocol.swift index aa8ce989005..15c4bdd016c 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelProtocol.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelProtocol.swift @@ -20,16 +20,16 @@ import Foundation // sourcery: AutoMockable /// Make a direct connection to a server to receive update events. -public protocol PushChannelProtocol { +public protocol PushChannelProtocol: Sendable { /// Open the push channel and start receiving update events. /// /// - Returns: An async stream of live update event envelopes. - func open() throws -> AsyncThrowingStream + func open() async throws -> AsyncThrowingStream /// Close the push channel and stop receiving update events. - func close() + func close() async } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift index e1fac36b02b..842e629f9fc 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/PushChannelService.swift @@ -37,7 +37,7 @@ public final class PushChannelService: PushChannelServiceProtocol { private let networkService: NetworkService private let authenticationManager: any AuthenticationManagerProtocol - init( + public init( networkService: NetworkService, authenticationManager: any AuthenticationManagerProtocol ) { diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift index 16b087959b2..f1a3bf66a01 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/URLSessionWebSocketTaskProtocol.swift @@ -19,7 +19,7 @@ import Foundation // sourcery: AutoMockable -protocol URLSessionWebSocketTaskProtocol { +public protocol URLSessionWebSocketTaskProtocol: Sendable { var isOpen: Bool { get } @@ -30,7 +30,7 @@ protocol URLSessionWebSocketTaskProtocol { reason: Data? ) - func receive(completionHandler: @escaping (Result) -> Void) + func receive(completionHandler: @escaping @Sendable (Result) -> Void) func receive() async throws -> URLSessionWebSocketTask.Message @@ -38,7 +38,7 @@ protocol URLSessionWebSocketTaskProtocol { extension URLSessionWebSocketTask: URLSessionWebSocketTaskProtocol { - var isOpen: Bool { + public var isOpen: Bool { closeCode == .invalid } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift index 7f5924b3859..23d75798df3 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift @@ -18,22 +18,18 @@ import Foundation -final class WebSocket: WebSocketProtocol { +public actor WebSocket: WebSocketProtocol { - typealias Stream = AsyncThrowingStream + public typealias Stream = AsyncThrowingStream private let connection: any URLSessionWebSocketTaskProtocol private var continuation: Stream.Continuation? - init(connection: any URLSessionWebSocketTaskProtocol) { + public init(connection: any URLSessionWebSocketTaskProtocol) { self.connection = connection } - deinit { - close() - } - - func open() throws -> Stream { + public func open() async throws -> Stream { connection.resume() if #available(iOS 17, *) { @@ -62,7 +58,7 @@ final class WebSocket: WebSocketProtocol { return Stream { continuation in self.continuation = continuation - func yieldNextMessage() { + @Sendable func yieldNextMessage() { guard connection.isOpen else { continuation.finish() return @@ -85,7 +81,7 @@ final class WebSocket: WebSocketProtocol { } } - func close() { + public func close() async { connection.cancel(with: .goingAway, reason: nil) continuation?.finish() } diff --git a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift index 92e0f18913a..5e885f119ee 100644 --- a/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift +++ b/WireAPI/Sources/WireAPI/Network/PushChannel/WebSocketProtocol.swift @@ -19,10 +19,10 @@ import Foundation // sourcery: AutoMockable -protocol WebSocketProtocol { +public protocol WebSocketProtocol: Sendable { - func open() throws -> AsyncThrowingStream + func open() async throws -> AsyncThrowingStream - func close() + func close() async } diff --git a/WireAPI/Tests/WireAPITests/Network/PushChannel/PushChannelTests.swift b/WireAPI/Tests/WireAPITests/Network/PushChannel/PushChannelTests.swift index af9111de799..349994dfbbd 100644 --- a/WireAPI/Tests/WireAPITests/Network/PushChannel/PushChannelTests.swift +++ b/WireAPI/Tests/WireAPITests/Network/PushChannel/PushChannelTests.swift @@ -57,7 +57,7 @@ final class PushChannelTests: XCTestCase { } // When the push channel is open and the stream is iterated - let liveEventEnvelopes = try sut.open() + let liveEventEnvelopes = try await sut.open() var receivedEnvelopes = [UpdateEventEnvelope]() for try await envelope in liveEventEnvelopes { @@ -74,10 +74,10 @@ final class PushChannelTests: XCTestCase { func testClosingPushChannel() async throws { // Given an open push channel webSocket.open_MockValue = AsyncThrowingStream { _ in } - _ = try sut.open() + _ = try await sut.open() // When the push channel is closed - sut.close() + await sut.close() // Then the web socket was closed XCTAssertEqual(webSocket.close_Invocations.count, 1) @@ -91,7 +91,7 @@ final class PushChannelTests: XCTestCase { // Don't call finish, so the stream stays open. } - let liveEventEnvelopes = try sut.open() + let liveEventEnvelopes = try await sut.open() do { for try await _ in liveEventEnvelopes { @@ -115,7 +115,7 @@ final class PushChannelTests: XCTestCase { // Don't call finish, so the stream stays open. } - let liveEventEnvelopes = try sut.open() + let liveEventEnvelopes = try await sut.open() do { for try await _ in liveEventEnvelopes { diff --git a/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift b/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift index 652fdc16670..4252417ba91 100644 --- a/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift +++ b/WireAPI/Tests/WireAPITests/Network/PushChannel/WebSocketTests.swift @@ -37,19 +37,6 @@ final class WebSocketTests: XCTestCase { try await super.tearDown() } - func testWebSocketCancelsWhenItDeinitializes() async throws { - // When - { - _ = WebSocket(connection: self.connection) - }() - - // Then - let invocations = connection.cancelWithReason_Invocations - try XCTAssertCount(invocations, count: 1) - XCTAssertEqual(invocations[0].closeCode, .goingAway) - XCTAssertNil(invocations[0].reason) - } - func testWebSocketCloses() async throws { // Given we're iterating over the web socket let sut = WebSocket(connection: connection) @@ -69,7 +56,7 @@ final class WebSocketTests: XCTestCase { Task { do { - for try await _ in try sut.open() { + for try await _ in try await sut.open() { didReceiveMessage.fulfill() } } catch { @@ -83,7 +70,7 @@ final class WebSocketTests: XCTestCase { await fulfillment(of: [didReceiveMessage], timeout: 1) // When - sut.close() + await sut.close() // Then the stream finished successfully await fulfillment(of: [didFinishIterating], timeout: 1) @@ -113,7 +100,7 @@ final class WebSocketTests: XCTestCase { Task { do { - for try await _ in try sut.open() { + for try await _ in try await sut.open() { didReceiveMessage.fulfill() } } catch { @@ -157,7 +144,7 @@ final class WebSocketTests: XCTestCase { Task { do { - for try await _ in try sut.open() { + for try await _ in try await sut.open() { didReceiveMessage.fulfill() } } catch { @@ -199,7 +186,7 @@ final class WebSocketTests: XCTestCase { // When do { - for try await message in try sut.open() { + for try await message in try await sut.open() { if case let .data(data) = message { receivedMessageData.append(data) } diff --git a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsRepositoryError.swift b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+IncrementalSyncProvider.swift similarity index 81% rename from WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsRepositoryError.swift rename to WireDomain/Sources/WireDomain/Components/ClientSessionComponent+IncrementalSyncProvider.swift index 909dc214c6f..b7f515d5047 100644 --- a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsRepositoryError.swift +++ b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+IncrementalSyncProvider.swift @@ -18,9 +18,10 @@ import Foundation -enum UpdateEventsRepositoryError: Error { +extension ClientSessionComponent: IncrementalSyncProvider { - case lastEventIDMissing - case failedToDecodeStoredEvent(Error) + public func provideIncrementalSync() throws -> IncrementalSync { + incrementalSync + } } diff --git a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+InitialSyncBuilder.swift b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+InitialSyncProvider.swift similarity index 84% rename from WireDomain/Sources/WireDomain/Components/ClientSessionComponent+InitialSyncBuilder.swift rename to WireDomain/Sources/WireDomain/Components/ClientSessionComponent+InitialSyncProvider.swift index 943a7b7043c..41bf5f66420 100644 --- a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+InitialSyncBuilder.swift +++ b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent+InitialSyncProvider.swift @@ -18,9 +18,9 @@ import Foundation -extension ClientSessionComponent: InitialSyncBuilderProtocol { +extension ClientSessionComponent: InitialSyncProvider { - public func buildInitialSync() throws -> any InitialSyncProtocol { + public func provideInitialSync() throws -> any InitialSyncProtocol { initialSync } diff --git a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift index baccf0eb37e..b38fad113e1 100644 --- a/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift +++ b/WireDomain/Sources/WireDomain/Components/ClientSessionComponent.swift @@ -26,6 +26,7 @@ public final class ClientSessionComponent { private let selfClientID: String private let networkService: NetworkService + private let pushChannelNetworkService: NetworkService private let apiVersion: WireAPI.APIVersion private let localDomain: String @@ -45,6 +46,7 @@ public final class ClientSessionComponent { selfUserID: UUID, selfClientID: String, networkService: NetworkService, + pushChannelNetworkService: NetworkService, apiVersion: WireAPI.APIVersion, localDomain: String, isFederationEnabled: Bool, @@ -61,6 +63,7 @@ public final class ClientSessionComponent { self.selfClientID = selfClientID self.cookieStorage = cookieStorage self.networkService = networkService + self.pushChannelNetworkService = pushChannelNetworkService self.apiVersion = apiVersion self.sharedUserDefaults = sharedUserDefaults self.syncContext = syncContext @@ -95,6 +98,8 @@ public final class ClientSessionComponent { private lazy var featureConfigsAPI: any FeatureConfigsAPI = FeatureConfigsAPIBuilder(apiService: apiService) .makeAPI(for: apiVersion) + private lazy var pushChannelAPI: any PushChannelAPI = PushChannelAPIBuilder(pushChannelService: pushChannelService).makeAPI() + private lazy var selfUserAPI: any SelfUserAPI = SelfUserAPIBuilder(apiService: apiService).makeAPI(for: apiVersion) private lazy var teamsAPI: any TeamsAPI = TeamsAPIBuilder(apiService: apiService).makeAPI(for: apiVersion) @@ -207,6 +212,13 @@ public final class ClientSessionComponent { store: backendConfigLocalStore ) + private lazy var pullPendingUpdateEventsSync: some PullPendingUpdateEventsSyncProtocol = PullPendingUpdateEventsSync( + selfClientID: selfClientID, + api: updateEventsAPI, + store: updateEventsLocalStore, + decryptor: updateEventDecryptor + ) + private lazy var pullSelfLegalholdInfoSync: some PullSelfLegalholdInfoSyncProtocol = PullSelfLegalholdInfoSync( selfUserID: selfUserID, api: teamsAPI, @@ -287,8 +299,301 @@ public final class ClientSessionComponent { ) }() + private lazy var pushChannelService = PushChannelService( + networkService: pushChannelNetworkService, + authenticationManager: authenticationManager + ) + + public lazy var incrementalSync = IncrementalSync( + selfClientID: selfClientID, + pushChannelAPI: pushChannelAPI, + updateEventsSync: pullPendingUpdateEventsSync, + decryptor: updateEventDecryptor, + store: updateEventsLocalStore, + processor: updateEventProcessor + ) + + // MARK: - Repositories + + private lazy var conversationLabelsRepository: some ConversationLabelsRepositoryProtocol = ConversationLabelsRepository( + userPropertiesAPI: userPropertiesAPI, + conversationLabelsLocalStore: conversationLabelsLocalStore + ) + + private lazy var conversationRepository: some ConversationRepositoryProtocol = ConversationRepository( + conversationsAPI: conversationsAPI, + conversationsLocalStore: conversationLocalStore, + userLocalStore: userLocalStore, + teamRepository: teamRepository, + messageRepository: messageRepository, + backendInfo: .init( + domain: localDomain, + isFederationEnabled: isFederationEnabled, + isMLSEnabled: isMLSEnabled + ), + mlsProvider: mlsProvider + ) + + private lazy var featureConfigRepository: some FeatureConfigRepositoryProtocol = FeatureConfigRepository( + featureConfigsAPI: featureConfigsAPI, + featureConfigLocalStore: featureConfigsLocalStore + ) + + private lazy var messageRepository: some MessageRepositoryProtocol = MessageRepository( + localStore: messageLocalStore + ) + + private lazy var teamRepository: some TeamRepositoryProtocol = TeamRepository( + userRepository: userRepository, + teamLocalStore: teamLocalStore, + teamsAPI: teamsAPI + ) + + private lazy var userClientsRepository: some UserClientsRepositoryProtocol = UserClientsRepository( + userClientsAPI: userClientsAPI, + userRepository: userRepository, + userClientsLocalStore: userClientsLocalStore + ) + + private lazy var userConnectionsRepository: some ConnectionsRepositoryProtocol = ConnectionsRepository( + connectionsAPI: userConnectionsAPI, + connectionsLocalStore: userConnectionsStore + ) + + private lazy var userRepository: some UserRepositoryProtocol = UserRepository( + usersAPI: usersAPI, + selfUserAPI: selfUserAPI, + conversationLabelsRepository: conversationLabelsRepository, + conversationLocalStore: conversationLocalStore, + userLocalStore: userLocalStore + ) + + // MARK: - Update events + + private lazy var updateEventDecryptor: some UpdateEventDecryptorProtocol = { + let messageRepository = MessageRepository(localStore: messageLocalStore) + return UpdateEventDecryptor( + proteusService: proteusService, + mlsService: mlsService, + mlsDecryptionService: mlsDecryptionService, + userClientsLocalStore: userClientsLocalStore, + messageRepository: messageRepository, + userLocalStore: userLocalStore, + conversationLocalStore: conversationLocalStore + ) + }() + + private lazy var conversationAccessUpdateEventProcessor: some ConversationAccessUpdateEventProcessorProtocol = ConversationAccessUpdateEventProcessor( + repository: conversationRepository, + localStore: conversationLocalStore + ) + + private lazy var conversationCreateEventProcessor: some ConversationCreateEventProcessorProtocol = ConversationCreateEventProcessor( + repository: conversationRepository + ) + + private lazy var conversationDeleteEventProcessor: some ConversationDeleteEventProcessorProtocol = ConversationDeleteEventProcessor( + repository: conversationRepository + ) + + private lazy var conversationMemberJoinEventProcessor: some ConversationMemberJoinEventProcessorProtocol = ConversationMemberJoinEventProcessor( + conversationRepository: conversationRepository, + conversationLocalStore: conversationLocalStore, + userRepository: userRepository + ) + + private lazy var conversationMemberLeaveEventProcessor: some ConversationMemberLeaveEventProcessorProtocol = ConversationMemberLeaveEventProcessor( + repository: conversationRepository + ) + + private lazy var conversationMemberUpdateEventProcessor: some ConversationMemberUpdateEventProcessorProtocol = ConversationMemberUpdateEventProcessor( + conversationRepository: conversationRepository, + userRepository: userRepository, + localStore: conversationLocalStore + ) + + private lazy var conversationMessageTimerUpdateEventProcessor: some ConversationMessageTimerUpdateEventProcessorProtocol = ConversationMessageTimerUpdateEventProcessor( + conversationLocalStore: conversationLocalStore, + messageLocalStore: messageLocalStore + ) + + private lazy var conversationMLSMessageAddEventProcessor: some ConversationMLSMessageAddEventProcessorProtocol = ConversationMLSMessageAddEventProcessor( + conversationLocalStore: conversationLocalStore, + messageLocalStore: messageLocalStore, + userLocalStore: userLocalStore, + protobufMessageProcessor: conversationProtobufMessageProcessor + ) + + private lazy var conversationMLSWelcomeEventProcessor: some ConversationMLSWelcomeEventProcessorProtocol = ConversationMLSWelcomeEventProcessor( + conversationRepository: conversationRepository, + conversationLocalStore: conversationLocalStore, + mlsService: mlsService, + mlsDecryptionService: mlsDecryptionService, + oneOnOneResolver: oneOnOneResolver + ) + + private lazy var conversationProteusMessageAddEventProcessor: some ConversationProteusMessageAddEventProcessorProtocol = ConversationProteusMessageAddEventProcessor( + conversationLocalStore: conversationLocalStore, + messageLocalStore: messageLocalStore, + userLocalStore: userLocalStore, + protobufMessageProcessor: conversationProtobufMessageProcessor + ) + + private lazy var conversationProtocolUpdateEventProcessor: some ConversationProtocolUpdateEventProcessorProtocol = ConversationProtocolUpdateEventProcessor( + repository: conversationRepository + ) + + private lazy var conversationReceiptModeUpdateEventProcessor: some ConversationReceiptModeUpdateEventProcessorProtocol = ConversationReceiptModeUpdateEventProcessor( + userRepository: userRepository, + conversationRepository: conversationRepository, + conversationLocalStore: conversationLocalStore, + messageRepository: messageRepository + ) + + private lazy var conversationRenameEventProcessor: some ConversationRenameEventProcessorProtocol = ConversationRenameEventProcessor( + repository: conversationRepository + ) + + private lazy var conversationTypingEventProcessor: some ConversationTypingEventProcessorProtocol = ConversationTypingEventProcessor( + conversationRepository: conversationRepository, + conversationLocalStore: conversationLocalStore, + userRepository: userRepository + ) + + private lazy var featureConfigUpdateEventProcessor: some FeatureConfigUpdateEventProcessorProtocol = FeatureConfigUpdateEventProcessor( + repository: featureConfigRepository + ) + + private lazy var federationConnectionRemovedEventProcessor: some FederationConnectionRemovedEventProcessorProtocol = FederationConnectionRemovedEventProcessor( + context: syncContext + ) + + private lazy var federationDeleteEventProcessor: some FederationDeleteEventProcessorProtocol = FederationDeleteEventProcessor( + context: syncContext + ) + + private lazy var userClientAddEventProcessor: some UserClientAddEventProcessorProtocol = UserClientAddEventProcessor( + repository: userClientsRepository + ) + + private lazy var userClientRemoveEventProcessor: some UserClientRemoveEventProcessorProtocol = UserClientRemoveEventProcessor() + + private lazy var userConnectionEventProcessor: some UserConnectionEventProcessorProtocol = UserConnectionEventProcessor( + connectionsRepository: userConnectionsRepository, + oneOnOneResolver: oneOnOneResolver + ) + + private lazy var userDeleteEventProcessor: some UserDeleteEventProcessorProtocol = UserDeleteEventProcessor( + repository: userRepository + ) + + private lazy var userLegalholdDisableEventProcessor: some UserLegalholdDisableEventProcessorProtocol = UserLegalholdDisableEventProcessor( + repository: userRepository + ) + + private lazy var userLegalholdEnableEventProcessor: some UserLegalholdEnableEventProcessorProtocol = UserLegalholdEnableEventProcessor( + context: syncContext, + userRepository: userRepository, + userClientsRepository: userClientsRepository + ) + + private lazy var userLegalholdRequestEventProcessor: some UserLegalholdRequestEventProcessorProtocol = UserLegalholdRequestEventProcessor( + repository: userRepository + ) + + private lazy var userPropertiesSetEventProcessor: some UserPropertiesSetEventProcessorProtocol = UserPropertiesSetEventProcessor( + repository: userRepository + ) + + private lazy var userPropertiesDeleteEventProcessor: some UserPropertiesDeleteEventProcessorProtocol = UserPropertiesDeleteEventProcessor( + repository: userRepository + ) + + private lazy var userPushRemoveEventProcessor: some UserPushRemoveEventProcessorProtocol = UserPushRemoveEventProcessor( + repository: userRepository + ) + + private lazy var userUpdateEventProcessor: some UserUpdateEventProcessorProtocol = UserUpdateEventProcessor( + repository: userRepository + ) + + private lazy var teamDeleteEventProcessor: some TeamDeleteEventProcessorProtocol = TeamDeleteEventProcessor( + context: syncContext + ) + + private lazy var teamMemberLeaveEventProcessor: some TeamMemberLeaveEventProcessorProtocol = TeamMemberLeaveEventProcessor( + repository: teamRepository + ) + + private lazy var teamMemberUpdateEventProcessor: some TeamMemberUpdateEventProcessorProtocol = TeamMemberUpdateEventProcessor( + repository: teamRepository + ) + + private lazy var updateEventProcessor: some UpdateEventProcessorProtocol = { + let conversationEventProcessor = ConversationEventProcessor( + accessUpdateEventProcessor: conversationAccessUpdateEventProcessor, + createEventProcessor: conversationCreateEventProcessor, + deleteEventProcessor: conversationDeleteEventProcessor, + memberJoinEventProcessor: conversationMemberJoinEventProcessor, + memberLeaveEventProcessor: conversationMemberLeaveEventProcessor, + memberUpdateEventProcessor: conversationMemberUpdateEventProcessor, + messageTimerUpdateEventProcessor: conversationMessageTimerUpdateEventProcessor, + mlsMessageAddEventProcessor: conversationMLSMessageAddEventProcessor, + mlsWelcomeEventProcessor: conversationMLSWelcomeEventProcessor, + proteusMessageAddEventProcessor: conversationProteusMessageAddEventProcessor, + protocolUpdateEventProcessor: conversationProtocolUpdateEventProcessor, + receiptModeUpdateEventProcessor: conversationReceiptModeUpdateEventProcessor, + renameEventProcessor: conversationRenameEventProcessor, + typingEventProcessor: conversationTypingEventProcessor + ) + + let featureConfigEventProcessor = FeatureConfigEventProcessor( + updateEventProcessor: featureConfigUpdateEventProcessor + ) + + let federationEventProcessor = FederationEventProcessor( + connectionRemovedEventProcessor: federationConnectionRemovedEventProcessor, + deleteEventProcessor: federationDeleteEventProcessor + ) + + let userEventProcessor = UserEventProcessor( + clientAddEventProcessor: userClientAddEventProcessor, + clientRemoveEventProcessor: userClientRemoveEventProcessor, + connectionEventProcessor: userConnectionEventProcessor, + deleteEventProcessor: userDeleteEventProcessor, + legalholdDisableEventProcessor: userLegalholdDisableEventProcessor, + legalholdEnableEventProcessor: userLegalholdEnableEventProcessor, + legalholdRequestEventProcessor: userLegalholdRequestEventProcessor, + propertiesSetEventProcessor: userPropertiesSetEventProcessor, + propertiesDeleteEventProcessor: userPropertiesDeleteEventProcessor, + pushRemoveEventProcessor: userPushRemoveEventProcessor, + updateEventProcessor: userUpdateEventProcessor + ) + + let teamEventProcessor = TeamEventProcessor( + deleteEventProcessor: teamDeleteEventProcessor, + memberLeaveEventProcessor: teamMemberLeaveEventProcessor, + memberUpdateEventProcessor: teamMemberUpdateEventProcessor + ) + + // TODO: fix featureConfig typo + return UpdateEventProcessor( + conversationEventProcessor: conversationEventProcessor, + featureconfigEventProcessor: featureConfigEventProcessor, + federationEventProcessor: federationEventProcessor, + userEventProcessor: userEventProcessor, + teamEventProcessor: teamEventProcessor + ) + }() + // MARK: - Other + private lazy var conversationProtobufMessageProcessor: some ConversationProtobufMessageProcessorProtocol = ConversationProtobufMessageProcessor( + messageLocalStore: messageLocalStore, + conversationLocalStore: conversationLocalStore, + userLocalStore: userLocalStore + ) + private lazy var oneOnOneResolver: some OneOnOneResolverProtocol = OneOnOneResolver( context: syncContext, userLocalStore: userLocalStore, diff --git a/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift b/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift index 68c831fc36b..4e3c2217a6f 100644 --- a/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift +++ b/WireDomain/Sources/WireDomain/Components/UserSessionComponent.swift @@ -103,6 +103,21 @@ public final class UserSessionComponent { return networkService }() + private lazy var pushChannelNetworkService: NetworkService = { + let networkService = NetworkService( + baseURL: backendEnvironment.webSocketURL, + serverTrustValidator: serverTrustValidator + ) + let config = urlSessionConfigurationFactory.makeWebSocketSessionConfiguration() + let session = URLSession( + configuration: config, + delegate: networkService, + delegateQueue: nil + ) + networkService.configure(with: session) + return networkService + }() + // MARK: - Children public func clientSessionComponent(clientID: String) -> ClientSessionComponent { @@ -110,6 +125,7 @@ public final class UserSessionComponent { selfUserID: selfUserID, selfClientID: clientID, networkService: networkService, + pushChannelNetworkService: pushChannelNetworkService, apiVersion: apiVersion, localDomain: localDomain, isFederationEnabled: isFederationEnabled, diff --git a/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationEventProcessor.swift b/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationEventProcessor.swift index 05d916bb83e..286cebd2d02 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationEventProcessor.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationEventProcessor.swift @@ -18,7 +18,7 @@ import WireAPI -struct ConversationEventProcessor { +struct ConversationEventProcessor: ConversationEventProcessorProtocol { let accessUpdateEventProcessor: any ConversationAccessUpdateEventProcessorProtocol let createEventProcessor: any ConversationCreateEventProcessorProtocol diff --git a/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationProtobufMessageProcessor.swift b/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationProtobufMessageProcessor.swift index 1d1779de73c..d3c188ff0b7 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationProtobufMessageProcessor.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/ConversationEventProcessor/ConversationProtobufMessageProcessor.swift @@ -210,12 +210,20 @@ struct ConversationProtobufMessageProcessor: ConversationProtobufMessageProcesso date: Date, logAttributes: LogAttributes ) async throws { - let (assetClientMessage, isNew) = try await messageLocalStore.fetchOrCreateAssetClientMessage( - id: message.messageID, - conversation: conversation, - sender: (sender.id, sender.domain, sender.clientID), - date: date - ) + let (assetClientMessage, isNew): (ZMAssetClientMessage, Bool) + do { + (assetClientMessage, isNew) = try await messageLocalStore.fetchOrCreateAssetClientMessage( + id: message.messageID, + conversation: conversation, + sender: (sender.id, sender.domain, sender.clientID), + date: date + ) + } catch MessageLocalStore.Failure.invalidInsertion(reason: let reason) { + return WireLogger.eventProcessing.warn( + "failed to process asset message, dropping. Reason: \(reason)", + attributes: logAttributes + ) + } await messageLocalStore.addAssetClientMessage( assetClientMessage, @@ -234,12 +242,20 @@ struct ConversationProtobufMessageProcessor: ConversationProtobufMessageProcesso date: Date, logAttributes: LogAttributes ) async throws { - let (clientMessage, isNew) = try await messageLocalStore.fetchOrCreateClientMessage( - id: message.messageID, - conversation: conversation, - sender: (sender.id, sender.domain, sender.clientID), - date: date - ) + let (clientMessage, isNew): (ZMClientMessage, isNew: Bool) + do { + (clientMessage, isNew) = try await messageLocalStore.fetchOrCreateClientMessage( + id: message.messageID, + conversation: conversation, + sender: (sender.id, sender.domain, sender.clientID), + date: date + ) + } catch MessageLocalStore.Failure.invalidInsertion(reason: let reason) { + return WireLogger.eventProcessing.warn( + "failed to process message, dropping. Reason: \(reason)", + attributes: logAttributes + ) + } await messageLocalStore.addClientMessage( clientMessage, diff --git a/WireDomain/Sources/WireDomain/Event Processing/FeatureConfigEventProcessor/FeatureConfigEventProcessor.swift b/WireDomain/Sources/WireDomain/Event Processing/FeatureConfigEventProcessor/FeatureConfigEventProcessor.swift index 95339b1591d..d2aa4f24221 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/FeatureConfigEventProcessor/FeatureConfigEventProcessor.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/FeatureConfigEventProcessor/FeatureConfigEventProcessor.swift @@ -19,7 +19,7 @@ import Foundation import WireAPI -struct FeatureConfigEventProcessor { +struct FeatureConfigEventProcessor: FeatureConfigEventProcessorProtocol { let updateEventProcessor: any FeatureConfigUpdateEventProcessorProtocol diff --git a/WireDomain/Sources/WireDomain/Event Processing/FederationEventProcessor/FederationEventProcessor.swift b/WireDomain/Sources/WireDomain/Event Processing/FederationEventProcessor/FederationEventProcessor.swift index 54fa20a5ba2..024577b098e 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/FederationEventProcessor/FederationEventProcessor.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/FederationEventProcessor/FederationEventProcessor.swift @@ -19,7 +19,7 @@ import Foundation import WireAPI -struct FederationEventProcessor { +struct FederationEventProcessor: FederationEventProcessorProtocol { let connectionRemovedEventProcessor: any FederationConnectionRemovedEventProcessorProtocol let deleteEventProcessor: any FederationDeleteEventProcessorProtocol diff --git a/WireDomain/Sources/WireDomain/Event Processing/TeamEventProcessor/TeamEventProcessor.swift b/WireDomain/Sources/WireDomain/Event Processing/TeamEventProcessor/TeamEventProcessor.swift index 809cd1bfc54..864dc45c51d 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/TeamEventProcessor/TeamEventProcessor.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/TeamEventProcessor/TeamEventProcessor.swift @@ -19,7 +19,7 @@ import Foundation import WireAPI -struct TeamEventProcessor { +struct TeamEventProcessor: TeamEventProcessorProtocol { let deleteEventProcessor: any TeamDeleteEventProcessorProtocol let memberLeaveEventProcessor: any TeamMemberLeaveEventProcessorProtocol diff --git a/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessor.swift b/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessor.swift index 8d4de5aea05..64f67ebf3d3 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessor.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessor.swift @@ -19,7 +19,7 @@ import Foundation import WireAPI -struct UpdateEventProcessor { +struct UpdateEventProcessor: UpdateEventProcessorProtocol { let conversationEventProcessor: any ConversationEventProcessorProtocol let featureconfigEventProcessor: any FeatureConfigEventProcessorProtocol diff --git a/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessorProtocol.swift b/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessorProtocol.swift index bebe1a74d65..742324a147d 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessorProtocol.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/UpdateEventProcessorProtocol.swift @@ -20,7 +20,7 @@ import WireAPI // sourcery: AutoMockable /// Process update events. -protocol UpdateEventProcessorProtocol { +public protocol UpdateEventProcessorProtocol { /// Process an update event. /// diff --git a/WireDomain/Sources/WireDomain/Event Processing/UserEventProcessor/UserEventProcessor.swift b/WireDomain/Sources/WireDomain/Event Processing/UserEventProcessor/UserEventProcessor.swift index cb8fe0122a4..d5bbc12625d 100644 --- a/WireDomain/Sources/WireDomain/Event Processing/UserEventProcessor/UserEventProcessor.swift +++ b/WireDomain/Sources/WireDomain/Event Processing/UserEventProcessor/UserEventProcessor.swift @@ -19,7 +19,7 @@ import Foundation import WireAPI -struct UserEventProcessor { +struct UserEventProcessor: UserEventProcessorProtocol { let clientAddEventProcessor: any UserClientAddEventProcessorProtocol let clientRemoveEventProcessor: any UserClientRemoveEventProcessorProtocol diff --git a/WireDomain/Sources/WireDomain/Synchronization/Protocols/SyncManagerProtocol.swift b/WireDomain/Sources/WireDomain/Providers/IncrementalSyncProvider.swift similarity index 75% rename from WireDomain/Sources/WireDomain/Synchronization/Protocols/SyncManagerProtocol.swift rename to WireDomain/Sources/WireDomain/Providers/IncrementalSyncProvider.swift index 7368ddf139c..75b555f7c3c 100644 --- a/WireDomain/Sources/WireDomain/Synchronization/Protocols/SyncManagerProtocol.swift +++ b/WireDomain/Sources/WireDomain/Providers/IncrementalSyncProvider.swift @@ -16,14 +16,12 @@ // along with this program. If not, see http://www.gnu.org/licenses/. // -protocol SyncManagerProtocol { +import Foundation - /// Fetch events from the server and process all pending events. +// sourcery: AutoMockable +/// An object that provides an instance of `IncrementalSync`. +public protocol IncrementalSyncProvider { - func performQuickSync() async throws - - /// Stop all syncing activities and prepare to idle. - - func suspend() async throws + func provideIncrementalSync() throws -> IncrementalSync } diff --git a/WireDomain/Sources/WireDomain/Providers/InitialSyncBuilderProtocol.swift b/WireDomain/Sources/WireDomain/Providers/InitialSyncProvider.swift similarity index 80% rename from WireDomain/Sources/WireDomain/Providers/InitialSyncBuilderProtocol.swift rename to WireDomain/Sources/WireDomain/Providers/InitialSyncProvider.swift index 51fb89fe024..807ea92a828 100644 --- a/WireDomain/Sources/WireDomain/Providers/InitialSyncBuilderProtocol.swift +++ b/WireDomain/Sources/WireDomain/Providers/InitialSyncProvider.swift @@ -19,13 +19,13 @@ import Foundation // sourcery: AutoMockable -/// An object that builds `InitialSync`. -public protocol InitialSyncBuilderProtocol { +/// An object that provides an instance of `InitialSync`. +public protocol InitialSyncProvider { // Workaround for Sourcery unable to generate compilable mock // due to `any InitialSyncProtocol?` - typealias Sync = any InitialSyncProtocol + typealias AnyInitialSync = any InitialSyncProtocol - func buildInitialSync() throws -> Sync + func provideInitialSync() throws -> AnyInitialSync } diff --git a/WireDomain/Sources/WireDomain/Repositories/Message/MessageLocalStore.swift b/WireDomain/Sources/WireDomain/Repositories/Message/MessageLocalStore.swift index fb97554576e..c5a91ff3eb2 100644 --- a/WireDomain/Sources/WireDomain/Repositories/Message/MessageLocalStore.swift +++ b/WireDomain/Sources/WireDomain/Repositories/Message/MessageLocalStore.swift @@ -24,7 +24,9 @@ import WireLogging public final class MessageLocalStore: MessageLocalStoreProtocol { enum Failure: Error { - case failedToAddConversation + + case invalidInsertion(reason: String) + } /// When receiving a MLS/Proteus add message event, we treat them either as an `asset` client message or a `default` @@ -317,11 +319,20 @@ public final class MessageLocalStore: MessageLocalStoreProtocol { date: Date ) async throws -> (ZMOTRMessage, isNew: Bool) { try await context.perform { [self] in - guard let clearedTime = conversation.clearedTimeStamp, - clearedTime.compare(date) != .orderedAscending, - conversation.conversationType != .self, - let nonce = UUID(uuidString: id) else { - throw Failure.failedToAddConversation + + if + let clearedTime = conversation.clearedTimeStamp, + clearedTime.compare(date) != .orderedAscending + { + throw Failure.invalidInsertion(reason: "message is older than cleared time") + } + + guard conversation.conversationType != .`self` else { + throw Failure.invalidInsertion(reason: "message cannot be sent to self") + } + + guard let nonce = UUID(uuidString: id) else { + throw Failure.invalidInsertion(reason: "invalid nonce") } let clientMessage = messageType == .asset ? diff --git a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/Protocols/UpdateEventsLocalStoreProtocol.swift b/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/Protocols/UpdateEventsLocalStoreProtocol.swift index 00f1c389272..7d1cf62722e 100644 --- a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/Protocols/UpdateEventsLocalStoreProtocol.swift +++ b/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/Protocols/UpdateEventsLocalStoreProtocol.swift @@ -17,6 +17,7 @@ // import Foundation +import WireAPI // sourcery: AutoMockable public protocol UpdateEventsLocalStoreProtocol { @@ -38,21 +39,21 @@ public protocol UpdateEventsLocalStoreProtocol { /// Persists an event envelope locally. /// - Parameters: - /// - data: The event envelope payload data. + /// - eventEnvelope: The event envelope to persist. /// - index: The event envelope index. func persistEventEnvelope( - _ data: Data, + _ eventEnvelope: UpdateEventEnvelope, index: Int64 ) async throws - /// Fetches stored event envelope payloads. + /// Fetches stored event envelopes. /// - parameter limit: A fetch limit. - /// - returns: A list of event payloads. + /// - returns: A list of event envelopes. - func fetchStoredEventEnvelopePayloads( + func fetchStoredEventEnvelopes( limit: UInt - ) async throws -> [Data] + ) async throws -> [UpdateEventEnvelope] /// Deletes next pending events locally. /// - parameter limit: A fetch limit. @@ -60,4 +61,12 @@ public protocol UpdateEventsLocalStoreProtocol { func deleteNextPendingEvents( limit: UInt ) async throws + + /// Delete the event envelope with the given index. + /// - parameter index: The index of the envelope to delete + + func deleteEventEnvelope( + atIndex index: Int64 + ) async throws + } diff --git a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/Protocols/UpdateEventsRepositoryProtocol.swift b/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/Protocols/UpdateEventsRepositoryProtocol.swift deleted file mode 100644 index 2bdc4bea76e..00000000000 --- a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/Protocols/UpdateEventsRepositoryProtocol.swift +++ /dev/null @@ -1,82 +0,0 @@ -// -// Wire -// Copyright (C) 2025 Wire Swiss GmbH -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see http://www.gnu.org/licenses/. -// - -import Foundation -import WireAPI - -// sourcery: AutoMockable -/// Access update events. -protocol UpdateEventsRepositoryProtocol { - - /// Pull pending events from the server, decrypt if needed, and store locally. - /// - /// Pending events are events that have been buffered by the server while - /// the self client has not had an active push channel. - - func pullPendingEvents() async throws - - /// Fetch the next batch pending events from the database. - /// - /// The batch is already sorted, such that the first element is the oldest - /// stored event. This method does not delete any events - /// (see `deleteNextPendingEvents(limit:)`), so invoking this method again - /// will return the same batch. - /// - /// - Parameter limit: The maximum number of events to fetch. - /// - Returns: Decrypted update event envelopes ready for processing. - - func fetchNextPendingEvents(limit: UInt) async throws -> [UpdateEventEnvelope] - - /// Delete the next batch of pending events from the database. - /// - /// Use this method to delete stored events that have been processed and - /// can now be discarded. - /// - /// - Parameter limit: The maximum number of events to delete. - - func deleteNextPendingEvents(limit: UInt) async throws - - /// Open the push channel and deliver update event envelopes through - /// an asynchronous stream. - /// - /// The envelopes are bufferred until a consumer starts to iterate though - /// the stream. - /// - /// - Returns: An asynchronous stream of `UpdateEventEnvelope`s. - - func startBufferingLiveEvents() async throws -> AsyncThrowingStream - - /// Close the push channel and stop the asynchronous stream of - /// `UpdateEventEnvelope`s returned in `startBufferingLiveEvents`. - - func stopReceivingLiveEvents() async - - /// Store the last event envelope id. - /// - /// Future pulls of pending events will only include event envelopes - /// since this id. - /// - /// - Parameter id: The id to store. - - func storeLastEventEnvelopeID(_ id: UUID) - - /// Pulls the last event envelope id and stores it locally. - - func pullLastEventID() async throws - -} diff --git a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsLocalStore.swift b/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsLocalStore.swift index b853b8fef2e..7d98ecb21d5 100644 --- a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsLocalStore.swift +++ b/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsLocalStore.swift @@ -16,6 +16,7 @@ // along with this program. If not, see http://www.gnu.org/licenses/. // +import WireAPI import WireDataModel import WireFoundation import WireLogging @@ -37,6 +38,8 @@ final class UpdateEventsLocalStore: UpdateEventsLocalStoreProtocol { private let context: NSManagedObjectContext private let storage: PrivateUserDefaults + private let encoder = JSONEncoder() + private let decoder = JSONDecoder() // MARK: - Object lifecycle @@ -74,27 +77,29 @@ final class UpdateEventsLocalStore: UpdateEventsLocalStoreProtocol { } public func persistEventEnvelope( - _ data: Data, + _ eventEnvelope: UpdateEventEnvelope, index: Int64 ) async throws { - try await context.perform { [context] in + try await context.perform { [context, encoder] in let storedEventEnvelope = StoredUpdateEventEnvelope(context: context) - storedEventEnvelope.data = data + storedEventEnvelope.data = try encoder.encode(eventEnvelope) storedEventEnvelope.sortIndex = index try context.save() } } - public func fetchStoredEventEnvelopePayloads( + public func fetchStoredEventEnvelopes( limit: UInt - ) async throws -> [Data] { - try await context.perform { [context] in + ) async throws -> [UpdateEventEnvelope] { + try await context.perform { [context, decoder] in do { let request = StoredUpdateEventEnvelope.sortedFetchRequest(asending: true) request.fetchLimit = Int(limit) request.returnsObjectsAsFaults = false let storedEventEnvelopes = try context.fetch(request) - return storedEventEnvelopes.map(\.data) + return try storedEventEnvelopes.map { + try decoder.decode(UpdateEventEnvelope.self, from: $0.data) + } } catch { throw Error.failedToFetchStoredEvents(error) } @@ -118,4 +123,16 @@ final class UpdateEventsLocalStore: UpdateEventsLocalStoreProtocol { } } + public func deleteEventEnvelope( + atIndex index: Int64 + ) async throws { + try await context.perform { [context] in + let request = StoredUpdateEventEnvelope.fetchRequest(sortIndex: index) + guard let envelope = try context.fetch(request).first else { return } + WireLogger.sync.debug("deleting stored envelope at index \(index)") + context.delete(envelope) + try context.save() + } + } + } diff --git a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsRepository.swift b/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsRepository.swift deleted file mode 100644 index 41ccb6da4a6..00000000000 --- a/WireDomain/Sources/WireDomain/Repositories/UpdateEvents/UpdateEventsRepository.swift +++ /dev/null @@ -1,138 +0,0 @@ -// -// Wire -// Copyright (C) 2025 Wire Swiss GmbH -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see http://www.gnu.org/licenses/. -// - -import Foundation -import WireAPI -import WireDataModel -import WireFoundation -import WireLogging - -final class UpdateEventsRepository: UpdateEventsRepositoryProtocol { - - // MARK: - Properties - - private let userID: UUID - private let selfClientID: String - private let updateEventsAPI: any UpdateEventsAPI - private let pushChannel: any PushChannelProtocol - private let updateEventDecryptor: any UpdateEventDecryptorProtocol - private let updateEventsLocalStore: any UpdateEventsLocalStoreProtocol - private let encoder = JSONEncoder() - private let decoder = JSONDecoder() - private let pullPendingEventsSync: PullPendingUpdateEventsSync - - private let pullLastUpdateEventIDSync: PullLastUpdateEventIDSync - - // MARK: - Object lifecycle - - init( - userID: UUID, - selfClientID: String, - updateEventsAPI: any UpdateEventsAPI, - pushChannel: any PushChannelProtocol, - updateEventDecryptor: any UpdateEventDecryptorProtocol, - updateEventsLocalStore: any UpdateEventsLocalStoreProtocol - ) { - self.userID = userID - self.selfClientID = selfClientID - self.updateEventsAPI = updateEventsAPI - self.pushChannel = pushChannel - self.updateEventDecryptor = updateEventDecryptor - self.updateEventsLocalStore = updateEventsLocalStore - self.pullLastUpdateEventIDSync = PullLastUpdateEventIDSync( - selfClientID: selfClientID, - api: updateEventsAPI, - store: updateEventsLocalStore - ) - self.pullPendingEventsSync = PullPendingUpdateEventsSync( - selfClientID: selfClientID, - api: updateEventsAPI, - store: updateEventsLocalStore, - decryptor: updateEventDecryptor - ) - } - - // MARK: - Pull pending events - - func pullPendingEvents() async throws { - try await pullPendingEventsSync.pull() - } - - func pullLastEventID() async throws { - try await pullLastUpdateEventIDSync.pull() - } - - // MARK: - Fetch pending events - - func fetchNextPendingEvents(limit: UInt) async throws -> [UpdateEventEnvelope] { - let payloads = try await updateEventsLocalStore.fetchStoredEventEnvelopePayloads(limit: limit) - return try decodeEventEnvelopes(payloads) - } - - private func decodeEventEnvelopes(_ payloads: [Data]) throws -> [UpdateEventEnvelope] { - try payloads.map { - do { - return try decoder.decode(UpdateEventEnvelope.self, from: $0) - } catch { - throw UpdateEventsRepositoryError.failedToDecodeStoredEvent(error) - } - } - } - - // MARK: - Delete pending events - - func deleteNextPendingEvents(limit: UInt) async throws { - try await updateEventsLocalStore.deleteNextPendingEvents(limit: limit) - } - - // MARK: - Live events - - func startBufferingLiveEvents() async throws -> AsyncThrowingStream { - try pushChannel.open().compactMap { - do { - WireLogger.sync.debug( - "decrypting live event", - attributes: [.eventEnvelopeID: $0.id] - ) - var envelope = $0 - envelope.events = try await self.updateEventDecryptor.decryptEvents(in: envelope) - return envelope - } catch { - WireLogger.sync.error( - "failed to decrypt live event, dropping: \(error)", - attributes: [.eventEnvelopeID: $0.id] - ) - return nil - } - }.toStream() - } - - func stopReceivingLiveEvents() async { - pushChannel.close() - } - - func storeLastEventEnvelopeID(_ id: UUID) { - WireLogger.sync.debug( - "storing last event id", - attributes: [.eventEnvelopeID: id] - ) - - updateEventsLocalStore.storeLastEventID(id: id) - } - -} diff --git a/WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift b/WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift new file mode 100644 index 00000000000..e8f5e011fea --- /dev/null +++ b/WireDomain/Sources/WireDomain/Synchronization/IncrementalSync.swift @@ -0,0 +1,114 @@ +// +// Wire +// Copyright (C) 2025 Wire Swiss GmbH +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see http://www.gnu.org/licenses/. +// + +import Foundation +import WireAPI +import WireLogging + +public struct IncrementalSync { + + private let selfClientID: String + private let pushChannelAPI: any PushChannelAPI + private let updateEventsSync: any PullPendingUpdateEventsSyncProtocol + private let decryptor: any UpdateEventDecryptorProtocol + private let store: any UpdateEventsLocalStoreProtocol + private let processor: any UpdateEventProcessorProtocol + private let logger = WireLogger.sync + + public init( + selfClientID: String, + pushChannelAPI: any PushChannelAPI, + updateEventsSync: any PullPendingUpdateEventsSyncProtocol, + decryptor: any UpdateEventDecryptorProtocol, + store: any UpdateEventsLocalStoreProtocol, + processor: any UpdateEventProcessorProtocol + ) { + self.selfClientID = selfClientID + self.pushChannelAPI = pushChannelAPI + self.updateEventsSync = updateEventsSync + self.decryptor = decryptor + self.store = store + self.processor = processor + } + + public func perform() async throws -> Task { + logger.debug("performing incremental sync") + let pushChannel = try await pushChannelAPI.createPushChannel(clientID: selfClientID) + logger.debug("opening push channel") + let liveEventStream = try await pushChannel.open() + try await updateEventsSync.pull() + try await processStoredEvents() + + return Task { @Sendable [logger, decryptor, store, processor] in + let jsonEncoder = JSONEncoder() + for try await var envelope in liveEventStream { + logger.debug("received live event envelope") + try Task.checkCancellation() + // TODO: [WPB-16165] skip if duplicate. + + // Decrypt. + logger.debug("decrypting live event envelope") + envelope.events = try await decryptor.decryptEvents(in: envelope) + + // Store. + logger.debug("storing live event envelope") + let index = try await store.indexOfLastEventEnvelope() + 1 + try await store.persistEventEnvelope(envelope, index: index) + + // Process. + for event in envelope.events { + logger.debug("processing live event: \(event)") + try await processor.processEvent(event) + } + + // Delete. + logger.debug("deleting live event envelope") + try await store.deleteEventEnvelope(atIndex: index) + } + } + } + + private func processStoredEvents() async throws { + let batchSize: UInt = 500 + + while true { + // If we need to abort, do it before processing the next batch. + try Task.checkCancellation() + + let envelopes = try await store.fetchStoredEventEnvelopes(limit: batchSize) + + guard !envelopes.isEmpty else { + break + } + + logger.debug("fetched \(envelopes.count) stored envelopes for processing") + + for event in envelopes.flatMap(\.events) { + do { + logger.debug("processing pending event: \(event)") + try await processor.processEvent(event) + } catch { + logger.error("failed to process stored event, dropping: \(error)") + } + } + + try await store.deleteNextPendingEvents(limit: batchSize) + } + } + +} diff --git a/WireDomain/Sources/WireDomain/Synchronization/PullPendingUpdateEventsSync.swift b/WireDomain/Sources/WireDomain/Synchronization/PullPendingUpdateEventsSync.swift index 14e006dcf7c..c553acfd8f3 100644 --- a/WireDomain/Sources/WireDomain/Synchronization/PullPendingUpdateEventsSync.swift +++ b/WireDomain/Sources/WireDomain/Synchronization/PullPendingUpdateEventsSync.swift @@ -59,14 +59,18 @@ public struct PullPendingUpdateEventsSync: PullPendingUpdateEventsSyncProtocol { let batchCount = envelopes.count var count = 0 - WireLogger.sync.debug("received batch of \(batchCount) envelopes") + if batchCount > 0 { + WireLogger.sync.debug("fetched \(batchCount) envelopes from remote") + } else { + WireLogger.sync.debug("no new events on remote") + } // If we need to abort, do it before processing the next page. try Task.checkCancellation() func log(_ message: String, envelopeID: UUID?) { WireLogger.sync.debug( - "event \(count) os \(batchCount): \(message)", + "event \(count) of \(batchCount): \(message)", attributes: [.eventEnvelopeID: envelopeID] ) } @@ -78,13 +82,9 @@ public struct PullPendingUpdateEventsSync: PullPendingUpdateEventsSyncProtocol { var decryptedEnvelope = envelope decryptedEnvelope.events = try await decryptor.decryptEvents(in: envelope) - // We can only decrypt once so store the decrypted events for later retrieval. - log("encoding...", envelopeID: envelope.id) - let decryptedEnvelopeData = try jsonEncoder.encode(decryptedEnvelope) - log("storing...", envelopeID: envelope.id) try await store.persistEventEnvelope( - decryptedEnvelopeData, + decryptedEnvelope, index: currentIndex ) diff --git a/WireDomain/Sources/WireDomain/Synchronization/SyncManager.swift b/WireDomain/Sources/WireDomain/Synchronization/SyncManager.swift deleted file mode 100644 index 118407a5e04..00000000000 --- a/WireDomain/Sources/WireDomain/Synchronization/SyncManager.swift +++ /dev/null @@ -1,182 +0,0 @@ -// -// Wire -// Copyright (C) 2025 Wire Swiss GmbH -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see http://www.gnu.org/licenses/. -// - -import CoreData -import Foundation -import WireAPI -import WireDataModel -import WireLogging -import WireSystem - -final class SyncManager: SyncManagerProtocol { - - enum Failure: Error { - case failedToPerformSlowSync(Error) - } - - // MARK: - Properties - - private(set) var syncState: SyncState = .suspended - private var isSuspending = false - - // MARK: - Repositories - - private let updateEventsRepository: any UpdateEventsRepositoryProtocol - private let mlsProvider: MLSProvider - private let context: NSManagedObjectContext - - // MARK: - Update event processor - - private let updateEventProcessor: any UpdateEventProcessorProtocol - - // MARK: - Object lifecycle - - init( - updateEventsRepository: any UpdateEventsRepositoryProtocol, - updateEventProcessor: any UpdateEventProcessorProtocol, - mlsProvider: MLSProvider, - context: NSManagedObjectContext - ) { - self.updateEventsRepository = updateEventsRepository - self.updateEventProcessor = updateEventProcessor - self.mlsProvider = mlsProvider - self.context = context - } - - func performQuickSync() async throws { - if case .quickSync = syncState { - return - } - - WireLogger.sync.info("performing quick sync") - - // Opens the push channel, but events are buffered. - let liveEventsStream = try await updateEventsRepository.startBufferingLiveEvents() - - let quickSyncTask = Task { - try await updateEventsRepository.pullPendingEvents() - try await processStoredEvents() - } - - do { - syncState = .quickSync(quickSyncTask) - try await quickSyncTask.value - } catch { - try await suspend() - throw error - } - - let liveTask = Task { - do { - for try await envelope in liveEventsStream { - WireLogger.sync.info( - "received live event", - attributes: [.eventEnvelopeID: envelope.id] - ) - try Task.checkCancellation() - await processLiveEvents(in: envelope) - } - } catch is CancellationError { - WireLogger.sync.info("live task was cancelled") - } catch { - WireLogger.sync.error("live task encountered error: \(error)") - try await suspend() - throw error - } - } - - syncState = .live(liveTask) - } - - func suspend() async throws { - if case .suspended = syncState { - return - } - - guard !isSuspending else { - return - } - - WireLogger.sync.info("suspending") - - isSuspending = true - await closePushChannel() - ongoingTask?.cancel() - syncState = .suspended - isSuspending = false - } - - private var ongoingTask: Task? { - switch syncState { - case let .quickSync(task): - task - default: - nil - } - } - - // MARK: - Live events - - private func closePushChannel() async { - await updateEventsRepository.stopReceivingLiveEvents() - } - - private func processLiveEvents(in envelope: UpdateEventEnvelope) async { - for event in envelope.events { - do { - try await updateEventProcessor.processEvent(event) - } catch { - WireLogger.sync.error("failed to process live event, dropping: \(error)") - } - } - - if !envelope.isTransient { - updateEventsRepository.storeLastEventEnvelopeID(envelope.id) - } - } - - // MARK: - Event processing - - private func processStoredEvents() async throws { - let batchSize: UInt = 500 - - while true { - // If we need to abort, do it before processing the next batch. - try Task.checkCancellation() - - let envelopes = try await updateEventsRepository.fetchNextPendingEvents(limit: batchSize) - - guard !envelopes.isEmpty else { - break - } - - WireLogger.sync.debug("fetched \(envelopes.count) stored envelopes for processing") - - for event in envelopes.flatMap(\.events) { - do { - try await updateEventProcessor.processEvent(event) - } catch { - WireLogger.sync.error("failed to process stored event, dropping: \(error)") - } - } - - try await updateEventsRepository.deleteNextPendingEvents(limit: batchSize) - } - } - -} diff --git a/WireDomain/Sources/WireDomain/Synchronization/SyncState.swift b/WireDomain/Sources/WireDomain/Synchronization/SyncState.swift deleted file mode 100644 index 7cbd0b64506..00000000000 --- a/WireDomain/Sources/WireDomain/Synchronization/SyncState.swift +++ /dev/null @@ -1,37 +0,0 @@ -// -// Wire -// Copyright (C) 2025 Wire Swiss GmbH -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see http://www.gnu.org/licenses/. -// - -import Foundation - -/// Describes the apps synchronization state. - -enum SyncState { - - /// The app is fetching and processing all pending events. - - case quickSync(Task) - - /// The app is processing live events via the push channel. - - case live(Task) - - /// The app is neither receiving nor processing any events. - - case suspended - -} diff --git a/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift b/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift index f96979e0c43..791d76764a5 100644 --- a/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift +++ b/WireDomain/Sources/WireDomainSupport/Sourcery/generated/AutoMockable.generated.swift @@ -1208,65 +1208,65 @@ public class MockImportBackupUseCaseProtocol: ImportBackupUseCaseProtocol { } -public class MockIndividualToTeamMigrationUseCaseProtocol: IndividualToTeamMigrationUseCaseProtocol { +public class MockIncrementalSyncProvider: IncrementalSyncProvider { // MARK: - Life cycle public init() {} - // MARK: - invoke + // MARK: - provideIncrementalSync - public var invokeTeamName_Invocations: [String] = [] - public var invokeTeamName_MockError: Error? - public var invokeTeamName_MockMethod: ((String) async throws -> IndividualToTeamMigrationResult)? - public var invokeTeamName_MockValue: IndividualToTeamMigrationResult? + public var provideIncrementalSync_Invocations: [Void] = [] + public var provideIncrementalSync_MockError: Error? + public var provideIncrementalSync_MockMethod: (() throws -> IncrementalSync)? + public var provideIncrementalSync_MockValue: IncrementalSync? - public func invoke(teamName: String) async throws -> IndividualToTeamMigrationResult { - invokeTeamName_Invocations.append(teamName) + public func provideIncrementalSync() throws -> IncrementalSync { + provideIncrementalSync_Invocations.append(()) - if let error = invokeTeamName_MockError { + if let error = provideIncrementalSync_MockError { throw error } - if let mock = invokeTeamName_MockMethod { - return try await mock(teamName) - } else if let mock = invokeTeamName_MockValue { + if let mock = provideIncrementalSync_MockMethod { + return try mock() + } else if let mock = provideIncrementalSync_MockValue { return mock } else { - fatalError("no mock for `invokeTeamName`") + fatalError("no mock for `provideIncrementalSync`") } } } -public class MockInitialSyncBuilderProtocol: InitialSyncBuilderProtocol { +public class MockIndividualToTeamMigrationUseCaseProtocol: IndividualToTeamMigrationUseCaseProtocol { // MARK: - Life cycle public init() {} - // MARK: - buildInitialSync + // MARK: - invoke - public var buildInitialSync_Invocations: [Void] = [] - public var buildInitialSync_MockError: Error? - public var buildInitialSync_MockMethod: (() throws -> Sync)? - public var buildInitialSync_MockValue: Sync? + public var invokeTeamName_Invocations: [String] = [] + public var invokeTeamName_MockError: Error? + public var invokeTeamName_MockMethod: ((String) async throws -> IndividualToTeamMigrationResult)? + public var invokeTeamName_MockValue: IndividualToTeamMigrationResult? - public func buildInitialSync() throws -> Sync { - buildInitialSync_Invocations.append(()) + public func invoke(teamName: String) async throws -> IndividualToTeamMigrationResult { + invokeTeamName_Invocations.append(teamName) - if let error = buildInitialSync_MockError { + if let error = invokeTeamName_MockError { throw error } - if let mock = buildInitialSync_MockMethod { - return try mock() - } else if let mock = buildInitialSync_MockValue { + if let mock = invokeTeamName_MockMethod { + return try await mock(teamName) + } else if let mock = invokeTeamName_MockValue { return mock } else { - fatalError("no mock for `buildInitialSync`") + fatalError("no mock for `invokeTeamName`") } } @@ -1301,6 +1301,38 @@ public class MockInitialSyncProtocol: InitialSyncProtocol { } +public class MockInitialSyncProvider: InitialSyncProvider { + + // MARK: - Life cycle + + public init() {} + + + // MARK: - provideInitialSync + + public var provideInitialSync_Invocations: [Void] = [] + public var provideInitialSync_MockError: Error? + public var provideInitialSync_MockMethod: (() throws -> AnyInitialSync)? + public var provideInitialSync_MockValue: AnyInitialSync? + + public func provideInitialSync() throws -> AnyInitialSync { + provideInitialSync_Invocations.append(()) + + if let error = provideInitialSync_MockError { + throw error + } + + if let mock = provideInitialSync_MockMethod { + return try mock() + } else if let mock = provideInitialSync_MockValue { + return mock + } else { + fatalError("no mock for `provideInitialSync`") + } + } + +} + class MockMLSMessageDecryptorProtocol: MLSMessageDecryptorProtocol { // MARK: - Life cycle @@ -2589,19 +2621,20 @@ public class MockUpdateEventDecryptorProtocol: UpdateEventDecryptorProtocol { } -class MockUpdateEventProcessorProtocol: UpdateEventProcessorProtocol { +public class MockUpdateEventProcessorProtocol: UpdateEventProcessorProtocol { // MARK: - Life cycle + public init() {} // MARK: - processEvent - var processEvent_Invocations: [UpdateEvent] = [] - var processEvent_MockError: Error? - var processEvent_MockMethod: ((UpdateEvent) async throws -> Void)? + public var processEvent_Invocations: [UpdateEvent] = [] + public var processEvent_MockError: Error? + public var processEvent_MockMethod: ((UpdateEvent) async throws -> Void)? - func processEvent(_ event: UpdateEvent) async throws { + public func processEvent(_ event: UpdateEvent) async throws { processEvent_Invocations.append(event) if let error = processEvent_MockError { @@ -2682,12 +2715,12 @@ public class MockUpdateEventsLocalStoreProtocol: UpdateEventsLocalStoreProtocol // MARK: - persistEventEnvelope - public var persistEventEnvelopeIndex_Invocations: [(data: Data, index: Int64)] = [] + public var persistEventEnvelopeIndex_Invocations: [(eventEnvelope: UpdateEventEnvelope, index: Int64)] = [] public var persistEventEnvelopeIndex_MockError: Error? - public var persistEventEnvelopeIndex_MockMethod: ((Data, Int64) async throws -> Void)? + public var persistEventEnvelopeIndex_MockMethod: ((UpdateEventEnvelope, Int64) async throws -> Void)? - public func persistEventEnvelope(_ data: Data, index: Int64) async throws { - persistEventEnvelopeIndex_Invocations.append((data: data, index: index)) + public func persistEventEnvelope(_ eventEnvelope: UpdateEventEnvelope, index: Int64) async throws { + persistEventEnvelopeIndex_Invocations.append((eventEnvelope: eventEnvelope, index: index)) if let error = persistEventEnvelopeIndex_MockError { throw error @@ -2697,29 +2730,29 @@ public class MockUpdateEventsLocalStoreProtocol: UpdateEventsLocalStoreProtocol fatalError("no mock for `persistEventEnvelopeIndex`") } - try await mock(data, index) + try await mock(eventEnvelope, index) } - // MARK: - fetchStoredEventEnvelopePayloads + // MARK: - fetchStoredEventEnvelopes - public var fetchStoredEventEnvelopePayloadsLimit_Invocations: [UInt] = [] - public var fetchStoredEventEnvelopePayloadsLimit_MockError: Error? - public var fetchStoredEventEnvelopePayloadsLimit_MockMethod: ((UInt) async throws -> [Data])? - public var fetchStoredEventEnvelopePayloadsLimit_MockValue: [Data]? + public var fetchStoredEventEnvelopesLimit_Invocations: [UInt] = [] + public var fetchStoredEventEnvelopesLimit_MockError: Error? + public var fetchStoredEventEnvelopesLimit_MockMethod: ((UInt) async throws -> [UpdateEventEnvelope])? + public var fetchStoredEventEnvelopesLimit_MockValue: [UpdateEventEnvelope]? - public func fetchStoredEventEnvelopePayloads(limit: UInt) async throws -> [Data] { - fetchStoredEventEnvelopePayloadsLimit_Invocations.append(limit) + public func fetchStoredEventEnvelopes(limit: UInt) async throws -> [UpdateEventEnvelope] { + fetchStoredEventEnvelopesLimit_Invocations.append(limit) - if let error = fetchStoredEventEnvelopePayloadsLimit_MockError { + if let error = fetchStoredEventEnvelopesLimit_MockError { throw error } - if let mock = fetchStoredEventEnvelopePayloadsLimit_MockMethod { + if let mock = fetchStoredEventEnvelopesLimit_MockMethod { return try await mock(limit) - } else if let mock = fetchStoredEventEnvelopePayloadsLimit_MockValue { + } else if let mock = fetchStoredEventEnvelopesLimit_MockValue { return mock } else { - fatalError("no mock for `fetchStoredEventEnvelopePayloadsLimit`") + fatalError("no mock for `fetchStoredEventEnvelopesLimit`") } } @@ -2743,148 +2776,24 @@ public class MockUpdateEventsLocalStoreProtocol: UpdateEventsLocalStoreProtocol try await mock(limit) } -} - -class MockUpdateEventsRepositoryProtocol: UpdateEventsRepositoryProtocol { - - // MARK: - Life cycle - - - - // MARK: - pullPendingEvents - - var pullPendingEvents_Invocations: [Void] = [] - var pullPendingEvents_MockError: Error? - var pullPendingEvents_MockMethod: (() async throws -> Void)? - - func pullPendingEvents() async throws { - pullPendingEvents_Invocations.append(()) - - if let error = pullPendingEvents_MockError { - throw error - } - - guard let mock = pullPendingEvents_MockMethod else { - fatalError("no mock for `pullPendingEvents`") - } - - try await mock() - } - - // MARK: - fetchNextPendingEvents - - var fetchNextPendingEventsLimit_Invocations: [UInt] = [] - var fetchNextPendingEventsLimit_MockError: Error? - var fetchNextPendingEventsLimit_MockMethod: ((UInt) async throws -> [UpdateEventEnvelope])? - var fetchNextPendingEventsLimit_MockValue: [UpdateEventEnvelope]? - - func fetchNextPendingEvents(limit: UInt) async throws -> [UpdateEventEnvelope] { - fetchNextPendingEventsLimit_Invocations.append(limit) - - if let error = fetchNextPendingEventsLimit_MockError { - throw error - } - - if let mock = fetchNextPendingEventsLimit_MockMethod { - return try await mock(limit) - } else if let mock = fetchNextPendingEventsLimit_MockValue { - return mock - } else { - fatalError("no mock for `fetchNextPendingEventsLimit`") - } - } - - // MARK: - deleteNextPendingEvents - - var deleteNextPendingEventsLimit_Invocations: [UInt] = [] - var deleteNextPendingEventsLimit_MockError: Error? - var deleteNextPendingEventsLimit_MockMethod: ((UInt) async throws -> Void)? - - func deleteNextPendingEvents(limit: UInt) async throws { - deleteNextPendingEventsLimit_Invocations.append(limit) - - if let error = deleteNextPendingEventsLimit_MockError { - throw error - } - - guard let mock = deleteNextPendingEventsLimit_MockMethod else { - fatalError("no mock for `deleteNextPendingEventsLimit`") - } - - try await mock(limit) - } - - // MARK: - startBufferingLiveEvents + // MARK: - deleteEventEnvelope - var startBufferingLiveEvents_Invocations: [Void] = [] - var startBufferingLiveEvents_MockError: Error? - var startBufferingLiveEvents_MockMethod: (() async throws -> AsyncThrowingStream)? - var startBufferingLiveEvents_MockValue: AsyncThrowingStream? + public var deleteEventEnvelopeAtIndex_Invocations: [Int64] = [] + public var deleteEventEnvelopeAtIndex_MockError: Error? + public var deleteEventEnvelopeAtIndex_MockMethod: ((Int64) async throws -> Void)? - func startBufferingLiveEvents() async throws -> AsyncThrowingStream { - startBufferingLiveEvents_Invocations.append(()) + public func deleteEventEnvelope(atIndex index: Int64) async throws { + deleteEventEnvelopeAtIndex_Invocations.append(index) - if let error = startBufferingLiveEvents_MockError { + if let error = deleteEventEnvelopeAtIndex_MockError { throw error } - if let mock = startBufferingLiveEvents_MockMethod { - return try await mock() - } else if let mock = startBufferingLiveEvents_MockValue { - return mock - } else { - fatalError("no mock for `startBufferingLiveEvents`") - } - } - - // MARK: - stopReceivingLiveEvents - - var stopReceivingLiveEvents_Invocations: [Void] = [] - var stopReceivingLiveEvents_MockMethod: (() async -> Void)? - - func stopReceivingLiveEvents() async { - stopReceivingLiveEvents_Invocations.append(()) - - guard let mock = stopReceivingLiveEvents_MockMethod else { - fatalError("no mock for `stopReceivingLiveEvents`") + guard let mock = deleteEventEnvelopeAtIndex_MockMethod else { + fatalError("no mock for `deleteEventEnvelopeAtIndex`") } - await mock() - } - - // MARK: - storeLastEventEnvelopeID - - var storeLastEventEnvelopeID_Invocations: [UUID] = [] - var storeLastEventEnvelopeID_MockMethod: ((UUID) -> Void)? - - func storeLastEventEnvelopeID(_ id: UUID) { - storeLastEventEnvelopeID_Invocations.append(id) - - guard let mock = storeLastEventEnvelopeID_MockMethod else { - fatalError("no mock for `storeLastEventEnvelopeID`") - } - - mock(id) - } - - // MARK: - pullLastEventID - - var pullLastEventID_Invocations: [Void] = [] - var pullLastEventID_MockError: Error? - var pullLastEventID_MockMethod: (() async throws -> Void)? - - func pullLastEventID() async throws { - pullLastEventID_Invocations.append(()) - - if let error = pullLastEventID_MockError { - throw error - } - - guard let mock = pullLastEventID_MockMethod else { - fatalError("no mock for `pullLastEventID`") - } - - try await mock() + try await mock(index) } } diff --git a/WireDomain/Tests/WireDomainTests/LocalStores/UpdateEventsLocalStoreTests.swift b/WireDomain/Tests/WireDomainTests/LocalStores/UpdateEventsLocalStoreTests.swift index 925f3244a56..d961f33114b 100644 --- a/WireDomain/Tests/WireDomainTests/LocalStores/UpdateEventsLocalStoreTests.swift +++ b/WireDomain/Tests/WireDomainTests/LocalStores/UpdateEventsLocalStoreTests.swift @@ -62,14 +62,10 @@ final class UpdateEventsLocalStoreTests: XCTestCase { // MARK: - Tests func testPersistEventEnvelope_It_Stores_Envelope_Locally() async throws { - // Given - - let envelopeData = try JSONEncoder().encode(Scaffolding.envelope1) - // When try await sut.persistEventEnvelope( - envelopeData, + Scaffolding.envelope1, index: 1 ) @@ -90,7 +86,7 @@ final class UpdateEventsLocalStoreTests: XCTestCase { // When - let fetchedEnvelopes = try await sut.fetchStoredEventEnvelopePayloads(limit: 3) + let fetchedEnvelopes = try await sut.fetchStoredEventEnvelopes(limit: 3) // Then it returns no envelopes. @@ -105,11 +101,11 @@ final class UpdateEventsLocalStoreTests: XCTestCase { // When - let fetchedEnvelopes = try await sut.fetchStoredEventEnvelopePayloads(limit: 3) + let fetchedEnvelopes = try await sut.fetchStoredEventEnvelopes(limit: 3) // Then it returns the one and only envelope. - - let fetchedEnvelope1 = try JSONDecoder().decode(UpdateEventEnvelope.self, from: fetchedEnvelopes[0]) + try XCTAssertCount(fetchedEnvelopes, count: 1) + let fetchedEnvelope1 = fetchedEnvelopes[0] XCTAssertEqual(fetchedEnvelope1, Scaffolding.envelope3) } @@ -127,26 +123,17 @@ final class UpdateEventsLocalStoreTests: XCTestCase { // When - let fetchedEnvelopes = try await sut.fetchStoredEventEnvelopePayloads(limit: 3) + let fetchedEnvelopes = try await sut.fetchStoredEventEnvelopes(limit: 3) // Then the first 3 envelopes were returned. + try XCTAssertCount(fetchedEnvelopes, count: 3) - guard fetchedEnvelopes.count == 3 else { - XCTFail("expected 3 envelopes, got \(fetchedEnvelopes.count)") - return - } - - let fetchedEnvelope1 = try JSONDecoder().decode(UpdateEventEnvelope.self, from: fetchedEnvelopes[0]) - let fetchedEnvelope2 = try JSONDecoder().decode(UpdateEventEnvelope.self, from: fetchedEnvelopes[1]) - let fetchedEnvelope3 = try JSONDecoder().decode(UpdateEventEnvelope.self, from: fetchedEnvelopes[2]) - - XCTAssertEqual(fetchedEnvelope1, Scaffolding.envelope3) - XCTAssertEqual(fetchedEnvelope2, Scaffolding.envelope4) - XCTAssertEqual(fetchedEnvelope3, Scaffolding.envelope1) + XCTAssertEqual(fetchedEnvelopes[0], Scaffolding.envelope3) + XCTAssertEqual(fetchedEnvelopes[1], Scaffolding.envelope4) + XCTAssertEqual(fetchedEnvelopes[2], Scaffolding.envelope1) } - func testDeleteNextPendingEvents_It_Deletes_All_Stored_Envelopes_If_Limit_Exceeds_Total_Number_Of_Envelopes( - ) async throws { + func testDeleteNextPendingEvents_It_Deletes_All_Stored_Envelopes_If_Limit_Exceeds_Total_Number_Of_Envelopes() async throws { // Given there are stored envelopes. try await insertStoredEventEnvelopes([ @@ -198,6 +185,28 @@ final class UpdateEventsLocalStoreTests: XCTestCase { } } + func testDeleteEventEnvelope_It_Deletes_The_Stored_Envelope() async throws { + // Given there are stored envelopes. + try await insertStoredEventEnvelopes([ + Scaffolding.envelope1, + Scaffolding.envelope2, + Scaffolding.envelope3 + ]) + + // When it deletes the first one + try await sut.deleteEventEnvelope(atIndex: 0) + + // Then the first envelope (index 0) was deleted and indices 1 and 2 remain. + try await context.perform { [context] in + let request = StoredUpdateEventEnvelope.sortedFetchRequest(asending: true) + let result = try context.fetch(request) + XCTAssertEqual(result.count, 2) + + let indicesOfStoredEnvelopes = result.map(\.sortIndex) + XCTAssertEqual(indicesOfStoredEnvelopes, [1, 2]) + } + } + func testStoreLastEventID_It_Stores_Last_Event_Envelope_ID() throws { // Given diff --git a/WireDomain/Tests/WireDomainTests/Repositories/UpdateEventsRepositoryTests.swift b/WireDomain/Tests/WireDomainTests/Repositories/UpdateEventsRepositoryTests.swift deleted file mode 100644 index f6abc24ec9c..00000000000 --- a/WireDomain/Tests/WireDomainTests/Repositories/UpdateEventsRepositoryTests.swift +++ /dev/null @@ -1,363 +0,0 @@ -// -// Wire -// Copyright (C) 2025 Wire Swiss GmbH -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see http://www.gnu.org/licenses/. -// - -import WireAPISupport -import WireDataModel -import WireDataModelSupport -import WireTestingPackage -import XCTest -@testable import WireAPI -@testable import WireDomain -@testable import WireDomainSupport - -final class UpdateEventsRepositoryTests: XCTestCase { - - private var sut: UpdateEventsRepository! - private var updateEventsAPI: MockUpdateEventsAPI! - private var pushChannel: MockPushChannelProtocol! - private var updateEventDecryptor: MockUpdateEventDecryptorProtocol! - private var updateEventsLocalStore: MockUpdateEventsLocalStoreProtocol! - private var stack: CoreDataStack! - private var coreDataStackHelper: CoreDataStackHelper! - - private var context: NSManagedObjectContext { - stack.eventContext - } - - override func setUp() async throws { - coreDataStackHelper = CoreDataStackHelper() - stack = try await coreDataStackHelper.createStack() - updateEventsAPI = MockUpdateEventsAPI() - pushChannel = MockPushChannelProtocol() - updateEventDecryptor = MockUpdateEventDecryptorProtocol() - updateEventsLocalStore = MockUpdateEventsLocalStoreProtocol() - - sut = UpdateEventsRepository( - userID: Scaffolding.selfUserID.uuid, - selfClientID: Scaffolding.selfClientID, - updateEventsAPI: updateEventsAPI, - pushChannel: pushChannel, - updateEventDecryptor: updateEventDecryptor, - updateEventsLocalStore: updateEventsLocalStore - ) - - // Base mocks - updateEventDecryptor.decryptEventsIn_MockMethod = { $0.events } - } - - override func tearDown() async throws { - coreDataStackHelper = CoreDataStackHelper() - stack = nil - updateEventsAPI = nil - pushChannel = nil - updateEventDecryptor = nil - sut = nil - updateEventsLocalStore = nil - try coreDataStackHelper.cleanupDirectory() - } - - private func insertStoredEventEnvelopes(_ envelopes: [UpdateEventEnvelope]) async throws { - try await context.perform { [context] in - let encoder = JSONEncoder() - - for (index, envelope) in envelopes.enumerated() { - let storedEventEnvelope = StoredUpdateEventEnvelope(context: context) - storedEventEnvelope.data = try encoder.encode(envelope) - storedEventEnvelope.sortIndex = Int64(index) - } - - try context.save() - } - } - - // MARK: - Pull pending events - - func testPullPendingEvents_It_Throws_Error_When_Pulling_Pending_Events_Without_Last_Event_ID() async throws { - // Mock - - updateEventsLocalStore.lastEventID_MockMethod = { nil } - - do { - // When - try await sut.pullPendingEvents() - XCTFail("expected an error, but none was thrown") - } catch PullPendingUpdateEventsSyncError.noLastEventID { - // Then it threw the right error. - } catch { - XCTFail("unexpected error: \(error)") - } - } - - func testPullPendingEvents_It_Pulls_Pending_Events() async throws { - // There is a last event id. - - updateEventsLocalStore.lastEventID_MockValue = Scaffolding.lastEventID - updateEventsLocalStore.indexOfLastEventEnvelope_MockValue = 1 - updateEventsLocalStore.persistEventEnvelopeIndex_MockMethod = { _, _ in } - updateEventsLocalStore.storeLastEventIDId_MockMethod = { _ in } - - // There are two pages of events waiting to be pulled. - - updateEventsAPI.getUpdateEventsSelfClientIDSinceEventID_MockValue = PayloadPager(start: "page1") { start in - switch start { - case "page1": - return Scaffolding.page1 - - case "page2": - return Scaffolding.page2 - - default: - throw TestError(message: "unknown page: \(start ?? "nil")") - } - } - - // When - try await sut.pullPendingEvents() - - // Then we used the api to fetch pending events. - let apiInvocations = updateEventsAPI.getUpdateEventsSelfClientIDSinceEventID_Invocations - - guard apiInvocations.count == 1 else { - XCTFail("expected 1 invocation, got \(apiInvocations.count)") - return - } - - XCTAssertEqual(apiInvocations[0].selfClientID, Scaffolding.selfClientID) - XCTAssertEqual(apiInvocations[0].sinceEventID, Scaffolding.lastEventID) - - // Then the events were decrypted, one call per envelope. - let decryptorInvocations = updateEventDecryptor.decryptEventsIn_Invocations - - guard decryptorInvocations.count == 4 else { - XCTFail("expected 4 invocations, got \(decryptorInvocations.count)") - return - } - - XCTAssertEqual(decryptorInvocations[0].id, Scaffolding.envelope3.id) - XCTAssertEqual(decryptorInvocations[1].id, Scaffolding.envelope4.id) - XCTAssertEqual(decryptorInvocations[2].id, Scaffolding.envelope5.id) - XCTAssertEqual(decryptorInvocations[3].id, Scaffolding.envelope6.id) - - // Then - - XCTAssertEqual(updateEventsLocalStore.persistEventEnvelopeIndex_Invocations.count, 4) - XCTAssertEqual(updateEventsLocalStore.storeLastEventIDId_Invocations.count, 3) - XCTAssertEqual(updateEventsLocalStore.lastEventID_Invocations.count, 1) - XCTAssertEqual(updateEventsLocalStore.indexOfLastEventEnvelope_Invocations.count, 1) - } - - // MARK: - Live events - - func testStartBufferingLiveEvents_It_Buffers_Live_Events_Until_Iteration_Starts() async throws { - // Mock push channel. - - var liveEventsContinuation: AsyncThrowingStream.Continuation? - pushChannel.open_MockValue = AsyncThrowingStream { - liveEventsContinuation = $0 - } - - // Given it starts buffering. - - let liveEventStream = try await sut.startBufferingLiveEvents() - - // Given live events arrive. - - liveEventsContinuation?.yield(Scaffolding.envelope1) - liveEventsContinuation?.yield(Scaffolding.envelope2) - liveEventsContinuation?.yield(Scaffolding.envelope3) - - // When iteration starts. - - let task = Task { - var receivedEnvelopes = [UpdateEventEnvelope]() - for try await envelope in liveEventStream { - receivedEnvelopes.append(envelope) - } - return receivedEnvelopes - } - - liveEventsContinuation?.finish() - let receivedEnvelopes = try await task.value - - // Then all three envelopes are received. - - guard receivedEnvelopes.count == 3 else { - XCTFail("Expected 3 envelopes, got \(receivedEnvelopes.count)") - return - } - - XCTAssertEqual(receivedEnvelopes[0], Scaffolding.envelope1) - XCTAssertEqual(receivedEnvelopes[1], Scaffolding.envelope2) - XCTAssertEqual(receivedEnvelopes[2], Scaffolding.envelope3) - - // Then each envelope was decrypted. - - let decryptionInvocations = updateEventDecryptor.decryptEventsIn_Invocations - guard decryptionInvocations.count == 3 else { - XCTFail("expected 4 decryption invocations, got \(decryptionInvocations.count)") - return - } - - XCTAssertEqual(decryptionInvocations[0], Scaffolding.envelope1) - XCTAssertEqual(decryptionInvocations[1], Scaffolding.envelope2) - XCTAssertEqual(decryptionInvocations[2], Scaffolding.envelope3) - } - - func testStoreLastEventEnvelopeID_It_Invokes_Local_Store_Method() throws { - // Mock - - updateEventsLocalStore.storeLastEventIDId_MockMethod = { _ in } - - // When - - sut.storeLastEventEnvelopeID(Scaffolding.lastEventID) - - // Then - - XCTAssertEqual(updateEventsLocalStore.storeLastEventIDId_Invocations.count, 1) - } - - func testPullLastEventID_It_Invokes_Local_Store_Method() async throws { - // Mock - - updateEventsAPI.getLastUpdateEventSelfClientID_MockValue = Scaffolding.envelope1 - updateEventsLocalStore.storeLastEventIDId_MockMethod = { _ in } - - // When - - try await sut.pullLastEventID() - - // Then - - XCTAssertEqual(updateEventsLocalStore.storeLastEventIDId_Invocations.count, 1) - } - - private enum Scaffolding { - - // MARK: - Local domain - - static let localDomain = "local.com" - static let selfUserID = UserID(uuid: .mockID1, domain: localDomain) - static let selfClientID = "abcd1234" - static let conversationID = ConversationID(uuid: .mockID2, domain: localDomain) - - static let lastEventID = UUID(uuidString: "571d22a5-026c-48b4-90bf-78d00354f121")! - - // MARK: - Other domain - - static let otherDomain = "other.com" - static let aliceID = UserID(uuid: .mockID3, domain: otherDomain) - static let aliceClientID = "efgh5678" - - // MARK: - Pending events - - // 6 envelopes, the first 2 will be already stored in the DB - // and the rest will come from the backend. - - static let envelope1 = UpdateEventEnvelope( - id: id1, - events: [.user(.pushRemove)], - isTransient: false - ) - - static let envelope2 = UpdateEventEnvelope( - id: id2, - events: [.user(.pushRemove)], - isTransient: false - ) - - static let envelope3 = UpdateEventEnvelope( - id: id3, - events: [.conversation(.proteusMessageAdd(proteusMessage1))], - isTransient: false - ) - - static let envelope4 = UpdateEventEnvelope( - id: id4, - events: [.user(.pushRemove)], - isTransient: true - ) - - static let envelope5 = UpdateEventEnvelope( - id: id5, - events: [.conversation(.proteusMessageAdd(proteusMessage2))], - isTransient: false - ) - - static let envelope6 = UpdateEventEnvelope( - id: id6, - events: [.conversation(.proteusMessageAdd(proteusMessage3))], - isTransient: false - ) - - static let proteusMessage1 = ConversationProteusMessageAddEvent( - conversationID: conversationID, - senderID: aliceID, - timestamp: time30SecondsAgo, - message: .init(encryptedMessage: "xxxxx"), - externalData: nil, - messageSenderClientID: aliceClientID, - messageRecipientClientID: selfClientID - ) - - static let proteusMessage2 = ConversationProteusMessageAddEvent( - conversationID: conversationID, - senderID: aliceID, - timestamp: time20SecondsAgo, - message: .init(encryptedMessage: "yyyyy"), - externalData: nil, - messageSenderClientID: aliceClientID, - messageRecipientClientID: selfClientID - ) - - static let proteusMessage3 = ConversationProteusMessageAddEvent( - conversationID: conversationID, - senderID: aliceID, - timestamp: time10SecondsAgo, - message: .init(encryptedMessage: "zzzzz"), - externalData: nil, - messageSenderClientID: aliceClientID, - messageRecipientClientID: selfClientID - ) - - static let id1 = UUID.mockID1 - static let id2 = UUID.mockID2 - static let id3 = UUID.mockID3 - static let id4 = UUID.mockID4 - static let id5 = UUID.mockID5 - static let id6 = UUID.mockID6 - - static let time30SecondsAgo = Date(timeIntervalSinceNow: -30) - static let time20SecondsAgo = Date(timeIntervalSinceNow: -20) - static let time10SecondsAgo = Date(timeIntervalSinceNow: -10) - - nonisolated(unsafe) static let page1 = PayloadPager.Page( - element: [envelope3, envelope4], - hasMore: true, - nextStart: "page2" - ) - - nonisolated(unsafe) static let page2 = PayloadPager.Page( - element: [envelope5, envelope6], - hasMore: false, - nextStart: "" - ) - - } - -} diff --git a/WireDomain/Tests/WireDomainTests/Synchronization/SyncManagerTests.swift b/WireDomain/Tests/WireDomainTests/Synchronization/SyncManagerTests.swift deleted file mode 100644 index 92bc55f192f..00000000000 --- a/WireDomain/Tests/WireDomainTests/Synchronization/SyncManagerTests.swift +++ /dev/null @@ -1,357 +0,0 @@ -// -// Wire -// Copyright (C) 2025 Wire Swiss GmbH -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see http://www.gnu.org/licenses/. -// - -import Combine -import WireAPI -import WireAPISupport -import WireDataModel -import WireDataModelSupport -import XCTest -@testable import WireDomain -@testable import WireDomainSupport - -final class SyncManagerTests: XCTestCase { - - private var sut: SyncManager! - private var coreDataStackHelper: CoreDataStackHelper! - private var stack: CoreDataStack! - private var modelHelper: ModelHelper! - private var updateEventsRepository: MockUpdateEventsRepositoryProtocol! - private var updateEventProcessor: MockUpdateEventProcessorProtocol! - private var mlsService: MockMLSServiceInterface! - - var context: NSManagedObjectContext { - stack.syncContext - } - - override func setUp() async throws { - coreDataStackHelper = CoreDataStackHelper() - stack = try await coreDataStackHelper.createStack() - mlsService = MockMLSServiceInterface() - modelHelper = ModelHelper() - updateEventsRepository = MockUpdateEventsRepositoryProtocol() - updateEventProcessor = MockUpdateEventProcessorProtocol() - - sut = SyncManager( - updateEventsRepository: updateEventsRepository, - updateEventProcessor: updateEventProcessor, - mlsProvider: MLSProvider(service: mlsService, isMLSEnabled: true), - context: context - ) - - // Base mocks. - updateEventsRepository.startBufferingLiveEvents_MockValue = AsyncThrowingStream { _ in } - updateEventsRepository.stopReceivingLiveEvents_MockMethod = {} - updateEventsRepository.pullPendingEvents_MockMethod = {} - updateEventsRepository.fetchNextPendingEventsLimit_MockValue = [] - updateEventsRepository.deleteNextPendingEventsLimit_MockMethod = { _ in } - updateEventsRepository.storeLastEventEnvelopeID_MockMethod = { _ in } - updateEventProcessor.processEvent_MockMethod = { _ in } - } - - override func tearDown() async throws { - sut = nil - modelHelper = nil - try coreDataStackHelper.cleanupDirectory() - coreDataStackHelper = nil - stack = nil - mlsService = nil - updateEventsRepository = nil - updateEventProcessor = nil - } - - // MARK: - Tests - - func testItStartsSuspended() async throws { - // Given we just initialized the sync manager. - - // Then it's suspended. - guard case .suspended = sut.syncState else { - XCTFail("unexpected sync state: \(sut.syncState)") - return - } - - // Then is has not requested any live events. - XCTAssertTrue(updateEventsRepository.startBufferingLiveEvents_Invocations.isEmpty) - } - - // MARK: - Suspension - - func testItDoesNotSuspendIfItIsAlreadySuspended() async throws { - // Given it's already suspended. - guard case .suspended = sut.syncState else { - XCTFail("unexpected sync state: \(sut.syncState)") - return - } - - // When - try await sut.suspend() - - // Then it didn't do anything. - XCTAssertTrue(updateEventsRepository.stopReceivingLiveEvents_Invocations.isEmpty) - } - - func testItSuspendsWhenLive() async throws { - // Given it goes live. - try await sut.performQuickSync() - - guard case .live = sut.syncState else { - XCTFail("unexpected sync state: \(sut.syncState)") - return - } - - // When it suspends. - try await sut.suspend() - - // Then the push channel was closed. - XCTAssertEqual(updateEventsRepository.stopReceivingLiveEvents_Invocations.count, 1) - - // Then it goes to the suspended state. - guard case .suspended = sut.syncState else { - XCTFail("unexpected sync state: \(sut.syncState)") - return - } - } - - func testItSuspendsWhenQuickSyncing() async throws { - let didPullEvents = XCTestExpectation() - let didSuspend = XCTestExpectation() - - updateEventsRepository.pullPendingEvents_MockMethod = { - didPullEvents.fulfill() - } - - updateEventsRepository.fetchNextPendingEventsLimit_MockMethod = { _ in - // Wait here until we suspend. - await self.fulfillment(of: [didSuspend]) - return [Scaffolding.makeEnvelope(with: Scaffolding.event1)] - } - - let ongoingQuickSync = Task { - // Given we are quick syncing. - try await sut.performQuickSync() - } - - // Let quick sync run, wait until it pulls events. - await fulfillment(of: [didPullEvents]) - - // When it suspends. - try await sut.suspend() - didSuspend.fulfill() - - do { - // Wait for the quick sync to finish. - try await ongoingQuickSync.value - XCTFail("expected the quick sync to cancel but it did not") - return - } catch is CancellationError { - // Then the quick sync was cancelled. - } catch { - XCTFail("expected a cancellation error but got: \(error)") - return - } - - // Then the push channel was closed. - XCTAssertEqual(updateEventsRepository.stopReceivingLiveEvents_Invocations.count, 1) - - // Then it goes to the suspended state. - guard case .suspended = sut.syncState else { - XCTFail("unexpected sync state: \(sut.syncState)") - return - } - } - - // MARK: - Quick sync - - func testItQuickSyncs() async throws { - // Given no stored events. - var storedEvents = [UpdateEventEnvelope]() - - // Mock live event stream. - var liveEventsContinuation: AsyncThrowingStream.Continuation? - updateEventsRepository.startBufferingLiveEvents_MockValue = AsyncThrowingStream { continuation in - liveEventsContinuation = continuation - } - - // Mock pull events from remote, store locally. - updateEventsRepository.pullPendingEvents_MockMethod = { - storedEvents = [ - Scaffolding.makeEnvelope(with: Scaffolding.event1), - Scaffolding.makeEnvelope(with: Scaffolding.event2), - Scaffolding.makeEnvelope(with: Scaffolding.event3) - ] - } - - // Mock fetch the next batch. - updateEventsRepository.fetchNextPendingEventsLimit_MockMethod = { limit in - Array(storedEvents.prefix(Int(limit))) - } - - // Mock delete the next batch. - updateEventsRepository.deleteNextPendingEventsLimit_MockMethod = { limit in - storedEvents = Array(storedEvents.dropFirst(Int(limit))) - } - - let didProcessEvent = XCTestExpectation() - let didProcessEvent5 = XCTestExpectation() - let didPushLiveEvents = XCTestExpectation() - - updateEventProcessor.processEvent_MockMethod = { event in - switch event { - case Scaffolding.event1: - didProcessEvent.fulfill() - - case Scaffolding.event2: - // Stop processing, wait for live events. - await self.fulfillment(of: [didPushLiveEvents]) - - case Scaffolding.event5: - didProcessEvent5.fulfill() - - default: - break - } - } - - // Run in another task so we can send events through the push channel. - let whenTask = Task.detached { - // When - try await self.sut.performQuickSync() - } - - // Wait for event 1 to be processed. - await fulfillment(of: [didProcessEvent]) - - // Push 2 live events through push channel - let liveEnvelope1 = Scaffolding.makeEnvelope( - with: Scaffolding.event4, - isTransient: true - ) - liveEventsContinuation?.yield(liveEnvelope1) - - let liveEnvelope2 = Scaffolding.makeEnvelope( - with: Scaffolding.event5, - isTransient: false - ) - liveEventsContinuation?.yield(liveEnvelope2) - didPushLiveEvents.fulfill() - - // Wait for "when" to finish. - try await whenTask.value - - // We also need to wait to get the two live events... - await fulfillment(of: [didProcessEvent5]) - - // Then it requested live events. - XCTAssertEqual(updateEventsRepository.startBufferingLiveEvents_Invocations.count, 1) - - // Then it pulls pending events. - XCTAssertEqual(updateEventsRepository.pullPendingEvents_Invocations.count, 1) - - // Then it tries to fetch 2 event batches (1st is non-empty, 2nd is empty). - XCTAssertEqual(updateEventsRepository.fetchNextPendingEventsLimit_Invocations, [500, 500]) - - // Then it processed 5 events, in the correct order. - let processEventInvocations = updateEventProcessor.processEvent_Invocations - - guard processEventInvocations.count == 5 else { - XCTFail("expected 5 events to be processed, got \(processEventInvocations.count)") - return - } - - // These were the stored events. - XCTAssertEqual(processEventInvocations[0], Scaffolding.event1) - XCTAssertEqual(processEventInvocations[1], Scaffolding.event2) - XCTAssertEqual(processEventInvocations[2], Scaffolding.event3) - - // These were the buffered events. - XCTAssertEqual(processEventInvocations[3], Scaffolding.event4) - XCTAssertEqual(processEventInvocations[4], Scaffolding.event5) - - // Then it deleted 1 batch (i.e all) of the stored events. - XCTAssertEqual(updateEventsRepository.deleteNextPendingEventsLimit_Invocations, [500]) - XCTAssertTrue(storedEvents.isEmpty) - - // Then it update the last event id for the non-transient live events. - XCTAssertEqual( - updateEventsRepository.storeLastEventEnvelopeID_Invocations, - [liveEnvelope2.id] - ) - - // Then it is live. - guard case .live = sut.syncState else { - XCTFail("unexpected sync state: \(sut.syncState)") - return - } - - XCTAssertEqual(updateEventsRepository.stopReceivingLiveEvents_Invocations.count, 0) - } - - private enum Scaffolding { - - static let localDomain = "example.com" - static let conversationID1 = ConversationID(uuid: UUID(), domain: localDomain) - static let conversationID2 = ConversationID(uuid: UUID(), domain: localDomain) - static let aliceID = UserID(uuid: UUID(), domain: localDomain) - - static let event1 = UpdateEvent.user(.clientAdd(UserClientAddEvent(client: SelfUserClient( - id: "userClientID", - type: .permanent, - activationDate: .now, - capabilities: [.legalholdConsent] - )))) - - static let event2 = UpdateEvent.conversation(.typing(ConversationTypingEvent( - conversationID: conversationID1, - senderID: aliceID, - isTyping: true - ))) - - static let event3 = UpdateEvent.conversation(.delete(ConversationDeleteEvent( - conversationID: conversationID1, - senderID: aliceID, - timestamp: .now - ))) - - static let event4 = UpdateEvent.conversation(.rename(ConversationRenameEvent( - conversationID: conversationID2, - senderID: aliceID, - timestamp: .now, - newName: "Foo" - ))) - - static let event5 = UpdateEvent.conversation(.rename(ConversationRenameEvent( - conversationID: conversationID2, - senderID: aliceID, - timestamp: .now, - newName: "Bar" - ))) - - static func makeEnvelope( - with event: UpdateEvent, - isTransient: Bool = false - ) -> UpdateEventEnvelope { - .init( - id: UUID(), - events: [event], - isTransient: isTransient - ) - } - - } -} diff --git a/wire-ios-data-model/Source/Model/ZMEventModel/StoredUpdateEventEnvelope.swift b/wire-ios-data-model/Source/Model/ZMEventModel/StoredUpdateEventEnvelope.swift index 8a6db4210a1..23286950b1e 100644 --- a/wire-ios-data-model/Source/Model/ZMEventModel/StoredUpdateEventEnvelope.swift +++ b/wire-ios-data-model/Source/Model/ZMEventModel/StoredUpdateEventEnvelope.swift @@ -59,4 +59,17 @@ public final class StoredUpdateEventEnvelope: NSManagedObject { return request } + + /// Create a fetch request to retrieve a single event envelope. + /// + /// - Parameter sortIndex: The sort index of the desired envelope. + /// - Returns: A fetch request for a single event envelope. + + public static func fetchRequest(sortIndex: Int64) -> NSFetchRequest { + let request = NSFetchRequest(entityName: entityName) + request.predicate = NSPredicate(format: "\(#keyPath(StoredUpdateEventEnvelope.sortIndex)) == \(sortIndex)") + request.fetchLimit = 1 + return request + } + } diff --git a/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift b/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift index 2789f24aff7..3cb4ae0458f 100644 --- a/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift +++ b/wire-ios-sync-engine/Source/Synchronization/SyncAgent.swift @@ -30,9 +30,12 @@ final class SyncAgent: NSObject { weak var delegate: SyncAgentDelegate? private let lastUpdateEventIDRepository: any LastEventIDRepositoryInterface - private let initialSyncBuilder: any InitialSyncBuilderProtocol + private let initialSyncProvider: any InitialSyncProvider + private let incrementalSyncProvider: any IncrementalSyncProvider private let legacySyncStatus: any SyncStatusProtocol + private var incrementalSyncTask: Task? + private var hasCompletedInitialSync: Bool { lastUpdateEventIDRepository.fetchLastEventID() != nil } @@ -41,11 +44,13 @@ final class SyncAgent: NSObject { init( lastUpdateEventIDRepository: any LastEventIDRepositoryInterface, - initialSyncBuilder: any InitialSyncBuilderProtocol, + initialSyncProvider: any InitialSyncProvider, + incrementalSyncProvider: any IncrementalSyncProvider, legacySyncStatus: any SyncStatusProtocol ) { self.lastUpdateEventIDRepository = lastUpdateEventIDRepository - self.initialSyncBuilder = initialSyncBuilder + self.initialSyncProvider = initialSyncProvider + self.incrementalSyncProvider = incrementalSyncProvider self.legacySyncStatus = legacySyncStatus super.init() } @@ -72,7 +77,7 @@ final class SyncAgent: NSObject { do { delegate?.syncAgentDidStartInitialSync(self) WireLogger.sync.debug("did start new initial sync") - try await initialSyncBuilder.buildInitialSync().perform(skipPullingLastUpdateEventID: false) + try await initialSyncProvider.provideInitialSync().perform(skipPullingLastUpdateEventID: false) WireLogger.sync.debug("did finish new initial sync") delegate?.syncAgentDidFinishInitialSync(self) } catch { @@ -94,7 +99,7 @@ final class SyncAgent: NSObject { do { delegate?.syncAgentDidStartInitialSync(self) WireLogger.sync.debug("did start new resource sync") - try await initialSyncBuilder.buildInitialSync().perform(skipPullingLastUpdateEventID: true) + try await initialSyncProvider.provideInitialSync().perform(skipPullingLastUpdateEventID: true) WireLogger.sync.debug("did finish new resource sync") delegate?.syncAgentDidFinishInitialSync(self) } catch { @@ -112,7 +117,25 @@ final class SyncAgent: NSObject { /// Perform an incremental sync. func performIncrementalSync() async throws { - await legacySyncStatus.performQuickSync() + if DeveloperFlag.newInitialSync.isOn { + do { + if let incrementalSyncTask { + WireLogger.sync.info("incremental sync already running, waiting for it instead") + try await incrementalSyncTask.value + } else { + delegate?.syncAgentDidStartIncrementalSync(self) + incrementalSyncTask = try await incrementalSyncProvider.provideIncrementalSync().perform() + delegate?.syncAgentDidFinishIncrementalSync(self) + incrementalSyncTask = nil + } + } catch { + WireLogger.sync.error("failed to perform new incremental sync: \(String(describing: error))") + incrementalSyncTask = nil + throw error + } + } else { + await legacySyncStatus.performQuickSync() + } } } diff --git a/wire-ios-sync-engine/Source/Synchronization/SyncAgentDelegate.swift b/wire-ios-sync-engine/Source/Synchronization/SyncAgentDelegate.swift index 21a85f45daa..d1ad603d248 100644 --- a/wire-ios-sync-engine/Source/Synchronization/SyncAgentDelegate.swift +++ b/wire-ios-sync-engine/Source/Synchronization/SyncAgentDelegate.swift @@ -22,6 +22,9 @@ protocol SyncAgentDelegate: AnyObject { func syncAgentDidStartInitialSync(_ syncAgent: SyncAgent) func syncAgentDidFinishInitialSync(_ syncAgent: SyncAgent) + func syncAgentDidStartIncrementalSync(_ syncAgent: SyncAgent) + func syncAgentDidFinishIncrementalSync(_ syncAgent: SyncAgent) + func syncAgentDidStartLegacyInitialSync(_ syncAgent: SyncAgent) func syncAgentDidFinishLegacyInitialSync(_ syncAgent: SyncAgent) func syncAgentDidStartLegacyIncrementalSync(_ syncAgent: SyncAgent) diff --git a/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop+OperationStatus.swift b/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop+OperationStatus.swift index fdc92865be9..816e914780e 100644 --- a/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop+OperationStatus.swift +++ b/wire-ios-sync-engine/Source/Synchronization/ZMOperationLoop+OperationStatus.swift @@ -28,7 +28,13 @@ extension ZMOperationLoop: OperationStatusDelegate { transportSession.enterBackground() } - transportSession.pushChannel.keepOpen = state == .foreground || state == .backgroundCall + if DeveloperFlag.newInitialSync.isOn { + // The new sync also includes a new push channel, so we don't + // what to open the legacy one. + transportSession.pushChannel.keepOpen = false + } else { + transportSession.pushChannel.keepOpen = state == .foreground || state == .backgroundCall + } } } diff --git a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+LifeCycle.swift b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+LifeCycle.swift index e5c5f29529c..ad07c45b01a 100644 --- a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+LifeCycle.swift +++ b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+LifeCycle.swift @@ -72,7 +72,7 @@ public extension ZMUserSession { internal func processPendingEvents() { syncContext.performGroupedBlock { - self.processEvents() + self.processLegacyEvents() } } diff --git a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+UserSession.swift b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+UserSession.swift index 36fd3befcb3..a7060d477b2 100644 --- a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+UserSession.swift +++ b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession+UserSession.swift @@ -114,7 +114,7 @@ extension ZMUserSession: UserSession { DatabaseEncryptionLockNotification(databaseIsEncrypted: false).post(in: notificationContext) - processEvents() + processLegacyEvents() } public func deleteAppLockPasscode() throws { diff --git a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift index dd67e4e9316..c5db5c60b87 100644 --- a/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift +++ b/wire-ios-sync-engine/Source/UserSession/ZMUserSession/ZMUserSession.swift @@ -59,7 +59,7 @@ public final class ZMUserSession: NSObject { private(set) var transportSession: TransportSessionType let storedDidSaveNotifications: ContextDidSaveNotificationPersistence let userExpirationObserver: UserExpirationObserver - private(set) var updateEventProcessor: UpdateEventProcessor? + private(set) var legacyUpdateEventProcessor: UpdateEventProcessor? private(set) var strategyDirectory: StrategyDirectoryProtocol? private(set) var syncStrategy: ZMSyncStrategy? private(set) var operationLoop: ZMOperationLoop? @@ -491,7 +491,7 @@ public final class ZMUserSession: NSObject { self .strategyDirectory = strategyDirectory ?? createStrategyDirectory(useLegacyPushNotifications: configuration.useLegacyPushNotifications) - updateEventProcessor = eventProcessor ?? createUpdateEventProcessor() + legacyUpdateEventProcessor = eventProcessor ?? createUpdateEventProcessor() self.syncStrategy = syncStrategy ?? createSyncStrategy() self.operationLoop = operationLoop ?? createOperationLoop(isDeveloperModeEnabled: isDeveloperModeEnabled) urlActionProcessors = createURLActionProcessors() @@ -559,7 +559,8 @@ public final class ZMUserSession: NSObject { let clientSessionComponent = userSessionComponent.clientSessionComponent(clientID: clientID) let syncAgent = SyncAgent( lastUpdateEventIDRepository: lastEventIDRepository, - initialSyncBuilder: clientSessionComponent, + initialSyncProvider: clientSessionComponent, + incrementalSyncProvider: clientSessionComponent, legacySyncStatus: applicationStatusDirectory.syncStatus ) applicationStatusDirectory.syncStatus.syncStateDelegate = syncAgent @@ -657,7 +658,7 @@ public final class ZMUserSession: NSObject { private func createURLActionProcessors() -> [URLActionProcessor] { [ ImportEventsURLActionProcessor( - eventProcessor: updateEventProcessor! + eventProcessor: legacyUpdateEventProcessor! ), DeepLinkURLActionProcessor( contextProvider: coreDataStack, @@ -688,7 +689,7 @@ public final class ZMUserSession: NSObject { ZMOperationLoop( transportSession: transportSession, requestStrategy: syncStrategy, - updateEventProcessor: updateEventProcessor!, + updateEventProcessor: legacyUpdateEventProcessor!, operationStatus: applicationStatusDirectory.operationStatus, syncStatus: applicationStatusDirectory.syncStatus, pushNotificationStatus: applicationStatusDirectory.pushNotificationStatus, @@ -831,7 +832,7 @@ public final class ZMUserSession: NSObject { // might be replaced by something more elegant public func processUpdateEvents(_ events: [ZMUpdateEvent]) { WaitingGroupTask(context: syncContext) { - try? await self.updateEventProcessor?.processEvents(events) + try? await self.legacyUpdateEventProcessor?.processEvents(events) } } @@ -965,15 +966,15 @@ extension ZMUserSession: ZMNetworkStateDelegate { // TODO: [WPB-9089] find another way of providing the event processor to ZMissingEventTranscoder extension ZMUserSession: UpdateEventProcessor { public func bufferEvents(_ events: [WireTransport.ZMUpdateEvent]) async { - await updateEventProcessor?.bufferEvents(events) + await legacyUpdateEventProcessor?.bufferEvents(events) } public func processEvents(_ events: [WireTransport.ZMUpdateEvent]) async throws { - try await updateEventProcessor?.processEvents(events) + try await legacyUpdateEventProcessor?.processEvents(events) } public func processBufferedEvents() async throws { - try await updateEventProcessor?.processBufferedEvents() + try await legacyUpdateEventProcessor?.processBufferedEvents() } } @@ -989,6 +990,14 @@ extension ZMUserSession: SyncAgentDelegate { didFinishInitialSync() } + func syncAgentDidStartIncrementalSync(_ syncAgent: SyncAgent) { + didStartIncrementalSync() + } + + func syncAgentDidFinishIncrementalSync(_ syncAgent: SyncAgent) { + didFinishIncrementalSync() + } + func syncAgentDidStartLegacyInitialSync(_ syncAgent: SyncAgent) { didStartInitialSync() } @@ -1058,7 +1067,7 @@ extension ZMUserSession: SyncAgentDelegate { syncContext.performGroupedBlock { [weak self] in guard let self else { return } WireLogger.sync.debug("did finish incremental sync") - processEvents() + processLegacyEvents() NotificationInContext( name: .quickSyncCompletedNotification, @@ -1190,7 +1199,7 @@ extension ZMUserSession: SyncAgentDelegate { } } - func processEvents() { + func processLegacyEvents() { managedObjectContext.performGroupedBlock { [weak self] in self?.isPerformingSync = true self?.updateNetworkState() @@ -1200,7 +1209,7 @@ extension ZMUserSession: SyncAgentDelegate { Task { var processingInterrupted = false do { - try await updateEventProcessor?.processBufferedEvents() + try await legacyUpdateEventProcessor?.processBufferedEvents() } catch { processingInterrupted = true } @@ -1228,7 +1237,7 @@ extension ZMUserSession: SyncAgentDelegate { Task { do { // TODO: [WPB-15391] why not processing only the call events (should be stored here?) - try await updateEventProcessor!.processBufferedEvents() + try await legacyUpdateEventProcessor!.processBufferedEvents() await managedObjectContext.perform { completionHandler() } diff --git a/wire-ios-sync-engine/Tests/Source/UserSession/ZMUserSessionTests+Syncing.swift b/wire-ios-sync-engine/Tests/Source/UserSession/ZMUserSessionTests+Syncing.swift index 42a98321a9b..969b1706e18 100644 --- a/wire-ios-sync-engine/Tests/Source/UserSession/ZMUserSessionTests+Syncing.swift +++ b/wire-ios-sync-engine/Tests/Source/UserSession/ZMUserSessionTests+Syncing.swift @@ -185,7 +185,7 @@ final class ZMUserSessionTests_Syncing: ZMUserSessionTestsBase { XCTAssertTrue(sut.isPerformingSync) // when - sut.processEvents() + sut.processLegacyEvents() XCTAssertTrue(waitForAllGroupsToBeEmpty(withTimeout: 0.5)) // then @@ -202,7 +202,7 @@ final class ZMUserSessionTests_Syncing: ZMUserSessionTestsBase { networkStateRecorder.observe(in: sut.managedObjectContext.notificationContext) // when - sut.processEvents() + sut.processLegacyEvents() XCTAssertTrue(waitForAllGroupsToBeEmpty(withTimeout: 0.5)) // then diff --git a/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift b/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift index c380c6fd473..a49fdaf58ec 100644 --- a/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift +++ b/wire-ios-transport/Source/PushChannel/StarscreamPushChannel.swift @@ -56,7 +56,9 @@ final class StarscreamPushChannel: NSObject, PushChannelType { } var canOpenConnection: Bool { - keepOpen && websocketURL != nil && consumer != nil + // This is a legacy push channel, so don't open it if we should use the new one. + guard !DeveloperFlag.newInitialSync.isOn else { return false } + return keepOpen && websocketURL != nil && consumer != nil } var websocketURL: URL? {