Skip to content

Commit 74f141a

Browse files
authored
KAFKA-388: Add NullFieldValueRemover post processor (#170)
1 parent 94e23a8 commit 74f141a

File tree

3 files changed

+184
-1
lines changed

3 files changed

+184
-1
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
## 1.13.1
66

77
### Bug Fixes
8-
- [KAFKA-428](https://jira.mongodb.org/browse/KAFKA-428) Avoid unnecessary copy of data on restart
8+
- [KAFKA-428](https://jira.mongodb.org/browse/KAFKA-428) Avoid unnecessary copy of data on restart
99

1010
## 1.13.0
1111

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.processor;
18+
19+
import org.apache.kafka.connect.sink.SinkRecord;
20+
21+
import org.bson.BsonDocument;
22+
import org.bson.BsonValue;
23+
24+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
25+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
26+
27+
public class NullFieldValueRemover extends PostProcessor {
28+
29+
public NullFieldValueRemover(final MongoSinkTopicConfig config) {
30+
super(config);
31+
}
32+
33+
@Override
34+
public void process(final SinkDocument doc, final SinkRecord orig) {
35+
doc.getValueDoc().ifPresent(this::removeNullFieldValues);
36+
}
37+
38+
private void removeNullFieldValues(final BsonDocument doc) {
39+
doc.entrySet()
40+
.removeIf(
41+
entry -> {
42+
BsonValue value = entry.getValue();
43+
if (value.isDocument()) {
44+
removeNullFieldValues(value.asDocument());
45+
}
46+
if (value.isArray()) {
47+
value.asArray().stream()
48+
.filter(BsonValue::isDocument)
49+
.forEach(element -> removeNullFieldValues(element.asDocument()));
50+
}
51+
return value.isNull();
52+
});
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.processor;
18+
19+
import static com.mongodb.kafka.connect.sink.SinkTestHelper.createTopicConfig;
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
22+
import java.util.Optional;
23+
24+
import org.junit.jupiter.api.DisplayName;
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.bson.BsonDocument;
28+
29+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
30+
31+
public class NullFieldValueRemoverTest {
32+
@Test
33+
@DisplayName("test NullFieldValueRemoverTest flat document")
34+
void testNullFieldValueRemoverFlatDocument() {
35+
BsonDocument document =
36+
BsonDocument.parse(
37+
"{"
38+
+ " 'myNull1': null,"
39+
+ " 'myString': 'a',"
40+
+ " 'myEmptyString': ''"
41+
+ " 'myNull2': null,"
42+
+ " 'myTrueBool': true,"
43+
+ " 'myFalseBool': false,"
44+
+ " 'myInt': 123,"
45+
+ " 'myNull3': null"
46+
+ "}");
47+
SinkDocument sinkDocWithValueDoc = new SinkDocument(null, document);
48+
new NullFieldValueRemover(createTopicConfig()).process(sinkDocWithValueDoc, null);
49+
BsonDocument expected =
50+
BsonDocument.parse(
51+
"{"
52+
+ " 'myString': 'a',"
53+
+ " 'myEmptyString': '',"
54+
+ " 'myTrueBool': true,"
55+
+ " 'myFalseBool': false,"
56+
+ " 'myInt': 123"
57+
+ "}");
58+
59+
assertEquals(Optional.of(expected), sinkDocWithValueDoc.getValueDoc());
60+
}
61+
62+
@Test
63+
@DisplayName("test NullFieldValueRemoverTest document with only null values")
64+
void testNullFieldValueRemoverObjectWithOnlyNullValues() {
65+
BsonDocument document =
66+
BsonDocument.parse("{ 'myNull1': null, 'myNull2': null, 'myNull3': null }");
67+
SinkDocument sinkDocWithValueDoc = new SinkDocument(null, document);
68+
new NullFieldValueRemover(createTopicConfig()).process(sinkDocWithValueDoc, null);
69+
BsonDocument expected = BsonDocument.parse("{}");
70+
assertEquals(Optional.of(expected), sinkDocWithValueDoc.getValueDoc());
71+
}
72+
73+
@Test
74+
@DisplayName("test NullFieldValueRemoverTest empty document")
75+
void testNullFieldValueRemoverEmptyDocument() {
76+
BsonDocument empty = new BsonDocument();
77+
SinkDocument sinkDocWithValueDoc = new SinkDocument(null, empty);
78+
new NullFieldValueRemover(createTopicConfig()).process(sinkDocWithValueDoc, null);
79+
BsonDocument expected = BsonDocument.parse("{}");
80+
81+
assertEquals(Optional.of(expected), sinkDocWithValueDoc.getValueDoc());
82+
}
83+
84+
@Test
85+
@DisplayName("test NullFieldValueRemoverTest nested document")
86+
void testNullFieldValueRemoverNestedDocument() {
87+
BsonDocument document =
88+
BsonDocument.parse(
89+
"{"
90+
+ " 'myNull1': null,"
91+
+ " 'mySubDoc1': {"
92+
+ " 'myDocumentString': 'a',"
93+
+ " 'myDocumentNull': null,"
94+
+ " 'mySubDoc2': {"
95+
+ " 'myDocumentString': 'b',"
96+
+ " 'myDocumentNull': null,"
97+
+ " 'mySubDoc3': {"
98+
+ " 'myDocumentString': 'c',"
99+
+ " 'myDocumentNull': null"
100+
+ " }"
101+
+ " }"
102+
+ " },"
103+
+ " 'myArray': [null, 'a', { 'a': null, 'b': null, 'c': null }, 123],"
104+
+ " 'myNull2': null,"
105+
+ " 'myDocument': { 'myDocumentString': 'a', 'myDocumentNull': null }"
106+
+ "}");
107+
SinkDocument sinkDocWithValueDoc = new SinkDocument(null, document);
108+
new NullFieldValueRemover(createTopicConfig()).process(sinkDocWithValueDoc, null);
109+
BsonDocument expected =
110+
BsonDocument.parse(
111+
"{"
112+
+ " 'mySubDoc1': {"
113+
+ " 'myDocumentString': 'a',"
114+
+ " 'mySubDoc2': {"
115+
+ " 'myDocumentString': 'b',"
116+
+ " 'mySubDoc3': {"
117+
+ " 'myDocumentString': 'c'"
118+
+ " }"
119+
+ " }"
120+
+ " }, "
121+
+ " 'myArray': [null, 'a', {}, 123],"
122+
+ " 'myDocument': {"
123+
+ " 'myDocumentString': 'a'"
124+
+ " }"
125+
+ "}");
126+
127+
assertEquals(Optional.of(expected), sinkDocWithValueDoc.getValueDoc());
128+
}
129+
}

0 commit comments

Comments
 (0)