diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java index 8a1d54feb03..63345892dcc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java @@ -12,6 +12,7 @@ public class ChangelogClientConfig { private Properties consumerProperties; private SchemaReader schemaReader; private String viewName; + private Boolean isBeforeImageView = false; private String consumerName = ""; @@ -219,7 +220,17 @@ public static ChangelogClientConfig cloneConfig(Ch .setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes()) .setConsumerName(config.consumerName) .setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval()) - .setShouldCompactMessages(config.shouldCompactMessages()); + .setShouldCompactMessages(config.shouldCompactMessages()) + .setIsBeforeImageView(config.isBeforeImageView()); return newConfig; } + + protected Boolean isBeforeImageView() { + return isBeforeImageView; + } + + public ChangelogClientConfig setIsBeforeImageView(Boolean beforeImageView) { + isBeforeImageView = beforeImageView; + return this; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java index 0ce73ad438f..5ff9c4598c7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumer.java @@ -31,6 +31,7 @@ import com.linkedin.venice.kafka.protocol.state.PartitionState; import com.linkedin.venice.kafka.protocol.state.StoreVersionState; import com.linkedin.venice.meta.PersistenceType; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; @@ -93,7 +94,8 @@ public InternalLocalBootstrappingVeniceChangelogConsumer( bootstrapStateMap = new VeniceConcurrentHashMap<>(); syncBytesInterval = changelogClientConfig.getDatabaseSyncBytesInterval(); metricsRepository = changelogClientConfig.getInnerClientConfig().getMetricsRepository(); - String localStateTopicNameTemp = changelogClientConfig.getStoreName() + LOCAL_STATE_TOPIC_SUFFIX; + String viewNamePath = changelogClientConfig.getViewName() == null ? "" : "-" + changelogClientConfig.getViewName(); + String localStateTopicNameTemp = changelogClientConfig.getStoreName() + viewNamePath + LOCAL_STATE_TOPIC_SUFFIX; String bootstrapFileSystemPath = changelogClientConfig.getBootstrapFileSystemPath(); if (StringUtils.isNotEmpty(consumerId)) { localStateTopicNameTemp += "-" + consumerId; @@ -183,7 +185,8 @@ private Function functionToCheckWhetherStorageEngineShouldBeKep protected boolean handleVersionSwapControlMessage( ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, - String topicSuffix) { + String topicSuffix, + Integer upstreamPartition) { ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage); if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) { VersionSwap versionSwap = (VersionSwap) controlMessage.controlMessageUnion; @@ -506,10 +509,9 @@ public CompletableFuture start(Set partitions) { storageService.start(); try { - storeRepository.start(); storeRepository.subscribe(storeName); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e); } return seekWithBootStrap(partitions); @@ -518,7 +520,13 @@ public CompletableFuture start(Set partitions) { @Override public CompletableFuture start() { Set allPartitions = new HashSet<>(); - for (int partition = 0; partition < partitionCount; partition++) { + try { + storeRepository.subscribe(storeName); + } catch (InterruptedException e) { + throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e); + } + Store store = storeRepository.getStore(storeName); + for (int partition = 0; partition < store.getVersion(store.getCurrentVersion()).getPartitionCount(); partition++) { allPartitions.add(partition); } return this.start(allPartitions); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java index f016dc829f7..e1df68aed26 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceAfterImageConsumerImpl.java @@ -2,6 +2,7 @@ import com.linkedin.alpini.base.concurrency.Executors; import com.linkedin.alpini.base.concurrency.ScheduledExecutorService; +import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; @@ -92,6 +93,11 @@ public CompletableFuture subscribe(Set partitions) { if (partitions.isEmpty()) { return CompletableFuture.completedFuture(null); } + try { + storeRepository.subscribe(storeName); + } catch (InterruptedException e) { + throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e); + } if (!versionSwapThreadScheduled.get()) { // schedule the version swap thread and set up the callback listener this.storeRepository.registerStoreDataChangedListener(versionSwapListener); @@ -208,4 +214,10 @@ public void run() { versionSwapListener.handleStoreChanged(null); } } + + @Override + public void setStoreRepository(NativeMetadataRepositoryViewAdapter repository) { + super.setStoreRepository(repository); + versionSwapListener.setStoreRepository(repository); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java index d20bcc5b17f..bf2d30294f3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java @@ -78,6 +78,9 @@ public VeniceChangelogConsumer getChangelogConsumer(String storeNam String viewClass = getViewClass(newStoreChangelogClientConfig, storeName); String consumerName = suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId); if (viewClass.equals(ChangeCaptureView.class.getCanonicalName())) { + // TODO: This is a little bit of a hack. This is to deal with the an issue where the before image change + // capture topic doesn't follow the same naming convention as view topics. + newStoreChangelogClientConfig.setIsBeforeImageView(true); return new VeniceChangelogConsumerImpl( newStoreChangelogClientConfig, consumer != null diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java index 30ce5739596..462fb43df19 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java @@ -5,6 +5,7 @@ import com.google.common.annotations.VisibleForTesting; import com.linkedin.davinci.consumer.stats.BasicConsumerStats; +import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; import com.linkedin.davinci.storage.chunking.AbstractAvroChunkingAdapter; import com.linkedin.davinci.storage.chunking.GenericChunkingAdapter; @@ -16,7 +17,6 @@ import com.linkedin.venice.compression.NoopCompressor; import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.controllerapi.D2ControllerClient; -import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.exceptions.StoreVersionNotFoundException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.ControlMessage; @@ -28,7 +28,6 @@ import com.linkedin.venice.kafka.protocol.enums.MessageType; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.meta.Store; -import com.linkedin.venice.meta.StoreInfo; import com.linkedin.venice.meta.Version; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; @@ -53,6 +52,7 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.views.ChangeCaptureView; +import com.linkedin.venice.views.VeniceView; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -61,12 +61,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -82,7 +84,6 @@ public class VeniceChangelogConsumerImpl implements VeniceChangelogConsumer { private static final Logger LOGGER = LogManager.getLogger(VeniceChangelogConsumerImpl.class); private static final int MAX_SUBSCRIBE_RETRIES = 5; - protected final int partitionCount; protected long subscribeTime = Long.MAX_VALUE; protected static final VeniceCompressor NO_OP_COMPRESSOR = new NoopCompressor(); @@ -94,12 +95,11 @@ public class VeniceChangelogConsumerImpl implements VeniceChangelogConsume protected StoreDeserializerCache rmdDeserializerCache; protected Class specificValueClass; - protected ThinClientMetaStoreBasedRepository storeRepository; + protected NativeMetadataRepositoryViewAdapter storeRepository; protected final AbstractAvroChunkingAdapter userEventChunkingAdapter; protected final SchemaReader schemaReader; - private final String viewClassName; protected final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); protected final Map partitionToPutMessageCount = new VeniceConcurrentHashMap<>(); @@ -120,9 +120,13 @@ public class VeniceChangelogConsumerImpl implements VeniceChangelogConsume protected final String storeName; protected final PubSubConsumerAdapter pubSubConsumer; - protected final Map> currentVersionHighWatermarks = new HashMap<>(); - protected final Map currentVersionLastHeartbeat = new VeniceConcurrentHashMap<>(); - protected final int[] currentValuePayloadSize; + + // This member is a map of maps in order to accommodate view topics. If the message we consume has the appropriate + // footer then we'll use that to infer entry into the wrapped map and compare with it, otherwise we'll infer it from + // the consumed partition for the given message. We do all this because for a view topic, it may have many + // upstream RT partitions writing to a given view partition. + protected final Map>> currentVersionHighWatermarks = new VeniceConcurrentHashMap<>(); + protected final ConcurrentHashMap currentVersionLastHeartbeat = new VeniceConcurrentHashMap<>(); protected final ChangelogClientConfig changelogClientConfig; @@ -135,7 +139,22 @@ public VeniceChangelogConsumerImpl( ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter pubSubConsumer) { this.pubSubConsumer = pubSubConsumer; - this.storeName = changelogClientConfig.getStoreName(); + + // TODO: putting the change capture case here is a little bit weird. The view abstraction should probably + // accommodate + // this case, but it doesn't seem that clean in this case. Change capture topics don't behave like view topics as + // they only contain nearline data whereas views are full version topics. So for now it seems legit to have this + // caveat, but we should perhaps think through this in the future if we think we'll have more nearline data only + // cases for view topics. Change capture topic naming also doesn't follow the view_store_store_name_view_name + // naming pattern, making tracking it even weirder. + if (changelogClientConfig.getViewName() == null || changelogClientConfig.getViewName().isEmpty() + || changelogClientConfig.isBeforeImageView()) { + this.storeName = changelogClientConfig.getStoreName(); + } else { + this.storeName = + VeniceView.getViewStoreName(changelogClientConfig.getStoreName(), changelogClientConfig.getViewName()); + } + this.d2ControllerClient = changelogClientConfig.getD2ControllerClient(); if (changelogClientConfig.getInnerClientConfig().getMetricsRepository() != null) { this.changeCaptureStats = new BasicConsumerStats( @@ -145,16 +164,7 @@ public VeniceChangelogConsumerImpl( changeCaptureStats = null; } heartbeatReporterThread = new HeartbeatReporterThread(); - StoreResponse storeResponse = changelogClientConfig.getD2ControllerClient().getStore(storeName); - if (storeResponse.isError()) { - throw new VeniceException( - "Failed to get store info for store: " + storeName + " with error: " + storeResponse.getError()); - } - StoreInfo store = storeResponse.getStore(); this.changelogClientConfig = ChangelogClientConfig.cloneConfig(changelogClientConfig); - this.partitionCount = store.getPartitionCount(); - this.currentValuePayloadSize = new int[partitionCount]; - this.viewClassName = changelogClientConfig.getViewName(); this.replicationMetadataSchemaRepository = new ReplicationMetadataSchemaRepository(d2ControllerClient); this.schemaReader = changelogClientConfig.getSchemaReader(); Schema keySchema = schemaReader.getKeySchema(); @@ -162,10 +172,13 @@ public VeniceChangelogConsumerImpl( this.chunkAssembler = new ChunkAssembler(storeName); this.startTimestamp = System.currentTimeMillis(); LOGGER.info("VeniceChangelogConsumer created at timestamp: {}", startTimestamp); - this.storeRepository = new ThinClientMetaStoreBasedRepository( + + ThinClientMetaStoreBasedRepository repository = new ThinClientMetaStoreBasedRepository( changelogClientConfig.getInnerClientConfig(), VeniceProperties.empty(), null); + repository.start(); + this.storeRepository = new NativeMetadataRepositoryViewAdapter(repository); this.rmdDeserializerCache = new RmdDeserializerCache<>(replicationMetadataSchemaRepository, storeName, 1, false); if (changelogClientConfig.getInnerClientConfig().isSpecificClient()) { // If a value class is supplied, we'll use a Specific record adapter @@ -182,16 +195,13 @@ public VeniceChangelogConsumerImpl( } - LOGGER.info( - "Start a change log consumer client for store: {}, with partition count: {} and view class: {} ", - storeName, - partitionCount, - viewClassName); + LOGGER.info("Start a change log consumer client for store: {}", storeName); } @Override public int getPartitionCount() { - return partitionCount; + Store store = getStore(); + return store.getVersion(store.getCurrentVersion()).getPartitionCount(); } @Override @@ -206,7 +216,6 @@ public CompletableFuture subscribe(Set partitions) { protected CompletableFuture internalSubscribe(Set partitions, PubSubTopic topic) { return CompletableFuture.supplyAsync(() -> { try { - storeRepository.start(); for (int i = 0; i <= MAX_SUBSCRIBE_RETRIES; i++) { try { storeRepository.subscribe(storeName); @@ -221,11 +230,6 @@ protected CompletableFuture internalSubscribe(Set partitions, Pub } } storeRepository.refresh(); - if (changeCaptureStats != null) { - if (!heartbeatReporterThread.isAlive()) { - heartbeatReporterThread.start(); - } - } } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -264,6 +268,11 @@ protected CompletableFuture internalSubscribe(Set partitions, Pub currentVersionLastHeartbeat.put(topicPartition.getPartitionNumber(), System.currentTimeMillis()); } } + if (changeCaptureStats != null) { + if (!heartbeatReporterThread.isAlive()) { + heartbeatReporterThread.start(); + } + } isSubscribed.set(true); return null; }); @@ -272,7 +281,7 @@ protected CompletableFuture internalSubscribe(Set partitions, Pub protected VeniceCompressor getVersionCompressor(PubSubTopicPartition topicPartition) { Store store = storeRepository.getStore(storeName); String topicName = topicPartition.getPubSubTopic().getName(); - Version version = store.getVersionOrThrow(Version.parseVersionFromVersionTopicName(topicName)); + Version version = store.getVersionOrThrow(Version.parseVersionFromKafkaTopicName(topicName)); VeniceCompressor compressor; if (CompressionStrategy.ZSTD_WITH_DICT.equals(version.getCompressionStrategy())) { compressor = compressorFactory.getVersionSpecificCompressor(topicName); @@ -309,7 +318,7 @@ public CompletableFuture seekToBeginningOfPush() { public CompletableFuture seekToEndOfPush(Set partitions) { // Get the latest change capture topic storeRepository.refresh(); - Store store = storeRepository.getStore(storeName); + Store store = getStore(); int currentVersion = store.getCurrentVersion(); PubSubTopic topic = pubSubTopicRepository .getTopic(Version.composeKafkaTopic(storeName, currentVersion) + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); @@ -388,7 +397,11 @@ public CompletableFuture internalSeekToTail(Set partitions, Strin private PubSubTopic getCurrentServingVersionTopic() { Store store = storeRepository.getStore(storeName); int currentVersion = store.getCurrentVersion(); - return pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, currentVersion)); + if (changelogClientConfig.getViewName() == null || changelogClientConfig.getViewName().isEmpty()) { + return pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, currentVersion)); + } + + return pubSubTopicRepository.getTopic(store.getVersion(currentVersion).kafkaTopicName()); } @Override @@ -445,7 +458,7 @@ private void pubSubConsumerSeek(PubSubTopicPartition topicPartition, Long offset @Override public CompletableFuture subscribeAll() { - return this.subscribe(IntStream.range(0, partitionCount).boxed().collect(Collectors.toSet())); + return this.subscribe(IntStream.range(0, getPartitionCount()).boxed().collect(Collectors.toSet())); } @Override @@ -570,12 +583,21 @@ public void unsubscribe(Set partitions) { @Override public void unsubscribeAll() { Set allPartitions = new HashSet<>(); - for (int partition = 0; partition < partitionCount; partition++) { + for (int partition = 0; partition < getPartitionCount(); partition++) { allPartitions.add(partition); } this.unsubscribe(allPartitions); } + private Store getStore() { + try { + storeRepository.subscribe(storeName); + } catch (InterruptedException e) { + throw new VeniceException("Failed to get store info with exception:", e); + } + return storeRepository.getStore(storeName); + } + @Override public Collection, VeniceChangeCoordinate>> poll(long timeoutInMs) { return internalPoll(timeoutInMs, ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); @@ -687,7 +709,14 @@ protected boolean handleControlMessage( pubSubTopicPartition.getPartitionNumber()); } if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) { - return handleVersionSwapControlMessage(controlMessage, pubSubTopicPartition, topicSuffix); + // TODO: In view topics, we need to know the partition of the upstream RT + // how we transmit this information has yet to be determined, so once we finalize + // that, we'll need to tweak this. For now, we'll just pass in the same partition number + return handleVersionSwapControlMessage( + controlMessage, + pubSubTopicPartition, + topicSuffix, + pubSubTopicPartition.getPartitionNumber()); } if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue() && Arrays.equals(key, KafkaKey.HEART_BEAT.getKey())) { @@ -760,24 +789,18 @@ protected Optional, VeniceChangeCoordinate>> con } if (messageType.equals(MessageType.PUT)) { Put put = (Put) message.getValue().payloadUnion; - // Select appropriate deserializers + // Select appropriate deserializers and compressors Lazy deserializerProvider; int readerSchemaId; - if (pubSubTopicPartition.getPubSubTopic().isVersionTopic()) { - deserializerProvider = Lazy.of(() -> storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId)); - readerSchemaId = put.schemaId; - } else { + VeniceCompressor compressor; + if (pubSubTopicPartition.getPubSubTopic().isViewTopic() && changelogClientConfig.isBeforeImageView()) { deserializerProvider = Lazy.of(() -> recordChangeDeserializer); readerSchemaId = this.schemaReader.getLatestValueSchemaId(); - } - - // Select compressor. We'll only construct compressors for version topics so this will return null for - // events from change capture. This is fine as today they are not compressed. - VeniceCompressor compressor; - if (pubSubTopicPartition.getPubSubTopic().isVersionTopic()) { - compressor = compressorMap.get(pubSubTopicPartition.getPartitionNumber()); - } else { compressor = NO_OP_COMPRESSOR; + } else { + deserializerProvider = Lazy.of(() -> storeDeserializerCache.getDeserializer(put.schemaId, put.schemaId)); + readerSchemaId = put.schemaId; + compressor = compressorMap.get(pubSubTopicPartition.getPartitionNumber()); } assembledObject = chunkAssembler.bufferAndAssembleRecord( @@ -860,8 +883,13 @@ protected Optional, VeniceChangeCoordinate>> con } partitionToPutMessageCount.computeIfAbsent(message.getPartition(), x -> new AtomicLong(0)).incrementAndGet(); } + + // TODO: Once we settle on how to extract upstream RT partition we need to update this with the correct RT partition // Determine if the event should be filtered or not - if (filterRecordByVersionSwapHighWatermarks(replicationCheckpoint, pubSubTopicPartition)) { + if (filterRecordByVersionSwapHighWatermarks( + replicationCheckpoint, + pubSubTopicPartition, + pubSubTopicPartition.getPartitionNumber())) { return Optional.empty(); } @@ -890,7 +918,8 @@ protected List extractOffsetVectorFromMessage( protected boolean handleVersionSwapControlMessage( ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, - String topicSuffix) { + String topicSuffix, + Integer upstreamPartition) { ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage); if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) { VersionSwap versionSwap = (VersionSwap) controlMessage.controlMessageUnion; @@ -908,11 +937,14 @@ protected boolean handleVersionSwapControlMessage( // To make the client // handle this gracefully, we instate the below condition that says the hwm in the client should never go // backwards. - if (RmdUtils.hasOffsetAdvanced( - currentVersionHighWatermarks.getOrDefault(pubSubTopicPartition.getPartitionNumber(), Collections.EMPTY_LIST), - versionSwap.getLocalHighWatermarks())) { - currentVersionHighWatermarks - .put(pubSubTopicPartition.getPartitionNumber(), versionSwap.getLocalHighWatermarks()); + List localOffset = (List) currentVersionHighWatermarks + .getOrDefault(pubSubTopicPartition.getPartitionNumber(), Collections.EMPTY_MAP) + .getOrDefault(upstreamPartition, Collections.EMPTY_LIST); + if (RmdUtils.hasOffsetAdvanced(localOffset, versionSwap.getLocalHighWatermarks())) { + + currentVersionHighWatermarks.putIfAbsent(pubSubTopicPartition.getPartitionNumber(), new HashMap<>()); + currentVersionHighWatermarks.get(pubSubTopicPartition.getPartitionNumber()) + .put(upstreamPartition, versionSwap.getLocalHighWatermarks()); } switchToNewTopic(newServingVersionTopic, topicSuffix, pubSubTopicPartition.getPartitionNumber()); chunkAssembler.clearInMemoryDB(); @@ -930,8 +962,6 @@ private PubSubMessage, VeniceChangeCoordinate> convertChangeEv int payloadSize) { V currentValue = null; if (recordChangeEvent.currentValue != null && recordChangeEvent.currentValue.getSchemaId() > 0) { - currentValuePayloadSize[pubSubTopicPartition.getPartitionNumber()] = - recordChangeEvent.currentValue.getValue().array().length; currentValue = deserializeValueFromBytes( recordChangeEvent.currentValue.getValue(), recordChangeEvent.currentValue.getSchemaId()); @@ -963,12 +993,21 @@ private V deserializeValueFromBytes(ByteBuffer byteBuffer, int valueSchemaId) { private boolean filterRecordByVersionSwapHighWatermarks( List recordCheckpointVector, - PubSubTopicPartition pubSubTopicPartition) { + PubSubTopicPartition pubSubTopicPartition, + Integer upstreamPartition) { int partitionId = pubSubTopicPartition.getPartitionNumber(); - if (recordCheckpointVector != null && currentVersionHighWatermarks.containsKey(partitionId)) { - List partitionCurrentVersionHighWatermarks = - currentVersionHighWatermarks.getOrDefault(partitionId, Collections.EMPTY_LIST); - return !RmdUtils.hasOffsetAdvanced(partitionCurrentVersionHighWatermarks, recordCheckpointVector); + List localOffset = (List) currentVersionHighWatermarks.getOrDefault(partitionId, Collections.EMPTY_MAP) + .getOrDefault(upstreamPartition, Collections.EMPTY_LIST); + if (localOffset != null) { + if (RmdUtils.hasOffsetAdvanced(localOffset, recordCheckpointVector)) { + currentVersionHighWatermarks.putIfAbsent(pubSubTopicPartition.getPartitionNumber(), new HashMap<>()); + // We need to merge + currentVersionHighWatermarks.get(pubSubTopicPartition.getPartitionNumber()) + .put(upstreamPartition, RmdUtils.mergeOffsetVectors(localOffset, recordCheckpointVector)); + return false; + } else { + return true; + } } // Has not met version swap message after client initialization. return false; @@ -1011,7 +1050,7 @@ public void close() { } @VisibleForTesting - protected void setStoreRepository(ThinClientMetaStoreBasedRepository repository) { + protected void setStoreRepository(NativeMetadataRepositoryViewAdapter repository) { this.storeRepository = repository; if (changelogClientConfig.getInnerClientConfig().isSpecificClient()) { // If a value class is supplied, we'll use a Specific record adapter @@ -1093,12 +1132,17 @@ public void run() { } protected void recordStats( - Map currentVersionLastHeartbeat, + ConcurrentHashMap currentVersionLastHeartbeat, BasicConsumerStats changeCaptureStats, Set assignment) { - for (Map.Entry lastHeartbeat: currentVersionLastHeartbeat.entrySet()) { - changeCaptureStats.recordLag(System.currentTimeMillis() - lastHeartbeat.getValue()); + + Iterator> heartbeatIterator = currentVersionLastHeartbeat.entrySet().iterator(); + while (heartbeatIterator.hasNext()) { + changeCaptureStats.recordLag(System.currentTimeMillis() - heartbeatIterator.next().getValue()); } + + currentVersionLastHeartbeat.forEachValue(1, changeCaptureStats::recordLag); + int maxVersion = -1; int minVersion = Integer.MAX_VALUE; synchronized (assignment) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java index 6f8d24fb952..ae1c9bc1507 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VersionSwapDataChangeListener.java @@ -1,10 +1,12 @@ package com.linkedin.davinci.consumer; -import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreDataChangedListener; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.views.VeniceView; import java.util.HashSet; import java.util.Set; import org.apache.logging.log4j.LogManager; @@ -14,13 +16,13 @@ class VersionSwapDataChangeListener implements StoreDataChangedListener { private static final Logger LOGGER = LogManager.getLogger(VersionSwapDataChangeListener.class); private final VeniceAfterImageConsumerImpl consumer; - private final ThinClientMetaStoreBasedRepository storeRepository; + private NativeMetadataRepositoryViewAdapter storeRepository; private final String storeName; private final String consumerName; VersionSwapDataChangeListener( VeniceAfterImageConsumerImpl consumer, - ThinClientMetaStoreBasedRepository storeRepository, + NativeMetadataRepositoryViewAdapter storeRepository, String storeName, String consumerName) { this.consumer = consumer; @@ -31,12 +33,12 @@ class VersionSwapDataChangeListener implements StoreDataChangedListener { @Override public void handleStoreChanged(Store store) { - // store may be null as this is called by other repair tasks - if (!consumer.subscribed()) { - // skip this for now as the consumer hasn't even been set up yet - return; - } synchronized (this) { + // store may be null as this is called by other repair tasks + if (!consumer.subscribed()) { + // skip this for now as the consumer hasn't even been set up yet + return; + } Set partitions = new HashSet<>(); try { // Check the current version of the server @@ -50,7 +52,12 @@ public void handleStoreChanged(Store store) { // for all partition subscriptions that are not subscribed to the current version, resubscribe them for (PubSubTopicPartition topicPartition: subscriptions) { - int version = Version.parseVersionFromVersionTopicName(topicPartition.getPubSubTopic().getName()); + int version; + if (topicPartition.getPubSubTopic().isViewTopic()) { + version = VeniceView.parseVersionFromViewTopic(topicPartition.getPubSubTopic().getName()); + } else { + version = Version.parseVersionFromVersionTopicName(topicPartition.getPubSubTopic().getName()); + } if (version != currentVersion) { partitions.add(topicPartition.getPartitionNumber()); } @@ -71,4 +78,12 @@ public void handleStoreChanged(Store store) { } } } + + @VisibleForTesting + void setStoreRepository(NativeMetadataRepositoryViewAdapter storeRepository) { + // This is chiefly to make static analysis happy + synchronized (this) { + this.storeRepository = storeRepository; + } + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java index 707215c8651..b1b6f8b1495 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/ChangeCaptureViewWriter.java @@ -151,6 +151,11 @@ public Map getTopicNamesAndConfigsForVersion(int versi return internalView.getTopicNamesAndConfigsForVersion(version); } + @Override + public String composeTopicName(int version) { + return internalView.composeTopicName(version); + } + @Override public String getWriterClassName() { return internalView.getWriterClassName(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java index b7df6bf447f..3428dd1a710 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/InternalLocalBootstrappingVeniceChangelogConsumerTest.java @@ -18,7 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; -import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; +import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.davinci.storage.StorageEngineMetadataService; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; @@ -113,7 +113,7 @@ public class InternalLocalBootstrappingVeniceChangelogConsumerTest { private RecordSerializer keySerializer; private RecordSerializer valueSerializer; private PubSubConsumerAdapter pubSubConsumer; - private ThinClientMetaStoreBasedRepository metadataRepository; + private NativeMetadataRepositoryViewAdapter metadataRepository; private PubSubTopic changeCaptureTopic; private SchemaReader schemaReader; private Schema valueSchema; @@ -129,7 +129,7 @@ public class InternalLocalBootstrappingVeniceChangelogConsumerTest { @BeforeMethod public void setUp() { storeName = Utils.getUniqueString(); - localStateTopicName = storeName + "_Bootstrap_v1"; + localStateTopicName = storeName + "-changeCaptureView" + "_Bootstrap_v1"; schemaReader = mock(SchemaReader.class); Schema keySchema = AvroCompatibilityHelper.parse("\"string\""); doReturn(keySchema).when(schemaReader).getKeySchema(); @@ -180,18 +180,22 @@ public void setUp() { .setConsumerProperties(consumerProperties) .setLocalD2ZkHosts(TEST_ZOOKEEPER_ADDRESS) .setRocksDBBlockCacheSizeInBytes(TEST_ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES) - .setDatabaseSyncBytesInterval(TEST_DB_SYNC_BYTES_INTERVAL); + .setDatabaseSyncBytesInterval(TEST_DB_SYNC_BYTES_INTERVAL) + .setIsBeforeImageView(true); changelogClientConfig.getInnerClientConfig().setMetricsRepository(new MetricsRepository()); bootstrappingVeniceChangelogConsumer = new InternalLocalBootstrappingVeniceChangelogConsumer<>(changelogClientConfig, pubSubConsumer, null); - metadataRepository = mock(ThinClientMetaStoreBasedRepository.class); + metadataRepository = mock(NativeMetadataRepositoryViewAdapter.class); Store store = mock(Store.class); Version mockVersion = new VersionImpl(storeName, 1, "foo"); + mockVersion.setPartitionCount(2); when(store.getCurrentVersion()).thenReturn(1); when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + when(store.getPartitionCount()).thenReturn(2); when(metadataRepository.getStore(anyString())).thenReturn(store); when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + when(store.getVersion(Mockito.anyInt())).thenReturn(mockVersion); when(metadataRepository.getValueSchema(storeName, TEST_SCHEMA_ID)) .thenReturn(new SchemaEntry(TEST_SCHEMA_ID, valueSchema)); bootstrappingVeniceChangelogConsumer.setStoreRepository(metadataRepository); @@ -230,8 +234,7 @@ public void testStart() throws ExecutionException, InterruptedException { verify(mockStorageService, times(1)).start(); verify(mockStorageService, times(1)).openStoreForNewPartition(any(), eq(0), any()); verify(mockStorageService, times(1)).openStoreForNewPartition(any(), eq(1), any()); - verify(metadataRepository, times(1)).start(); - verify(metadataRepository, times(1)).subscribe(storeName); + verify(metadataRepository, times(2)).subscribe(storeName); verify(pubSubConsumer, times(1)).subscribe(topicPartition_0, LOWEST_OFFSET); verify(pubSubConsumer, times(1)).subscribe(topicPartition_1, LOWEST_OFFSET); verify(pubSubConsumer, times(1)).subscribe(topicPartition_0, 0L); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactoryTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactoryTest.java index ae02c6a5113..2a7c1d7156f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactoryTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactoryTest.java @@ -3,18 +3,24 @@ import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import com.fasterxml.jackson.core.JsonProcessingException; import com.linkedin.d2.balancer.D2Client; import com.linkedin.data.ByteString; +import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.r2.message.rest.RestResponse; import com.linkedin.venice.client.store.schemas.TestKeyRecord; +import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.controllerapi.D2ControllerClient; import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.meta.ViewConfigImpl; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; @@ -234,7 +240,8 @@ public void testGetBootstrappingChangelogConsumer() new ChangelogClientConfig().setConsumerProperties(consumerProperties) .setSchemaReader(mockSchemaReader) .setBootstrapFileSystemPath(TEST_BOOTSTRAP_FILE_SYSTEM_PATH) - .setLocalD2ZkHosts(TEST_ZOOKEEPER_ADDRESS); + .setLocalD2ZkHosts(TEST_ZOOKEEPER_ADDRESS) + .setIsBeforeImageView(true); VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, new MetricsRepository()); D2ControllerClient mockControllerClient = Mockito.mock(D2ControllerClient.class); @@ -259,6 +266,20 @@ public void testGetBootstrappingChangelogConsumer() Assert.assertTrue(consumer instanceof LocalBootstrappingVeniceChangelogConsumer); globalChangelogClientConfig.setViewName(VIEW_NAME); + + NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); + Store store = mock(Store.class); + Version mockVersion = new VersionImpl(STORE_NAME, 1, "foo"); + mockVersion.setPartitionCount(2); + Mockito.when(store.getCurrentVersion()).thenReturn(1); + Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(store.getPartitionCount()).thenReturn(2); + Mockito.when(store.getVersion(anyInt())).thenReturn(mockVersion); + Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); + Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + + ((LocalBootstrappingVeniceChangelogConsumer) consumer).setStoreRepository(mockRepository); + consumer = veniceChangelogConsumerClientFactory.getBootstrappingChangelogConsumer(STORE_NAME); Assert.assertTrue(consumer instanceof LocalBootstrappingVeniceChangelogConsumer); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index c29da42aaa9..752571c1c6c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -12,7 +12,7 @@ import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.davinci.consumer.stats.BasicConsumerStats; import com.linkedin.davinci.kafka.consumer.TestPubSubTopic; -import com.linkedin.davinci.repository.ThinClientMetaStoreBasedRepository; +import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter; import com.linkedin.venice.client.change.capture.protocol.RecordChangeEvent; import com.linkedin.venice.client.change.capture.protocol.ValueBytes; import com.linkedin.venice.compression.CompressionStrategy; @@ -64,6 +64,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; @@ -129,21 +130,25 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup false, false); ChangelogClientConfig changelogClientConfig = - getChangelogClientConfig(d2ControllerClient).setViewName("changeCaptureView"); + getChangelogClientConfig(d2ControllerClient).setViewName("changeCaptureView").setIsBeforeImageView(true); VeniceChangelogConsumerImpl veniceChangelogConsumer = new VeniceChangelogConsumerImpl<>(changelogClientConfig, mockPubSubConsumer); - Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); - ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class); + NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); Store store = mock(Store.class); Version mockVersion = new VersionImpl(storeName, 1, "foo"); + mockVersion.setPartitionCount(2); Mockito.when(store.getCurrentVersion()).thenReturn(1); Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); - + Mockito.when(store.getPartitionCount()).thenReturn(2); + Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(mockVersion); veniceChangelogConsumer.setStoreRepository(mockRepository); + + Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); + veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); veniceChangelogConsumer.seekToTimestamp(System.currentTimeMillis() - 10000L); PubSubTopicPartition oldVersionTopicPartition = new PubSubTopicPartitionImpl(oldVersionTopic, 0); @@ -215,16 +220,20 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE mockPubSubConsumer, Lazy.of(() -> mockInternalSeekConsumer)); veniceChangelogConsumer.versionSwapDetectionIntervalTimeInMs = 1; - Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); - - ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class); + NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); Store store = mock(Store.class); Version mockVersion = new VersionImpl(storeName, 1, "foo"); + mockVersion.setPartitionCount(2); Mockito.when(store.getCurrentVersion()).thenReturn(1); Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(store.getPartitionCount()).thenReturn(2); Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); - Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(mockVersion); veniceChangelogConsumer.setStoreRepository(mockRepository); + + Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); + + Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); Set partitionSet = new HashSet<>(); @@ -288,24 +297,25 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept PubSubConsumerAdapter mockPubSubConsumer = mock(PubSubConsumerAdapter.class); PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1)); - PubSubTopic oldChangeCaptureTopic = - pubSubTopicRepository.getTopic(oldVersionTopic + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); - prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true); ChangelogClientConfig changelogClientConfig = getChangelogClientConfig(d2ControllerClient).setViewName(""); VeniceChangelogConsumerImpl veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer); - Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); - ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class); + NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); Store store = mock(Store.class); Version mockVersion = new VersionImpl(storeName, 1, "foo"); + mockVersion.setPartitionCount(2); Mockito.when(store.getCurrentVersion()).thenReturn(1); Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(store.getPartitionCount()).thenReturn(2); Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(mockVersion); veniceChangelogConsumer.setStoreRepository(mockRepository); + + Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET); @@ -316,28 +326,20 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept Utf8 messageStr = pubSubMessage.getValue().getCurrentValue(); Assert.assertEquals(messageStr.toString(), "newValue" + i); } - prepareChangeCaptureRecordsToBePolled( - 0L, - 10L, - mockPubSubConsumer, - oldChangeCaptureTopic, - 0, - oldVersionTopic, - null, - false, - false); + + prepareVersionTopicRecordsToBePolled(5L, 15L, mockPubSubConsumer, oldVersionTopic, 0, true); pubSubMessages = (List, VeniceChangeCoordinate>>) veniceChangelogConsumer.poll(100); Assert.assertFalse(pubSubMessages.isEmpty()); Assert.assertEquals(pubSubMessages.size(), 10); - for (int i = 0; i < 10; i++) { - PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i); + for (int i = 5; i < 15; i++) { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i - 5); Utf8 pubSubMessageValue = pubSubMessage.getValue().getCurrentValue(); Assert.assertEquals(pubSubMessageValue.toString(), "newValue" + i); } veniceChangelogConsumer.close(); - verify(mockPubSubConsumer, times(2)).batchUnsubscribe(any()); + verify(mockPubSubConsumer, times(3)).batchUnsubscribe(any()); verify(mockPubSubConsumer).close(); } @@ -361,8 +363,6 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int PubSubConsumerAdapter mockPubSubConsumer = mock(PubSubConsumerAdapter.class); PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1)); - PubSubTopic oldChangeCaptureTopic = - pubSubTopicRepository.getTopic(oldVersionTopic + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX); prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true); ChangelogClientConfig changelogClientConfig = @@ -374,17 +374,22 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int changelogClientConfig.getInnerClientConfig().setMetricsRepository(new MetricsRepository()); VeniceChangelogConsumerImpl veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer); - Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); - ThinClientMetaStoreBasedRepository mockRepository = mock(ThinClientMetaStoreBasedRepository.class); + NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); Store store = mock(Store.class); Version mockVersion = new VersionImpl(storeName, 1, "foo"); + mockVersion.setPartitionCount(2); Mockito.when(store.getCurrentVersion()).thenReturn(1); Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(store.getPartitionCount()).thenReturn(2); Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(mockVersion); veniceChangelogConsumer.setStoreRepository(mockRepository); + + Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); + veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get(); verify(mockPubSubConsumer).subscribe(new PubSubTopicPartitionImpl(oldVersionTopic, 0), OffsetRecord.LOWEST_OFFSET); @@ -395,27 +400,19 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int Utf8 messageStr = pubSubMessage.getValue().getCurrentValue(); Assert.assertEquals(messageStr.toString(), "newValue" + i); } - prepareChangeCaptureRecordsToBePolled( - 0L, - 10L, - mockPubSubConsumer, - oldChangeCaptureTopic, - 0, - oldVersionTopic, - null, - false, - true); + + prepareVersionTopicRecordsToBePolled(5L, 15L, mockPubSubConsumer, oldVersionTopic, 0, true); pubSubMessages = new ArrayList<>(veniceChangelogConsumer.poll(100)); Assert.assertFalse(pubSubMessages.isEmpty()); Assert.assertEquals(pubSubMessages.size(), 10); - for (int i = 0; i < 10; i++) { - PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i); + for (int i = 5; i < 15; i++) { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = pubSubMessages.get(i - 5); Utf8 pubSubMessageValue = pubSubMessage.getValue().getCurrentValue(); Assert.assertEquals(pubSubMessageValue.toString(), "newValue" + i); } veniceChangelogConsumer.close(); - verify(mockPubSubConsumer, times(2)).batchUnsubscribe(any()); + verify(mockPubSubConsumer, times(3)).batchUnsubscribe(any()); verify(mockPubSubConsumer).close(); } @@ -434,7 +431,7 @@ public void testVersionSwapDataChangeListener() { String storeName = "LeppalĂșĂ°i_store"; - ThinClientMetaStoreBasedRepository mockRepository = Mockito.mock(ThinClientMetaStoreBasedRepository.class); + NativeMetadataRepositoryViewAdapter mockRepository = Mockito.mock(NativeMetadataRepositoryViewAdapter.class); Store mockStore = Mockito.mock(Store.class); Mockito.when(mockStore.getCurrentVersion()).thenReturn(5); Mockito.when(mockRepository.getStore(storeName)).thenReturn(mockStore); @@ -469,12 +466,26 @@ public void testMetricReportingThread() throws InterruptedException { ChangelogClientConfig changelogClientConfig = getChangelogClientConfig(d2ControllerClient).setViewName(""); VeniceChangelogConsumerImpl veniceChangelogConsumer = new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer); + + NativeMetadataRepositoryViewAdapter mockRepository = mock(NativeMetadataRepositoryViewAdapter.class); + Store store = mock(Store.class); + Version mockVersion = new VersionImpl(storeName, 1, "foo"); + mockVersion.setPartitionCount(2); + Mockito.when(store.getCurrentVersion()).thenReturn(1); + Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP); + Mockito.when(store.getPartitionCount()).thenReturn(2); + Mockito.when(mockRepository.getStore(anyString())).thenReturn(store); + Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); + Mockito.when(store.getVersionOrThrow(Mockito.anyInt())).thenReturn(mockVersion); + Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(mockVersion); + veniceChangelogConsumer.setStoreRepository(mockRepository); + Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2); VeniceChangelogConsumerImpl.HeartbeatReporterThread reporterThread = veniceChangelogConsumer.getHeartbeatReporterThread(); - Map lastHeartbeat = new HashMap<>(); + ConcurrentHashMap lastHeartbeat = new VeniceConcurrentHashMap<>(); BasicConsumerStats consumerStats = Mockito.mock(BasicConsumerStats.class); Set topicPartitionSet = new HashSet<>(); topicPartitionSet.add(new PubSubTopicPartitionImpl(oldVersionTopic, 1)); @@ -619,7 +630,7 @@ private PubSubMessage constructChangeCaptu KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope( MessageType.PUT.getValue(), producerMetadata, - new Put(ByteBuffer.wrap(recordChangeSerializer.serialize(recordChangeEvent)), 0, 0, ByteBuffer.allocate(0)), + new Put(ByteBuffer.wrap(recordChangeSerializer.serialize(recordChangeEvent)), 1, 0, ByteBuffer.allocate(0)), null); kafkaMessageEnvelope.setProducerMetadata(producerMetadata); KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, keySerializer.serialize(key)); diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java index dd818f6805f..9cd403cd544 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/schema/rmd/RmdUtils.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.schema.rmd.RmdConstants.REPLICATION_CHECKPOINT_VECTOR_FIELD_POS; import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_POS; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.antlr.v4.runtime.misc.NotNull; @@ -100,4 +101,29 @@ static public boolean hasOffsetAdvanced(@NotNull List baseOffset, @NotNull } return true; } + + static public List mergeOffsetVectors(@NotNull List baseOffset, @NotNull List advancedOffset) { + List shortVector; + List longVector; + + if (baseOffset.size() > advancedOffset.size()) { + shortVector = advancedOffset; + longVector = baseOffset; + } else { + shortVector = baseOffset; + longVector = advancedOffset; + } + + List mergedVector = new ArrayList<>(longVector.size()); + + for (int i = 0; i < shortVector.size(); i++) { + mergedVector.add(Math.max(shortVector.get(i), longVector.get(i))); + } + + if (longVector.size() != shortVector.size()) { + mergedVector.addAll(longVector.subList(shortVector.size(), longVector.size())); + } + + return mergedVector; + } } diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java index c65392c6609..a4c48013937 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/schema/rmd/TestRmdUtils.java @@ -70,10 +70,36 @@ public void testHasOffsetAdvanced() { list2.add(1L); list2.add(9L); + List list3 = new ArrayList<>(); + list3.add(10L); + + List list4 = new ArrayList<>(); + list4.add(-1L); + Assert.assertFalse(RmdUtils.hasOffsetAdvanced(list1, list2)); Assert.assertFalse(RmdUtils.hasOffsetAdvanced(list1, Collections.emptyList())); Assert.assertTrue(RmdUtils.hasOffsetAdvanced(list2, list1)); Assert.assertTrue(RmdUtils.hasOffsetAdvanced(Collections.emptyList(), list2)); + + List mergedList = RmdUtils.mergeOffsetVectors(list2, list1); + Assert.assertEquals(mergedList.get(0), list2.get(0)); + Assert.assertEquals(mergedList.get(1), list1.get(1)); + + mergedList = RmdUtils.mergeOffsetVectors(list2, Collections.EMPTY_LIST); + Assert.assertEquals(mergedList.get(0), list2.get(0)); + Assert.assertEquals(mergedList.get(1), list2.get(1)); + + mergedList = RmdUtils.mergeOffsetVectors(Collections.EMPTY_LIST, list2); + Assert.assertEquals(mergedList.get(0), list2.get(0)); + Assert.assertEquals(mergedList.get(1), list2.get(1)); + + mergedList = RmdUtils.mergeOffsetVectors(list1, list3); + Assert.assertEquals(mergedList.get(0), list3.get(0)); + Assert.assertEquals(mergedList.get(1), list1.get(1)); + + mergedList = RmdUtils.mergeOffsetVectors(list3, list1); + Assert.assertEquals(mergedList.get(0), list3.get(0)); + Assert.assertEquals(mergedList.get(1), list1.get(1)); } @Test(expectedExceptions = IllegalStateException.class) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java index a602bdf906d..0ea1b6e4024 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/ChangeCaptureView.java @@ -53,6 +53,11 @@ public Map getTopicNamesAndConfigsForVersion(int versi .singletonMap(Version.composeKafkaTopic(storeName, version) + CHANGE_CAPTURE_TOPIC_SUFFIX, properties); } + @Override + public String composeTopicName(int version) { + return Version.composeKafkaTopic(storeName, version) + CHANGE_CAPTURE_TOPIC_SUFFIX; + } + @Override public String getWriterClassName() { return CHANGE_CAPTURE_VIEW_WRITER_CLASS_NAME; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java index d0656401e13..f82011349a6 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/MaterializedView.java @@ -66,6 +66,12 @@ public Map getTopicNamesAndConfigsForVersion(int versi properties); } + @Override + public String composeTopicName(int version) { + return Version.composeKafkaTopic(storeName, version) + VIEW_NAME_SEPARATOR + viewName + + MATERIALIZED_VIEW_TOPIC_SUFFIX; + } + /** * {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITION_COUNT} is required to configure a new re-partition view. * {@link MaterializedViewParameters#MATERIALIZED_VIEW_PARTITIONER} is optional. The re-partition view will use the store level diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java index cac0706ba87..b0debb798a9 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/views/VeniceView.java @@ -59,6 +59,10 @@ public Map getTopicNamesAndConfigsForVersion(int versi return Collections.emptyMap(); } + public String composeTopicName(int version) { + return Version.composeKafkaTopic(storeName, version); + } + /** * Implementations should return the fully specified class name for the component VeniceViewWriter * implementation. diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java index 81584ae7f01..1d9aa9b0735 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/TestChangelogConsumer.java @@ -57,6 +57,7 @@ import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper; import com.linkedin.venice.integration.utils.ZkServerWrapper; +import com.linkedin.venice.meta.MaterializedViewParameters; import com.linkedin.venice.meta.VeniceUserStoreType; import com.linkedin.venice.meta.ViewConfig; import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition; @@ -71,6 +72,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.view.TestView; import com.linkedin.venice.views.ChangeCaptureView; +import com.linkedin.venice.views.MaterializedView; import io.tehuti.metrics.MetricsRepository; import java.io.File; import java.time.Instant; @@ -103,7 +105,7 @@ public class TestChangelogConsumer { - private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE; + private static final int TEST_TIMEOUT = 3 * Time.MS_PER_MINUTE; private static final String[] CLUSTER_NAMES = IntStream.range(0, 1).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new); @@ -226,14 +228,29 @@ public void testAAIngestionWithStoreView() throws Exception { setupControllerClient .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams4)); + UpdateStoreQueryParams storeParams5 = new UpdateStoreQueryParams().setViewName("materializedView") + .setViewClassName(MaterializedView.class.getCanonicalName()) + .setViewClassParams( + Collections.singletonMap(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name(), "1")); + setupControllerClient + .retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParams5)); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { Map viewConfigMap = setupControllerClient.getStore(storeName).getStore().getViewConfigs(); - Assert.assertEquals(viewConfigMap.size(), 2); + Assert.assertEquals(viewConfigMap.size(), 3); Assert.assertEquals(viewConfigMap.get("testView").getViewClassName(), TestView.class.getCanonicalName()); Assert.assertEquals( viewConfigMap.get("changeCaptureView").getViewClassName(), ChangeCaptureView.class.getCanonicalName()); Assert.assertEquals(viewConfigMap.get("changeCaptureView").getViewParameters().size(), 1); + Assert.assertEquals( + viewConfigMap.get("materializedView").getViewClassName(), + MaterializedView.class.getCanonicalName()); + Assert.assertEquals( + viewConfigMap.get("materializedView") + .getViewParameters() + .get(MaterializedViewParameters.MATERIALIZED_VIEW_PARTITION_COUNT.name()), + "1"); }); // Write Records to the store for version v1, the push job will contain 100 records. @@ -255,7 +272,8 @@ public void testAAIngestionWithStoreView() throws Exception { .setControllerD2ServiceName(D2_SERVICE_NAME) .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setLocalD2ZkHosts(localZkServer.getAddress()) - .setControllerRequestRetryCount(3); + .setControllerRequestRetryCount(3) + .setIsBeforeImageView(true); VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(globalChangelogClientConfig, metricsRepository); @@ -269,11 +287,29 @@ public void testAAIngestionWithStoreView() throws Exception { Assert.assertTrue(versionTopicConsumer instanceof VeniceAfterImageConsumerImpl); versionTopicConsumer.subscribeAll().get(); + ChangelogClientConfig viewChangeLogClientConfig = new ChangelogClientConfig().setViewName("materializedView") + .setConsumerProperties(consumerProperties) + .setControllerD2ServiceName(D2_SERVICE_NAME) + .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .setLocalD2ZkHosts(localZkServer.getAddress()) + .setControllerRequestRetryCount(3); + VeniceChangelogConsumerClientFactory veniceViewChangelogConsumerClientFactory = + new VeniceChangelogConsumerClientFactory(viewChangeLogClientConfig, metricsRepository); + + VeniceChangelogConsumer viewTopicConsumer = + veniceViewChangelogConsumerClientFactory.getChangelogConsumer(storeName); + Assert.assertTrue(viewTopicConsumer instanceof VeniceAfterImageConsumerImpl); + viewTopicConsumer.subscribeAll().get(); + // Let's consume those 100 records off of version 1 Map versionTopicEvents = new HashMap<>(); pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); Assert.assertEquals(versionTopicEvents.size(), 100); + Map viewTopicEvents = new HashMap<>(); + pollAfterImageEventsFromChangeCaptureConsumer(viewTopicEvents, viewTopicConsumer); + Assert.assertEquals(viewTopicEvents.size(), 100); + VeniceChangelogConsumer veniceChangelogConsumer = veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName); veniceChangelogConsumer.subscribeAll().get(); @@ -546,7 +582,7 @@ public void testAAIngestionWithStoreView() throws Exception { veniceChangelogConsumer.seekToBeginningOfPush().join(); TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); - Assert.assertEquals(polledChangeEvents.size(), 30); + Assert.assertEquals(polledChangeEvents.size(), 10); }); // Save a checkpoint and clear the map @@ -572,7 +608,7 @@ public void testAAIngestionWithStoreView() throws Exception { TestUtils.waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> { pollChangeEventsFromChangeCaptureConsumer(polledChangeEvents, veniceChangelogConsumer); // Repush with TTL will include delete events in the topic - Assert.assertEquals(polledChangeEvents.size(), 16); + Assert.assertEquals(polledChangeEvents.size(), 5); }); allChangeEvents.putAll(polledChangeEvents); polledChangeEvents.clear(); @@ -620,7 +656,7 @@ public void testAAIngestionWithStoreView() throws Exception { pollAfterImageEventsFromChangeCaptureConsumer(versionTopicEvents, versionTopicConsumer); // Reconsuming the events from the version topic, which at this point should just contain the same 16 // events we consumed with the before/after image consumer earlier. - Assert.assertEquals(versionTopicEvents.size(), 30); + Assert.assertEquals(versionTopicEvents.size(), 10); }); // Verify version swap count matches with version count - 1 (since we don't transmit from version 0 to version 1).