Skip to content

Commit fecf7f2

Browse files
sixpluszeroclaude
andcommitted
[da-vinci] Remove deprecated Ingestion Isolation feature
Remove the Ingestion Isolation feature which enabled running ingestion in a separate forked JVM process communicating via HTTP/Netty IPC. This feature is no longer needed, and removing it reduces codebase complexity and maintenance burden. - Delete ~30 production and test files: IsolatedIngestionBackend, IsolatedIngestionServer, MainIngestionMonitorService, and related classes in ingestion/isolated/, ingestion/main/, and utils/ - Delete IngestionMode and IngestionAction enums - Remove all SERVER_INGESTION_ISOLATION_* config keys from ConfigKeys - Simplify DaVinciBackend to always use DefaultIngestionBackend and StorageEngineMetadataService - Remove isIsolatedIngestion parameter from StoreIngestionTask, KafkaStoreIngestionService, BlobTransferUtils, and VeniceMetadataRepositoryBuilder - Remove ingestion isolation guards from DefaultIngestionBackend, AvroGenericDaVinciClient, and VersionBackend - Clean up SpotBugs exclusions, server properties, and build configs - Update integration tests to remove isolation-specific parameterization Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 16a81ce commit fecf7f2

File tree

81 files changed

+98
-6377
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+98
-6377
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,6 @@ ext.createDiffFile = { ->
755755
// Keep this sorted
756756
// da-vinci-client
757757
':!clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java',
758-
':!clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.java',
759758
':!clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatStatReporter.java',
760759

761760
// venice-client

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 11 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,18 @@
2020
import com.linkedin.davinci.config.VeniceServerConfig;
2121
import com.linkedin.davinci.ingestion.DefaultIngestionBackend;
2222
import com.linkedin.davinci.ingestion.IngestionBackend;
23-
import com.linkedin.davinci.ingestion.IsolatedIngestionBackend;
24-
import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService;
25-
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
2623
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
2724
import com.linkedin.davinci.notifier.VeniceNotifier;
2825
import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder;
2926
import com.linkedin.davinci.stats.AggBlobTransferStats;
3027
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
3128
import com.linkedin.davinci.stats.AggVersionedStorageEngineStats;
3229
import com.linkedin.davinci.stats.HeartbeatMonitoringServiceStats;
33-
import com.linkedin.davinci.stats.MetadataUpdateStats;
3430
import com.linkedin.davinci.stats.RocksDBMemoryStats;
3531
import com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatMonitoringService;
3632
import com.linkedin.davinci.storage.StorageEngineMetadataService;
3733
import com.linkedin.davinci.storage.StorageMetadataService;
3834
import com.linkedin.davinci.storage.StorageService;
39-
import com.linkedin.davinci.store.StorageEngine;
4035
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
4136
import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig;
4237
import com.linkedin.venice.annotation.VisibleForTesting;
@@ -51,7 +46,6 @@
5146
import com.linkedin.venice.kafka.protocol.state.PartitionState;
5247
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
5348
import com.linkedin.venice.meta.ClusterInfoProvider;
54-
import com.linkedin.venice.meta.IngestionMode;
5549
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
5650
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
5751
import com.linkedin.venice.meta.Store;
@@ -156,7 +150,7 @@ public DaVinciBackend(
156150
metricsRepository = Optional.ofNullable(clientConfig.getMetricsRepository())
157151
.orElse(TehutiUtils.getMetricsRepository(DAVINCI_CLIENT.getName()));
158152
VeniceMetadataRepositoryBuilder veniceMetadataRepositoryBuilder =
159-
new VeniceMetadataRepositoryBuilder(configLoader, clientConfig, metricsRepository, icProvider, false);
153+
new VeniceMetadataRepositoryBuilder(configLoader, clientConfig, metricsRepository, icProvider);
160154

161155
ClusterInfoProvider clusterInfoProvider = veniceMetadataRepositoryBuilder.getClusterInfoProvider();
162156
ReadOnlyStoreRepository readOnlyStoreRepository = veniceMetadataRepositoryBuilder.getStoreRepo();
@@ -195,35 +189,17 @@ public DaVinciBackend(
195189
backendConfig.getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled())
196190
: null;
197191

198-
// Add extra safeguards here to ensure we have released RocksDB database locks before we initialize storage
199-
// services.
200-
IsolatedIngestionUtils.destroyLingeringIsolatedIngestionProcess(configLoader);
201192
/**
202193
* The constructor of {@link #storageService} will take care of unused store/store version cleanup.
203-
*
204-
* When Ingestion Isolation is enabled, we don't want to restore data partitions here:
205-
* 1. It is a waste of effort since all the opened partitioned will be closed right after.
206-
* 2. When DaVinci memory limiter is enabled, currently, SSTFileManager doesn't clean up the entries belonging
207-
* to the closed database, which means when the Isolated process hands the database back to the main process,
208-
* some removed SST files (some SST files can be removed in Isolated Process because of log compaction) will
209-
* remain in SSTFileManager tracked file list.
210-
* 3. We still want to open metadata partition, otherwise {@link StorageService} won't scan the local db folder.
211-
* Also opening metadata partition in main process won't cause much side effect from DaVinci memory limiter's
212-
* POV, since metadata partition won't be handed back to main process in the future.
213-
* 4. When Ingestion Isolation is enabled with suppressing live update feature, main process needs to open all the
214-
* data partitions since Isolated Process won't re-ingest the existing partitions.
215194
*/
216-
boolean whetherToRestoreDataPartitions = !isIsolatedIngestion()
217-
|| configLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists();
218-
LOGGER.info("DaVinci {} restore data partitions.", whetherToRestoreDataPartitions ? "will" : "won't");
219195
storageService = new StorageService(
220196
configLoader,
221197
aggVersionedStorageEngineStats,
222198
rocksDBMemoryStats,
223199
storeVersionStateSerializer,
224200
partitionStateSerializer,
225201
storeRepository,
226-
whetherToRestoreDataPartitions,
202+
true,
227203
true,
228204
functionToCheckWhetherStorageEngineShouldBeKeptOrNot(managedClients));
229205
storageService.start();
@@ -245,14 +221,8 @@ public DaVinciBackend(
245221
LOGGER.info("Successfully verified the latest protocols at runtime are valid in Venice backend.");
246222
}
247223

248-
storageMetadataService = backendConfig.getIngestionMode().equals(IngestionMode.ISOLATED)
249-
? new MainIngestionStorageMetadataService(
250-
backendConfig.getIngestionServicePort(),
251-
partitionStateSerializer,
252-
new MetadataUpdateStats(metricsRepository),
253-
configLoader,
254-
storageService.getStoreVersionStateSyncer())
255-
: new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
224+
storageMetadataService =
225+
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
256226
// Start storage metadata service
257227
((AbstractVeniceService) storageMetadataService).start();
258228
compressorFactory = new StorageEngineBackedCompressorFactory(storageMetadataService);
@@ -282,7 +252,6 @@ public DaVinciBackend(
282252
partitionStateSerializer,
283253
Optional.empty(),
284254
null,
285-
false,
286255
compressorFactory,
287256
cacheBackend,
288257
true,
@@ -315,17 +284,7 @@ public DaVinciBackend(
315284

316285
ingestionService.start();
317286

318-
if (isIsolatedIngestion() && cacheConfig.isPresent()) {
319-
// TODO: There are 'some' cases where this mix might be ok, (like a batch only store, or with certain TTL
320-
// settings),
321-
// could add further validation. If the process isn't ingesting data, then it can't maintain the object cache
322-
// with
323-
// a correct view of the data.
324-
throw new IllegalArgumentException(
325-
"Ingestion isolated and Cache are incompatible configs!! Aborting start up!");
326-
}
327-
328-
if (BlobTransferUtils.isBlobTransferManagerEnabled(backendConfig, isIsolatedIngestion())) {
287+
if (BlobTransferUtils.isBlobTransferManagerEnabled(backendConfig)) {
329288
aggVersionedBlobTransferStats =
330289
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());
331290
aggBlobTransferStats =
@@ -441,41 +400,12 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep
441400

442401
@VisibleForTesting
443402
final synchronized void bootstrap() {
444-
/**
445-
* In order to make bootstrap logic compatible with ingestion isolation, we first scan all local storage engines,
446-
* record all store versions that are up-to-date and close all storage engines. This will make sure child process
447-
* can open RocksDB stores.
448-
*/
449-
if (isIsolatedIngestion()) {
450-
if (configLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists()) {
451-
/**
452-
* In this case we will only need to close metadata partition, as it is supposed to be opened and managed by
453-
* forked ingestion process via following subscribe call.
454-
*/
455-
for (StorageEngine storageEngine: getStorageService().getStorageEngineRepository()
456-
.getAllLocalStorageEngines()) {
457-
storageEngine.closeMetadataPartition();
458-
}
459-
} else {
460-
getStorageService().closeAllStorageEngines();
461-
}
462-
}
463-
464-
ingestionBackend = isIsolatedIngestion()
465-
? new IsolatedIngestionBackend(
466-
configLoader,
467-
metricsRepository,
468-
storageMetadataService,
469-
ingestionService,
470-
getStorageService(),
471-
blobTransferManager,
472-
this::getVeniceCurrentVersionNumber)
473-
: new DefaultIngestionBackend(
474-
storageMetadataService,
475-
ingestionService,
476-
getStorageService(),
477-
blobTransferManager,
478-
configLoader.getVeniceServerConfig());
403+
ingestionBackend = new DefaultIngestionBackend(
404+
storageMetadataService,
405+
ingestionService,
406+
getStorageService(),
407+
blobTransferManager,
408+
configLoader.getVeniceServerConfig());
479409
ingestionBackend.addIngestionNotifier(ingestionListener);
480410
}
481411

@@ -636,10 +566,6 @@ protected void deleteStore(String storeName) {
636566
}
637567
}
638568

639-
public final boolean isIsolatedIngestion() {
640-
return configLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED);
641-
}
642-
643569
// Move the logic to this protected method to make it visible for unit test.
644570
protected void handleStoreChanged(StoreBackend storeBackend) {
645571
// Skip version swaps for version-specific stores

clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import com.linkedin.venice.compute.ComputeRequestWrapper;
1818
import com.linkedin.venice.compute.ComputeUtils;
1919
import com.linkedin.venice.exceptions.VeniceException;
20-
import com.linkedin.venice.meta.IngestionMode;
2120
import com.linkedin.venice.meta.Store;
2221
import com.linkedin.venice.meta.Version;
2322
import com.linkedin.venice.partitioner.VenicePartitioner;
@@ -93,14 +92,6 @@ public class VersionBackend {
9392
this.config = backend.getConfigLoader().getStoreConfig(version.kafkaTopicName());
9493
this.batchReportEOIPStatusEnabled = config.getBatchReportEOIPEnabled();
9594

96-
if (this.config.getIngestionMode().equals(IngestionMode.ISOLATED)) {
97-
/*
98-
* Explicitly disable the store restore since we don't want to open other partitions that should be controlled by
99-
* child process. All the finished partitions will be closed by child process and reopened in parent process.
100-
*/
101-
this.config.setRestoreDataPartitions(false);
102-
this.config.setRestoreMetadataPartition(false);
103-
}
10495
this.partitioner = PartitionUtils.getUserPartitionLevelVenicePartitioner(version.getPartitionerConfig());
10596
this.suppressLiveUpdates = this.config.freezeIngestionIfReadyToServeOrLocalDataExists();
10697
this.storageEngine.set(backend.getStorageService().getStorageEngine(version.kafkaTopicName()));

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -218,20 +218,12 @@ private static boolean isBlobTransferAclValidationEnabled(VeniceConfigLoader con
218218
/**
219219
* A config check to determine if blob transfer manager is enabled
220220
* @param backendConfig the Venice server config
221-
* @param isIsolatedIngestionEnabled whether isolated ingestion is enabled
222221
* @return true if blob transfer manager is enabled, false otherwise
223222
*/
224-
public static boolean isBlobTransferManagerEnabled(
225-
VeniceServerConfig backendConfig,
226-
boolean isIsolatedIngestionEnabled) {
227-
// Blob transfer feature and isolated ingestion feature are mutually exclusive
223+
public static boolean isBlobTransferManagerEnabled(VeniceServerConfig backendConfig) {
228224
if (backendConfig.isBlobTransferManagerEnabled() && backendConfig.isBlobTransferSslEnabled()
229225
&& backendConfig.isBlobTransferAclEnabled()) {
230-
if (isIsolatedIngestionEnabled) {
231-
throw new VeniceException("Blob transfer manager is not supported with isolated ingestion");
232-
} else {
233-
return true;
234-
}
226+
return true;
235227
} else if (backendConfig.isBlobTransferManagerEnabled()) {
236228
throw new VeniceException("Blob transfer manager is not supported without SSL and ACL enabled");
237229
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -277,27 +277,18 @@ private Optional<Version> getVersion() {
277277
}
278278

279279
protected CompletableFuture<Void> seekToTail() {
280-
if (getBackend().isIsolatedIngestion()) {
281-
throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint");
282-
}
283280
throwIfNotReady();
284281
addPartitionsToSubscription(ComplementSet.universalSet());
285282
return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion());
286283
}
287284

288285
protected CompletableFuture<Void> seekToTail(Set<Integer> partitionSet) {
289-
if (getBackend().isIsolatedIngestion()) {
290-
throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint");
291-
}
292286
throwIfNotReady();
293287
addPartitionsToSubscription(ComplementSet.wrap(partitionSet));
294288
return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion());
295289
}
296290

297291
protected CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> checkpoints) {
298-
if (getBackend().isIsolatedIngestion()) {
299-
throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint");
300-
}
301292
throwIfNotReady();
302293
Map<Integer, PubSubPosition> positionMap = new HashMap<>();
303294
for (VeniceChangeCoordinate changeCoordinate: checkpoints) {
@@ -313,19 +304,13 @@ protected CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> c
313304
}
314305

315306
protected CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps) {
316-
if (getBackend().isIsolatedIngestion()) {
317-
throw new VeniceClientException("Isolated Ingestion is not supported with seekToTimestamps");
318-
}
319307
throwIfNotReady();
320308
addPartitionsToSubscription(ComplementSet.wrap(timestamps.keySet()));
321309
return getStoreBackend()
322310
.seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, timestamps, null, false), getVersion());
323311
}
324312

325313
protected CompletableFuture<Void> seekToTimestamps(Long timestamp) {
326-
if (getBackend().isIsolatedIngestion()) {
327-
throw new VeniceClientException("Isolated Ingestion is not supported with seekToTimestamps");
328-
}
329314
throwIfNotReady();
330315
addPartitionsToSubscription(ComplementSet.universalSet());
331316
return getStoreBackend()

clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,6 @@
131131
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_CHECKPOINT_DURING_GRACEFUL_SHUTDOWN_ENABLED;
132132
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS;
133133
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_INFO_LOG_LINE_LIMIT;
134-
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_APPLICATION_PORT;
135-
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT;
136-
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE;
137134
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_MAX_IDLE_COUNT;
138135
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_REUSABLE_OBJECTS_STRATEGY;
139136
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
@@ -249,7 +246,6 @@
249246
import com.linkedin.venice.authorization.DefaultIdentityParser;
250247
import com.linkedin.venice.exceptions.ConfigurationException;
251248
import com.linkedin.venice.exceptions.VeniceException;
252-
import com.linkedin.venice.meta.IngestionMode;
253249
import com.linkedin.venice.pubsub.PubSubClientsFactory;
254250
import com.linkedin.venice.throttle.VeniceRateLimiter;
255251
import com.linkedin.venice.utils.ConfigCommonUtils;
@@ -275,7 +271,7 @@
275271

276272

277273
/**
278-
* VeniceServerConfig maintains configs specific to Venice Server, Da Vinci client and Isolated Ingestion Service.
274+
* VeniceServerConfig maintains configs specific to Venice Server, Da Vinci client.
279275
*/
280276
public class VeniceServerConfig extends VeniceClusterConfig {
281277
private static final Logger LOGGER = LogManager.getLogger(VeniceServerConfig.class);
@@ -496,9 +492,6 @@ public class VeniceServerConfig extends VeniceClusterConfig {
496492
private final int consumerPoolSizePerKafkaCluster;
497493
private final boolean leakedResourceCleanupEnabled;
498494

499-
private final IngestionMode ingestionMode;
500-
private final int ingestionServicePort;
501-
private final int ingestionApplicationPort;
502495
private final boolean databaseChecksumVerificationEnabled;
503496
private final boolean rocksDbStorageEngineConfigCheckEnabled;
504497

@@ -923,10 +916,6 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
923916
}
924917
leakedResourceCleanupEnabled = serverProperties.getBoolean(SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED, true);
925918

926-
ingestionMode =
927-
IngestionMode.valueOf(serverProperties.getString(SERVER_INGESTION_MODE, IngestionMode.BUILT_IN.toString()));
928-
ingestionServicePort = serverProperties.getInt(SERVER_INGESTION_ISOLATION_SERVICE_PORT, 27015);
929-
ingestionApplicationPort = serverProperties.getInt(SERVER_INGESTION_ISOLATION_APPLICATION_PORT, 27016);
930919
databaseChecksumVerificationEnabled =
931920
serverProperties.getBoolean(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, false);
932921

@@ -1543,18 +1532,6 @@ public boolean isLeakedResourceCleanupEnabled() {
15431532
return leakedResourceCleanupEnabled;
15441533
}
15451534

1546-
public IngestionMode getIngestionMode() {
1547-
return ingestionMode;
1548-
}
1549-
1550-
public int getIngestionServicePort() {
1551-
return ingestionServicePort;
1552-
}
1553-
1554-
public int getIngestionApplicationPort() {
1555-
return ingestionApplicationPort;
1556-
}
1557-
15581535
public boolean isDatabaseChecksumVerificationEnabled() {
15591536
return databaseChecksumVerificationEnabled;
15601537
}

0 commit comments

Comments
 (0)