@@ -19,14 +19,16 @@ import io.rsocket.kotlin.transport.RSocketClientTarget
1919import io.rsocket.kotlin.transport.RSocketConnection
2020import io.rsocket.kotlin.transport.RSocketTransportApi
2121import io.rsocket.kotlin.transport.ktor.websocket.internal.KtorWebSocketConnection
22+ import kotlinx.coroutines.CancellationException
2223import kotlinx.coroutines.Dispatchers
2324import kotlinx.coroutines.IO
2425import kotlinx.coroutines.currentCoroutineContext
26+ import kotlinx.coroutines.ensureActive
2527import kotlinx.coroutines.flow.Flow
2628import kotlinx.coroutines.flow.emitAll
2729import kotlinx.coroutines.flow.flow
2830import kotlinx.coroutines.flow.flowOn
29- import kotlinx.coroutines.flow.map
31+ import kotlinx.coroutines.flow.transform
3032import kotlinx.io.readByteArray
3133import kotlinx.serialization.SerialName
3234import kotlinx.serialization.Serializable
@@ -50,81 +52,115 @@ internal fun HttpClient.rSocketSyncStream(
5052 credentials : PowerSyncCredentials ,
5153): Flow <PowerSyncControlArguments > =
5254 flow {
53- val flowContext = currentCoroutineContext()
55+ try {
56+ val flowContext = currentCoroutineContext()
5457
55- val websocketUri =
56- URLBuilder (credentials.endpointUri(" sync/stream" )).apply {
57- protocol =
58- when (protocolOrNull) {
59- URLProtocol .HTTP -> URLProtocol .WS
60- else -> URLProtocol .WSS
61- }
62- }
63-
64- // Note: We're using a custom connector here because we need to set options for each request
65- // without creating a new HTTP client each time. The recommended approach would be to add an
66- // RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for
67- // all connections (bad because we need a short-lived token in there).
68- // https://github.com/rsocket/rsocket-kotlin/issues/311
69- val target =
70- object : RSocketClientTarget {
71- @RSocketTransportApi
72- override suspend fun connectClient (): RSocketConnection {
73- val ws =
74- webSocketSession {
75- url.takeFrom(websocketUri)
58+ val websocketUri =
59+ URLBuilder (credentials.endpointUri(" sync/stream" )).apply {
60+ protocol =
61+ when (protocolOrNull) {
62+ URLProtocol .HTTP -> URLProtocol .WS
63+ else -> URLProtocol .WSS
7664 }
77- return KtorWebSocketConnection (ws)
7865 }
7966
80- override val coroutineContext: CoroutineContext
81- get() = flowContext
82- }
67+ // Note: We're using a custom connector here because we need to set options for each request
68+ // without creating a new HTTP client each time. The recommended approach would be to add an
69+ // RSocket extension to the HTTP client, but that only allows us to set the SETUP metadata for
70+ // all connections (bad because we need a short-lived token in there).
71+ // https://github.com/rsocket/rsocket-kotlin/issues/311
72+ val target =
73+ object : RSocketClientTarget {
74+ @RSocketTransportApi
75+ override suspend fun connectClient (): RSocketConnection {
76+ val ws =
77+ webSocketSession {
78+ url.takeFrom(websocketUri)
79+ }
80+ return KtorWebSocketConnection (ws)
81+ }
8382
84- val connector =
85- RSocketConnector {
86- connectionConfig {
87- payloadMimeType =
88- PayloadMimeType (
89- metadata = " application/json" ,
90- data = " application/json" ,
91- )
83+ override val coroutineContext: CoroutineContext
84+ get() = flowContext
85+ }
9286
93- setupPayload {
94- buildPayload {
95- data(" {}" )
96- metadata(
97- JsonUtil .json.encodeToString(
98- ConnectionSetupMetadata (
99- token = " Bearer ${credentials.token} " ,
100- userAgent = userAgent,
101- ),
102- ),
87+ val connector =
88+ RSocketConnector {
89+ connectionConfig {
90+ payloadMimeType =
91+ PayloadMimeType (
92+ metadata = " application/json" ,
93+ data = " application/json" ,
10394 )
95+
96+ setupPayload {
97+ buildPayload {
98+ data(" {}" )
99+ metadata(
100+ JsonUtil .json.encodeToString(
101+ ConnectionSetupMetadata (
102+ token = " Bearer ${credentials.token} " ,
103+ userAgent = userAgent,
104+ ),
105+ ),
106+ )
107+ }
104108 }
105- }
106109
107- keepAlive = KeepAlive (interval = 20.0 .seconds, maxLifetime = 30.0 .seconds)
110+ keepAlive = KeepAlive (interval = 20.0 .seconds, maxLifetime = 30.0 .seconds)
111+ }
108112 }
109- }
110113
111- val rSocket = connector.connect(target)
112- emit(PowerSyncControlArguments .ConnectionEstablished )
113- val syncStream =
114- rSocket.requestStream(
115- buildPayload {
116- data(JsonUtil .json.encodeToString(req))
117- metadata(JsonUtil .json.encodeToString(RequestStreamMetadata (" /sync/stream" )))
118- },
119- )
114+ val rSocket = connector.connect(target)
115+ val syncStream =
116+ rSocket.requestStream(
117+ buildPayload {
118+ data(JsonUtil .json.encodeToString(req))
119+ metadata(JsonUtil .json.encodeToString(RequestStreamMetadata (" /sync/stream" )))
120+ },
121+ )
120122
121- emitAll(
122- syncStream
123- .map {
124- PowerSyncControlArguments .BinaryLine (it.data.readByteArray())
125- }.flowOn(Dispatchers .IO ),
126- )
127- emit(PowerSyncControlArguments .ResponseStreamEnd )
123+ // Emit ConnectionEstablished only when the first frame arrives from the server, mirroring
124+ // the HTTP path which emits it only after receiving a 200 OK. This prevents falsely
125+ // reporting a successful connection when the server rejects the token: the WebSocket
126+ // upgrade (HTTP 101) succeeds at the transport layer, but token validation happens when
127+ // the server processes the stream request and responds.
128+ var connectionEstablishedEmitted = false
129+ emitAll(
130+ syncStream
131+ .transform { payload ->
132+ if (! connectionEstablishedEmitted) {
133+ connectionEstablishedEmitted = true
134+ emit(PowerSyncControlArguments .ConnectionEstablished )
135+ }
136+ emit(PowerSyncControlArguments .BinaryLine (payload.data.readByteArray()))
137+ }.flowOn(Dispatchers .IO ),
138+ )
139+ emit(PowerSyncControlArguments .ResponseStreamEnd )
140+ } catch (e: CancellationException ) {
141+ // A CancellationException here means the transport layer failed without the server
142+ // sending an RSocket ERROR frame first (e.g. iOS OS detecting a dead socket, airplane
143+ // mode, wifi dropout). rsocket-kotlin cancels its internal connection scope with the
144+ // underlying IOException as the cause, which arrives here as a CancellationException
145+ // rather than an RSocketError. The iOS OS surfaces dead sockets promptly, so this
146+ // path is reached well before any keep-alive timeout fires.
147+ //
148+ // If our own collector coroutine is being cancelled (e.g. disconnect() was called),
149+ // ensureActive() re-throws and the cancellation propagates normally.
150+ // If only the RSocket-internal scope was cancelled, ensureActive() returns normally
151+ // and we wrap as RuntimeException so the enclosing launch{} is treated as *failed*
152+ // rather than *cancelled* — preventing a fetchLinesJob stall.
153+ currentCoroutineContext().ensureActive()
154+ // e.cause is the underlying error (e.g. IOException, RSocketError.ConnectionError)
155+ // that rsocket-kotlin wrapped in a CancellationException via connection.cancel().
156+ // Include it in the message so it surfaces in the streamingSync() error log.
157+ val rootCause = e.cause
158+ val message = " RSocket sync stream was interrupted: ${rootCause?.message ? : e.message} "
159+ if (rootCause?.message?.contains(" PSYNC_S21" ) == true ) {
160+ throw RSocketCredentialsExpiredException (message, e)
161+ }
162+ throw RuntimeException (message, e)
163+ }
128164 }
129165
130166/* *
@@ -146,3 +182,13 @@ private class ConnectionSetupMetadata(
146182private class RequestStreamMetadata (
147183 val path : String ,
148184)
185+
186+ /* *
187+ * Thrown from [rSocketSyncStream] when the server closes the RSocket connection with a
188+ * PowerSync authorization error (PSYNC_S21xx) embedded in the transport-level error message.
189+ * Caught by [StreamingSync] to trigger credential invalidation.
190+ */
191+ internal class RSocketCredentialsExpiredException (
192+ message : String ,
193+ cause : Throwable ,
194+ ) : RuntimeException(message, cause)
0 commit comments