Skip to content

Commit

Permalink
Add more error handling cases for RPCSubscriptionService.
Browse files Browse the repository at this point in the history
  • Loading branch information
kukabi committed Jan 26, 2024
1 parent 7e39da3 commit 2bf62a1
Showing 1 changed file with 45 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,48 +121,60 @@ abstract class RPCSubscriptionService<K, T>(
return
}
rpcId = (0..Int.MAX_VALUE).random().toLong()
client.wss(host = host, port = port) {
Logger.d("WebSockets session initialized.")
_status.value = RPCSubscriptionServiceStatus.Connected
send(
try {
client.wss(host = host, port = port) {
Logger.d("WebSockets session initialized.")
_status.value = RPCSubscriptionServiceStatus.Connected
try {
send(
gson.toJson(
RPCRequest(
id = rpcId,
method = subscribeMethod,
params = params,
),
),
)
val textFrame = readNextTextFrame(incoming)
val subscriptionStatus =
gson.fromJson(
textFrame.readText(),
RPCSubscribeStatus::class.java,
)
if (subscriptionStatus.subscriptionId <= 0) {
_status.value = RPCSubscriptionServiceStatus.Error(null)
throw SubscriptionException("Invalid subscription id: ${subscriptionStatus.subscriptionId}")
}
session = this
subscriptionId = subscriptionStatus.subscriptionId
Logger.d("Subscribed with id: $subscriptionId")
_status.value = RPCSubscriptionServiceStatus.Subscribed(subscriptionId)
beginIncomingProcessing(incoming)
} catch (error: Throwable) {
_status.value = RPCSubscriptionServiceStatus.Error(error)
}
}
} catch (error: Throwable) {
_status.value = RPCSubscriptionServiceStatus.Error(error)
}
}

suspend fun unsubscribe() {
try {
session?.send(
gson.toJson(
RPCRequest(
id = rpcId,
method = subscribeMethod,
params = params,
method = unsubscribeMethod,
params = listOf(subscriptionId),
),
),
)
val textFrame = readNextTextFrame(incoming)
val subscriptionStatus =
gson.fromJson(
textFrame.readText(),
RPCSubscribeStatus::class.java,
)
if (subscriptionStatus.subscriptionId <= 0) {
_status.value = RPCSubscriptionServiceStatus.Error(null)
throw SubscriptionException("Invalid subscription id: ${subscriptionStatus.subscriptionId}")
}
session = this
subscriptionId = subscriptionStatus.subscriptionId
Logger.d("Subscribed with id: $subscriptionId")
_status.value = RPCSubscriptionServiceStatus.Subscribed(subscriptionId)
beginIncomingProcessing(incoming)
} catch (error: Throwable) {
_status.value = RPCSubscriptionServiceStatus.Error(error)
}
}

suspend fun unsubscribe() {
session?.send(
gson.toJson(
RPCRequest(
id = rpcId,
method = unsubscribeMethod,
params = listOf(subscriptionId),
),
),
)
}

abstract suspend fun processOnSubscribed(json: String)

abstract suspend fun processUpdate(json: String)
Expand Down

0 comments on commit 2bf62a1

Please sign in to comment.