Skip to content

Commit 7c582e6

Browse files
authored
[controller] Delete backup version before start of new push in target regions (#1049)
When target region push is enabled, after pushing to the target region, rest of the regions do not delete the backup version before the start of the push. This increases the disk usage during the duration of the push to 3x of the store size (backup + current + future versions). This PR deletes the backup version before starting ingestion in non-target regions.
1 parent c81f413 commit 7c582e6

File tree

2 files changed

+50
-13
lines changed

2 files changed

+50
-13
lines changed

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestPushJobWithNativeReplication.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
7676
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
7777
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
78+
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
7879
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
7980
import com.linkedin.venice.meta.DataReplicationPolicy;
8081
import com.linkedin.venice.meta.HybridStoreConfig;
@@ -142,7 +143,6 @@
142143
public class TestPushJobWithNativeReplication {
143144
private static final Logger LOGGER = LogManager.getLogger(TestPushJobWithNativeReplication.class);
144145
private static final int TEST_TIMEOUT = 2 * Time.MS_PER_MINUTE;
145-
146146
private static final int NUMBER_OF_CHILD_DATACENTERS = 2;
147147
private static final int NUMBER_OF_CLUSTERS = 1;
148148
private static final String[] CLUSTER_NAMES =
@@ -161,6 +161,7 @@ public class TestPushJobWithNativeReplication {
161161

162162
private PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
163163
private D2Client d2Client;
164+
private VeniceServerWrapper serverWrapper;
164165

165166
@DataProvider(name = "storeSize")
166167
public static Object[][] storeSize() {
@@ -179,7 +180,6 @@ public void setUp() {
179180
serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false");
180181
serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "true");
181182
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
182-
183183
Properties controllerProps = new Properties();
184184
// This property is required for test stores that have 10 partitions
185185
controllerProps.put(DEFAULT_MAX_NUMBER_OF_PARTITIONS, 10);
@@ -209,7 +209,7 @@ public void setUp() {
209209
.setZkStartupTimeout(3, TimeUnit.SECONDS)
210210
.build();
211211
D2ClientUtils.startClient(d2Client);
212-
212+
serverWrapper = clusterWrapper.getVeniceServers().get(0);
213213
}
214214

215215
@AfterClass(alwaysRun = true)
@@ -957,19 +957,47 @@ public void testTargetedRegionPushJobFullConsumptionForBatchStore() throws Excep
957957
}
958958
});
959959
}
960+
String dataDBPathV1 = serverWrapper.getDataDirectory() + "/rocksdb/" + storeName + "_v1";
961+
long storeSize = FileUtils.sizeOfDirectory(new File(dataDBPathV1));
962+
try (VenicePushJob job = new VenicePushJob("Test push job 2", props)) {
963+
job.run();
964+
965+
TestUtils.waitForNonDeterministicAssertion(45, TimeUnit.SECONDS, () -> {
966+
for (int version: parentControllerClient.getStore(storeName)
967+
.getStore()
968+
.getColoToCurrentVersions()
969+
.values()) {
970+
Assert.assertEquals(version, 2);
971+
}
972+
});
973+
}
960974
props.put(TARGETED_REGION_PUSH_ENABLED, true);
961-
// props.put(POST_VALIDATION_CONSUMPTION_ENABLED, true);
962975
TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir, 20);
963-
try (VenicePushJob job = new VenicePushJob("Test push job 2", props)) {
976+
try (VenicePushJob job = new VenicePushJob("Test push job 3", props)) {
964977
job.run(); // the job should succeed
965-
978+
File directory = new File(serverWrapper.getDataDirectory() + "/rocksdb/");
979+
File[] storeDBDirs = directory.listFiles(File::isDirectory);
980+
long totalStoreSize = 0;
981+
if (storeDBDirs != null) {
982+
for (File storeDB: storeDBDirs) {
983+
if (storeDB.getName().startsWith(storeName)) {
984+
long size = FileUtils
985+
.sizeOfDirectory(new File(serverWrapper.getDataDirectory() + "/rocksdb/" + storeDB.getName()));
986+
;
987+
totalStoreSize += size;
988+
}
989+
}
990+
Assert.assertTrue(
991+
storeSize * 2 >= totalStoreSize,
992+
"2x of store size " + storeSize + " is more than total " + totalStoreSize);
993+
}
966994
TestUtils.waitForNonDeterministicAssertion(45, TimeUnit.SECONDS, () -> {
967995
// Current version should become 2
968996
for (int version: parentControllerClient.getStore(storeName)
969997
.getStore()
970998
.getColoToCurrentVersions()
971999
.values()) {
972-
Assert.assertEquals(version, 2);
1000+
Assert.assertEquals(version, 3);
9731001
}
9741002
// should be able to read all 20 records.
9751003
validateDaVinciClient(storeName, 20);

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2206,17 +2206,26 @@ public void createHelixResourceAndStartMonitoring(String clusterName, String sto
22062206
if (store == null) {
22072207
throwStoreDoesNotExist(clusterName, storeName);
22082208
} else {
2209-
helixAdminClient.createVeniceStorageClusterResources(
2210-
clusterName,
2211-
version.kafkaTopicName(),
2212-
version.getPartitionCount(),
2213-
store.getReplicationFactor());
22142209
startMonitorOfflinePush(
22152210
clusterName,
22162211
version.kafkaTopicName(),
22172212
version.getPartitionCount(),
22182213
store.getReplicationFactor(),
22192214
store.getOffLinePushStrategy());
2215+
helixAdminClient.createVeniceStorageClusterResources(
2216+
clusterName,
2217+
version.kafkaTopicName(),
2218+
version.getPartitionCount(),
2219+
store.getReplicationFactor());
2220+
try {
2221+
retireOldStoreVersions(clusterName, storeName, true, store.getCurrentVersion());
2222+
} catch (Throwable t) {
2223+
LOGGER.error(
2224+
"Failed to delete previous backup version while data recovery to store {} in cluster {}",
2225+
storeName,
2226+
clusterName,
2227+
t);
2228+
}
22202229
}
22212230
}
22222231

@@ -3395,7 +3404,7 @@ public void retireOldStoreVersions(
33953404
return;
33963405
}
33973406

3398-
if (store.getBackupStrategy() == BackupStrategy.DELETE_ON_NEW_PUSH_START) {
3407+
if (deleteBackupOnStartPush) {
33993408
LOGGER.info("Deleting backup versions as the new push started for upcoming version for store: {}", storeName);
34003409
} else {
34013410
LOGGER.info("Retiring old versions after successful push for store: {}", storeName);

0 commit comments

Comments
 (0)