1
1
package com.possible_triangle.atheneum_connector
2
2
3
- import com.rabbitmq.client.Channel
4
- import com.rabbitmq.client.ConnectionFactory
3
+ import com.possible_triangle.atheneum_connector.messages.ServerStatus
4
+ import com.possible_triangle.atheneum_connector.messages.ServerStatusMessage
5
+ import com.rabbitmq.client.*
6
+ import io.ktor.util.logging.*
5
7
import kotlinx.serialization.InternalSerializationApi
6
8
import kotlinx.serialization.serializer
7
9
import net.minecraftforge.fml.ModList
@@ -11,18 +13,26 @@ import kotlin.reflect.KParameter
11
13
import kotlin.reflect.full.hasAnnotation
12
14
import kotlin.reflect.full.memberFunctions
13
15
14
- object RabbitMQ {
16
+ object RabbitMQ : RecoveryListener, ShutdownListener {
15
17
16
18
private const val EXCHANGE_KEY = " exchange"
17
19
18
20
private lateinit var channel: Channel
19
21
20
- fun connect (url : String ) {
22
+ fun connect () {
21
23
AtheneumConnector .LOG .info(" Connecting to RabbitMQ" )
22
24
23
- val connection = ConnectionFactory ().newConnection(url)
25
+ val connection = ConnectionFactory ().newConnection(Config .SERVER .rabbitMQUrl)
26
+
27
+ connection.addShutdownListener(this )
28
+
29
+ if (connection is Recoverable ) {
30
+ connection.addRecoveryListener(this )
31
+ }
24
32
25
33
channel = connection.createChannel()
34
+ declareExchanges()
35
+
26
36
val queue = channel.queueDeclare().queue
27
37
28
38
val consumers = hashMapOf<String , MutableList <(ByteArray ) - > Unit >> ()
@@ -39,11 +49,38 @@ object RabbitMQ {
39
49
}, { _ -> })
40
50
}
41
51
42
- fun close () {
52
+ private fun declareExchanges () {
53
+ channel.exchangeDeclare(EXCHANGE_KEY , " direct" )
54
+ }
55
+
56
+ override fun handleRecovery (recoverable : Recoverable ) {
57
+ AtheneumConnector .LOG .warn(" RabbitMQ has recovered" )
58
+ declareExchanges()
59
+ publish(ServerStatusMessage (ServerStatus .RECOVERED ))
60
+ }
61
+
62
+ override fun handleRecoveryStarted (recoverable : Recoverable ) {
63
+ AtheneumConnector .LOG .warn(" Trying to reconnect to RabbitMQ..." )
64
+ }
65
+
66
+ override fun shutdownCompleted (exception : ShutdownSignalException ) {
67
+ AtheneumConnector .LOG .warn(" RabbitMQ has shutdown, reconnecting... ({})" , exception.message)
68
+ }
69
+
70
+ private fun tryCatching (action : () -> Unit ) {
71
+ try {
72
+ action()
73
+ } catch (ex: ShutdownSignalException ) {
74
+ AtheneumConnector .LOG .error(" Encountered exception when publishing to RabbitMQ" )
75
+ AtheneumConnector .LOG .error(ex)
76
+ }
77
+ }
78
+
79
+ fun close () = tryCatching {
43
80
channel.connection.close()
44
81
}
45
82
46
- fun publish (routingKey : String , bytes : ByteArray ) {
83
+ fun publish (routingKey : String , bytes : ByteArray ) = tryCatching {
47
84
channel.basicPublish(EXCHANGE_KEY , routingKey, null , bytes)
48
85
}
49
86
@@ -63,9 +100,7 @@ object RabbitMQ {
63
100
val topic = Class .forName(messageType.toString()).simpleName
64
101
val deserializer = serializer(messageType)
65
102
66
- if (method.parameters.size != 2 ) {
67
- throw IllegalArgumentException (" @SubscribeMessage handlers may only accept one parameter, the message itself" )
68
- }
103
+ require(method.parameters.size == 2 ) { " @SubscribeMessage handlers may only accept one parameter, the message itself" }
69
104
70
105
consume(topic) {
71
106
val message = AtheneumConnector .JSON .decodeFromString(deserializer, it.decodeToString())
0 commit comments