-
Notifications
You must be signed in to change notification settings - Fork 38
Add export command for data loader CLI #2617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
be9ee23
Initial commit
inv-jishnu 89b9f05
Spotless applied again
inv-jishnu 49c83b6
Removed unused code
inv-jishnu b2871fb
Merge branch 'master' into feat/data-loader/import-log-2
ypeckstadt c5c9c0a
Removed unused classes and references
inv-jishnu 4964e8d
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu ff81f5f
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu 3934c2a
Improve Javadocs
ypeckstadt 9958f95
Changes
inv-jishnu 1afbc21
Renamed parameters
inv-jishnu 8c5114d
logging changes
inv-jishnu ffab395
removed repeated code
inv-jishnu 79df1ed
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu cf31672
Merge branch 'master' into feat/data-loader/import-log-2
brfrn169 6dd213e
Added excetpion throw
inv-jishnu 6542177
Synchronisation changes
inv-jishnu 603e46e
Added volatile back to fix spotbugs issue
inv-jishnu eaf9d88
Removed unused variable
inv-jishnu 0a2518a
Changes
inv-jishnu 378effb
Initial commit
inv-jishnu 573cfae
Merged changes from master after resolving conflicts
inv-jishnu 7f7d34b
Removed unused file
inv-jishnu a842dc6
Minor name changes
inv-jishnu 508c311
Fixed unit test failure
inv-jishnu b9b6efd
Minor unit test changes
inv-jishnu c839066
Desription and default value corrections
inv-jishnu 6b05948
Changes
inv-jishnu 707f56e
Fix error message
inv-jishnu a8e8b89
Merge branch 'master' into feat/data-loader/cli-export-2
inv-jishnu 8f67bbc
Merge branch 'master' into feat/data-loader/cli-export-2
inv-jishnu 105367c
Merged latest changes from master after resolving conflicts
inv-jishnu 145b239
Javadoc change
inv-jishnu 210f62d
Changed exception thrown based on feedback
inv-jishnu 676c12d
Revert "Changed exception thrown based on feedback"
inv-jishnu d05ec66
Changes
inv-jishnu 07243a4
Merge branch 'master' into feat/data-loader/cli-export-2
inv-jishnu e0ecb75
Additional check added
inv-jishnu f595434
Minor changes
inv-jishnu 8366167
Scan order input read updated
inv-jishnu 8992201
Empty line removed
inv-jishnu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
207 changes: 172 additions & 35 deletions
207
...ader/cli/src/main/java/com/scalar/db/dataloader/cli/command/dataexport/ExportCommand.java
100644 → 100755
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,70 +1,207 @@ | ||
package com.scalar.db.dataloader.cli.command.dataexport; | ||
|
||
import static java.nio.file.StandardOpenOption.APPEND; | ||
import static java.nio.file.StandardOpenOption.CREATE; | ||
|
||
import com.scalar.db.api.DistributedStorage; | ||
import com.scalar.db.api.TableMetadata; | ||
import com.scalar.db.common.error.CoreError; | ||
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException; | ||
import com.scalar.db.dataloader.cli.exception.InvalidFileExtensionException; | ||
import com.scalar.db.dataloader.cli.util.DirectoryUtils; | ||
import java.io.File; | ||
import java.util.Arrays; | ||
import com.scalar.db.dataloader.cli.util.FileUtils; | ||
import com.scalar.db.dataloader.cli.util.InvalidFilePathException; | ||
import com.scalar.db.dataloader.core.ColumnKeyValue; | ||
import com.scalar.db.dataloader.core.FileFormat; | ||
import com.scalar.db.dataloader.core.ScanRange; | ||
import com.scalar.db.dataloader.core.dataexport.CsvExportManager; | ||
import com.scalar.db.dataloader.core.dataexport.ExportManager; | ||
import com.scalar.db.dataloader.core.dataexport.ExportOptions; | ||
import com.scalar.db.dataloader.core.dataexport.JsonExportManager; | ||
import com.scalar.db.dataloader.core.dataexport.JsonLineExportManager; | ||
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory; | ||
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao; | ||
import com.scalar.db.dataloader.core.exception.ColumnParsingException; | ||
import com.scalar.db.dataloader.core.exception.KeyParsingException; | ||
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException; | ||
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService; | ||
import com.scalar.db.dataloader.core.util.KeyUtils; | ||
import com.scalar.db.io.Key; | ||
import com.scalar.db.service.StorageFactory; | ||
import java.io.BufferedWriter; | ||
import java.nio.charset.Charset; | ||
import java.nio.file.Files; | ||
import java.nio.file.Paths; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.Callable; | ||
import javax.annotation.Nullable; | ||
import org.apache.commons.io.FilenameUtils; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import picocli.CommandLine; | ||
import picocli.CommandLine.Model.CommandSpec; | ||
import picocli.CommandLine.Spec; | ||
|
||
@CommandLine.Command(name = "export", description = "Export data from a ScalarDB table") | ||
@CommandLine.Command(name = "export", description = "export data from a ScalarDB table") | ||
public class ExportCommand extends ExportCommandOptions implements Callable<Integer> { | ||
|
||
private static final List<String> ALLOWED_EXTENSIONS = Arrays.asList("csv", "json", "jsonl"); | ||
private static final String EXPORT_FILE_NAME_FORMAT = "export.%s.%s.%s.%s"; | ||
private static final Logger logger = LoggerFactory.getLogger(ExportCommand.class); | ||
|
||
@Spec CommandSpec spec; | ||
|
||
@Override | ||
public Integer call() throws Exception { | ||
validateOutputDirectory(outputFilePath); | ||
String scalarDbPropertiesFilePath = getScalarDbPropertiesFilePath(); | ||
|
||
try { | ||
validateOutputDirectory(); | ||
FileUtils.validateFilePath(scalarDbPropertiesFilePath); | ||
|
||
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath); | ||
TableMetadataService metaDataService = | ||
new TableMetadataService(storageFactory.getStorageAdmin()); | ||
ScalarDbDao scalarDbDao = new ScalarDbDao(); | ||
|
||
ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat); | ||
|
||
TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table); | ||
|
||
Key partitionKey = | ||
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null; | ||
Key scanStartKey = | ||
scanStartKeyValue != null | ||
? getKey(scanStartKeyValue, namespace, table, tableMetadata) | ||
: null; | ||
Key scanEndKey = | ||
scanEndKeyValue != null ? getKey(scanEndKeyValue, namespace, table, tableMetadata) : null; | ||
|
||
ScanRange scanRange = | ||
new ScanRange(scanStartKey, scanEndKey, scanStartInclusive, scanEndInclusive); | ||
ExportOptions exportOptions = buildExportOptions(partitionKey, scanRange); | ||
|
||
String filePath = | ||
getOutputAbsoluteFilePath( | ||
outputDirectory, outputFileName, exportOptions.getOutputFileFormat()); | ||
logger.info("Exporting data to file: {}", filePath); | ||
|
||
try (BufferedWriter writer = | ||
Files.newBufferedWriter(Paths.get(filePath), Charset.defaultCharset(), CREATE, APPEND)) { | ||
exportManager.startExport(exportOptions, tableMetadata, writer); | ||
} | ||
|
||
} catch (DirectoryValidationException e) { | ||
logger.error("Invalid output directory path: {}", outputDirectory); | ||
return 1; | ||
} catch (InvalidFilePathException e) { | ||
logger.error( | ||
"The ScalarDB connection settings file path is invalid or the file is missing: {}", | ||
scalarDbPropertiesFilePath); | ||
return 1; | ||
} catch (TableMetadataException e) { | ||
logger.error("Failed to retrieve table metadata: {}", e.getMessage()); | ||
return 1; | ||
} | ||
return 0; | ||
} | ||
|
||
private void validateOutputDirectory(@Nullable String path) | ||
throws DirectoryValidationException, InvalidFileExtensionException { | ||
if (path == null || path.isEmpty()) { | ||
// It is ok for the output file path to be null or empty as a default file name will be used | ||
// if not provided | ||
return; | ||
private String getScalarDbPropertiesFilePath() { | ||
if (StringUtils.isBlank(configFilePath)) { | ||
throw new IllegalArgumentException( | ||
CoreError.DATA_LOADER_CONFIG_FILE_PATH_BLANK.buildMessage()); | ||
} | ||
return Objects.equals(configFilePath, DEFAULT_CONFIG_FILE_NAME) | ||
? Paths.get("").toAbsolutePath().resolve(DEFAULT_CONFIG_FILE_NAME).toString() | ||
: configFilePath; | ||
} | ||
|
||
File file = new File(path); | ||
|
||
if (file.isDirectory()) { | ||
validateDirectory(path); | ||
private void validateOutputDirectory() throws DirectoryValidationException { | ||
if (StringUtils.isBlank(outputDirectory)) { | ||
DirectoryUtils.validateWorkingDirectory(); | ||
} else { | ||
validateFileExtension(file.getName()); | ||
validateDirectory(file.getParent()); | ||
DirectoryUtils.validateOrCreateTargetDirectory(outputDirectory); | ||
} | ||
} | ||
|
||
private void validateDirectory(String directoryPath) throws DirectoryValidationException { | ||
// If the directory path is null or empty, use the current working directory | ||
if (directoryPath == null || directoryPath.isEmpty()) { | ||
DirectoryUtils.validateOrCreateTargetDirectory(DirectoryUtils.getCurrentWorkingDirectory()); | ||
} else { | ||
DirectoryUtils.validateOrCreateTargetDirectory(directoryPath); | ||
private ExportManager createExportManager( | ||
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) { | ||
ProducerTaskFactory taskFactory = | ||
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson); | ||
DistributedStorage storage = storageFactory.getStorage(); | ||
switch (fileFormat) { | ||
case JSON: | ||
return new JsonExportManager(storage, scalarDbDao, taskFactory); | ||
case JSONL: | ||
return new JsonLineExportManager(storage, scalarDbDao, taskFactory); | ||
case CSV: | ||
return new CsvExportManager(storage, scalarDbDao, taskFactory); | ||
default: | ||
throw new AssertionError("Invalid file format" + fileFormat); | ||
} | ||
} | ||
|
||
private void validateFileExtension(String filename) throws InvalidFileExtensionException { | ||
String extension = FilenameUtils.getExtension(filename); | ||
if (StringUtils.isBlank(extension)) { | ||
throw new InvalidFileExtensionException( | ||
CoreError.DATA_LOADER_MISSING_FILE_EXTENSION.buildMessage(filename)); | ||
private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange) { | ||
ExportOptions.ExportOptionsBuilder builder = | ||
ExportOptions.builder(namespace, table, partitionKey, outputFormat) | ||
.sortOrders(sortOrders) | ||
.excludeHeaderRow(excludeHeader) | ||
.includeTransactionMetadata(includeTransactionMetadata) | ||
.delimiter(delimiter) | ||
.limit(limit) | ||
.maxThreadCount(maxThreads) | ||
.dataChunkSize(dataChunkSize) | ||
.prettyPrintJson(prettyPrintJson) | ||
.scanRange(scanRange); | ||
|
||
if (projectionColumns != null) { | ||
builder.projectionColumns(projectionColumns); | ||
} | ||
if (!ALLOWED_EXTENSIONS.contains(extension.toLowerCase())) { | ||
throw new InvalidFileExtensionException( | ||
CoreError.DATA_LOADER_INVALID_FILE_EXTENSION.buildMessage( | ||
extension, String.join(", ", ALLOWED_EXTENSIONS))); | ||
|
||
return builder.build(); | ||
} | ||
|
||
private String getOutputAbsoluteFilePath( | ||
String outputDirectory, String outputFileName, FileFormat outputFormat) { | ||
String fileName = | ||
StringUtils.isBlank(outputFileName) | ||
? String.format( | ||
EXPORT_FILE_NAME_FORMAT, | ||
namespace, | ||
table, | ||
System.nanoTime(), | ||
outputFormat.toString().toLowerCase()) | ||
: outputFileName; | ||
|
||
if (StringUtils.isBlank(outputDirectory)) { | ||
return Paths.get("").toAbsolutePath().resolve(fileName).toAbsolutePath().toString(); | ||
} else { | ||
return Paths.get(outputDirectory).resolve(fileName).toAbsolutePath().toString(); | ||
} | ||
} | ||
|
||
/** | ||
* Convert ColumnKeyValue list to a key | ||
* | ||
* @param keyValueList key value list | ||
* @param tableMetadata table metadata | ||
* @return key | ||
* @throws ColumnParsingException if any error occur during parsing column value | ||
*/ | ||
private Key getKeysFromList(List<ColumnKeyValue> keyValueList, TableMetadata tableMetadata) | ||
throws ColumnParsingException { | ||
return KeyUtils.parseMultipleKeyValues(keyValueList, tableMetadata); | ||
} | ||
|
||
/** | ||
* Convert ColumnKeyValue to a key | ||
* | ||
* @param keyValue key value | ||
* @param tableMetadata table metadata | ||
* @return key | ||
* @throws KeyParsingException if any error occur during decoding key | ||
*/ | ||
private Key getKey( | ||
ColumnKeyValue keyValue, String namespace, String table, TableMetadata tableMetadata) | ||
throws KeyParsingException { | ||
return KeyUtils.parseKeyValue(keyValue, namespace, table, tableMetadata); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests expect an IllegalArgumentException when configFilePath is blank, but the current implementation catches all invalid file paths and returns an exit code. Add an explicit check for blank configFilePath and throw IllegalArgumentException with CoreError.DATA_LOADER_FILE_PATH_IS_BLANK to match test expectations.
Copilot uses AI. Check for mistakes.