Skip to content

Commit 08a8d28

Browse files
inv-jishnuypeckstadtbrfrn169
committed
Add import command for data loader CLI (#2618)
Co-authored-by: Peckstadt Yves <[email protected]> Co-authored-by: Toshihiro Suzuki <[email protected]>
1 parent e451f22 commit 08a8d28

File tree

3 files changed

+520
-2
lines changed

3 files changed

+520
-2
lines changed
Lines changed: 295 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,308 @@
11
package com.scalar.db.dataloader.cli.command.dataimport;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.scalar.db.api.DistributedStorageAdmin;
5+
import com.scalar.db.api.TableMetadata;
6+
import com.scalar.db.common.error.CoreError;
7+
import com.scalar.db.dataloader.core.FileFormat;
8+
import com.scalar.db.dataloader.core.ScalarDbMode;
9+
import com.scalar.db.dataloader.core.dataimport.ImportManager;
10+
import com.scalar.db.dataloader.core.dataimport.ImportOptions;
11+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
12+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
13+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbStorageManager;
14+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDbTransactionManager;
15+
import com.scalar.db.dataloader.core.dataimport.log.ImportLoggerConfig;
16+
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
17+
import com.scalar.db.dataloader.core.dataimport.log.SingleFileImportLogger;
18+
import com.scalar.db.dataloader.core.dataimport.log.SplitByDataChunkImportLogger;
19+
import com.scalar.db.dataloader.core.dataimport.log.writer.DefaultLogWriterFactory;
20+
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
21+
import com.scalar.db.dataloader.core.dataimport.processor.DefaultImportProcessorFactory;
22+
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
23+
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataException;
24+
import com.scalar.db.dataloader.core.tablemetadata.TableMetadataService;
25+
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
26+
import com.scalar.db.service.StorageFactory;
27+
import com.scalar.db.service.TransactionFactory;
28+
import java.io.BufferedReader;
29+
import java.io.File;
30+
import java.io.IOException;
31+
import java.nio.charset.Charset;
32+
import java.nio.file.Files;
33+
import java.nio.file.Path;
34+
import java.nio.file.Paths;
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
import java.util.Optional;
338
import java.util.concurrent.Callable;
39+
import org.apache.commons.lang3.StringUtils;
440
import picocli.CommandLine;
41+
import picocli.CommandLine.Model.CommandSpec;
42+
import picocli.CommandLine.ParameterException;
43+
import picocli.CommandLine.Spec;
544

645
@CommandLine.Command(name = "import", description = "Import data into a ScalarDB table")
746
public class ImportCommand extends ImportCommandOptions implements Callable<Integer> {
8-
@CommandLine.Spec CommandLine.Model.CommandSpec spec;
47+
48+
/** Spec injected by PicoCli */
49+
@Spec CommandSpec spec;
950

1051
@Override
1152
public Integer call() throws Exception {
53+
validateImportTarget(controlFilePath, namespace, tableName);
54+
validateLogDirectory(logDirectory);
55+
ControlFile controlFile = parseControlFileFromPath(controlFilePath).orElse(null);
56+
ImportOptions importOptions = createImportOptions(controlFile);
57+
ImportLoggerConfig config =
58+
ImportLoggerConfig.builder()
59+
.logDirectoryPath(logDirectory)
60+
.isLogRawSourceRecordsEnabled(importOptions.isLogRawRecord())
61+
.isLogSuccessRecordsEnabled(importOptions.isLogSuccessRecords())
62+
.prettyPrint(prettyPrint)
63+
.build();
64+
LogWriterFactory logWriterFactory = createLogWriterFactory(config);
65+
Map<String, TableMetadata> tableMetadataMap =
66+
createTableMetadataMap(controlFile, namespace, tableName);
67+
try (BufferedReader reader =
68+
Files.newBufferedReader(Paths.get(sourceFilePath), Charset.defaultCharset())) {
69+
ImportManager importManager =
70+
createImportManager(importOptions, tableMetadataMap, reader, logWriterFactory, config);
71+
importManager.startImport();
72+
}
1273
return 0;
1374
}
75+
76+
/**
77+
* Create LogWriterFactory object
78+
*
79+
* @return LogWriterFactory object
80+
*/
81+
private LogWriterFactory createLogWriterFactory(ImportLoggerConfig config) {
82+
return new DefaultLogWriterFactory(config);
83+
}
84+
85+
/**
86+
* Create TableMetadata Map from provided controlfile/ namespace, table name
87+
*
88+
* @param controlFile control file
89+
* @param namespace Namespace
90+
* @param tableName Single table name
91+
* @return {@code Map<String, TableMetadata>} a table metadata map
92+
* @throws ParameterException if one of the argument values is wrong
93+
*/
94+
private Map<String, TableMetadata> createTableMetadataMap(
95+
ControlFile controlFile, String namespace, String tableName)
96+
throws IOException, TableMetadataException {
97+
File configFile = new File(configFilePath);
98+
StorageFactory storageFactory = StorageFactory.create(configFile);
99+
try (DistributedStorageAdmin storageAdmin = storageFactory.getStorageAdmin()) {
100+
TableMetadataService tableMetadataService = new TableMetadataService(storageAdmin);
101+
Map<String, TableMetadata> tableMetadataMap = new HashMap<>();
102+
if (controlFile != null) {
103+
for (ControlFileTable table : controlFile.getTables()) {
104+
tableMetadataMap.put(
105+
TableMetadataUtil.getTableLookupKey(table.getNamespace(), table.getTable()),
106+
tableMetadataService.getTableMetadata(table.getNamespace(), table.getTable()));
107+
}
108+
} else {
109+
tableMetadataMap.put(
110+
TableMetadataUtil.getTableLookupKey(namespace, tableName),
111+
tableMetadataService.getTableMetadata(namespace, tableName));
112+
}
113+
return tableMetadataMap;
114+
}
115+
}
116+
117+
/**
118+
* Create ImportManager object from data
119+
*
120+
* @param importOptions import options
121+
* @param tableMetadataMap table metadata map
122+
* @param reader buffered reader with source data
123+
* @param logWriterFactory log writer factory object
124+
* @param config import logging config
125+
* @return ImportManager object
126+
*/
127+
private ImportManager createImportManager(
128+
ImportOptions importOptions,
129+
Map<String, TableMetadata> tableMetadataMap,
130+
BufferedReader reader,
131+
LogWriterFactory logWriterFactory,
132+
ImportLoggerConfig config)
133+
throws IOException {
134+
File configFile = new File(configFilePath);
135+
ImportProcessorFactory importProcessorFactory = new DefaultImportProcessorFactory();
136+
ImportManager importManager;
137+
if (scalarDbMode == ScalarDbMode.TRANSACTION) {
138+
ScalarDbTransactionManager scalarDbTransactionManager =
139+
new ScalarDbTransactionManager(TransactionFactory.create(configFile));
140+
importManager =
141+
new ImportManager(
142+
tableMetadataMap,
143+
reader,
144+
importOptions,
145+
importProcessorFactory,
146+
ScalarDbMode.TRANSACTION,
147+
null,
148+
scalarDbTransactionManager.getDistributedTransactionManager());
149+
} else {
150+
ScalarDbStorageManager scalarDbStorageManager =
151+
new ScalarDbStorageManager(StorageFactory.create(configFile));
152+
importManager =
153+
new ImportManager(
154+
tableMetadataMap,
155+
reader,
156+
importOptions,
157+
importProcessorFactory,
158+
ScalarDbMode.STORAGE,
159+
scalarDbStorageManager.getDistributedStorage(),
160+
null);
161+
}
162+
if (importOptions.getLogMode().equals(LogMode.SPLIT_BY_DATA_CHUNK)) {
163+
importManager.addListener(new SplitByDataChunkImportLogger(config, logWriterFactory));
164+
} else {
165+
importManager.addListener(new SingleFileImportLogger(config, logWriterFactory));
166+
}
167+
return importManager;
168+
}
169+
170+
/**
171+
* Validate import targets
172+
*
173+
* @param controlFilePath control file path
174+
* @param namespace Namespace
175+
* @param tableName Single table name
176+
* @throws ParameterException if one of the argument values is wrong
177+
*/
178+
private void validateImportTarget(String controlFilePath, String namespace, String tableName) {
179+
// Throw an error if there was no clear imports target specified
180+
if (StringUtils.isBlank(controlFilePath)
181+
&& (StringUtils.isBlank(namespace) || StringUtils.isBlank(tableName))) {
182+
throw new ParameterException(
183+
spec.commandLine(), CoreError.DATA_LOADER_IMPORT_TARGET_MISSING.buildMessage());
184+
}
185+
186+
// Make sure the control file exists when a path is provided
187+
if (!StringUtils.isBlank(controlFilePath)) {
188+
Path path = Paths.get(controlFilePath);
189+
if (!Files.exists(path)) {
190+
throw new ParameterException(
191+
spec.commandLine(),
192+
CoreError.DATA_LOADER_MISSING_IMPORT_FILE.buildMessage(
193+
controlFilePath, FILE_OPTION_NAME_LONG_FORMAT));
194+
}
195+
}
196+
}
197+
198+
/**
199+
* Validate log directory path
200+
*
201+
* @param logDirectory log directory path
202+
* @throws ParameterException if the path is invalid
203+
*/
204+
private void validateLogDirectory(String logDirectory) throws ParameterException {
205+
Path logDirectoryPath;
206+
if (!StringUtils.isBlank(logDirectory)) {
207+
// User-provided log directory via CLI argument
208+
logDirectoryPath = Paths.get(logDirectory);
209+
210+
if (Files.exists(logDirectoryPath)) {
211+
// Check if the provided directory is writable
212+
if (!Files.isWritable(logDirectoryPath)) {
213+
throw new ParameterException(
214+
spec.commandLine(),
215+
CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage(
216+
logDirectoryPath.toAbsolutePath()));
217+
}
218+
} else {
219+
// Create the log directory if it doesn't exist
220+
try {
221+
Files.createDirectories(logDirectoryPath);
222+
} catch (IOException e) {
223+
throw new ParameterException(
224+
spec.commandLine(),
225+
CoreError.DATA_LOADER_LOG_DIRECTORY_CREATION_FAILED.buildMessage(
226+
logDirectoryPath.toAbsolutePath()));
227+
}
228+
}
229+
return;
230+
}
231+
232+
// Use the current working directory as the log directory
233+
logDirectoryPath = Paths.get(System.getProperty("user.dir"));
234+
235+
// Check if the current working directory is writable
236+
if (!Files.isWritable(logDirectoryPath)) {
237+
throw new ParameterException(
238+
spec.commandLine(),
239+
CoreError.DATA_LOADER_LOG_DIRECTORY_WRITE_ACCESS_DENIED.buildMessage(
240+
logDirectoryPath.toAbsolutePath()));
241+
}
242+
}
243+
244+
/**
245+
* Generate control file from a valid control file path
246+
*
247+
* @param controlFilePath control directory path
248+
* @return {@code Optional<ControlFile>} generated control file object
249+
* @throws ParameterException if the path is invalid
250+
*/
251+
private Optional<ControlFile> parseControlFileFromPath(String controlFilePath) {
252+
if (StringUtils.isBlank(controlFilePath)) {
253+
return Optional.empty();
254+
}
255+
try {
256+
ObjectMapper objectMapper = new ObjectMapper();
257+
ControlFile controlFile =
258+
objectMapper.readValue(new File(controlFilePath), ControlFile.class);
259+
return Optional.of(controlFile);
260+
} catch (IOException e) {
261+
throw new ParameterException(
262+
spec.commandLine(),
263+
CoreError.DATA_LOADER_INVALID_CONTROL_FILE.buildMessage(controlFilePath));
264+
}
265+
}
266+
267+
/**
268+
* Generate import options object from provided cli parameter data
269+
*
270+
* @param controlFile control file
271+
* @return ImportOptions generated import options object
272+
*/
273+
private ImportOptions createImportOptions(ControlFile controlFile) {
274+
ImportOptions.ImportOptionsBuilder builder =
275+
ImportOptions.builder()
276+
.fileFormat(sourceFileFormat)
277+
.requireAllColumns(requireAllColumns)
278+
.prettyPrint(prettyPrint)
279+
.controlFile(controlFile)
280+
.controlFileValidationLevel(controlFileValidation)
281+
.logRawRecord(logRawRecord)
282+
.logSuccessRecords(logSuccessRecords)
283+
.ignoreNullValues(ignoreNullValues)
284+
.namespace(namespace)
285+
.dataChunkSize(dataChunkSize)
286+
.transactionBatchSize(transactionSize)
287+
.maxThreads(maxThreads)
288+
.dataChunkQueueSize(dataChunkQueueSize)
289+
.tableName(tableName);
290+
291+
// Import mode
292+
if (importMode != null) {
293+
builder.importMode(importMode);
294+
}
295+
if (!splitLogMode) {
296+
builder.logMode(LogMode.SINGLE_FILE);
297+
}
298+
299+
// CSV options
300+
if (sourceFileFormat.equals(FileFormat.CSV)) {
301+
builder.delimiter(delimiter);
302+
if (!StringUtils.isBlank(customHeaderRow)) {
303+
builder.customHeaderRow(customHeaderRow);
304+
}
305+
}
306+
return builder.build();
307+
}
14308
}

0 commit comments

Comments
 (0)