Skip to content

Commit cc16c1e

Browse files
committed
Clear local logged stores if input checkpoints are empty
1 parent 63c86b5 commit cc16c1e

File tree

2 files changed

+65
-5
lines changed

2 files changed

+65
-5
lines changed

samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.samza.system.SystemStream;
5555
import org.apache.samza.task.TaskInstanceCollector;
5656
import org.apache.samza.util.Clock;
57+
import org.apache.samza.util.FileUtil;
5758
import org.slf4j.Logger;
5859
import org.slf4j.LoggerFactory;
5960

@@ -274,7 +275,11 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
274275
taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
275276
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", taskCheckpoint, taskName);
276277
}
277-
taskCheckpoints.put(taskName, taskCheckpoint);
278+
279+
// Only insert non-null checkpoints
280+
if (taskCheckpoint != null) {
281+
taskCheckpoints.put(taskName, taskCheckpoint);
282+
}
278283

279284
Map<String, Set<String>> backendFactoryToStoreNames =
280285
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
@@ -308,6 +313,15 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
308313
taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
309314
});
310315

316+
// if we have received no input checkpoints, it can only be due to two reasons:
317+
// a) Samza job is new, so it has no previous checkpoints.
318+
// b) The checkpoints were cleared.
319+
// We should be able to safely clear local logged stores in either case
320+
if (taskCheckpoints.isEmpty()) {
321+
LOG.info("No checkpoints read. Attempting to clear logged stores.");
322+
clearLoggedStores(loggedStoreBaseDirectory);
323+
}
324+
311325
// Init all taskRestores and if successful, restores all the task stores concurrently
312326
LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
313327
CompletableFuture<Map<TaskName, Checkpoint>> initRestoreAndNewCheckpointFuture =
@@ -357,6 +371,19 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
357371
return taskCheckpoints;
358372
}
359373

374+
private static void clearLoggedStores(File loggedStoreBaseDir) {
375+
final FileUtil fileUtil = new FileUtil();
376+
final File[] storeDirs = loggedStoreBaseDir.listFiles();
377+
if (storeDirs == null || storeDirs.length == 0) {
378+
LOG.info("No stores to delete");
379+
return;
380+
}
381+
for (File storeDir: storeDirs) {
382+
LOG.info("Clearing store dir {} from logged stores.", storeDir);
383+
fileUtil.rm(storeDir);
384+
}
385+
}
386+
360387
/**
361388
* Get the {@link StorageEngine} instance with a given name for a given task.
362389
* @param taskName the task name for which the storage engine is desired.

samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.common.collect.ImmutableSet;
2424
import java.io.ByteArrayOutputStream;
2525
import java.io.File;
26+
import java.io.IOException;
2627
import java.nio.file.Path;
2728
import java.util.ArrayList;
2829
import java.util.Arrays;
@@ -102,11 +103,10 @@
102103
@RunWith(PowerMockRunner.class)
103104
@PrepareForTest({ReflectionUtil.class, ContainerStorageManagerRestoreUtil.class})
104105
public class TestContainerStorageManager {
105-
106106
private static final String STORE_NAME = "store";
107107
private static final String SYSTEM_NAME = "kafka";
108108
private static final String STREAM_NAME = "store-stream";
109-
private static final File DEFAULT_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "store");
109+
private static final File DEFAULT_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "store");
110110
private static final File
111111
DEFAULT_LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore");
112112

@@ -116,6 +116,7 @@ public class TestContainerStorageManager {
116116
private SamzaContainerMetrics samzaContainerMetrics;
117117
private Map<TaskName, TaskModel> tasks;
118118
private StandbyTestContext testContext;
119+
private CheckpointManager checkpointManager;
119120

120121
private volatile int systemConsumerCreationCount;
121122
private volatile int systemConsumerStartCount;
@@ -143,7 +144,7 @@ private void addMockedTask(String taskname, int changelogPartition) {
143144
* Method to create a containerStorageManager with mocked dependencies
144145
*/
145146
@Before
146-
public void setUp() throws InterruptedException {
147+
public void setUp() throws InterruptedException, IOException {
147148
taskRestoreMetricGauges = new HashMap<>();
148149
this.tasks = new HashMap<>();
149150
this.taskInstanceMetrics = new HashMap<>();
@@ -248,7 +249,7 @@ public Void answer(InvocationOnMock invocation) {
248249
.thenReturn(
249250
new scala.collection.immutable.Map.Map1(new SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata));
250251

251-
CheckpointManager checkpointManager = mock(CheckpointManager.class);
252+
this.checkpointManager = mock(CheckpointManager.class);
252253
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>()));
253254

254255
SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class);
@@ -320,6 +321,38 @@ public void testParallelismAndMetrics() throws InterruptedException {
320321
Assert.assertEquals("systemConsumerStartCount count should be 1", 1, this.systemConsumerStartCount);
321322
}
322323

324+
/**
325+
* This test will attempt to verify if logged stores are deleted if the input checkpoints are empty.
326+
* */
327+
@Test
328+
@SuppressWarnings("ResultOfMethodCallIgnored")
329+
public void testDeleteLoggedStoreOnNoCheckpoints() {
330+
// reset the mock to reset the stubs in setup method
331+
reset(this.checkpointManager);
332+
// redo stubbing to return null checkpoints
333+
when(this.checkpointManager.readLastCheckpoint(any())).thenReturn(null);
334+
// create store under logged stores to demonstrate deletion
335+
final File storeFile = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME);
336+
// add contents to store
337+
final File storeFilePartition = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME + File.separator + "Partition_0");
338+
storeFile.deleteOnExit();
339+
try {
340+
storeFile.mkdirs();
341+
storeFilePartition.createNewFile();
342+
Assert.assertTrue("Assert that stores are present prior to the test.", storeFile.exists());
343+
Assert.assertTrue("Assert that store files are present prior to the test.", storeFilePartition.exists());
344+
this.containerStorageManager.start();
345+
this.containerStorageManager.shutdown();
346+
Assert.assertFalse("Assert that stores are deleted after the test.", storeFile.exists());
347+
Assert.assertFalse("Assert that store files are deleted after the test.", storeFile.exists());
348+
} catch (Exception e) {
349+
System.out.printf("File %s could not be created.", storeFile);
350+
Assert.fail();
351+
} finally {
352+
storeFile.delete();
353+
}
354+
}
355+
323356
@Test
324357
public void testNoConfiguredDurableStores() throws InterruptedException {
325358
taskRestoreMetricGauges = new HashMap<>();

0 commit comments

Comments
 (0)