Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add export command for data loader CLI #2490

Draft
wants to merge 36 commits into
base: feat/data-loader/cli-import
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
497b375
Merge branch 'feat/data-loader/export-tasks' into feat/data-loader/cl…
inv-jishnu Jan 28, 2025
a43a8c6
Initial commit
inv-jishnu Jan 28, 2025
e33cc77
Merge branch 'feat/data-loader/cli-import' into feat/data-loader/cli-…
inv-jishnu Jan 28, 2025
0155c59
Changes
inv-jishnu Jan 28, 2025
1439523
Changes
inv-jishnu Jan 29, 2025
2a96abc
Changes
inv-jishnu Jan 29, 2025
3196b1d
change
inv-jishnu Jan 30, 2025
c7a491c
Merged changes from feat/data-loader/cli-import
inv-jishnu Feb 4, 2025
dd5e0b9
Merge branch 'feat/data-loader/cli-import' into feat/data-loader/cli-…
inv-jishnu Feb 4, 2025
53b1fe8
Merged export tasks branch after resolving conflicts
inv-jishnu Feb 4, 2025
ba9fa8f
Removed scalardb manager class as was removed earlier [skip ci]
inv-jishnu Feb 5, 2025
1bcd3a7
Changes
inv-jishnu Feb 5, 2025
268fbc2
Changes
inv-jishnu Feb 6, 2025
6f0066d
Export count related changes
inv-jishnu Feb 7, 2025
c5d3fcb
Revert "Export count related changes"
inv-jishnu Feb 7, 2025
e0a9e27
Export row count addition updated
inv-jishnu Feb 7, 2025
f93a9ae
Newline added again [skip ci]
inv-jishnu Feb 7, 2025
9c7a94b
Newline added again [skip ci]
inv-jishnu Feb 7, 2025
af51049
Merge branch 'master' into feat/data-loader/export-tasks
inv-jishnu Feb 7, 2025
4e5e06d
Resolved conflicts and merged lastest changes from master
inv-jishnu Feb 7, 2025
c2248f3
Removed duplicated build code from core error
inv-jishnu Feb 10, 2025
55f711b
Merge branch 'feat/data-loader/cli-import' into feat/data-loader/cli-…
inv-jishnu Feb 11, 2025
b31742d
Update error message [skip ci]
inv-jishnu Feb 11, 2025
636e737
Updated java doc
inv-jishnu Feb 12, 2025
05eee13
Update ExportManager logic
inv-jishnu Feb 12, 2025
a3a17f8
changes
inv-jishnu Feb 13, 2025
de52b56
Resolved conflicts and merged latest changes from main
inv-jishnu Feb 13, 2025
a4b33d2
Core error formatting
inv-jishnu Feb 13, 2025
4b550bd
Core error class formatted
inv-jishnu Feb 13, 2025
15b0df7
Feedback changes
inv-jishnu Feb 14, 2025
af624e1
Merged changes from export tasks [skip ci]
inv-jishnu Feb 17, 2025
bdfca3d
Changes from export tasks [skip ci]
inv-jishnu Feb 17, 2025
15395c0
Resolved conflicts and merged latest changes from main
inv-jishnu Apr 9, 2025
fc27f01
Export options changes
inv-jishnu Apr 9, 2025
213698f
Merge branch 'feat/data-loader/cli-import' into feat/data-loader/cli-…
inv-jishnu Apr 9, 2025
a2e4c47
Removed duplicate error code
inv-jishnu Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,12 @@ public enum CoreError implements ScalarDbError {
Category.INTERNAL_ERROR, "0052", "Failed to read JSON file. Details: %s.", "", ""),
DATA_LOADER_JSONLINES_FILE_READ_FAILED(
Category.INTERNAL_ERROR, "0053", "Failed to read JSON Lines file. Details: %s.", "", ""),
DATA_LOADER_CONVERT_TO_STRING_FAILED(
Category.INTERNAL_ERROR,
"0054",
"Unable to convert value to string from data format: %s",
"",
""),

//
// Errors for the unknown transaction status error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ public class ErrorMessage {
"Failed to create the log directory %s";
public static final String ERROR_CONTROL_FILE_INVALID_JSON =
"Not able to parse the %s control file";
public static final String ERROR_DIRECTORY_WRITE_ACCESS =
"Write access denied for directory: %s. Please check the permissions and try again.";
public static final String ERROR_CREATE_DIRECTORY_FAILED = "TBD %s";
}
Original file line number Diff line number Diff line change
@@ -1,70 +1,241 @@
package com.scalar.db.dataloader.cli.command.dataexport;

import com.scalar.db.common.error.CoreError;
import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
import com.scalar.db.dataloader.cli.exception.InvalidFileExtensionException;
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
import java.io.File;
import java.util.Arrays;
import com.scalar.db.dataloader.cli.util.FileUtils;
import com.scalar.db.dataloader.cli.util.InvalidFilePathException;
import com.scalar.db.dataloader.core.ColumnKeyValue;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.CsvExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
import com.scalar.db.dataloader.core.dataexport.JsonExportManager;
import com.scalar.db.dataloader.core.dataexport.JsonLineExportManager;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.exception.Base64Exception;
import com.scalar.db.dataloader.core.exception.ColumnParsingException;
import com.scalar.db.dataloader.core.exception.KeyParsingException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import java.io.BufferedWriter;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Spec;

@CommandLine.Command(name = "export", description = "Export data from a ScalarDB table")
@CommandLine.Command(name = "export", description = "export data from a ScalarDB table")
public class ExportCommand extends ExportCommandOptions implements Callable<Integer> {

private static final List<String> ALLOWED_EXTENSIONS = Arrays.asList("csv", "json", "jsonl");
private static final String EXPORT_FILE_NAME_FORMAT = "export_%s.%s_%s.%s";
private static final Logger LOGGER = LoggerFactory.getLogger(ExportCommand.class);

@Spec CommandSpec spec;

@Override
public Integer call() throws Exception {
validateOutputDirectory(outputFilePath);
return 0;
}
String scalarDbPropertiesFilePath = getScalarDbPropertiesFilePath();

try {
validateOutputDirectory();
FileUtils.validateFilePath(scalarDbPropertiesFilePath);

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
ScalarDBDao scalarDBDao = new ScalarDBDao();

ExportManager exportManager = createExportManager(storageFactory, scalarDBDao, outputFormat);

TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);

private void validateOutputDirectory(@Nullable String path)
throws DirectoryValidationException, InvalidFileExtensionException {
if (path == null || path.isEmpty()) {
// It is ok for the output file path to be null or empty as a default file name will be used
// if not provided
return;
Key partitionKey =
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null;
Key scanStartKey =
scanStartKeyValue != null
? getKey(scanStartKeyValue, namespace, table, tableMetadata)
: null;
Key scanEndKey =
scanEndKeyValue != null ? getKey(scanEndKeyValue, namespace, table, tableMetadata) : null;

ScanRange scanRange =
new ScanRange(scanStartKey, scanEndKey, scanStartInclusive, scanEndInclusive);
ExportOptions exportOptions = buildExportOptions(partitionKey, scanRange);

String filePath =
getOutputAbsoluteFilePath(
outputDirectory, outputFileName, exportOptions.getOutputFileFormat());
LOGGER.info("Exporting data to file: {}", filePath);

try (BufferedWriter writer =
Files.newBufferedWriter(Paths.get(filePath), Charset.defaultCharset(), CREATE, APPEND)) {
exportManager.startExport(exportOptions, tableMetadata, writer);
}

} catch (DirectoryValidationException e) {
LOGGER.error("Invalid output directory path: {}", outputDirectory);
return 1;
} catch (InvalidFilePathException e) {
LOGGER.error(
"The ScalarDB connection settings file path is invalid or the file is missing: {}",
scalarDbPropertiesFilePath);
return 1;
} catch (TableMetadataException e) {
LOGGER.error("Failed to retrieve table metadata: {}", e.getMessage());
return 1;
}
return 0;
}

File file = new File(path);
/**
* Returns the absolute file path of the ScalarDB properties file. If the configuration file path
* is set to the default file name, it resolves the path from the current working directory.
*
* @return the absolute path of the ScalarDB properties file as a string.
*/
private String getScalarDbPropertiesFilePath() {
return Objects.equals(configFilePath, DEFAULT_CONFIG_FILE_NAME)
? Paths.get("").toAbsolutePath().resolve(DEFAULT_CONFIG_FILE_NAME).toString()
: configFilePath;
}

if (file.isDirectory()) {
validateDirectory(path);
/**
* Validates the output directory. If the output directory is blank, it validates the current
* working directory. Otherwise, it validates the specified output directory.
*
* @throws DirectoryValidationException if the directory validation fails.
*/
private void validateOutputDirectory() throws DirectoryValidationException {
if (StringUtils.isBlank(outputDirectory)) {
DirectoryUtils.validateWorkingDirectory();
} else {
validateFileExtension(file.getName());
validateDirectory(file.getParent());
DirectoryUtils.validateTargetDirectory(outputDirectory);
}
}

private void validateDirectory(String directoryPath) throws DirectoryValidationException {
// If the directory path is null or empty, use the current working directory
if (directoryPath == null || directoryPath.isEmpty()) {
DirectoryUtils.validateTargetDirectory(DirectoryUtils.getCurrentWorkingDirectory());
} else {
DirectoryUtils.validateTargetDirectory(directoryPath);
/**
* Creates an instance of {@code ExportManager} based on the specified file format.
*
* @param storageFactory the factory to create a storage instance.
* @param scalarDBDao the DAO instance for ScalarDB interactions.
* @param fileFormat the file format for export.
* @return an appropriate {@code ExportManager} implementation for the given file format.
* @throws AssertionError if an invalid file format is provided.
*/
private ExportManager createExportManager(
StorageFactory storageFactory, ScalarDBDao scalarDBDao, FileFormat fileFormat) {
ProducerTaskFactory taskFactory =
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
DistributedStorage storage = storageFactory.getStorage();
switch (fileFormat) {
case JSON:
return new JsonExportManager(storage, scalarDBDao, taskFactory);
case JSONL:
return new JsonLineExportManager(storage, scalarDBDao, taskFactory);
case CSV:
return new CsvExportManager(storage, scalarDBDao, taskFactory);
default:
throw new AssertionError("Invalid file format" + fileFormat);
}
}

private void validateFileExtension(String filename) throws InvalidFileExtensionException {
String extension = FilenameUtils.getExtension(filename);
if (StringUtils.isBlank(extension)) {
throw new InvalidFileExtensionException(
CoreError.DATA_LOADER_MISSING_FILE_EXTENSION.buildMessage(filename));
/**
* Builds and returns an {@code ExportOptions} instance configured with the provided parameters.
*
* @param partitionKey the partition key to filter the export data.
* @param scanRange the scan range for exporting data.
* @return an {@code ExportOptions} instance with the specified settings.
*/
private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange) {
ExportOptions.ExportOptionsBuilder builder =
ExportOptions.builder(namespace, table, partitionKey, outputFormat)
.sortOrders(sortOrders)
.excludeHeaderRow(excludeHeader)
.includeTransactionMetadata(includeTransactionMetadata)
.delimiter(delimiter)
.limit(limit)
.maxThreadCount(maxThreads)
.dataChunkSize(dataChunkSize)
.prettyPrintJson(prettyPrintJson)
.scanRange(scanRange);

if (projectionColumns != null) {
builder.projectionColumns(projectionColumns);
}
if (!ALLOWED_EXTENSIONS.contains(extension.toLowerCase())) {
throw new InvalidFileExtensionException(
CoreError.DATA_LOADER_INVALID_FILE_EXTENSION.buildMessage(
extension, String.join(", ", ALLOWED_EXTENSIONS)));

return builder.build();
}

/**
* Generates the absolute file path for the output file. If no output file name is provided, it
* constructs a default name based on the namespace, table, and timestamp.
*
* @param outputDirectory the directory where the output file should be placed.
* @param outputFileName the name of the output file (can be blank).
* @param outputFormat the file format for the output.
* @return the absolute file path as a string.
*/
private String getOutputAbsoluteFilePath(
String outputDirectory, String outputFileName, FileFormat outputFormat) {
String fileName =
StringUtils.isBlank(outputFileName)
? String.format(
EXPORT_FILE_NAME_FORMAT,
namespace,
table,
System.nanoTime(),
outputFormat.toString().toLowerCase())
: outputFileName;

if (StringUtils.isBlank(outputDirectory)) {
return Paths.get("").toAbsolutePath().resolve(fileName).toAbsolutePath().toString();
} else {
return Paths.get(outputDirectory).resolve(fileName).toAbsolutePath().toString();
}
}

/**
* Convert ColumnKeyValue list to a key
*
* @param keyValueList key value list
* @param tableMetadata table metadata
* @return key
* @throws Base64Exception if any error occur during decoding key
* @throws ColumnParsingException if any error occur during parsing column value
*/
private Key getKeysFromList(List<ColumnKeyValue> keyValueList, TableMetadata tableMetadata)
throws Base64Exception, ColumnParsingException {
return KeyUtils.parseMultipleKeyValues(keyValueList, tableMetadata);
}

/**
* Convert ColumnKeyValue to a key
*
* @param keyValue key value
* @param tableMetadata table metadata
* @return key
* @throws KeyParsingException if any error occur during decoding key
*/
private Key getKey(
ColumnKeyValue keyValue, String namespace, String table, TableMetadata tableMetadata)
throws KeyParsingException {
return KeyUtils.parseKeyValue(keyValue, namespace, table, tableMetadata);
}
}
Loading
Loading