Skip to content

Commit

Permalink
Refactor - init all store paths together and do not mutate the storeD…
Browse files Browse the repository at this point in the history
…irPaths. Added test
  • Loading branch information
shekhars-li committed Apr 4, 2024
1 parent 711a02a commit 0dab28b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -179,12 +180,16 @@ public ContainerStorageManager(
loggedStoreBaseDirectory);
}

this.storeDirectoryPaths = new HashSet<>();
// Note: The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage
// of the store directories. The stores itself does not need to be created but the store directory paths need to be
// set to be able to monitor them, once they're created and in use.
this.storeDirectoryPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);

this.inMemoryStores = ContainerStorageManagerUtil.createInMemoryStores(
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames,
storeDirectoryPaths, containerModel, jobContext, containerContext,
taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
activeTaskChangelogSystemStreams, storageEngineFactories, sideInputStoreNames, containerModel, jobContext,
containerContext, taskInstanceMetrics, taskInstanceCollectors, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

// Refactor Note (prateekm): in previous version, there's a subtle difference between 'this.changelogSystemStreams'
Expand All @@ -193,21 +198,6 @@ public ContainerStorageManager(
this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);

// The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage of the store
// directories. The stores itself does not need to be created but the store directory paths need to be set to be
// able to monitor them, once they're created and in use.
Set<String> storesToCreate =
ContainerStorageManagerUtil.getNonSideInputNonInMemoryStores(storageEngineFactories, sideInputStoreNames, config);
for (String storeName : storesToCreate) {
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
File storeDirPath =
ContainerStorageManagerUtil.getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
sideInputStoreNames, task.getKey(), task.getValue(), storageManagerUtil, loggedStoreBaseDirectory,
nonLoggedStoreBaseDirectory);
storeDirectoryPaths.add(storeDirPath.toPath());
}
}

JobConfig jobConfig = new JobConfig(config);
int restoreThreadPoolSize =
Math.min(
Expand Down Expand Up @@ -242,8 +232,7 @@ public Map<TaskName, Checkpoint> start() throws SamzaException, InterruptedExcep
this.sideInputsManager = new SideInputsManager(
sideInputSystemStreams, systemFactories,
changelogSystemStreams, activeTaskChangelogSystemStreams,
storageEngineFactories, storeDirectoryPaths,
containerModel, jobContext, containerContext,
storageEngineFactories, containerModel, jobContext, containerContext,
samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors,
streamMetadataCache, systemAdmins, serdeManager, serdes, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
Expand Down Expand Up @@ -355,8 +344,7 @@ private Map<TaskName, Checkpoint> restoreStores() throws InterruptedException {
.filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
this.taskStores = ContainerStorageManagerUtil.createTaskStores(
storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
this.containerModel, this.jobContext, this.containerContext,
this.activeTaskChangelogSystemStreams, this.containerModel, this.jobContext, this.containerContext,
this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, this.storageManagerUtil,
this.loggedStoreBaseDirectory, this.nonLoggedStoreBaseDirectory, this.config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
Map<String, Serde<Object>> serdes,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Expand All @@ -91,7 +90,6 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
for (String storeName : storesToCreate) {
File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
storeDirectoryPaths.add(storeDirectory.toPath());

// if taskInstanceMetrics are specified use those for store metrics,
// otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
Expand Down Expand Up @@ -156,7 +154,6 @@ public static Map<TaskName, Map<String, StorageEngine>> createInMemoryStores(
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
Expand All @@ -167,8 +164,7 @@ public static Map<TaskName, Map<String, StorageEngine>> createInMemoryStores(
Set<String> inMemoryStoreNames = getInMemoryStoreNames(storageEngineFactories, config);
return ContainerStorageManagerUtil.createTaskStores(
inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
containerModel, jobContext, containerContext, serdes,
activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
}
Expand Down Expand Up @@ -417,6 +413,28 @@ public static Set<String> getNonSideInputNonInMemoryStores(Map<String, StorageEn
return storeNames;
}

public static Set<Path> getStoreDirPaths(Config config, Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, SystemStream> activeTaskChangelogSystemStreams, Set<String> sideInputStoreNames,
ContainerModel containerModel, StorageManagerUtil storageManagerUtil, File loggedStoreBaseDirectory,
File nonLoggedStoreBaseDirectory) {
Set<Path> storeDirectoryPaths = new HashSet<>();
StorageConfig storageConfig = new StorageConfig(config);
Set<String> storeNames = new HashSet<>();
// Add all side input and regular stores
storeNames.addAll(storageConfig.getStoreNames());
// Add all in-memory store names
storeNames.addAll(getInMemoryStoreNames(storageEngineFactories, config));

for (String storeName : storeNames) {
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
File storeDirPath =
getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams, sideInputStoreNames, task.getKey(),
task.getValue(), storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
storeDirectoryPaths.add(storeDirPath.toPath());
}
}
return storeDirectoryPaths;
}
public static File getStoreDirPath(String storeName, Config config, Map<String, SystemStream> activeTaskChangelogSystemStreams,
Set<String> sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil,
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
Map<String, SystemStream> changelogSystemStreams,
Map<String, SystemStream> activeTaskChangelogSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<Path> storeDirectoryPaths,
ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
SamzaContainerMetrics samzaContainerMetrics,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
Expand Down Expand Up @@ -147,8 +146,7 @@ public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
// create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(
sideInputStoreNames, storageEngineFactories, sideInputStoreNames,
activeTaskChangelogSystemStreams, storeDirectoryPaths,
containerModel, jobContext, containerContext, serdes,
activeTaskChangelogSystemStreams, containerModel, jobContext, containerContext, serdes,
taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -731,6 +732,39 @@ public void testRestoreRecoversFromDeletedException() throws Exception {
inOrder.verify(blobStoreManager).close(); // close called on blobStoreManager passed to taskRestoreManager
}

@Test
public void testStoreDirectoriesInitialized() {
String sideInputStore = "sideInputStore";
String inMemoryStore = "inMemoryStore";
String regularStore = "regularStore";
Map<String, String> storeFactories = new HashMap<>();
storeFactories.put(String.format("stores.%s.side.inputs.processor.factory", sideInputStore), "sideinputfactory");
storeFactories.put(String.format("stores.%s.factory", regularStore), "regularstorefactory");
storeFactories.put(String.format("stores.%s.factory", inMemoryStore),
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory");
Map<String, String> configMap = new HashMap<>(storeFactories);
Config config = new MapConfig(configMap);
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
storageEngineFactories.put(sideInputStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
storageEngineFactories.put(inMemoryStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));
storageEngineFactories.put(regularStore, (StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class));

Map<String, SystemStream> activeTaskChangelogSystemStreams = new HashMap<>();
activeTaskChangelogSystemStreams.put(regularStore, new SystemStream("kafka", "changelog"));
Set<String> sideInputStoreNames = new HashSet<>();
sideInputStoreNames.add(sideInputStore);
ContainerModel containerModel = mock(ContainerModel.class);
when(containerModel.getTasks())
.thenReturn(ImmutableMap.of(new TaskName("task"),
new TaskModel(new TaskName("task"), Collections.emptySet(), new Partition(1))));

Set<Path> storeDirPaths = ContainerStorageManagerUtil.getStoreDirPaths(config, storageEngineFactories,
activeTaskChangelogSystemStreams, sideInputStoreNames, containerModel, new StorageManagerUtil(),
new File("/tmp"), new File("/tmp2"));

assertEquals(3, storeDirPaths.size());
}

@Test
public void getActiveTaskChangelogSystemStreams() {
Map<String, SystemStream> storeToChangelogSystemStreams =
Expand Down

0 comments on commit 0dab28b

Please sign in to comment.