Skip to content

Commit

Permalink
[changelog] Make changes for venice view consumption (#1497)
Browse files Browse the repository at this point in the history
* [changelog] Make changes for venice view consumption

These changes make it so that the venice after image consumer is able to now consume view topics. View topics internally very much resemble version topics, which the after image consumer understands today.  So the logic is largely the same.  The only difference we add here is that we now have to consult store repositories which are view aware, and we have to chnage how we maintain local highwatermark information.  highwatermarks are meant to be compared on RT partitions, and new view types map 1:N and N:N RT partitions to view partitions.  This means we need to add an additional dimensionality to highwatermark filtering where instead of keeping a single map of topic partitions to offsets, but a map of topic partitions to maps of RT partitions to highwatermarks.

There's still some pending work here, we need to figure out chunk assembly (as the current implementation of chunk assembly doesn't account for interleaving writes) and we need to put a bow on how we articulate to the consumers what the upstream RT partition is (right now we're just hardcoding the 1:1 pairing).
  • Loading branch information
ZacAttack authored Feb 12, 2025
1 parent bb092b9 commit 32c03ba
Show file tree
Hide file tree
Showing 16 changed files with 380 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
private Properties consumerProperties;
private SchemaReader schemaReader;
private String viewName;
private Boolean isBeforeImageView = false;

private String consumerName = "";

Expand Down Expand Up @@ -219,7 +220,17 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
.setRocksDBBlockCacheSizeInBytes(config.getRocksDBBlockCacheSizeInBytes())
.setConsumerName(config.consumerName)
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval())
.setShouldCompactMessages(config.shouldCompactMessages());
.setShouldCompactMessages(config.shouldCompactMessages())
.setIsBeforeImageView(config.isBeforeImageView());
return newConfig;
}

protected Boolean isBeforeImageView() {
return isBeforeImageView;
}

public ChangelogClientConfig setIsBeforeImageView(Boolean beforeImageView) {
isBeforeImageView = beforeImageView;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
Expand Down Expand Up @@ -93,7 +94,8 @@ public InternalLocalBootstrappingVeniceChangelogConsumer(
bootstrapStateMap = new VeniceConcurrentHashMap<>();
syncBytesInterval = changelogClientConfig.getDatabaseSyncBytesInterval();
metricsRepository = changelogClientConfig.getInnerClientConfig().getMetricsRepository();
String localStateTopicNameTemp = changelogClientConfig.getStoreName() + LOCAL_STATE_TOPIC_SUFFIX;
String viewNamePath = changelogClientConfig.getViewName() == null ? "" : "-" + changelogClientConfig.getViewName();
String localStateTopicNameTemp = changelogClientConfig.getStoreName() + viewNamePath + LOCAL_STATE_TOPIC_SUFFIX;
String bootstrapFileSystemPath = changelogClientConfig.getBootstrapFileSystemPath();
if (StringUtils.isNotEmpty(consumerId)) {
localStateTopicNameTemp += "-" + consumerId;
Expand Down Expand Up @@ -183,7 +185,8 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep
protected boolean handleVersionSwapControlMessage(
ControlMessage controlMessage,
PubSubTopicPartition pubSubTopicPartition,
String topicSuffix) {
String topicSuffix,
Integer upstreamPartition) {
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
if (controlMessageType.equals(ControlMessageType.VERSION_SWAP)) {
VersionSwap versionSwap = (VersionSwap) controlMessage.controlMessageUnion;
Expand Down Expand Up @@ -506,10 +509,9 @@ public CompletableFuture<Void> start(Set<Integer> partitions) {

storageService.start();
try {
storeRepository.start();
storeRepository.subscribe(storeName);
} catch (InterruptedException e) {
throw new RuntimeException(e);
throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e);
}

return seekWithBootStrap(partitions);
Expand All @@ -518,7 +520,13 @@ public CompletableFuture<Void> start(Set<Integer> partitions) {
@Override
public CompletableFuture<Void> start() {
Set<Integer> allPartitions = new HashSet<>();
for (int partition = 0; partition < partitionCount; partition++) {
try {
storeRepository.subscribe(storeName);
} catch (InterruptedException e) {
throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e);
}
Store store = storeRepository.getStore(storeName);
for (int partition = 0; partition < store.getVersion(store.getCurrentVersion()).getPartitionCount(); partition++) {
allPartitions.add(partition);
}
return this.start(allPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.alpini.base.concurrency.Executors;
import com.linkedin.alpini.base.concurrency.ScheduledExecutorService;
import com.linkedin.davinci.repository.NativeMetadataRepositoryViewAdapter;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
Expand Down Expand Up @@ -92,6 +93,11 @@ public CompletableFuture<Void> subscribe(Set<Integer> partitions) {
if (partitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
try {
storeRepository.subscribe(storeName);
} catch (InterruptedException e) {
throw new VeniceException("Failed to start bootstrapping changelog consumer with error:", e);
}
if (!versionSwapThreadScheduled.get()) {
// schedule the version swap thread and set up the callback listener
this.storeRepository.registerStoreDataChangedListener(versionSwapListener);
Expand Down Expand Up @@ -208,4 +214,10 @@ public void run() {
versionSwapListener.handleStoreChanged(null);
}
}

@Override
public void setStoreRepository(NativeMetadataRepositoryViewAdapter repository) {
super.setStoreRepository(repository);
versionSwapListener.setStoreRepository(repository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public <K, V> VeniceChangelogConsumer<K, V> getChangelogConsumer(String storeNam
String viewClass = getViewClass(newStoreChangelogClientConfig, storeName);
String consumerName = suffixConsumerIdToStore(storeName + "-" + viewClass.getClass().getSimpleName(), consumerId);
if (viewClass.equals(ChangeCaptureView.class.getCanonicalName())) {
// TODO: This is a little bit of a hack. This is to deal with the an issue where the before image change
// capture topic doesn't follow the same naming convention as view topics.
newStoreChangelogClientConfig.setIsBeforeImageView(true);
return new VeniceChangelogConsumerImpl(
newStoreChangelogClientConfig,
consumer != null
Expand Down
Loading

0 comments on commit 32c03ba

Please sign in to comment.