Skip to content

Commit eeca995

Browse files
committed
dynamic fields support
1 parent c0f7099 commit eeca995

File tree

4 files changed

+32
-5
lines changed

4 files changed

+32
-5
lines changed

manifest.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,5 @@
5353

5454
"title": "Kafka Connect solr sink",
5555

56-
"version": "1.0"
56+
"version": "2.0"
5757
}

src/main/java/com/bkatwal/kafkaproject/SolrSinkConnectorConfig.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.bkatwal.kafkaproject;
1616

17+
import static com.bkatwal.kafkaproject.utils.SolrMode.CLOUD;
18+
1719
import com.bkatwal.kafkaproject.utils.SolrMode;
1820
import java.util.Map;
1921
import org.apache.kafka.common.config.AbstractConfig;
@@ -59,7 +61,7 @@ public static ConfigDef conf() {
5961
.define(SOLRURL_CONFIG, Type.STRING, Importance.HIGH, SOLRURL_DOC)
6062
.define(USERNAME_CONFIG, Type.STRING, "", Importance.MEDIUM, USERNAME_DOC)
6163
.define(PASSWORD_CONFIG, Type.PASSWORD, "", Importance.MEDIUM, PASSWORD_DOC)
62-
.define(SOLRMODE_CONFIG, Type.STRING, "", Importance.MEDIUM, SOLRMODE_DOC)
64+
.define(SOLRMODE_CONFIG, Type.STRING, CLOUD.name(), Importance.MEDIUM, SOLRMODE_DOC)
6365
.define(COMMIT_WITHIN_MS, Type.INT, 10, Importance.MEDIUM, COMMIT_WITHIN_MS_DOC);
6466
}
6567

src/main/java/com/bkatwal/kafkaproject/utils/PlainJsonSolrDocMappersImpl.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import java.math.BigDecimal;
1818
import java.util.ArrayList;
1919
import java.util.Collection;
20+
import java.util.LinkedHashMap;
2021
import java.util.List;
2122
import java.util.Map;
23+
import java.util.Map.Entry;
2224
import org.apache.kafka.connect.errors.ConnectException;
2325
import org.apache.kafka.connect.sink.SinkRecord;
2426
import org.apache.solr.common.SolrInputDocument;
@@ -52,7 +54,7 @@ public SolrInputDocument convertToSolrDocument(SinkRecord sinkRecord) {
5254

5355
Map<String, Object> obj = (Map<String, Object>) sinkRecord.value();
5456

55-
return toSolrDoc(obj);
57+
return toSolrDoc(createDynamicFieldsForRecordIfExists(obj));
5658
}
5759

5860
/*
@@ -75,6 +77,29 @@ private void addFieldsToDoc(Map<String, Object> objectMap, SolrInputDocument doc
7577

7678
}
7779

80+
private Map<String, Object> createDynamicFieldsForRecordIfExists(
81+
final Map<String, Object> record) {
82+
83+
Map<String, Object> newRecord = new LinkedHashMap<>();
84+
85+
for (Entry<String, Object> entry : record.entrySet()) {
86+
87+
String entryKey = entry.getKey();
88+
Object entryValue = entry.getValue();
89+
90+
if (entryValue instanceof Map) {
91+
Map<String, Object> columnVal = (Map<String, Object>) entry.getValue();
92+
String columnName = entry.getKey();
93+
columnVal.forEach(
94+
(key, val) -> newRecord.put(columnName.concat("_").concat(key), val));
95+
} else {
96+
newRecord.put(entryKey, entryValue);
97+
}
98+
}
99+
record.clear();
100+
return newRecord;
101+
}
102+
78103
private Collection<SolrInputDocument> getChildDocuments(Object childDocuments) {
79104

80105
List<Map<String, Object>> childDocsList = (List<Map<String, Object>>) childDocuments;

src/main/java/com/bkatwal/kafkaproject/utils/SolrSinkService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public boolean update(final String id, final SinkRecord data) {
4747
public boolean deleteById(final String id) {
4848
UpdateResponse updateResponse;
4949
try {
50-
updateResponse = solrClient.deleteById(collection, id, 10);
50+
updateResponse = solrClient.deleteById(collection, id, commitWithinMs);
5151
} catch (SolrServerException | IOException e) {
5252
log.error("Unable to send delete request to solr");
5353
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to send delete request to solr", e);
@@ -61,7 +61,7 @@ public boolean insert(final String id, final SinkRecord record) {
6161
UpdateResponse updateResponse;
6262
SolrInputDocument solrInputDocument = jsonSolrDocMapper.convertToSolrDocument(record);
6363
try {
64-
updateResponse = solrClient.add(collection, solrInputDocument, 10);
64+
updateResponse = solrClient.add(collection, solrInputDocument, commitWithinMs);
6565
log.debug("saved document: {}", solrInputDocument);
6666
} catch (SolrServerException | IOException e) {
6767
log.error("Unable to send update request to solr");

0 commit comments

Comments
 (0)