@@ -5,8 +5,10 @@ import org.apache.kafka.clients.producer.ProducerConfig
5
5
import org.apache.kafka.common.serialization.ByteArraySerializer
6
6
import org.apache.kafka.common.serialization.LongSerializer
7
7
import org.apache.kafka.common.serialization.StringSerializer
8
+ import org.codehaus.jackson.map.ObjectMapper
8
9
import streams.getInt
9
10
import streams.serialization.JacksonUtil
11
+ import streams.toPointCase
10
12
import java.util.*
11
13
12
14
private val configPrefix = " kafka."
@@ -23,12 +25,23 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
23
25
val connectionTimeoutMs : Int = 10 * 1000 ,
24
26
val replication : Int = 1 ,
25
27
val transactionalId : String = StringUtils .EMPTY ,
26
- val lingerMs : Int = 1 ){
28
+ val lingerMs : Int = 1 ,
29
+ val extraProperties : Map <String , String > = emptyMap()) {
30
+
31
+ private fun asMap (): Map <String , Any ?> {
32
+ return ObjectMapper ().convertValue(this , Map ::class .java)
33
+ .mapKeys { it.key.toString() }
34
+ }
35
+
27
36
companion object {
28
37
fun from (cfg : Map <String , String >) : KafkaConfiguration {
29
38
val config = cfg.filterKeys { it.startsWith(configPrefix) }.mapKeys { it.key.substring(configPrefix.length) }
30
39
31
40
val default = KafkaConfiguration ()
41
+
42
+ val keys = default.asMap().keys.map { it.toPointCase() }
43
+ val extraProperties = config.filterKeys { ! keys.contains(it) }
44
+
32
45
return default.copy(zookeeperConnect = config.getOrDefault(" zookeeper.connect" ,default.zookeeperConnect),
33
46
bootstrapServers = config.getOrDefault(" bootstrap.servers" , default.bootstrapServers),
34
47
acks = config.getOrDefault(" acks" , default.acks),
@@ -41,24 +54,26 @@ data class KafkaConfiguration(val zookeeperConnect: String = "localhost:2181",
41
54
connectionTimeoutMs = config.getInt(" connection.timeout.ms" , default.connectionTimeoutMs),
42
55
replication = config.getInt(" replication" , default.replication),
43
56
transactionalId = config.getOrDefault(" transactional.id" , default.transactionalId),
44
- lingerMs = config.getInt(" linger.ms" , default.lingerMs)
57
+ lingerMs = config.getInt(" linger.ms" , default.lingerMs),
58
+ extraProperties = extraProperties // for what we don't provide a default configuration
45
59
)
46
60
}
47
61
}
48
62
49
63
fun asProperties (): Properties {
50
64
val props = Properties ()
51
- val map = JacksonUtil .getMapper().convertValue(this , Map ::class .java)
52
- .mapKeys { it.key.toString().split(" (?<=[a-z])(?=[A-Z])" .toRegex()).joinToString(separator = " ." ).toLowerCase() }
65
+ val map = this .asMap()
53
66
.filter {
54
- if (it.key == " transactional.id " ) {
67
+ if (it.key == " transactionalId " ) {
55
68
it.value != StringUtils .EMPTY
56
69
} else {
57
- it.key != " node.routing " && it.key != " rel.routing "
70
+ true
58
71
}
59
72
}
73
+ .mapKeys { it.key.toPointCase() }
60
74
props.putAll(map)
61
- props.putAll(addSerializers())
75
+ props.putAll(extraProperties)
76
+ props.putAll(addSerializers()) // Fixed serializers
62
77
return props
63
78
}
64
79
0 commit comments