Skip to content

Commit

Permalink
update to ktor 3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
DatL4g committed Oct 10, 2024
1 parent 6ef4547 commit e670778
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 80 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
alias(libs.plugins.multiplatform)
alias(libs.plugins.serialization)
alias(libs.plugins.complete.kotlin)
// alias(libs.plugins.complete.kotlin)
alias(libs.plugins.publish)
alias(libs.plugins.versions)
`maven-publish`
Expand Down
8 changes: 4 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[versions]
complete-kotlin = "1.1.0"
coroutines = "1.8.1"
coroutines = "1.9.0"
immutable = "0.3.7"
kotlin = "2.0.0"
ktor = "2.3.12"
kotlin = "2.0.20"
ktor = "3.0.0"
publish = "0.29.0"
serialization = "1.7.1"
serialization = "1.7.3"
tooling = "1.6.2"
versions = "0.51.0"

Expand Down
22 changes: 3 additions & 19 deletions src/commonMain/kotlin/dev/datlag/k2k/connect/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import kotlin.properties.Delegates

class Connection private constructor(
private val port: Int,
private val immediate: Boolean,
private val scope: CoroutineScope
) : AutoCloseable {

private val client = ConnectionClient(immediate)
private val server = ConnectionServer(immediate)
private val client = ConnectionClient()
private val server = ConnectionServer()

private var sendJob: Job? = null

Expand Down Expand Up @@ -50,31 +49,16 @@ class Connection private constructor(

class Builder(private var scope: CoroutineScope = CoroutineScope(Dispatcher.IO)) {
private var port by Delegates.notNull<Int>()
private var immediate: Boolean = false

fun setPort(port: Int) = apply {
this.port = port
}

/**
* Set TCP_NODELAY socket option to disable the Nagle algorithm.
*/
fun noDelay() = apply {
this.immediate = true
}

/**
* Set TCP_DELAY socket option to enable the Nagle algorithm.
*/
fun delay() = apply {
this.immediate = false
}

fun setScope(scope: CoroutineScope) = apply {
this.scope = scope
}

fun build() = Connection(port, immediate, scope)
fun build() = Connection(port, scope)
}
}

Expand Down
31 changes: 5 additions & 26 deletions src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,10 @@ import io.ktor.network.sockets.tcpNoDelay
import io.ktor.utils.io.close
import io.ktor.utils.io.writeFully

internal class ConnectionClient(
private val immediate: Boolean
) : AutoCloseable {
internal class ConnectionClient : AutoCloseable {

private var socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
aSocket(SelectorManager(Dispatcher.IO)).tcp()
}.getOrNull()

private var connectedSocket: Socket? = null
Expand All @@ -36,22 +28,15 @@ internal class ConnectionClient(
) = suspendCatching {
val socketAddress = InetSocketAddress(host.hostAddress, port)
val useSocket = socket ?: suspendCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
aSocket(SelectorManager(Dispatcher.IO)).tcp()
}.getOrNull()?.also { socket = it } ?: return@suspendCatching

connectedSocket = useSocket.connect(socketAddress) {
reuseAddress = true
}.also {
val channel = it.openWriteChannel(autoFlush = true)
channel.writeFully(byteArray, 0, byteArray.size)
channel.flush()
channel.close()
channel.flushAndClose()
}
}

Expand All @@ -60,13 +45,7 @@ internal class ConnectionClient(
connectedSocket = null

socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
aSocket(SelectorManager(Dispatcher.IO)).tcp()
}.getOrNull()
}
}
29 changes: 5 additions & 24 deletions src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.ktor.network.sockets.Socket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.tcpNoDelay
import io.ktor.utils.io.availableForRead
import io.ktor.utils.io.core.use
import io.ktor.utils.io.readAvailable
import kotlinx.coroutines.CoroutineScope
Expand All @@ -19,18 +20,10 @@ import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch

internal class ConnectionServer(
private val immediate: Boolean
) : AutoCloseable {
internal class ConnectionServer : AutoCloseable {
private var receiveJob: Job? = null
private var socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
aSocket(SelectorManager(Dispatcher.IO)).tcp()
}.getOrNull()

private var serverSocket: ServerSocket? = null
Expand All @@ -54,13 +47,7 @@ internal class ConnectionServer(

val socketAddress = InetSocketAddress(NetInterface.getLocalAddress(), port)
val useSocket = socket ?: suspendCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
aSocket(SelectorManager(Dispatcher.IO)).tcp()
}.getOrNull() ?: return@suspendCatching

serverSocket = useSocket.bind(socketAddress) {
Expand Down Expand Up @@ -104,13 +91,7 @@ internal class ConnectionServer(
connectedSocket = null

socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
aSocket(SelectorManager(Dispatcher.IO)).tcp()
}.getOrNull()
}
}
13 changes: 9 additions & 4 deletions src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import dev.datlag.k2k.Dispatcher
import dev.datlag.k2k.NetInterface
import dev.datlag.tooling.async.scopeCatching
import dev.datlag.tooling.async.suspendCatching
import io.ktor.network.sockets.Datagram
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.close
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.writeFully
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -53,10 +55,13 @@ internal class DiscoveryClient : AutoCloseable {
reuseAddress = true
}

val output = socketConnection.openWriteChannel(autoFlush = true)
output.writeFully(data, 0, data.size)
output.flush()
output.close()
socketConnection.send(
Datagram(
packet = ByteReadPacket(array = data),
address = socketConnection.remoteAddress
)
)

socketConnection.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.io.readByteArray
import kotlinx.serialization.decodeFromByteArray

internal class DiscoveryServer : AutoCloseable {
Expand Down Expand Up @@ -62,10 +63,9 @@ internal class DiscoveryServer : AutoCloseable {
}

while (currentCoroutineContext().isActive) {
serverSocket.openReadChannel()
serverSocket.incoming.consumeEach { datagram ->
suspendCatching {
val receivedPacket = datagram.packet.readBytes()
val receivedPacket = datagram.packet.readByteArray()
if (receivedPacket.isNotEmpty()) {
val host = Constants.protobuf.decodeFromByteArray<Host>(receivedPacket).apply {
val inetSocketAddress = datagram.address as InetSocketAddress
Expand Down

0 comments on commit e670778

Please sign in to comment.