Skip to content

Commit b1782d0

Browse files
BillFarberbgeorge
and
bgeorge
committed
Merge branch 'develop'
* Removing some erroneous comments in the example connect-standalone properties file. * Changes for additional Security options * Updated build.gradle to new kafka connect and datahub * Updated documentation for v1.2.2 * Updated Test Cases. * Some forgotten cleanup left over from building the Confluent archive. * Support for AVRO, JSON Schema, ProtoBuf messages. * Also support of ID strategies - UUID, JSONPATH,HASH, KAFKA_META_WITH_SLASH, KAFKA_META_HASHED m> Co-authored-by: bgeorge <[email protected]>
2 parents c8f0637 + 2dcc733 commit b1782d0

23 files changed

+465
-74
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
build
55
out
66
gradle-local.properties
7-
testConfig
87

98
bin
109
.vscode

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
# Change Log
22

3+
## [1.4.0](https://github.com/marklogic-community/kafka-marklogic-connector/releases/tag/1.4.0) (2020-10-12)
4+
[Full Changelog](https://github.com/marklogic-community/kafka-marklogic-connector/compare/1.4.0...master)
5+
6+
- Support for JSON Schema, ProtoBuf messages
7+
- Support of ID strategies: UUID, JSONPATH,HASH, KAFKA_META_WITH_SLASH, KAFKA_META_HASHED
8+
- Changes for additional Security options
9+
- Updated build.gradle to new kafka connect and datahub
10+
11+
312
## [1.3.0](https://github.com/marklogic-community/kafka-marklogic-connector/releases/tag/1.3.0) (2020-08-18)
413
[Full Changelog](https://github.com/marklogic-community/kafka-marklogic-connector/compare/1.3.0...master)
514

MarkLogic_Kafka_Connector_v1.2.2.pdf

870 KB
Binary file not shown.

MarkLogic_Kafka_Connector_v1.2.3.pdf

1.14 MB
Binary file not shown.

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies {
2020
compileOnly "org.apache.kafka:connect-api:2.5.0"
2121
compileOnly "org.apache.kafka:connect-json:2.5.0"
2222

23+
2324
compile ("com.marklogic:marklogic-data-hub:5.2.2") {
2425
// Excluding these because there's no need for them
2526
exclude module: "spring-boot-autoconfigure"
@@ -34,6 +35,7 @@ dependencies {
3435
testCompile "org.junit.jupiter:junit-jupiter-api:5.3.0"
3536
testCompile "org.apache.kafka:connect-api:2.5.0"
3637
testCompile "org.apache.kafka:connect-json:2.5.0"
38+
testCompile "com.google.code.gson:gson:2.8.6"
3739

3840
// Needed by Gradle 4.6+ - see https://www.petrikainulainen.net/programming/testing/junit-5-tutorial-running-unit-tests-with-gradle/
3941
testRuntime "org.junit.jupiter:junit-jupiter-engine:5.3.0"

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group=com.marklogic
2-
version=1.3.0
2+
version=1.4.0-SNAPSHOT
33

44
# For the Confluent Connector Archive
55
componentOwner=marklogic

src/main/java/com/marklogic/client/ext/document/ContentIdExtractor.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

src/main/java/com/marklogic/client/ext/document/DefaultContentIdExtractor.java

Lines changed: 0 additions & 13 deletions
This file was deleted.

src/main/java/com/marklogic/client/ext/document/DocumentWriteOperationBuilder.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package com.marklogic.client.ext.document;
22

3+
import java.io.IOException;
4+
35
import com.marklogic.client.document.DocumentWriteOperation;
6+
import com.marklogic.client.id.strategy.IdStrategy;
47
import com.marklogic.client.ext.util.DefaultDocumentPermissionsParser;
58
import com.marklogic.client.impl.DocumentWriteOperationImpl;
69
import com.marklogic.client.io.DocumentMetadataHandle;
710
import com.marklogic.client.io.marker.AbstractWriteHandle;
11+
import com.marklogic.client.document.RecordContent;
812

913
public class DocumentWriteOperationBuilder {
1014

@@ -13,34 +17,31 @@ public class DocumentWriteOperationBuilder {
1317
private String uriSuffix;
1418
private String collections;
1519
private String permissions;
16-
17-
private ContentIdExtractor contentIdExtractor = new DefaultContentIdExtractor();
18-
19-
public DocumentWriteOperation build(AbstractWriteHandle content, DocumentMetadataHandle metadata ) {
20+
21+
public DocumentWriteOperation build(RecordContent record) throws IOException {
22+
23+
AbstractWriteHandle content = record.getContent();
24+
DocumentMetadataHandle metadata = record.getAdditionalMetadata();
25+
String uri = record.getId();
26+
2027
if (content == null) {
2128
throw new NullPointerException("'content' must not be null");
2229
}
23-
2430
if (hasText(collections)) {
2531
metadata.getCollections().addAll(collections.trim().split(","));
2632
}
2733
if (hasText(permissions)) {
2834
new DefaultDocumentPermissionsParser().parsePermissions(permissions.trim(), metadata.getPermissions());
2935
}
3036

31-
String uri = buildUri(content);
32-
return build(operationType, uri, metadata, content);
33-
}
34-
35-
protected String buildUri(AbstractWriteHandle content) {
36-
String uri = contentIdExtractor.extractId(content);
3737
if (hasText(uriPrefix)) {
3838
uri = uriPrefix + uri;
3939
}
4040
if (hasText(uriSuffix)) {
4141
uri += uriSuffix;
4242
}
43-
return uri;
43+
44+
return build(operationType, uri, metadata, content);
4445
}
4546

4647
/**
@@ -86,8 +87,4 @@ public DocumentWriteOperationBuilder withOperationType(DocumentWriteOperation.Op
8687
return this;
8788
}
8889

89-
public DocumentWriteOperationBuilder withContentIdExtractor(ContentIdExtractor contentIdExtractor) {
90-
this.contentIdExtractor = contentIdExtractor;
91-
return this;
92-
}
9390
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.marklogic.client.document;
2+
3+
import com.marklogic.client.io.marker.AbstractWriteHandle;
4+
import com.marklogic.client.io.DocumentMetadataHandle;
5+
6+
public class RecordContent {
7+
8+
AbstractWriteHandle content;
9+
DocumentMetadataHandle additionalMetadata;
10+
String id;
11+
12+
public AbstractWriteHandle getContent() {
13+
return content;
14+
}
15+
16+
public void setContent(AbstractWriteHandle content) {
17+
this.content = content;
18+
}
19+
20+
public DocumentMetadataHandle getAdditionalMetadata() {
21+
return additionalMetadata;
22+
}
23+
24+
public void setAdditionalMetadata(DocumentMetadataHandle additionalMetadata) {
25+
this.additionalMetadata = additionalMetadata;
26+
}
27+
28+
public String getId() {
29+
return id;
30+
}
31+
public void setId(String id) {
32+
this.id = id;
33+
}
34+
35+
}
36+
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.marklogic.client.id.strategy;
2+
3+
import java.util.UUID;
4+
5+
import com.marklogic.client.io.marker.AbstractWriteHandle;
6+
7+
8+
public class DefaultStrategy implements IdStrategy{
9+
10+
@Override
11+
public String generateId(AbstractWriteHandle content, String topic, Integer partition, long offset) {
12+
return UUID.randomUUID().toString();
13+
}
14+
15+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.marklogic.client.id.strategy;
2+
3+
import java.io.IOException;
4+
import java.security.MessageDigest;
5+
import java.security.NoSuchAlgorithmException;
6+
import java.util.UUID;
7+
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import com.marklogic.client.io.marker.AbstractWriteHandle;
14+
15+
16+
public class HashedJSONPathsStrategy implements IdStrategy{
17+
18+
private static final Logger logger = LoggerFactory.getLogger(HashedJSONPathsStrategy.class);
19+
20+
private String [] paths;
21+
22+
public HashedJSONPathsStrategy(String [] paths) {
23+
this.paths = paths;
24+
}
25+
26+
@Override
27+
public String generateId(AbstractWriteHandle content, String topic, Integer partition, long offset) {
28+
ObjectMapper om = new ObjectMapper();
29+
String valueString = "";
30+
try {
31+
MessageDigest md = MessageDigest.getInstance("MD5");
32+
JsonNode node = om.readTree(content.toString());
33+
for (int i=0; i<paths.length; i++) {
34+
valueString = valueString + node.at(paths[i].trim()).asText();
35+
}
36+
String id = bytesToHex(md.digest(valueString.getBytes()));
37+
return id;
38+
}
39+
catch (IOException e) {
40+
logger.warn("IOException. Not creating MD5 URI, instead generating UUID");
41+
return UUID.randomUUID().toString();
42+
}
43+
catch (NoSuchAlgorithmException e) {
44+
logger.warn("NoSuchAlgorithmException. Not creating MD5 URI, instead generating UUID");
45+
return UUID.randomUUID().toString();
46+
}
47+
}
48+
49+
private static String bytesToHex(byte[] bytes) {
50+
StringBuilder sb = new StringBuilder();
51+
for (byte b : bytes) {
52+
sb.append(String.format("%02x", b));
53+
}
54+
return sb.toString();
55+
}
56+
57+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.marklogic.client.id.strategy;
2+
3+
import java.security.MessageDigest;
4+
import java.security.NoSuchAlgorithmException;
5+
import java.util.UUID;
6+
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import com.marklogic.client.io.marker.AbstractWriteHandle;
11+
12+
public class HashedKafkaMetaStrategy implements IdStrategy {
13+
14+
private static final Logger logger = LoggerFactory.getLogger(HashedKafkaMetaStrategy.class);
15+
16+
@Override
17+
public String generateId(AbstractWriteHandle content,String topic, Integer partition, long offset) {
18+
String id = "";
19+
try {
20+
MessageDigest md = MessageDigest.getInstance("MD5");
21+
String tmp = topic + partition.toString() + String.valueOf(offset);
22+
id = bytesToHex(md.digest(tmp.getBytes()));
23+
return id;
24+
} catch (NoSuchAlgorithmException e) {
25+
logger.warn("NoSuchAlgorithmException. Not creating MD5 URI, instead generating UUID");
26+
return UUID.randomUUID().toString();
27+
}
28+
}
29+
30+
private static String bytesToHex(byte[] bytes) {
31+
StringBuilder sb = new StringBuilder();
32+
for (byte b : bytes) {
33+
sb.append(String.format("%02x", b));
34+
}
35+
return sb.toString();
36+
}
37+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.marklogic.client.id.strategy;
2+
3+
import java.util.UUID;
4+
import com.marklogic.client.io.marker.AbstractWriteHandle;
5+
6+
public interface IdStrategy {
7+
default String generateId(AbstractWriteHandle content, String topic, Integer partition, long offset) {
8+
return UUID.randomUUID().toString();
9+
}
10+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.marklogic.client.id.strategy;
2+
3+
import java.util.Map;
4+
5+
import com.marklogic.kafka.connect.sink.MarkLogicSinkConfig;
6+
7+
public class IdStrategyFactory {
8+
9+
public static IdStrategy getIdStrategy(Map<String, String> kafkaConfig) {
10+
String strategyType = kafkaConfig.get(MarkLogicSinkConfig.ID_STRATEGY);
11+
String strategyPaths= kafkaConfig.get(MarkLogicSinkConfig.ID_STRATEGY_PATH);
12+
13+
switch((strategyType != null) ? strategyType : "UUID") {
14+
case "JSONPATH":
15+
return (new JSONPathStrategy(strategyPaths.trim().split(",")[0]));
16+
case "HASH":
17+
return (new HashedJSONPathsStrategy(strategyPaths.trim().split(",")));
18+
case "UUID":
19+
return (new DefaultStrategy());
20+
case "KAFKA_META_WITH_SLASH":
21+
return (new KafkaMetaStrategy());
22+
case "KAFKA_META_HASHED":
23+
return (new HashedKafkaMetaStrategy());
24+
default:
25+
return (new DefaultStrategy());
26+
}
27+
}
28+
29+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.marklogic.client.id.strategy;
2+
3+
import java.io.IOException;
4+
import java.util.UUID;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import com.fasterxml.jackson.databind.JsonNode;
10+
import com.fasterxml.jackson.databind.ObjectMapper;
11+
import com.marklogic.client.io.marker.AbstractWriteHandle;
12+
13+
public class JSONPathStrategy implements IdStrategy{
14+
15+
private static final Logger logger = LoggerFactory.getLogger(JSONPathStrategy.class);
16+
17+
private String path;
18+
19+
public JSONPathStrategy (String path) {
20+
this.path = path;
21+
}
22+
23+
@Override
24+
public String generateId(AbstractWriteHandle content, String topic, Integer partition, long offset) {
25+
ObjectMapper om = new ObjectMapper();
26+
try {
27+
JsonNode node = om.readTree(content.toString());
28+
String id = node.at(path).asText();
29+
return id;
30+
}
31+
catch (IOException e) {
32+
logger.warn("IOException. Not creating JSONPATH URI, instead generating UUID");
33+
return UUID.randomUUID().toString();
34+
}
35+
}
36+
37+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.marklogic.client.id.strategy;
2+
3+
import com.marklogic.client.io.marker.AbstractWriteHandle;
4+
5+
public class KafkaMetaStrategy implements IdStrategy {
6+
7+
@Override
8+
public String generateId(AbstractWriteHandle content, String topic, Integer partition, long offset) {
9+
String id = "";
10+
id = topic + "/" + partition.toString() + "/" + String.valueOf(offset);
11+
return id;
12+
}
13+
14+
}

0 commit comments

Comments
 (0)