Skip to content

Commit 2f814e4

Browse files
authored
fix: set default neo4j db port (#636)
* fix: set default neo4j db port * fix: gitignore docker plugin folder
1 parent a3765c1 commit 2f814e4

File tree

4 files changed

+38
-2
lines changed

4 files changed

+38
-2
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ Thumbs.db
3131
bin
3232
doc/node
3333
doc/node_modules
34+
*/docker/plugins/*

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/common/Neo4jConnectorConfig.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@ open class Neo4jConnectorConfig(configDef: ConfigDef,
7676
authenticationPassword = getPassword(AUTHENTICATION_BASIC_PASSWORD).value()
7777
authenticationKerberosTicket = getPassword(AUTHENTICATION_KERBEROS_TICKET).value()
7878

79-
serverUri = getString(SERVER_URI).split(",").map { URI(it) }
79+
serverUri = getString(SERVER_URI).split(",").map {
80+
val uri = URI(it)
81+
if (uri.port == -1) URI("$it:7687") else uri
82+
}
83+
8084
connectionLivenessCheckTimeout = getLong(CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS)
8185
connectionMaxConnectionLifetime = getLong(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS)
8286
connectionPoolMaxSize = getInt(CONNECTION_POOL_MAX_SIZE)

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,20 @@ class Neo4jSinkConnectorConfigTest {
9595
assertEquals(c, config.serverUri[2].toString())
9696
}
9797

98+
@Test
99+
fun `should return URIs with default port if port does not exist`() {
100+
val a = "bolt://neo4j.com"
101+
val b = "bolt://neo4j2.com"
102+
103+
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "foo",
104+
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})",
105+
Neo4jConnectorConfig.SERVER_URI to "$a,$b")
106+
val config = Neo4jSinkConnectorConfig(originals)
107+
108+
assertEquals("$a:7687", config.serverUri[0].toString())
109+
assertEquals("$b:7687", config.serverUri[1].toString())
110+
}
111+
98112
@Test
99113
fun `should return the configuration with shuffled topic order`() {
100114
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "bar,foo",

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceConnectorConfigTest.kt

+18-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package streams.kafka.connect.source
22

33
import org.apache.kafka.common.config.ConfigException
44
import org.junit.Test
5+
import streams.kafka.connect.common.Neo4jConnectorConfig
56
import kotlin.test.assertEquals
6-
import kotlin.test.assertNull
77

88
class Neo4jSourceConnectorConfigTest {
99

@@ -61,4 +61,21 @@ class Neo4jSourceConnectorConfigTest {
6161
val config = Neo4jSourceConnectorConfig(originals)
6262
assertEquals("", config.streamingProperty)
6363
}
64+
65+
@Test
66+
fun `should return URIs with default port if port does not exist`() {
67+
val a = "bolt://neo4j.com"
68+
val b = "bolt://neo4j2.com"
69+
70+
val originals = mapOf(Neo4jSourceConnectorConfig.SOURCE_TYPE to SourceType.QUERY.toString(),
71+
Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY to "MATCH (n) RETURN n",
72+
Neo4jSourceConnectorConfig.TOPIC to "topic",
73+
Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL to "10",
74+
Neo4jSourceConnectorConfig.STREAMING_FROM to StreamingFrom.NOW.toString(),
75+
Neo4jConnectorConfig.SERVER_URI to "$a,$b")
76+
val config = Neo4jSourceConnectorConfig(originals)
77+
78+
assertEquals("$a:7687", config.serverUri[0].toString())
79+
assertEquals("$b:7687", config.serverUri[1].toString())
80+
}
6481
}

0 commit comments

Comments
 (0)