Skip to content

Commit 35ee2c5

Browse files
committed
added conditions for unknown schemas for sink record
1 parent 6b72eb8 commit 35ee2c5

File tree

2 files changed

+26
-19
lines changed

2 files changed

+26
-19
lines changed

config/bkworker.properties

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ value.converter=org.apache.kafka.connect.json.JsonConverter
88
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
99
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
1010

11+
#to deserialize plain JSON data keep schemas.enable=false
1112
internal.key.converter.schemas.enable=false
1213
key.converter.schemas.enable=false
1314
value.converter.schemas.enable=false

src/main/java/com/bkatwal/kafkaproject/SolrSinkTask.java

+25-19
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
import com.bkatwal.kafkaproject.utils.SolrSinkService;
55
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
66
import org.apache.kafka.common.TopicPartition;
7+
import org.apache.kafka.connect.data.Schema;
78
import org.apache.kafka.connect.sink.SinkRecord;
89
import org.apache.kafka.connect.sink.SinkTask;
9-
import org.apache.solr.client.solrj.response.UpdateResponse;
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212

@@ -39,31 +39,37 @@ public void put(Collection<SinkRecord> kafkaRecords) {
3939
for (SinkRecord record : kafkaRecords) {
4040
String id = record.key() != null ? record.key().toString() : null;
4141

42+
Schema valueSchema = record.valueSchema();
4243

43-
if (record.value() == null) {
44-
log.error("No value passed for doc ID, {}", id);
45-
continue;
46-
}
44+
//not a plain json data/schema less data
45+
//Expecting schema less record
46+
//TODO handle schema based record later
47+
if (valueSchema == null) {
48+
49+
if (record.value() == null) {
50+
log.error("No value passed for doc ID, {}", id);
51+
continue;
52+
}
4753

48-
UpdateResponse updateResponse;
49-
Map<String, Object> jsonValueMap = (Map<String, Object>) record.value();
54+
Map<String, Object> jsonValueMap = (Map<String, Object>) record.value();
5055

51-
Object delVal = jsonValueMap.get("_delete_");
56+
Object delVal = jsonValueMap.get("_delete_");
5257

53-
//delete the field "_delete_" after reading the value from it
54-
jsonValueMap.remove("_delete_");
58+
//delete the field "_delete_" after reading the value from it
59+
jsonValueMap.remove("_delete_");
5560

56-
//if _delete_ is passed in doc and is false, will try to delete doc
57-
if (isDeleteRequest(delVal)) {
58-
sinkService.deleteById(id);
59-
} else {
60-
try {
61-
sinkService.insert(id, record);
62-
} catch (InvalidObjectException e) {
63-
e.printStackTrace();
61+
//if _delete_ is passed in doc and is false, will try to delete doc
62+
if (isDeleteRequest(delVal)) {
63+
sinkService.deleteById(id);
64+
} else {
65+
try {
66+
sinkService.insert(id, record);
67+
} catch (InvalidObjectException e) {
68+
e.printStackTrace();
69+
}
6470
}
6571
}
66-
72+
log.error("Check if record in topic is plain json data and value is schema less. Set schema.enable=false for value.");
6773

6874
}
6975

0 commit comments

Comments
 (0)