Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle remote candidates #3

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 36 additions & 37 deletions app/src/main/java/io/xconn/wampwebrtc/Offerer.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.xconn.wampwebrtc

import android.content.Context
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeoutOrNull
import org.webrtc.DataChannel
import org.webrtc.IceCandidate
import org.webrtc.MediaConstraints
Expand Down Expand Up @@ -30,6 +32,8 @@ class Offerer(
var dataChannel: DataChannel? = null
val peerConnectionFactory = PeerConnectionFactory.builder().createPeerConnectionFactory()
private var assembler = MessageAssembler()
private var onDataChannelOpen: (() -> Unit)? = null
private var dataChannelTimeoutMillis: Long = 20000

suspend fun createOffer(offerConfig: OfferConfig): SessionDescription? {
val configuration = PeerConnection.RTCConfiguration(offerConfig.iceServers)
Expand All @@ -45,17 +49,7 @@ class Offerer(
}
}

override fun onDataChannel(channel: DataChannel?) {
channel?.registerObserver(
object : DataChannel.Observer {
override fun onMessage(buffer: DataChannel.Buffer?) {}

override fun onBufferedAmountChange(p0: Long) {}

override fun onStateChange() {}
},
)
}
override fun onDataChannel(channel: DataChannel?) {}

override fun onSignalingChange(p0: PeerConnection.SignalingState?) {}

Expand Down Expand Up @@ -83,6 +77,29 @@ class Offerer(
protocol = offerConfig.protocol
}
dataChannel = peerConnection?.createDataChannel("wamp", conf)
dataChannel?.registerObserver(
object : DataChannel.Observer {
override fun onStateChange() {
if (dataChannel?.state() == DataChannel.State.OPEN) {
onDataChannelOpen?.invoke()
}
}

override fun onBufferedAmountChange(p0: Long) {}

override fun onMessage(buffer: DataChannel.Buffer?) {
buffer?.data?.let {
val data = ByteArray(it.remaining())
it.get(data)

val message = assembler.feed(data)
if (message != null) {
queue.put(message)
}
}
}
},
)

return suspendCoroutine { continuation ->
peerConnection?.createOffer(
Expand Down Expand Up @@ -115,32 +132,14 @@ class Offerer(
}
}

suspend fun waitForDataChannelOpen(): Unit =
suspendCoroutine { continuation ->
dataChannel?.registerObserver(
object : DataChannel.Observer {
override fun onStateChange() {
if (dataChannel?.state() == DataChannel.State.OPEN) {
continuation.resume(Unit)
}
}

override fun onBufferedAmountChange(p0: Long) {}

override fun onMessage(buffer: DataChannel.Buffer?) {
buffer?.data?.let {
val data = ByteArray(it.remaining())
it.get(data)

val message = assembler.feed(data)
if (message != null) {
queue.put(message)
}
}
}
},
)
}
suspend fun waitForDataChannelToOpen() {
withTimeoutOrNull(dataChannelTimeoutMillis) {
while (true) {
if (dataChannel?.state() == DataChannel.State.OPEN) return@withTimeoutOrNull
delay(100)
}
} ?: throw IllegalStateException("Data channel failed to open within $dataChannelTimeoutMillis milliseconds")
}

fun setRemoteDescription(sessionDescription: SessionDescription) {
peerConnection?.setRemoteDescription(
Expand Down
1 change: 1 addition & 0 deletions app/src/main/java/io/xconn/wampwebrtc/Types.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ data class ClientConfig(
val realm: String,
val procedureWebRTCOffer: String,
val topicAnswererOnCandidate: String,
val topicOffererOnCandidate: String,
val serializer: Serializer,
val subProtocol: String,
val iceServers: List<IceServer>,
Expand Down
57 changes: 51 additions & 6 deletions app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.xconn.wampwebrtc

import android.content.Context
import io.xconn.wampproto.serializers.CBORSerializer
import io.xconn.xconn.Client
import io.xconn.xconn.Event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import org.json.JSONArray
import org.json.JSONObject
Expand All @@ -16,8 +19,11 @@ class WebRTC(
private val context: Context,
private val queue: LinkedBlockingDeque<ByteArray>,
) {
private lateinit var offerer: Offerer
private val couroutineScope = CoroutineScope(Dispatchers.Default + Job())

suspend fun connect(config: ClientConfig): WebRTCSession {
val client = Client(serializer = CBORSerializer())
val client = Client(serializer = config.serializer)
val session = client.connect(config.url, config.realm)

val requestID = UUID.randomUUID().toString()
Expand All @@ -31,7 +37,7 @@ class WebRTC(
)

val candidates: MutableList<IceCandidate> = mutableListOf()
val offerer =
offerer =
Offerer(
context,
queue,
Expand All @@ -55,6 +61,10 @@ class WebRTC(
},
)

couroutineScope.launch {
session.subscribe(config.topicOffererOnCandidate, ::candidateHandler).await()
}

val offer = offerer.createOffer(offerConfig)
Thread.sleep(200)

Expand Down Expand Up @@ -95,13 +105,48 @@ class WebRTC(
val sdpString = descriptionMap.getString("sdp")
val sdpType = descriptionMap.getString("type")

val remoteDescription =
SessionDescription(SessionDescription.Type.fromCanonicalForm(sdpType), sdpString)
val remoteDescription = SessionDescription(SessionDescription.Type.fromCanonicalForm(sdpType), sdpString)

offerer.setRemoteDescription(remoteDescription)

offerer.waitForDataChannelOpen()
offerer.waitForDataChannelToOpen()

return WebRTCSession(offerer.peerConnection!!, offerer.dataChannel!!)
}

private fun candidateHandler(event: Event) {
if (event.args == null || event.args!!.size < 2) {
throw Exception("invalid arguments length")
}

val jsonString =
event.args?.get(1) as? String
?: throw Exception("Invalid argument type: Second argument must be a JSON string")

val result =
try {
convertJsonToMap(jsonString)
} catch (e: Exception) {
throw Exception("Invalid JSON: Unable to parse JSON string")
}

val candidate =
result["candidate"] as? String
?: throw Exception("Invalid candidate: 'candidate' field is missing or not a string")

val sdpMLineIndex =
result["sdpMLineIndex"] as? Int
?: throw Exception("Invalid sdpMLineIndex: 'sdpMLineIndex' field is missing or not an integer")

val sdpMid =
result["sdpMid"] as? String
?: throw Exception("Invalid sdpMid: 'sdpMid' field is missing or not a string")

try {
val iceCandidate = IceCandidate(sdpMid, sdpMLineIndex, candidate)
offerer.addIceCandidate(iceCandidate)
} catch (e: Exception) {
throw Exception("Failed to add ICE candidate: ${e.message}")
}
}
}
Loading