Skip to content

Commit 256991c

Browse files
committed
If value is null, delete document in solr
1 parent 1c90095 commit 256991c

File tree

1 file changed

+4
-8
lines changed

1 file changed

+4
-8
lines changed

src/main/java/com/bkatwal/kafkaproject/SolrSinkTask.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void start(Map<String, String> configMap) {
4949
}
5050

5151
@Override
52-
public void put(Collection<SinkRecord> kafkaRecords) {
52+
public void put(Collection<SinkRecord> kafkaRecords){
5353

5454
for (SinkRecord record : kafkaRecords) {
5555
String id = record.key() != null ? record.key().toString() : null;
@@ -61,20 +61,16 @@ public void put(Collection<SinkRecord> kafkaRecords) {
6161
//TODO handle schema based record later
6262
if (valueSchema == null) {
6363

64-
if (record.value() == null) {
65-
log.error("No value passed for doc ID, {}", id);
66-
continue;
67-
}
68-
6964
Map<String, Object> jsonValueMap = (Map<String, Object>) record.value();
7065

7166
Object delVal = jsonValueMap.get("_delete_");
7267

7368
//delete the field "_delete_" after reading the value from it
7469
jsonValueMap.remove("_delete_");
7570

76-
//if _delete_ is passed in doc and is false, will try to delete doc
77-
if (isDeleteRequest(delVal)) {
71+
//if _delete_ field is passed as false in value or if value is null, respective
72+
// doc will be deleted from solr
73+
if (isDeleteRequest(delVal) || record.value() == null) {
7874

7975
sinkService.deleteById(id);
8076

0 commit comments

Comments
 (0)