|
8 | 8 | import static org.mockito.Mockito.mock;
|
9 | 9 | import static org.mockito.Mockito.times;
|
10 | 10 | import static org.mockito.Mockito.verify;
|
| 11 | +import static org.testng.Assert.assertFalse; |
| 12 | +import static org.testng.Assert.assertNotNull; |
| 13 | +import static org.testng.Assert.assertThrows; |
| 14 | +import static org.testng.Assert.assertTrue; |
11 | 15 |
|
12 | 16 | import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
|
13 | 17 | import com.linkedin.davinci.consumer.stats.BasicConsumerStats;
|
|
65 | 69 | import java.util.HashSet;
|
66 | 70 | import java.util.List;
|
67 | 71 | import java.util.Map;
|
| 72 | +import java.util.Properties; |
68 | 73 | import java.util.Set;
|
69 | 74 | import java.util.concurrent.CompletableFuture;
|
70 | 75 | import java.util.concurrent.ConcurrentHashMap;
|
@@ -104,6 +109,19 @@ public void setUp() {
|
104 | 109 | valueSerializer = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(valueSchema);
|
105 | 110 | }
|
106 | 111 |
|
| 112 | + @Test |
| 113 | + public void testConfig() { |
| 114 | + ChangelogClientConfig config = new ChangelogClientConfig(); |
| 115 | + assertNotNull(config.getConsumerProperties()); |
| 116 | + assertTrue(config.getConsumerProperties().isEmpty()); |
| 117 | + assertThrows(NullPointerException.class, () -> config.setConsumerProperties(null)); |
| 118 | + Properties newProps = new Properties(); |
| 119 | + newProps.setProperty("foo", "bar"); |
| 120 | + config.setConsumerProperties(newProps); |
| 121 | + assertNotNull(config.getConsumerProperties()); |
| 122 | + assertFalse(config.getConsumerProperties().isEmpty()); |
| 123 | + } |
| 124 | + |
107 | 125 | @Test
|
108 | 126 | public void testConsumeBeforeAndAfterImage() throws ExecutionException, InterruptedException {
|
109 | 127 | D2ControllerClient d2ControllerClient = mock(D2ControllerClient.class);
|
@@ -394,7 +412,7 @@ public void testBootstrapState() {
|
394 | 412 | 0);
|
395 | 413 | doReturn(currentTimestamp).when(veniceChangelogConsumer).getSubscribeTime();
|
396 | 414 | veniceChangelogConsumer.maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition);
|
397 |
| - Assert.assertFalse(bootstrapStateMap.get(0)); |
| 415 | + assertFalse(bootstrapStateMap.get(0)); |
398 | 416 | kafkaMessageEnvelope.producerMetadata.messageTimestamp = currentTimestamp - TimeUnit.SECONDS.toMillis(30);
|
399 | 417 | veniceChangelogConsumer.maybeUpdatePartitionToBootstrapMap(message, pubSubTopicPartition);
|
400 | 418 | Assert.assertTrue(bootstrapStateMap.get(0));
|
@@ -453,7 +471,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
|
453 | 471 | prepareVersionTopicRecordsToBePolled(5L, 15L, mockPubSubConsumer, oldVersionTopic, 0, true);
|
454 | 472 | pubSubMessages =
|
455 | 473 | (List<PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100);
|
456 |
| - Assert.assertFalse(pubSubMessages.isEmpty()); |
| 474 | + assertFalse(pubSubMessages.isEmpty()); |
457 | 475 | Assert.assertEquals(pubSubMessages.size(), 10);
|
458 | 476 | for (int i = 5; i < 15; i++) {
|
459 | 477 | PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i - 5);
|
@@ -526,7 +544,7 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int
|
526 | 544 |
|
527 | 545 | prepareVersionTopicRecordsToBePolled(5L, 15L, mockPubSubConsumer, oldVersionTopic, 0, true);
|
528 | 546 | pubSubMessages = new ArrayList<>(veniceChangelogConsumer.poll(100));
|
529 |
| - Assert.assertFalse(pubSubMessages.isEmpty()); |
| 547 | + assertFalse(pubSubMessages.isEmpty()); |
530 | 548 | Assert.assertEquals(pubSubMessages.size(), 10);
|
531 | 549 | for (int i = 5; i < 15; i++) {
|
532 | 550 | PubSubMessage<String, ChangeEvent<Utf8>, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i - 5);
|
|
0 commit comments