Skip to content

Commit f0550fb

Browse files
inv-jishnuypeckstadtbrfrn169
committed
Add export command for data loader CLI (#2617)
Co-authored-by: Peckstadt Yves <[email protected]> Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent 645ccbf commit f0550fb

File tree

15 files changed

+806
-135
lines changed

15 files changed

+806
-135
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,21 @@ public enum CoreError implements ScalarDbError {
774774
DATA_LOADER_FILE_PATH_IS_BLANK(
775775
Category.USER_ERROR, "0197", "File path must not be blank.", "", ""),
776776
DATA_LOADER_FILE_NOT_FOUND(Category.USER_ERROR, "0198", "File not found: %s", "", ""),
777+
DATA_LOADER_INVALID_DATE_TIME_FOR_COLUMN_VALUE(
778+
Category.USER_ERROR,
779+
"0199",
780+
"Invalid date time value specified for column %s in table %s in namespace %s.",
781+
"",
782+
""),
783+
DATA_LOADER_NULL_OR_EMPTY_KEY_VALUE_INPUT(
784+
Category.USER_ERROR, "0200", "Key-value cannot be null or empty", "", ""),
785+
DATA_LOADER_INVALID_KEY_VALUE_INPUT(
786+
Category.USER_ERROR, "0201", "Invalid key-value format: %s", "", ""),
787+
DATA_LOADER_SPLIT_INPUT_VALUE_NULL(Category.USER_ERROR, "0202", "Value must not be null", "", ""),
788+
DATA_LOADER_SPLIT_INPUT_DELIMITER_NULL(
789+
Category.USER_ERROR, "0203", "Delimiter must not be null", "", ""),
790+
DATA_LOADER_CONFIG_FILE_PATH_BLANK(
791+
Category.USER_ERROR, "0204", "Config file path must not be blank", "", ""),
777792

778793
//
779794
// Errors for the concurrency error category
Lines changed: 172 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,207 @@
11
package com.scalar.db.dataloader.cli.command.dataexport;
22

3+
import static java.nio.file.StandardOpenOption.APPEND;
4+
import static java.nio.file.StandardOpenOption.CREATE;
5+
6+
import com.scalar.db.api.DistributedStorage;
7+
import com.scalar.db.api.TableMetadata;
38
import com.scalar.db.common.error.CoreError;
49
import com.scalar.db.dataloader.cli.exception.DirectoryValidationException;
5-
import com.scalar.db.dataloader.cli.exception.InvalidFileExtensionException;
610
import com.scalar.db.dataloader.cli.util.DirectoryUtils;
7-
import java.io.File;
8-
import java.util.Arrays;
11+
import com.scalar.db.dataloader.cli.util.FileUtils;
12+
import com.scalar.db.dataloader.cli.util.InvalidFilePathException;
13+
import com.scalar.db.dataloader.core.ColumnKeyValue;
14+
import com.scalar.db.dataloader.core.FileFormat;
15+
import com.scalar.db.dataloader.core.ScanRange;
16+
import com.scalar.db.dataloader.core.dataexport.CsvExportManager;
17+
import com.scalar.db.dataloader.core.dataexport.ExportManager;
18+
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
19+
import com.scalar.db.dataloader.core.dataexport.JsonExportManager;
20+
import com.scalar.db.dataloader.core.dataexport.JsonLineExportManager;
21+
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
22+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbDao;
23+
import com.scalar.db.dataloader.core.exception.ColumnParsingException;
24+
import com.scalar.db.dataloader.core.exception.KeyParsingException;
25+
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
26+
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
27+
import com.scalar.db.dataloader.core.util.KeyUtils;
28+
import com.scalar.db.io.Key;
29+
import com.scalar.db.service.StorageFactory;
30+
import java.io.BufferedWriter;
31+
import java.nio.charset.Charset;
32+
import java.nio.file.Files;
33+
import java.nio.file.Paths;
934
import java.util.List;
35+
import java.util.Objects;
1036
import java.util.concurrent.Callable;
11-
import javax.annotation.Nullable;
12-
import org.apache.commons.io.FilenameUtils;
1337
import org.apache.commons.lang3.StringUtils;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
1440
import picocli.CommandLine;
1541
import picocli.CommandLine.Model.CommandSpec;
1642
import picocli.CommandLine.Spec;
1743

18-
@CommandLine.Command(name = "export", description = "Export data from a ScalarDB table")
44+
@CommandLine.Command(name = "export", description = "export data from a ScalarDB table")
1945
public class ExportCommand extends ExportCommandOptions implements Callable<Integer> {
2046

21-
private static final List<String> ALLOWED_EXTENSIONS = Arrays.asList("csv", "json", "jsonl");
47+
private static final String EXPORT_FILE_NAME_FORMAT = "export.%s.%s.%s.%s";
48+
private static final Logger logger = LoggerFactory.getLogger(ExportCommand.class);
2249

2350
@Spec CommandSpec spec;
2451

2552
@Override
2653
public Integer call() throws Exception {
27-
validateOutputDirectory(outputFilePath);
54+
String scalarDbPropertiesFilePath = getScalarDbPropertiesFilePath();
55+
56+
try {
57+
validateOutputDirectory();
58+
FileUtils.validateFilePath(scalarDbPropertiesFilePath);
59+
60+
StorageFactory storageFactory = StorageFactory.create(scalarDbPropertiesFilePath);
61+
TableMetadataService metaDataService =
62+
new TableMetadataService(storageFactory.getStorageAdmin());
63+
ScalarDbDao scalarDbDao = new ScalarDbDao();
64+
65+
ExportManager exportManager = createExportManager(storageFactory, scalarDbDao, outputFormat);
66+
67+
TableMetadata tableMetadata = metaDataService.getTableMetadata(namespace, table);
68+
69+
Key partitionKey =
70+
partitionKeyValue != null ? getKeysFromList(partitionKeyValue, tableMetadata) : null;
71+
Key scanStartKey =
72+
scanStartKeyValue != null
73+
? getKey(scanStartKeyValue, namespace, table, tableMetadata)
74+
: null;
75+
Key scanEndKey =
76+
scanEndKeyValue != null ? getKey(scanEndKeyValue, namespace, table, tableMetadata) : null;
77+
78+
ScanRange scanRange =
79+
new ScanRange(scanStartKey, scanEndKey, scanStartInclusive, scanEndInclusive);
80+
ExportOptions exportOptions = buildExportOptions(partitionKey, scanRange);
81+
82+
String filePath =
83+
getOutputAbsoluteFilePath(
84+
outputDirectory, outputFileName, exportOptions.getOutputFileFormat());
85+
logger.info("Exporting data to file: {}", filePath);
86+
87+
try (BufferedWriter writer =
88+
Files.newBufferedWriter(Paths.get(filePath), Charset.defaultCharset(), CREATE, APPEND)) {
89+
exportManager.startExport(exportOptions, tableMetadata, writer);
90+
}
91+
92+
} catch (DirectoryValidationException e) {
93+
logger.error("Invalid output directory path: {}", outputDirectory);
94+
return 1;
95+
} catch (InvalidFilePathException e) {
96+
logger.error(
97+
"The ScalarDB connection settings file path is invalid or the file is missing: {}",
98+
scalarDbPropertiesFilePath);
99+
return 1;
100+
} catch (TableMetadataException e) {
101+
logger.error("Failed to retrieve table metadata: {}", e.getMessage());
102+
return 1;
103+
}
28104
return 0;
29105
}
30106

31-
private void validateOutputDirectory(@Nullable String path)
32-
throws DirectoryValidationException, InvalidFileExtensionException {
33-
if (path == null || path.isEmpty()) {
34-
// It is ok for the output file path to be null or empty as a default file name will be used
35-
// if not provided
36-
return;
107+
private String getScalarDbPropertiesFilePath() {
108+
if (StringUtils.isBlank(configFilePath)) {
109+
throw new IllegalArgumentException(
110+
CoreError.DATA_LOADER_CONFIG_FILE_PATH_BLANK.buildMessage());
37111
}
112+
return Objects.equals(configFilePath, DEFAULT_CONFIG_FILE_NAME)
113+
? Paths.get("").toAbsolutePath().resolve(DEFAULT_CONFIG_FILE_NAME).toString()
114+
: configFilePath;
115+
}
38116

39-
File file = new File(path);
40-
41-
if (file.isDirectory()) {
42-
validateDirectory(path);
117+
private void validateOutputDirectory() throws DirectoryValidationException {
118+
if (StringUtils.isBlank(outputDirectory)) {
119+
DirectoryUtils.validateWorkingDirectory();
43120
} else {
44-
validateFileExtension(file.getName());
45-
validateDirectory(file.getParent());
121+
DirectoryUtils.validateOrCreateTargetDirectory(outputDirectory);
46122
}
47123
}
48124

49-
private void validateDirectory(String directoryPath) throws DirectoryValidationException {
50-
// If the directory path is null or empty, use the current working directory
51-
if (directoryPath == null || directoryPath.isEmpty()) {
52-
DirectoryUtils.validateOrCreateTargetDirectory(DirectoryUtils.getCurrentWorkingDirectory());
53-
} else {
54-
DirectoryUtils.validateOrCreateTargetDirectory(directoryPath);
125+
private ExportManager createExportManager(
126+
StorageFactory storageFactory, ScalarDbDao scalarDbDao, FileFormat fileFormat) {
127+
ProducerTaskFactory taskFactory =
128+
new ProducerTaskFactory(delimiter, includeTransactionMetadata, prettyPrintJson);
129+
DistributedStorage storage = storageFactory.getStorage();
130+
switch (fileFormat) {
131+
case JSON:
132+
return new JsonExportManager(storage, scalarDbDao, taskFactory);
133+
case JSONL:
134+
return new JsonLineExportManager(storage, scalarDbDao, taskFactory);
135+
case CSV:
136+
return new CsvExportManager(storage, scalarDbDao, taskFactory);
137+
default:
138+
throw new AssertionError("Invalid file format" + fileFormat);
55139
}
56140
}
57141

58-
private void validateFileExtension(String filename) throws InvalidFileExtensionException {
59-
String extension = FilenameUtils.getExtension(filename);
60-
if (StringUtils.isBlank(extension)) {
61-
throw new InvalidFileExtensionException(
62-
CoreError.DATA_LOADER_MISSING_FILE_EXTENSION.buildMessage(filename));
142+
private ExportOptions buildExportOptions(Key partitionKey, ScanRange scanRange) {
143+
ExportOptions.ExportOptionsBuilder builder =
144+
ExportOptions.builder(namespace, table, partitionKey, outputFormat)
145+
.sortOrders(sortOrders)
146+
.excludeHeaderRow(excludeHeader)
147+
.includeTransactionMetadata(includeTransactionMetadata)
148+
.delimiter(delimiter)
149+
.limit(limit)
150+
.maxThreadCount(maxThreads)
151+
.dataChunkSize(dataChunkSize)
152+
.prettyPrintJson(prettyPrintJson)
153+
.scanRange(scanRange);
154+
155+
if (projectionColumns != null) {
156+
builder.projectionColumns(projectionColumns);
63157
}
64-
if (!ALLOWED_EXTENSIONS.contains(extension.toLowerCase())) {
65-
throw new InvalidFileExtensionException(
66-
CoreError.DATA_LOADER_INVALID_FILE_EXTENSION.buildMessage(
67-
extension, String.join(", ", ALLOWED_EXTENSIONS)));
158+
159+
return builder.build();
160+
}
161+
162+
private String getOutputAbsoluteFilePath(
163+
String outputDirectory, String outputFileName, FileFormat outputFormat) {
164+
String fileName =
165+
StringUtils.isBlank(outputFileName)
166+
? String.format(
167+
EXPORT_FILE_NAME_FORMAT,
168+
namespace,
169+
table,
170+
System.nanoTime(),
171+
outputFormat.toString().toLowerCase())
172+
: outputFileName;
173+
174+
if (StringUtils.isBlank(outputDirectory)) {
175+
return Paths.get("").toAbsolutePath().resolve(fileName).toAbsolutePath().toString();
176+
} else {
177+
return Paths.get(outputDirectory).resolve(fileName).toAbsolutePath().toString();
68178
}
69179
}
180+
181+
/**
182+
* Convert ColumnKeyValue list to a key
183+
*
184+
* @param keyValueList key value list
185+
* @param tableMetadata table metadata
186+
* @return key
187+
* @throws ColumnParsingException if any error occur during parsing column value
188+
*/
189+
private Key getKeysFromList(List<ColumnKeyValue> keyValueList, TableMetadata tableMetadata)
190+
throws ColumnParsingException {
191+
return KeyUtils.parseMultipleKeyValues(keyValueList, tableMetadata);
192+
}
193+
194+
/**
195+
* Convert ColumnKeyValue to a key
196+
*
197+
* @param keyValue key value
198+
* @param tableMetadata table metadata
199+
* @return key
200+
* @throws KeyParsingException if any error occur during decoding key
201+
*/
202+
private Key getKey(
203+
ColumnKeyValue keyValue, String namespace, String table, TableMetadata tableMetadata)
204+
throws KeyParsingException {
205+
return KeyUtils.parseKeyValue(keyValue, namespace, table, tableMetadata);
206+
}
70207
}

0 commit comments

Comments
 (0)