Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create store directory paths in CSM constructor for disk space monitor #1697

Merged
merged 6 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeManager;
import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
Expand Down Expand Up @@ -192,6 +193,21 @@ public ContainerStorageManager(
this.storeConsumers = ContainerStorageManagerUtil.createStoreChangelogConsumers(
activeTaskChangelogSystemStreams, systemFactories, samzaContainerMetrics.registry(), config);

// The store directory paths are used by SamzaContainer to add a metric to watch the disk space usage of the store
// directories. The stores itself does not need to be created but the store directory paths need to be set to be
// able to monitor them, once they're created and in use.
Set<String> storesToCreate =
ContainerStorageManagerUtil.getNonSideInputNonInMemoryStores(storageEngineFactories, sideInputStoreNames, config);
for (String storeName : storesToCreate) {
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
File storeDirPath =
ContainerStorageManagerUtil.getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
sideInputStoreNames, task.getKey(), task.getValue(), storageManagerUtil, loggedStoreBaseDirectory,
nonLoggedStoreBaseDirectory);
storeDirectoryPaths.add(storeDirPath.toPath());
shekhars-li marked this conversation as resolved.
Show resolved Hide resolved
}
}

JobConfig jobConfig = new JobConfig(config);
int restoreThreadPoolSize =
Math.min(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
Config config) {
Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
StorageConfig storageConfig = new StorageConfig(config);

// iterate over each task and each storeName
for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
Expand All @@ -90,15 +89,8 @@ public static Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<Str
}

for (String storeName : storesToCreate) {
List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
// A store is considered durable if it is backed by a changelog or another backupManager factory
boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
boolean isSideInput = sideInputStoreNames.contains(storeName);
// Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
taskModel.getTaskMode());
File storeDirectory = getStoreDirPath(storeName, config, activeTaskChangelogSystemStreams,
sideInputStoreNames, taskName, taskModel, storageManagerUtil, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
storeDirectoryPaths.add(storeDirectory.toPath());

// if taskInstanceMetrics are specified use those for store metrics,
Expand Down Expand Up @@ -412,4 +404,32 @@ public static Set<String> getSideInputStoreNames(
}
return sideInputStores;
}

public static Set<String> getNonSideInputNonInMemoryStores(Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Set<String> sideInputStoreNames, Config config) {
Set<String> inMemoryStoreNames =
ContainerStorageManagerUtil.getInMemoryStoreNames(storageEngineFactories, config);
Set<String> nonSideInputStoreNames =
storageEngineFactories.keySet().stream().filter(storeName -> !sideInputStoreNames.contains(storeName))
.collect(Collectors.toSet());
Set<String> storeNames = nonSideInputStoreNames.stream()
.filter(s -> !inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
return storeNames;
}

public static File getStoreDirPath(String storeName, Config config, Map<String, SystemStream> activeTaskChangelogSystemStreams,
Set<String> sideInputStoreNames, TaskName taskName, TaskModel taskModel, StorageManagerUtil storageManagerUtil,
File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory) {
StorageConfig storageConfig = new StorageConfig(config);
List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
// A store is considered durable if it is backed by a changelog or another backupManager factory
boolean isDurable = activeTaskChangelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
boolean isSideInput = sideInputStoreNames.contains(storeName);
// Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
// for non logged stores
File storeBaseDir = isDurable || isSideInput ? loggedStoreBaseDirectory : nonLoggedStoreBaseDirectory;
File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
taskModel.getTaskMode());
return storeDirectory;
}
}