Skip to content

Backport to branch(3.13) : Add export command for data loader CLI #2686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,21 @@ public enum CoreError implements ScalarDbError {
DATA_LOADER_FILE_PATH_IS_BLANK(
Category.USER_ERROR, "0197", "File path must not be blank.", "", ""),
DATA_LOADER_FILE_NOT_FOUND(Category.USER_ERROR, "0198", "File not found: %s", "", ""),
DATA_LOADER_INVALID_DATE_TIME_FOR_COLUMN_VALUE(
Category.USER_ERROR,
"0199",
"Invalid date time value specified for column %s in table %s in namespace %s.",
"",
""),
DATA_LOADER_NULL_OR_EMPTY_KEY_VALUE_INPUT(
Category.USER_ERROR, "0200", "Key-value cannot be null or empty", "", ""),
DATA_LOADER_INVALID_KEY_VALUE_INPUT(
Category.USER_ERROR, "0201", "Invalid key-value format: %s", "", ""),
DATA_LOADER_SPLIT_INPUT_VALUE_NULL(Category.USER_ERROR, "0202", "Value must not be null", "", ""),
DATA_LOADER_SPLIT_INPUT_DELIMITER_NULL(
Category.USER_ERROR, "0203", "Delimiter must not be null", "", ""),
DATA_LOADER_CONFIG_FILE_PATH_BLANK(
Category.USER_ERROR, "0204", "Config file path must not be blank", "", ""),

//
// Errors for the concurrency error category
Expand Down
207 changes: 172 additions & 35 deletions ...ader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,70 +1,207 @@
package com.scalar.db.dataloader.cli.command.dataexport;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
import com.scalar.db.dataloader.cli.exception.InvalidFileExtensionException;
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
import java.io.File;
import java.util.Arrays;
import com.scalar.db.dataloader.cli.util.FileUtils;
import com.scalar.db.dataloader.cli.util.InvalidFilePathException;
import com.scalar.db.dataloader.core.ColumnKeyValue;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.CsvExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
import com.scalar.db.dataloader.core.dataexport.JsonExportManager;
import com.scalar.db.dataloader.core.dataexport.JsonLineExportManager;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
import com.scalar.db.dataloader.core.exception.ColumnParsingException;
import com.scalar.db.dataloader.core.exception.KeyParsingException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
import com.scalar.db.dataloader.core.util.KeyUtils;
import com.scalar.db.io.Key;
import com.scalar.db.service.StorageFactory;
import java.io.BufferedWriter;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Spec;

@CommandLine.Command(name = "export", description = "Export data from a ScalarDB table")
@CommandLine.Command(name = "export", description = "export data from a ScalarDB table")
public class ExportCommand extends ExportCommandOptions implements Callable<Integer> {

private static final List<String> ALLOWED_EXTENSIONS = Arrays.asList("csv", "json", "jsonl");
private static final String EXPORT_FILE_NAME_FORMAT = "export.%s.%s.%s.%s";
private static final Logger logger = LoggerFactory.getLogger(ExportCommand.class);

@Spec CommandSpec spec;

@Override
public Integer call() throws Exception {
validateOutputDirectory(outputFilePath);
String scalarDbPropertiesFilePath = getScalarDbPropertiesFilePath();

try {
validateOutputDirectory();
FileUtils.validateFilePath(scalarDbPropertiesFilePath);

StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
TableMetadataService metaDataService =
new TableMetadataService(storageFactory.getStorageAdmin());
ScalarDbDao scalarDbDao = new ScalarDbDao();

ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);

TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);

Key partitionKey =
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null;
Key scanStartKey =
scanStartKeyValue != null
? getKey(scanStartKeyValue, namespace, table, tableMetadata)
: null;
Key scanEndKey =
scanEndKeyValue != null ? getKey(scanEndKeyValue, namespace, table, tableMetadata) : null;

ScanRange scanRange =
new ScanRange(scanStartKey, scanEndKey, scanStartInclusive, scanEndInclusive);
ExportOptions exportOptions = buildExportOptions(partitionKey, scanRange);

String filePath =
getOutputAbsoluteFilePath(
outputDirectory, outputFileName, exportOptions.getOutputFileFormat());
logger.info("Exporting data to file: {}", filePath);

try (BufferedWriter writer =
Files.newBufferedWriter(Paths.get(filePath), Charset.defaultCharset(), CREATE, APPEND)) {
exportManager.startExport(exportOptions, tableMetadata, writer);
}

} catch (DirectoryValidationException e) {
logger.error("Invalid output directory path: {}", outputDirectory);
return 1;
} catch (InvalidFilePathException e) {
logger.error(
"The ScalarDB connection settings file path is invalid or the file is missing: {}",
scalarDbPropertiesFilePath);
return 1;
} catch (TableMetadataException e) {
logger.error("Failed to retrieve table metadata: {}", e.getMessage());
return 1;
}
return 0;
}

private void validateOutputDirectory(@Nullable String path)
throws DirectoryValidationException, InvalidFileExtensionException {
if (path == null || path.isEmpty()) {
// It is ok for the output file path to be null or empty as a default file name will be used
// if not provided
return;
private String getScalarDbPropertiesFilePath() {
if (StringUtils.isBlank(configFilePath)) {
throw new IllegalArgumentException(
CoreError.DATA_LOADER_CONFIG_FILE_PATH_BLANK.buildMessage());
}
return Objects.equals(configFilePath, DEFAULT_CONFIG_FILE_NAME)
? Paths.get("").toAbsolutePath().resolve(DEFAULT_CONFIG_FILE_NAME).toString()
: configFilePath;
}

File file = new File(path);

if (file.isDirectory()) {
validateDirectory(path);
private void validateOutputDirectory() throws DirectoryValidationException {
if (StringUtils.isBlank(outputDirectory)) {
DirectoryUtils.validateWorkingDirectory();
} else {
validateFileExtension(file.getName());
validateDirectory(file.getParent());
DirectoryUtils.validateOrCreateTargetDirectory(outputDirectory);
}
}

private void validateDirectory(String directoryPath) throws DirectoryValidationException {
// If the directory path is null or empty, use the current working directory
if (directoryPath == null || directoryPath.isEmpty()) {
DirectoryUtils.validateOrCreateTargetDirectory(DirectoryUtils.getCurrentWorkingDirectory());
} else {
DirectoryUtils.validateOrCreateTargetDirectory(directoryPath);
private ExportManager createExportManager(
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
ProducerTaskFactory taskFactory =
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
DistributedStorage storage = storageFactory.getStorage();
switch (fileFormat) {
case JSON:
return new JsonExportManager(storage, scalarDbDao, taskFactory);
case JSONL:
return new JsonLineExportManager(storage, scalarDbDao, taskFactory);
case CSV:
return new CsvExportManager(storage, scalarDbDao, taskFactory);
default:
throw new AssertionError("Invalid file format" + fileFormat);
}
}

private void validateFileExtension(String filename) throws InvalidFileExtensionException {
String extension = FilenameUtils.getExtension(filename);
if (StringUtils.isBlank(extension)) {
throw new InvalidFileExtensionException(
CoreError.DATA_LOADER_MISSING_FILE_EXTENSION.buildMessage(filename));
private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange) {
ExportOptions.ExportOptionsBuilder builder =
ExportOptions.builder(namespace, table, partitionKey, outputFormat)
.sortOrders(sortOrders)
.excludeHeaderRow(excludeHeader)
.includeTransactionMetadata(includeTransactionMetadata)
.delimiter(delimiter)
.limit(limit)
.maxThreadCount(maxThreads)
.dataChunkSize(dataChunkSize)
.prettyPrintJson(prettyPrintJson)
.scanRange(scanRange);

if (projectionColumns != null) {
builder.projectionColumns(projectionColumns);
}
if (!ALLOWED_EXTENSIONS.contains(extension.toLowerCase())) {
throw new InvalidFileExtensionException(
CoreError.DATA_LOADER_INVALID_FILE_EXTENSION.buildMessage(
extension, String.join(", ", ALLOWED_EXTENSIONS)));

return builder.build();
}

private String getOutputAbsoluteFilePath(
String outputDirectory, String outputFileName, FileFormat outputFormat) {
String fileName =
StringUtils.isBlank(outputFileName)
? String.format(
EXPORT_FILE_NAME_FORMAT,
namespace,
table,
System.nanoTime(),
outputFormat.toString().toLowerCase())
: outputFileName;

if (StringUtils.isBlank(outputDirectory)) {
return Paths.get("").toAbsolutePath().resolve(fileName).toAbsolutePath().toString();
} else {
return Paths.get(outputDirectory).resolve(fileName).toAbsolutePath().toString();
}
}

/**
* Convert ColumnKeyValue list to a key
*
* @param keyValueList key value list
* @param tableMetadata table metadata
* @return key
* @throws ColumnParsingException if any error occur during parsing column value
*/
private Key getKeysFromList(List<ColumnKeyValue> keyValueList, TableMetadata tableMetadata)
throws ColumnParsingException {
return KeyUtils.parseMultipleKeyValues(keyValueList, tableMetadata);
}

/**
* Convert ColumnKeyValue to a key
*
* @param keyValue key value
* @param tableMetadata table metadata
* @return key
* @throws KeyParsingException if any error occur during decoding key
*/
private Key getKey(
ColumnKeyValue keyValue, String namespace, String table, TableMetadata tableMetadata)
throws KeyParsingException {
return KeyUtils.parseKeyValue(keyValue, namespace, table, tableMetadata);
}
}
Loading