Skip to content

Commit 9f852b0

Browse files
authored
[controller] fix for NPE during HybridStoreConfigImpl creation (#1357)
* fix for NPE during HybridStoreConfigImpl creation * address review comments
1 parent 8342860 commit 9f852b0

File tree

6 files changed

+592
-18
lines changed

6 files changed

+592
-18
lines changed

Diff for: internal/venice-common/src/main/java/com/linkedin/venice/meta/HybridStoreConfigImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public HybridStoreConfigImpl(
5656
: dataReplicationPolicy.getValue();
5757
this.hybridConfig.bufferReplayPolicy =
5858
bufferReplayPolicy == null ? BufferReplayPolicy.REWIND_FROM_EOP.getValue() : bufferReplayPolicy.getValue();
59-
this.hybridConfig.realTimeTopicName = realTimeTopicName;
59+
this.hybridConfig.realTimeTopicName = realTimeTopicName == null ? DEFAULT_REAL_TIME_TOPIC_NAME : realTimeTopicName;
6060
}
6161

6262
HybridStoreConfigImpl(StoreHybridConfig config) {

Diff for: internal/venice-common/src/test/java/com/linkedin/venice/helix/TestStoreJsonSerializer.java

+43
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,49 @@
2727
* Test cases for StoreJsonSerializer.
2828
*/
2929
public class TestStoreJsonSerializer {
30+
@Test
31+
void testRealTimeTopicNameDefault() throws IOException {
32+
StoreJSONSerializer serializer = new StoreJSONSerializer();
33+
Store store = serializer
34+
.deserialize(TestUtils.loadFileAsString("TestStoreJsonSerializer/testHybridStore.json").getBytes(), "");
35+
36+
Assert.assertNotNull(store.getHybridStoreConfig().getRealTimeTopicName(), "realTimeTopicName should not be null");
37+
Assert.assertEquals(
38+
HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME,
39+
store.getHybridStoreConfig().getRealTimeTopicName(),
40+
"realTimeTopicName should have default value of an empty string");
41+
42+
store.getVersions().forEach(v -> {
43+
Assert.assertNotNull(v.getHybridStoreConfig().getRealTimeTopicName(), "realTimeTopicName should not be null");
44+
Assert.assertEquals(
45+
HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME,
46+
v.getHybridStoreConfig().getRealTimeTopicName(),
47+
"realTimeTopicName should have default value of an empty string");
48+
});
49+
}
50+
51+
@Test
52+
void testRealTimeTopicName() throws IOException {
53+
StoreJSONSerializer serializer = new StoreJSONSerializer();
54+
Store store = serializer
55+
.deserialize(TestUtils.loadFileAsString("TestStoreJsonSerializer/testHybridStore2.json").getBytes(), "");
56+
57+
Assert.assertEquals(
58+
"TEST_RT_TOPIC_NAME",
59+
store.getHybridStoreConfig().getRealTimeTopicName(),
60+
"realTimeTopicName should have default value of an empty string");
61+
62+
store.getVersions().forEach(v -> {
63+
if (v.getNumber() == 817) {
64+
Assert.assertEquals("TEST_RT_TOPIC_NAME_817", v.getHybridStoreConfig().getRealTimeTopicName());
65+
} else if (v.getNumber() == 818) {
66+
Assert.assertEquals("TEST_RT_TOPIC_NAME_818", v.getHybridStoreConfig().getRealTimeTopicName());
67+
} else {
68+
Assert.fail("Unexpected version number!");
69+
}
70+
});
71+
}
72+
3073
@Test
3174
public void testSerializeAndDeserializeStore() throws IOException {
3275
Store store = TestUtils.createTestStore("s1", "owner", 1l);

Diff for: internal/venice-common/src/test/java/com/linkedin/venice/schema/SchemaAdapterTest.java

+3-17
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package com.linkedin.venice.schema;
22

3-
import com.linkedin.alpini.io.IOUtils;
43
import com.linkedin.venice.utils.TestUtils;
5-
import java.nio.charset.StandardCharsets;
64
import java.util.Arrays;
75
import java.util.LinkedHashMap;
86
import java.util.List;
97
import java.util.Map;
10-
import java.util.Objects;
118
import org.apache.avro.Schema;
129
import org.apache.avro.generic.GenericData;
1310
import org.apache.avro.generic.GenericRecord;
@@ -20,22 +17,11 @@
2017
public class SchemaAdapterTest {
2118
private static final Logger LOGGER = LogManager.getLogger(SchemaAdapterTest.class);
2219
private static final Schema SIMPLE_RECORD_SCHEMA = AvroSchemaParseUtils
23-
.parseSchemaFromJSONStrictValidation(loadFileAsString("AvroRecordUtilsTest/SimpleRecordSchema.avsc"));
20+
.parseSchemaFromJSONStrictValidation(TestUtils.loadFileAsString("AvroRecordUtilsTest/SimpleRecordSchema.avsc"));
2421
private static final Schema OLD_RECORD_SCHEMA = AvroSchemaParseUtils
25-
.parseSchemaFromJSONStrictValidation(loadFileAsString("AvroRecordUtilsTest/OldRecordSchema.avsc"));
22+
.parseSchemaFromJSONStrictValidation(TestUtils.loadFileAsString("AvroRecordUtilsTest/OldRecordSchema.avsc"));
2623
private static final Schema EVOLVED_RECORD_SCHEMA = AvroSchemaParseUtils
27-
.parseSchemaFromJSONStrictValidation(loadFileAsString("AvroRecordUtilsTest/EvolvedRecordSchema.avsc"));
28-
29-
private static String loadFileAsString(String fileName) {
30-
try {
31-
return IOUtils.toString(
32-
Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName)),
33-
StandardCharsets.UTF_8);
34-
} catch (Exception e) {
35-
LOGGER.error(e);
36-
return null;
37-
}
38-
}
24+
.parseSchemaFromJSONStrictValidation(TestUtils.loadFileAsString("AvroRecordUtilsTest/EvolvedRecordSchema.avsc"));
3925

4026
@Test
4127
public void testAdaptRecordWithSameSchema() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
{
2+
"name": "testStore",
3+
"owner": "urn:li:corpuser:testuser",
4+
"createdTime": 1661227581365,
5+
"persistenceType": "ROCKS_DB",
6+
"routingStrategy": "CONSISTENT_HASH",
7+
"readStrategy": "ANY_OF_ONLINE",
8+
"offLinePushStrategy": "WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION",
9+
"currentVersion": 818,
10+
"storageQuotaInByte": 53687091200,
11+
"readQuotaInCU": 5000,
12+
"hybridStoreConfig": {
13+
"rewindTimeInSeconds": 86400,
14+
"offsetLagThresholdToGoOnline": 10,
15+
"producerTimestampLagThresholdToGoOnlineInSeconds": -1,
16+
"dataReplicationPolicy": "NON_AGGREGATE",
17+
"bufferReplayPolicy": "REWIND_FROM_EOP"
18+
},
19+
"partitionerConfig": {
20+
"partitionerClass": "com.linkedin.venice.partitioner.DefaultVenicePartitioner",
21+
"partitionerParams": {},
22+
"amplificationFactor": 1
23+
},
24+
"replicationFactor": 3,
25+
"lowWatermark": 0,
26+
"partitionCount": 0,
27+
"daVinciPushStatusStoreEnabled": true,
28+
"hybrid": false,
29+
"nativeReplicationSourceFabric": "prod",
30+
"rmdChunkingEnabled": false,
31+
"accessControlled": true,
32+
"rmdVersion": -1,
33+
"schemaAutoRegisterFromPushJobEnabled": false,
34+
"readComputationEnabled": false,
35+
"latestSuperSetValueSchemaId": -1,
36+
"hybridStoreDiskQuotaEnabled": false,
37+
"storeMetadataSystemStoreEnabled": false,
38+
"latestVersionPromoteToCurrentTimestamp": 1733110839107,
39+
"migrationDuplicateStore": false,
40+
"systemStores": {
41+
"venice_system_store_davinci_push_status_store": {
42+
"currentVersion": 3,
43+
"latestVersionPromoteToCurrentTimestamp": 1729632343583,
44+
"versions": [
45+
{
46+
"storeName": "testStore",
47+
"number": 3,
48+
"createdTime": 1729632323698,
49+
"pushJobId": "SYSTEM_STORE_PUSH_1729632306180",
50+
"partitionCount": 1,
51+
"partitionerConfig": {
52+
"partitionerClass": "com.linkedin.venice.partitioner.DefaultVenicePartitioner",
53+
"partitionerParams": {},
54+
"amplificationFactor": 1
55+
},
56+
"replicationFactor": 3,
57+
"hybridStoreConfig": {
58+
"rewindTimeInSeconds": 86400,
59+
"offsetLagThresholdToGoOnline": 10,
60+
"producerTimestampLagThresholdToGoOnlineInSeconds": -1,
61+
"dataReplicationPolicy": "NON_AGGREGATE",
62+
"bufferReplayPolicy": "REWIND_FROM_EOP"
63+
},
64+
"nativeReplicationSourceFabric": "prod",
65+
"rmdChunkingEnabled": false,
66+
"blobTransferEnabled": false,
67+
"versionSwapDeferred": false,
68+
"minActiveReplicas": 2,
69+
"useVersionLevelIncrementalPushEnabled": true,
70+
"useVersionLevelHybridConfig": true,
71+
"dataRecoveryVersionConfig": null,
72+
"repushSourceVersion": -1,
73+
"chunkingEnabled": true,
74+
"incrementalPushEnabled": false,
75+
"separateRealTimeTopicEnabled": false,
76+
"nativeReplicationEnabled": true,
77+
"pushStreamSourceAddress": "kafka:16637",
78+
"activeActiveReplicationEnabled": false,
79+
"pushType": "BATCH",
80+
"compressionStrategy": "NO_OP",
81+
"status": "ONLINE",
82+
"leaderFollowerModelEnabled": true,
83+
"timestampMetadataVersionId": 1,
84+
"viewConfigs": {}
85+
}
86+
],
87+
"largestUsedVersionNumber": 3
88+
},
89+
"venice_system_store_meta_store": {
90+
"currentVersion": 3,
91+
"latestVersionPromoteToCurrentTimestamp": 1729632199446,
92+
"versions": [
93+
{
94+
"storeName": "venice_system_store_meta_store_V1",
95+
"number": 3,
96+
"createdTime": 1729632179842,
97+
"pushJobId": "SYSTEM_STORE_PUSH_1729632163228",
98+
"partitionCount": 1,
99+
"partitionerConfig": {
100+
"partitionerClass": "com.linkedin.venice.partitioner.DefaultVenicePartitioner",
101+
"partitionerParams": {},
102+
"amplificationFactor": 1
103+
},
104+
"replicationFactor": 3,
105+
"hybridStoreConfig": {
106+
"rewindTimeInSeconds": 300,
107+
"offsetLagThresholdToGoOnline": 10,
108+
"producerTimestampLagThresholdToGoOnlineInSeconds": -1,
109+
"dataReplicationPolicy": "NON_AGGREGATE",
110+
"bufferReplayPolicy": "REWIND_FROM_EOP"
111+
},
112+
"nativeReplicationSourceFabric": "prod",
113+
"rmdChunkingEnabled": false,
114+
"blobTransferEnabled": false,
115+
"versionSwapDeferred": false,
116+
"minActiveReplicas": 2,
117+
"useVersionLevelIncrementalPushEnabled": true,
118+
"useVersionLevelHybridConfig": true,
119+
"dataRecoveryVersionConfig": null,
120+
"repushSourceVersion": -1,
121+
"chunkingEnabled": true,
122+
"incrementalPushEnabled": false,
123+
"separateRealTimeTopicEnabled": false,
124+
"nativeReplicationEnabled": true,
125+
"pushStreamSourceAddress": "kafka:16637",
126+
"activeActiveReplicationEnabled": false,
127+
"pushType": "BATCH",
128+
"compressionStrategy": "NO_OP",
129+
"status": "ONLINE",
130+
"leaderFollowerModelEnabled": true,
131+
"timestampMetadataVersionId": 1,
132+
"viewConfigs": {}
133+
}
134+
],
135+
"largestUsedVersionNumber": 3
136+
}
137+
},
138+
"versions": [
139+
{
140+
"number": 817,
141+
"replicationFactor": 3,
142+
"partitionCount": 20,
143+
"storeName": "testStore",
144+
"hybridStoreConfig": {
145+
"rewindTimeInSeconds": 86400,
146+
"offsetLagThresholdToGoOnline": 10,
147+
"producerTimestampLagThresholdToGoOnlineInSeconds": -1,
148+
"dataReplicationPolicy": "NON_AGGREGATE",
149+
"bufferReplayPolicy": "REWIND_FROM_EOP"
150+
},
151+
"nativeReplicationSourceFabric": "prod",
152+
"createdTime": 1733024396416,
153+
"rmdChunkingEnabled": false,
154+
"blobTransferEnabled": false,
155+
"versionSwapDeferred": false,
156+
"minActiveReplicas": 2,
157+
"useVersionLevelIncrementalPushEnabled": true,
158+
"useVersionLevelHybridConfig": true,
159+
"dataRecoveryVersionConfig": {
160+
"dataRecoverySourceFabric": "prod",
161+
"dataRecoveryComplete": false,
162+
"dataRecoverySourceVersionNumber": 817
163+
},
164+
"repushSourceVersion": -1,
165+
"chunkingEnabled": false,
166+
"incrementalPushEnabled": false,
167+
"separateRealTimeTopicEnabled": false,
168+
"nativeReplicationEnabled": true,
169+
"pushStreamSourceAddress": "kafka:16637",
170+
"activeActiveReplicationEnabled": false,
171+
"pushType": "BATCH",
172+
"partitionerConfig": {
173+
"partitionerClass": "com.linkedin.venice.partitioner.DefaultVenicePartitioner",
174+
"partitionerParams": {},
175+
"amplificationFactor": 1
176+
},
177+
"compressionStrategy": "NO_OP",
178+
"viewConfigs": {},
179+
"status": "ONLINE",
180+
"leaderFollowerModelEnabled": true,
181+
"timestampMetadataVersionId": 1
182+
},
183+
{
184+
"number": 818,
185+
"replicationFactor": 3,
186+
"partitionCount": 20,
187+
"storeName": "testStore",
188+
"hybridStoreConfig": {
189+
"rewindTimeInSeconds": 86400,
190+
"offsetLagThresholdToGoOnline": 10,
191+
"producerTimestampLagThresholdToGoOnlineInSeconds": -1,
192+
"dataReplicationPolicy": "NON_AGGREGATE",
193+
"bufferReplayPolicy": "REWIND_FROM_EOP"
194+
},
195+
"nativeReplicationSourceFabric": "prod",
196+
"createdTime": 1733109912684,
197+
"rmdChunkingEnabled": false,
198+
"blobTransferEnabled": false,
199+
"versionSwapDeferred": false,
200+
"minActiveReplicas": 2,
201+
"useVersionLevelIncrementalPushEnabled": true,
202+
"useVersionLevelHybridConfig": true,
203+
"dataRecoveryVersionConfig": {
204+
"dataRecoverySourceFabric": "prod",
205+
"dataRecoveryComplete": false,
206+
"dataRecoverySourceVersionNumber": 818
207+
},
208+
"repushSourceVersion": -1,
209+
"chunkingEnabled": false,
210+
"incrementalPushEnabled": false,
211+
"separateRealTimeTopicEnabled": false,
212+
"nativeReplicationEnabled": true,
213+
"pushStreamSourceAddress": "kafka:16637",
214+
"activeActiveReplicationEnabled": false,
215+
"pushType": "BATCH",
216+
"partitionerConfig": {
217+
"partitionerClass": "com.linkedin.venice.partitioner.DefaultVenicePartitioner",
218+
"partitionerParams": {},
219+
"amplificationFactor": 1
220+
},
221+
"compressionStrategy": "NO_OP",
222+
"viewConfigs": {},
223+
"status": "ONLINE",
224+
"leaderFollowerModelEnabled": true,
225+
"timestampMetadataVersionId": 1
226+
}
227+
],
228+
"storageNodeReadQuotaEnabled": false,
229+
"unusedSchemaDeletionEnabled": false,
230+
"blobTransferEnabled": false,
231+
"nearlineProducerCompressionEnabled": true,
232+
"clientDecompressionEnabled": true,
233+
"chunkingEnabled": false,
234+
"incrementalPushEnabled": false,
235+
"separateRealTimeTopicEnabled": false,
236+
"batchGetLimit": -1,
237+
"numVersionsToPreserve": 0,
238+
"writeComputationEnabled": false,
239+
"bootstrapToOnlineTimeoutInHours": 24,
240+
"nativeReplicationEnabled": true,
241+
"pushStreamSourceAddress": "",
242+
"backupStrategy": "DELETE_ON_NEW_PUSH_START",
243+
"backupVersionRetentionMs": 864000000,
244+
"activeActiveReplicationEnabled": false,
245+
"largestUsedVersionNumber": 818,
246+
"storeMetaSystemStoreEnabled": true,
247+
"migrating": false,
248+
"compressionStrategy": "NO_OP",
249+
"enableReads": true,
250+
"enableWrites": true,
251+
"retentionTime": 432000000,
252+
"minCompactionLagSeconds": -1,
253+
"maxCompactionLagSeconds": -1,
254+
"maxRecordSizeBytes": -1,
255+
"maxNearlineRecordSizeBytes": -1,
256+
"nearlineProducerCountPerWriter": 1,
257+
"etlStoreConfig": {
258+
"etledUserProxyAccount": "",
259+
"regularVersionETLEnabled": false,
260+
"futureVersionETLEnabled": false
261+
},
262+
"systemStore": false,
263+
"views": {}
264+
}

0 commit comments

Comments
 (0)