Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: integrate new incremental sync - WPB-15440 #2579

Draft
wants to merge 70 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
5538c17
inject SyncAgent to user session
johnxnguyen Feb 12, 2025
616cb74
make user session delegate of sync agent
johnxnguyen Feb 12, 2025
625c2f7
make sync agent delegate of sync status
johnxnguyen Feb 12, 2025
57aed9a
trigger syncs
johnxnguyen Feb 12, 2025
028517d
delete dead code
johnxnguyen Feb 12, 2025
2bd6117
fix todo
johnxnguyen Feb 12, 2025
633f69c
fix error
johnxnguyen Feb 12, 2025
4e5c087
fix log
johnxnguyen Feb 12, 2025
880506e
remove dead code
johnxnguyen Feb 12, 2025
cc18864
run sync status methods on sync context
johnxnguyen Feb 12, 2025
2344b7d
fix wrong method call
johnxnguyen Feb 12, 2025
aa02c53
trigger sync only if there is a self client
johnxnguyen Feb 12, 2025
1a92034
trigger sync after client registration
johnxnguyen Feb 12, 2025
f1ed4f2
handle sync events on sync context
johnxnguyen Feb 12, 2025
f2fabfc
update label
johnxnguyen Feb 12, 2025
3b683cb
update todo
johnxnguyen Feb 12, 2025
6f2e08b
add missing resource step
johnxnguyen Feb 12, 2025
2fac4d7
fix errors
johnxnguyen Feb 12, 2025
23be37a
format
johnxnguyen Feb 13, 2025
94437e2
fix tests
johnxnguyen Feb 13, 2025
12338ef
fix typo
johnxnguyen Feb 13, 2025
6563468
format
johnxnguyen Feb 13, 2025
38eb0fb
Merge branch 'develop' into refactor/integrate-sync-agent-wpb-10801
johnxnguyen Feb 14, 2025
1d361e6
Merge branch 'develop' into refactor/integrate-sync-agent-wpb-10801
johnxnguyen Feb 17, 2025
d29d2be
Merge branch 'refactor/integrate-sync-agent-wpb-10801' of github.com:…
johnxnguyen Feb 19, 2025
4861ea5
Merge branch 'develop' into refactor/integrate-sync-agent-wpb-10801
johnxnguyen Feb 19, 2025
358d21d
Merge branch 'develop' into refactor/integrate-sync-agent-wpb-10801
johnxnguyen Feb 19, 2025
63f1c90
fix typo
johnxnguyen Feb 19, 2025
c3b25eb
create components to hold dependencies
johnxnguyen Feb 19, 2025
e140bed
create UserSessionComponent
johnxnguyen Feb 20, 2025
a1d400d
create SyncAgent from ClientSessionComponent
johnxnguyen Feb 20, 2025
4625e67
fix tests
johnxnguyen Feb 20, 2025
a4bc246
format
johnxnguyen Feb 20, 2025
8a41184
Merge branch 'develop' into refactor/integrate-sync-agent-wpb-10801
johnxnguyen Feb 20, 2025
d03416d
delete dead code
johnxnguyen Feb 20, 2025
78be5d3
rename method
johnxnguyen Feb 20, 2025
56bdb28
Merge branch 'refactor/integrate-sync-agent-wpb-10801' of github.com:…
johnxnguyen Feb 20, 2025
8e2aa6c
fix tests
johnxnguyen Feb 20, 2025
c7aa6a2
Merge branch 'develop' into refactor/integrate-sync-agent-wpb-10801
johnxnguyen Feb 20, 2025
9d39dba
rename property
johnxnguyen Feb 20, 2025
6cf20dc
fix tests
johnxnguyen Feb 20, 2025
b5386d7
Merge branch 'refactor/integrate-sync-agent-wpb-10801' of github.com:…
johnxnguyen Feb 20, 2025
43e480e
rename method
johnxnguyen Feb 20, 2025
1661769
move extension to file
johnxnguyen Feb 20, 2025
00e55f2
remove task groups
johnxnguyen Feb 20, 2025
ba11ffb
format
johnxnguyen Feb 20, 2025
eb4f852
disable test
johnxnguyen Feb 20, 2025
1c35347
simply sync setup and invocation
johnxnguyen Feb 20, 2025
566801c
format
johnxnguyen Feb 20, 2025
7fefd5b
fix test compilation
johnxnguyen Feb 21, 2025
b306e82
make push channel sendable
johnxnguyen Feb 21, 2025
442b1f6
add event processor protocol conformances
johnxnguyen Feb 21, 2025
3d45908
add method to delete a stored event envelope by index
johnxnguyen Feb 21, 2025
bfe880a
create IncrementalSync
johnxnguyen Feb 21, 2025
d8b28af
build IncrementalSync
johnxnguyen Feb 21, 2025
53c5511
Merge branch 'develop' into refactor/incremental-sync-wpb-15440
johnxnguyen Feb 21, 2025
3adb2d9
add SyncAgentDelegate methods for incremental sync
johnxnguyen Feb 21, 2025
1955aa0
add incremental sync to SyncAgent
johnxnguyen Feb 21, 2025
aeeb3ef
don't open legacy push channel
johnxnguyen Feb 21, 2025
fc24382
move event encoding decoding inside store
johnxnguyen Feb 21, 2025
dd87b0d
delete dead code
johnxnguyen Feb 21, 2025
9c54a9c
wait for existing incremental sync
johnxnguyen Feb 21, 2025
397310b
remove todo
johnxnguyen Feb 21, 2025
532b5d8
clear incremental sync task when done
johnxnguyen Feb 21, 2025
4c2842e
fix not opening legacy push channel
johnxnguyen Feb 21, 2025
c4fca8e
rename legacy methods
johnxnguyen Feb 21, 2025
955ee62
change logger
johnxnguyen Feb 21, 2025
7f0e69e
fix logs
johnxnguyen Feb 21, 2025
ed9a630
drop invalid message insertions
johnxnguyen Feb 21, 2025
af82170
clean logs
johnxnguyen Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ extension NetworkService: URLSessionWebSocketDelegate {
didCloseWith closeCode: URLSessionWebSocketTask.CloseCode,
reason: Data?
) {
webSocketsByTask[webSocketTask]?.close()
webSocketsByTask[webSocketTask] = nil
Task {
await webSocketsByTask[webSocketTask]?.close()
webSocketsByTask[webSocketTask] = nil
}
}

}
Expand Down
16 changes: 8 additions & 8 deletions WireAPI/Sources/WireAPI/Network/PushChannel/PushChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@
import Foundation
import WireFoundation

final class PushChannel: PushChannelProtocol {
public final class PushChannel: PushChannelProtocol {

typealias Stream = AsyncThrowingStream<UpdateEventEnvelope, any Error>
public typealias Stream = AsyncThrowingStream<UpdateEventEnvelope, any Error>

private let webSocket: any WebSocketProtocol
private let decoder = JSONDecoder()

init(webSocket: any WebSocketProtocol) {
public init(webSocket: any WebSocketProtocol) {
self.webSocket = webSocket
}

func open() throws -> Stream {
public func open() async throws -> Stream {
print("opening new push channel")
return try webSocket.open().map { [weak self, decoder] message in
return try await webSocket.open().map { [weak self, decoder] message in
do {
switch message {
case let .data(data):
Expand All @@ -50,15 +50,15 @@ final class PushChannel: PushChannelProtocol {
}
} catch {
print("failed to get next web socket message: \(error)")
self?.close()
await self?.close()
throw error
}
}.toStream()
}

func close() {
public func close() async {
print("closing push channel")
webSocket.close()
await webSocket.close()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import Foundation

// sourcery: AutoMockable
/// Make a direct connection to a server to receive update events.
public protocol PushChannelProtocol {
public protocol PushChannelProtocol: Sendable {

/// Open the push channel and start receiving update events.
///
/// - Returns: An async stream of live update event envelopes.

func open() throws -> AsyncThrowingStream<UpdateEventEnvelope, any Error>
func open() async throws -> AsyncThrowingStream<UpdateEventEnvelope, any Error>

/// Close the push channel and stop receiving update events.

func close()
func close() async

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class PushChannelService: PushChannelServiceProtocol {
private let networkService: NetworkService
private let authenticationManager: any AuthenticationManagerProtocol

init(
public init(
networkService: NetworkService,
authenticationManager: any AuthenticationManagerProtocol
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import Foundation

// sourcery: AutoMockable
protocol URLSessionWebSocketTaskProtocol {
public protocol URLSessionWebSocketTaskProtocol: Sendable {

var isOpen: Bool { get }

Expand All @@ -30,15 +30,15 @@ protocol URLSessionWebSocketTaskProtocol {
reason: Data?
)

func receive(completionHandler: @escaping (Result<URLSessionWebSocketTask.Message, any Error>) -> Void)
func receive(completionHandler: @escaping @Sendable (Result<URLSessionWebSocketTask.Message, any Error>) -> Void)

func receive() async throws -> URLSessionWebSocketTask.Message

}

extension URLSessionWebSocketTask: URLSessionWebSocketTaskProtocol {

var isOpen: Bool {
public var isOpen: Bool {
closeCode == .invalid
}

Expand Down
16 changes: 6 additions & 10 deletions WireAPI/Sources/WireAPI/Network/PushChannel/WebSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,18 @@

import Foundation

final class WebSocket: WebSocketProtocol {
public actor WebSocket: WebSocketProtocol {

typealias Stream = AsyncThrowingStream<URLSessionWebSocketTask.Message, any Error>
public typealias Stream = AsyncThrowingStream<URLSessionWebSocketTask.Message, any Error>

private let connection: any URLSessionWebSocketTaskProtocol
private var continuation: Stream.Continuation?

init(connection: any URLSessionWebSocketTaskProtocol) {
public init(connection: any URLSessionWebSocketTaskProtocol) {
self.connection = connection
}

deinit {
close()
}

func open() throws -> Stream {
public func open() async throws -> Stream {
connection.resume()

if #available(iOS 17, *) {
Expand Down Expand Up @@ -62,7 +58,7 @@ final class WebSocket: WebSocketProtocol {
return Stream { continuation in
self.continuation = continuation

func yieldNextMessage() {
@Sendable func yieldNextMessage() {
guard connection.isOpen else {
continuation.finish()
return
Expand All @@ -85,7 +81,7 @@ final class WebSocket: WebSocketProtocol {
}
}

func close() {
public func close() async {
connection.cancel(with: .goingAway, reason: nil)
continuation?.finish()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import Foundation

// sourcery: AutoMockable
protocol WebSocketProtocol {
public protocol WebSocketProtocol: Sendable {

func open() throws -> AsyncThrowingStream<URLSessionWebSocketTask.Message, any Error>
func open() async throws -> AsyncThrowingStream<URLSessionWebSocketTask.Message, any Error>

func close()
func close() async

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ final class PushChannelTests: XCTestCase {
}

// When the push channel is open and the stream is iterated
let liveEventEnvelopes = try sut.open()
let liveEventEnvelopes = try await sut.open()

var receivedEnvelopes = [UpdateEventEnvelope]()
for try await envelope in liveEventEnvelopes {
Expand All @@ -74,10 +74,10 @@ final class PushChannelTests: XCTestCase {
func testClosingPushChannel() async throws {
// Given an open push channel
webSocket.open_MockValue = AsyncThrowingStream { _ in }
_ = try sut.open()
_ = try await sut.open()

// When the push channel is closed
sut.close()
await sut.close()

// Then the web socket was closed
XCTAssertEqual(webSocket.close_Invocations.count, 1)
Expand All @@ -91,7 +91,7 @@ final class PushChannelTests: XCTestCase {
// Don't call finish, so the stream stays open.
}

let liveEventEnvelopes = try sut.open()
let liveEventEnvelopes = try await sut.open()

do {
for try await _ in liveEventEnvelopes {
Expand All @@ -115,7 +115,7 @@ final class PushChannelTests: XCTestCase {
// Don't call finish, so the stream stays open.
}

let liveEventEnvelopes = try sut.open()
let liveEventEnvelopes = try await sut.open()

do {
for try await _ in liveEventEnvelopes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,6 @@ final class WebSocketTests: XCTestCase {
try await super.tearDown()
}

func testWebSocketCancelsWhenItDeinitializes() async throws {
// When
{
_ = WebSocket(connection: self.connection)
}()

// Then
let invocations = connection.cancelWithReason_Invocations
try XCTAssertCount(invocations, count: 1)
XCTAssertEqual(invocations[0].closeCode, .goingAway)
XCTAssertNil(invocations[0].reason)
}

func testWebSocketCloses() async throws {
// Given we're iterating over the web socket
let sut = WebSocket(connection: connection)
Expand All @@ -69,7 +56,7 @@ final class WebSocketTests: XCTestCase {

Task {
do {
for try await _ in try sut.open() {
for try await _ in try await sut.open() {
didReceiveMessage.fulfill()
}
} catch {
Expand All @@ -83,7 +70,7 @@ final class WebSocketTests: XCTestCase {
await fulfillment(of: [didReceiveMessage], timeout: 1)

// When
sut.close()
await sut.close()

// Then the stream finished successfully
await fulfillment(of: [didFinishIterating], timeout: 1)
Expand Down Expand Up @@ -113,7 +100,7 @@ final class WebSocketTests: XCTestCase {

Task {
do {
for try await _ in try sut.open() {
for try await _ in try await sut.open() {
didReceiveMessage.fulfill()
}
} catch {
Expand Down Expand Up @@ -157,7 +144,7 @@ final class WebSocketTests: XCTestCase {

Task {
do {
for try await _ in try sut.open() {
for try await _ in try await sut.open() {
didReceiveMessage.fulfill()
}
} catch {
Expand Down Expand Up @@ -199,7 +186,7 @@ final class WebSocketTests: XCTestCase {

// When
do {
for try await message in try sut.open() {
for try await message in try await sut.open() {
if case let .data(data) = message {
receivedMessageData.append(data)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

import Foundation

enum UpdateEventsRepositoryError: Error {
extension ClientSessionComponent: IncrementalSyncProvider {

case lastEventIDMissing
case failedToDecodeStoredEvent(Error)
public func provideIncrementalSync() throws -> IncrementalSync {
incrementalSync
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import Foundation

extension ClientSessionComponent: InitialSyncBuilderProtocol {
extension ClientSessionComponent: InitialSyncProvider {

public func buildInitialSync() throws -> any InitialSyncProtocol {
public func provideInitialSync() throws -> any InitialSyncProtocol {
initialSync
}

Expand Down
Loading