Skip to content

Optimize ice candidates cache & publish #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions app/src/main/java/io/xconn/wampwebrtc/Types.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ data class ClientConfig(
val url: String,
val realm: String,
val procedureWebRTCOffer: String,
val topicAnswererOnCandidate: String,
val topicOffererOnCandidate: String,
val topicAnswererOnCandidates: String,
val topicOffererOnCandidates: String,
val serializer: Serializer,
val subProtocol: String,
val iceServers: List<IceServer>,
Expand All @@ -66,7 +66,7 @@ data class OfferConfig(
val iceServers: List<IceServer>,
val ordered: Boolean,
val id: Int,
val topicAnswererOnCandidate: String,
val topicAnswererOnCandidates: String,
)

data class WebRTCSession(
Expand Down
110 changes: 79 additions & 31 deletions app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package io.xconn.wampwebrtc
import android.content.Context
import io.xconn.xconn.Client
import io.xconn.xconn.Event
import io.xconn.xconn.Session
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.json.JSONArray
import org.json.JSONObject
Expand All @@ -20,7 +22,7 @@ class WebRTC(
private val queue: LinkedBlockingDeque<ByteArray>,
) {
private lateinit var offerer: Offerer
private val couroutineScope = CoroutineScope(Dispatchers.Default + Job())
private val coroutineScope = CoroutineScope(Dispatchers.Default + Job())

suspend fun connect(config: ClientConfig): WebRTCSession {
val client = Client(serializer = config.serializer)
Expand All @@ -33,58 +35,60 @@ class WebRTC(
config.iceServers,
true,
1,
config.topicAnswererOnCandidate,
config.topicAnswererOnCandidates,
)

val cachedCandidates: MutableList<IceCandidate> = mutableListOf()
val candidates: MutableList<IceCandidate> = mutableListOf()
var cachingDone = false
offerer =
Offerer(
context,
queue,
signalIceCandidate = { candidate ->
candidates.add(candidate)
GlobalScope.launch {
session.publish(
offerConfig.topicAnswererOnCandidate,
listOf(
requestID,
JSONObject(
mapOf(
"sdpMid" to candidate.sdpMid,
"sdpMLineIndex" to candidate.sdpMLineIndex,
"candidate" to candidate.sdp,
),
).toString(),
),
)
synchronized(cachedCandidates) {
if (cachingDone) {
candidates.add(candidate)
} else {
cachedCandidates.add(candidate)
}
}
},
)

couroutineScope.launch {
session.subscribe(config.topicOffererOnCandidate, ::candidateHandler).await()
coroutineScope.launch {
session.subscribe(config.topicOffererOnCandidates, ::candidateHandler).await()
}

val offer = offerer.createOffer(offerConfig)
Thread.sleep(200)

val candidatesList =
candidates.map { candidate ->
mapOf(
"sdpMid" to candidate.sdpMid,
"sdpMLineIndex" to candidate.sdpMLineIndex,
"candidate" to candidate.sdp,
)
}

// gather candidates within 200 millisecond
delay(200)

val initialCandidates =
synchronized(cachedCandidates) {
cachedCandidates.map { candidate ->
mapOf(
"sdpMid" to candidate.sdpMid,
"sdpMLineIndex" to candidate.sdpMLineIndex,
"candidate" to candidate.sdp,
)
}
}.toList()
cachingDone = true

val sdpData =
mapOf(
"description" to mapOf("type" to "offer", "sdp" to offer?.description),
"candidates" to candidatesList,
"candidates" to initialCandidates,
)

val json = JSONObject(sdpData).toString()

val res = session.call(config.procedureWebRTCOffer, listOf(requestID, json)).await()

publishRemainingCandidates(requestID, session, offerConfig, candidates)

val jsonString = res.args?.get(0) as String

val result = convertJsonToMap(jsonString)
Expand Down Expand Up @@ -149,4 +153,48 @@ class WebRTC(
throw Exception("Failed to add ICE candidate: ${e.message}")
}
}

private fun publishRemainingCandidates(
requestID: String,
session: Session,
offerConfig: OfferConfig,
candidates: MutableList<IceCandidate>,
) {
coroutineScope.launch {
while (true) {
delay(200)

val batchCandidates =
synchronized(candidates) {
val batch = candidates.toList()
candidates.clear()
batch
}

if (batchCandidates.isNotEmpty()) {
val jsonArray = JSONArray()
batchCandidates.forEach { candidate ->
val jsonObject =
JSONObject(
mapOf(
"sdpMid" to candidate.sdpMid,
"sdpMLineIndex" to candidate.sdpMLineIndex,
"candidate" to candidate.sdp,
),
)
jsonArray.put(jsonObject)
}

session.publish(
offerConfig.topicAnswererOnCandidates,
listOf(requestID, jsonArray.toString()),
)
}
}
}
}

fun close() {
coroutineScope.cancel()
}
}
Loading