Skip to content

Commit eece4f6

Browse files
committed
handle remote candidates
1 parent 18c70ce commit eece4f6

File tree

3 files changed

+88
-43
lines changed

3 files changed

+88
-43
lines changed

app/src/main/java/io/xconn/wampwebrtc/Offerer.kt

+36-37
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.xconn.wampwebrtc
22

33
import android.content.Context
4+
import kotlinx.coroutines.delay
5+
import kotlinx.coroutines.withTimeoutOrNull
46
import org.webrtc.DataChannel
57
import org.webrtc.IceCandidate
68
import org.webrtc.MediaConstraints
@@ -30,6 +32,8 @@ class Offerer(
3032
var dataChannel: DataChannel? = null
3133
val peerConnectionFactory = PeerConnectionFactory.builder().createPeerConnectionFactory()
3234
private var assembler = MessageAssembler()
35+
private var onDataChannelOpen: (() -> Unit)? = null
36+
private var dataChannelTimeoutMillis: Long = 20000
3337

3438
suspend fun createOffer(offerConfig: OfferConfig): SessionDescription? {
3539
val configuration = PeerConnection.RTCConfiguration(offerConfig.iceServers)
@@ -45,17 +49,7 @@ class Offerer(
4549
}
4650
}
4751

48-
override fun onDataChannel(channel: DataChannel?) {
49-
channel?.registerObserver(
50-
object : DataChannel.Observer {
51-
override fun onMessage(buffer: DataChannel.Buffer?) {}
52-
53-
override fun onBufferedAmountChange(p0: Long) {}
54-
55-
override fun onStateChange() {}
56-
},
57-
)
58-
}
52+
override fun onDataChannel(channel: DataChannel?) {}
5953

6054
override fun onSignalingChange(p0: PeerConnection.SignalingState?) {}
6155

@@ -83,6 +77,29 @@ class Offerer(
8377
protocol = offerConfig.protocol
8478
}
8579
dataChannel = peerConnection?.createDataChannel("wamp", conf)
80+
dataChannel?.registerObserver(
81+
object : DataChannel.Observer {
82+
override fun onStateChange() {
83+
if (dataChannel?.state() == DataChannel.State.OPEN) {
84+
onDataChannelOpen?.invoke()
85+
}
86+
}
87+
88+
override fun onBufferedAmountChange(p0: Long) {}
89+
90+
override fun onMessage(buffer: DataChannel.Buffer?) {
91+
buffer?.data?.let {
92+
val data = ByteArray(it.remaining())
93+
it.get(data)
94+
95+
val message = assembler.feed(data)
96+
if (message != null) {
97+
queue.put(message)
98+
}
99+
}
100+
}
101+
},
102+
)
86103

87104
return suspendCoroutine { continuation ->
88105
peerConnection?.createOffer(
@@ -115,32 +132,14 @@ class Offerer(
115132
}
116133
}
117134

118-
suspend fun waitForDataChannelOpen(): Unit =
119-
suspendCoroutine { continuation ->
120-
dataChannel?.registerObserver(
121-
object : DataChannel.Observer {
122-
override fun onStateChange() {
123-
if (dataChannel?.state() == DataChannel.State.OPEN) {
124-
continuation.resume(Unit)
125-
}
126-
}
127-
128-
override fun onBufferedAmountChange(p0: Long) {}
129-
130-
override fun onMessage(buffer: DataChannel.Buffer?) {
131-
buffer?.data?.let {
132-
val data = ByteArray(it.remaining())
133-
it.get(data)
134-
135-
val message = assembler.feed(data)
136-
if (message != null) {
137-
queue.put(message)
138-
}
139-
}
140-
}
141-
},
142-
)
143-
}
135+
suspend fun waitForDataChannelToOpen() {
136+
withTimeoutOrNull(dataChannelTimeoutMillis) {
137+
while (true) {
138+
if (dataChannel?.state() == DataChannel.State.OPEN) return@withTimeoutOrNull
139+
delay(100)
140+
}
141+
} ?: throw IllegalStateException("Data channel failed to open within $dataChannelTimeoutMillis milliseconds")
142+
}
144143

145144
fun setRemoteDescription(sessionDescription: SessionDescription) {
146145
peerConnection?.setRemoteDescription(

app/src/main/java/io/xconn/wampwebrtc/Types.kt

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ data class ClientConfig(
5555
val realm: String,
5656
val procedureWebRTCOffer: String,
5757
val topicAnswererOnCandidate: String,
58+
val topicOffererOnCandidate: String,
5859
val serializer: Serializer,
5960
val subProtocol: String,
6061
val iceServers: List<IceServer>,

app/src/main/java/io/xconn/wampwebrtc/WebRTC.kt

+51-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package io.xconn.wampwebrtc
22

33
import android.content.Context
4-
import io.xconn.wampproto.serializers.CBORSerializer
54
import io.xconn.xconn.Client
5+
import io.xconn.xconn.Event
6+
import kotlinx.coroutines.CoroutineScope
7+
import kotlinx.coroutines.Dispatchers
68
import kotlinx.coroutines.GlobalScope
9+
import kotlinx.coroutines.Job
710
import kotlinx.coroutines.launch
811
import org.json.JSONArray
912
import org.json.JSONObject
@@ -16,8 +19,11 @@ class WebRTC(
1619
private val context: Context,
1720
private val queue: LinkedBlockingDeque<ByteArray>,
1821
) {
22+
private lateinit var offerer: Offerer
23+
private val couroutineScope = CoroutineScope(Dispatchers.Default + Job())
24+
1925
suspend fun connect(config: ClientConfig): WebRTCSession {
20-
val client = Client(serializer = CBORSerializer())
26+
val client = Client(serializer = config.serializer)
2127
val session = client.connect(config.url, config.realm)
2228

2329
val requestID = UUID.randomUUID().toString()
@@ -31,7 +37,7 @@ class WebRTC(
3137
)
3238

3339
val candidates: MutableList<IceCandidate> = mutableListOf()
34-
val offerer =
40+
offerer =
3541
Offerer(
3642
context,
3743
queue,
@@ -55,6 +61,10 @@ class WebRTC(
5561
},
5662
)
5763

64+
couroutineScope.launch {
65+
session.subscribe(config.topicOffererOnCandidate, ::candidateHandler).await()
66+
}
67+
5868
val offer = offerer.createOffer(offerConfig)
5969
Thread.sleep(200)
6070

@@ -95,13 +105,48 @@ class WebRTC(
95105
val sdpString = descriptionMap.getString("sdp")
96106
val sdpType = descriptionMap.getString("type")
97107

98-
val remoteDescription =
99-
SessionDescription(SessionDescription.Type.fromCanonicalForm(sdpType), sdpString)
108+
val remoteDescription = SessionDescription(SessionDescription.Type.fromCanonicalForm(sdpType), sdpString)
100109

101110
offerer.setRemoteDescription(remoteDescription)
102111

103-
offerer.waitForDataChannelOpen()
112+
offerer.waitForDataChannelToOpen()
104113

105114
return WebRTCSession(offerer.peerConnection!!, offerer.dataChannel!!)
106115
}
116+
117+
private fun candidateHandler(event: Event) {
118+
if (event.args == null || event.args!!.size < 2) {
119+
throw Exception("invalid arguments length")
120+
}
121+
122+
val jsonString =
123+
event.args?.get(1) as? String
124+
?: throw Exception("Invalid argument type: Second argument must be a JSON string")
125+
126+
val result =
127+
try {
128+
convertJsonToMap(jsonString)
129+
} catch (e: Exception) {
130+
throw Exception("Invalid JSON: Unable to parse JSON string")
131+
}
132+
133+
val candidate =
134+
result["candidate"] as? String
135+
?: throw Exception("Invalid candidate: 'candidate' field is missing or not a string")
136+
137+
val sdpMLineIndex =
138+
result["sdpMLineIndex"] as? Int
139+
?: throw Exception("Invalid sdpMLineIndex: 'sdpMLineIndex' field is missing or not an integer")
140+
141+
val sdpMid =
142+
result["sdpMid"] as? String
143+
?: throw Exception("Invalid sdpMid: 'sdpMid' field is missing or not a string")
144+
145+
try {
146+
val iceCandidate = IceCandidate(sdpMid, sdpMLineIndex, candidate)
147+
offerer.addIceCandidate(iceCandidate)
148+
} catch (e: Exception) {
149+
throw Exception("Failed to add ICE candidate: ${e.message}")
150+
}
151+
}
107152
}

0 commit comments

Comments
 (0)