Skip to content

Commit 179991b

Browse files
Add export tasks (#2450)
Co-authored-by: Peckstadt Yves <[email protected]>
1 parent 9b951f9 commit 179991b

21 files changed

+1607
-5
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

+8
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,14 @@ public enum CoreError implements ScalarDbError {
816816
""),
817817
DATA_LOADER_MISSING_COLUMN(
818818
Category.USER_ERROR, "0176", "Missing field or column mapping for %s", "", ""),
819+
DATA_LOADER_VALUE_TO_STRING_CONVERSION_FAILED(
820+
Category.USER_ERROR,
821+
"0177",
822+
"Something went wrong while converting the ScalarDB values to strings. The table metadata and Value datatype probably do not match. Details: %s",
823+
"",
824+
""),
825+
DATA_LOADER_FILE_FORMAT_NOT_SUPPORTED(
826+
Category.USER_ERROR, "0178", "The provided file format is not supported : %s", "", ""),
819827

820828
//
821829
// Errors for the concurrency error category
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.scalar.db.dataloader.core;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
6+
7+
public class DataLoaderObjectMapper extends ObjectMapper {
8+
9+
public DataLoaderObjectMapper() {
10+
super();
11+
this.setSerializationInclusion(JsonInclude.Include.NON_NULL);
12+
this.registerModule(new JavaTimeModule());
13+
}
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.scalar.db.dataloader.core.dataexport;
2+
3+
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.TableMetadata;
5+
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
6+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
7+
import com.scalar.db.dataloader.core.util.CsvUtil;
8+
import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils;
9+
import java.io.IOException;
10+
import java.io.Writer;
11+
import java.util.Iterator;
12+
import java.util.List;
13+
14+
public class CsvExportManager extends ExportManager {
15+
public CsvExportManager(
16+
DistributedStorage storage, ScalarDBDao dao, ProducerTaskFactory producerTaskFactory) {
17+
super(storage, dao, producerTaskFactory);
18+
}
19+
20+
/**
21+
* Create and add header part for the export file
22+
*
23+
* @param exportOptions Export options for the data export
24+
* @param tableMetadata Metadata of the table to export
25+
* @param writer File writer object
26+
* @throws IOException If any IO exception occurs
27+
*/
28+
@Override
29+
void processHeader(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer)
30+
throws IOException {
31+
String header = createCsvHeaderRow(exportOptions, tableMetadata);
32+
writer.append(header);
33+
writer.flush();
34+
}
35+
36+
/**
37+
* Create and add footer part for the export file
38+
*
39+
* @param exportOptions Export options for the data export
40+
* @param tableMetadata Metadata of the table to export
41+
* @param writer File writer object
42+
* @throws IOException If any IO exception occurs
43+
*/
44+
@Override
45+
void processFooter(ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer)
46+
throws IOException {}
47+
48+
/**
49+
* To generate the header row of CSV export file
50+
*
51+
* @param exportOptions export options
52+
* @param tableMetadata metadata of the table
53+
* @return generated CSV header row
54+
*/
55+
private String createCsvHeaderRow(ExportOptions exportOptions, TableMetadata tableMetadata) {
56+
StringBuilder headerRow = new StringBuilder();
57+
List<String> projections = exportOptions.getProjectionColumns();
58+
Iterator<String> iterator = tableMetadata.getColumnNames().iterator();
59+
while (iterator.hasNext()) {
60+
String columnName = iterator.next();
61+
if (shouldIgnoreColumn(
62+
exportOptions.isIncludeTransactionMetadata(), columnName, tableMetadata, projections)) {
63+
continue;
64+
}
65+
headerRow.append(columnName);
66+
if (iterator.hasNext()) {
67+
headerRow.append(exportOptions.getDelimiter());
68+
}
69+
}
70+
CsvUtil.removeTrailingDelimiter(headerRow, exportOptions.getDelimiter());
71+
headerRow.append("\n");
72+
return headerRow.toString();
73+
}
74+
75+
/**
76+
* To ignore a column or not based on conditions such as if it is a metadata column or if it is
77+
* not include in selected projections
78+
*
79+
* @param isIncludeTransactionMetadata to include transaction metadata or not
80+
* @param columnName column name
81+
* @param tableMetadata table metadata
82+
* @param projections selected columns for projection
83+
* @return ignore the column or not
84+
*/
85+
private boolean shouldIgnoreColumn(
86+
boolean isIncludeTransactionMetadata,
87+
String columnName,
88+
TableMetadata tableMetadata,
89+
List<String> projections) {
90+
return (!isIncludeTransactionMetadata
91+
&& ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata))
92+
|| (!projections.isEmpty() && !projections.contains(columnName));
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
package com.scalar.db.dataloader.core.dataexport;
2+
3+
import com.scalar.db.api.DistributedStorage;
4+
import com.scalar.db.api.Result;
5+
import com.scalar.db.api.Scanner;
6+
import com.scalar.db.api.TableMetadata;
7+
import com.scalar.db.dataloader.core.FileFormat;
8+
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTask;
9+
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
10+
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidationException;
11+
import com.scalar.db.dataloader.core.dataexport.validation.ExportOptionsValidator;
12+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
13+
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDaoException;
14+
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
15+
import com.scalar.db.io.DataType;
16+
import java.io.BufferedWriter;
17+
import java.io.IOException;
18+
import java.io.Writer;
19+
import java.util.ArrayList;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import lombok.RequiredArgsConstructor;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
@RequiredArgsConstructor
32+
public abstract class ExportManager {
33+
private static final Logger logger = LoggerFactory.getLogger(ExportManager.class);
34+
35+
private final DistributedStorage storage;
36+
private final ScalarDBDao dao;
37+
private final ProducerTaskFactory producerTaskFactory;
38+
private final Object lock = new Object();
39+
40+
/**
41+
* Create and add header part for the export file
42+
*
43+
* @param exportOptions Export options for the data export
44+
* @param tableMetadata Metadata of the table to export
45+
* @param writer File writer object
46+
* @throws IOException If any IO exception occurs
47+
*/
48+
abstract void processHeader(
49+
ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) throws IOException;
50+
51+
/**
52+
* Create and add footer part for the export file
53+
*
54+
* @param exportOptions Export options for the data export
55+
* @param tableMetadata Metadata of the table to export
56+
* @param writer File writer object
57+
* @throws IOException If any IO exception occurs
58+
*/
59+
abstract void processFooter(
60+
ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) throws IOException;
61+
/**
62+
* Starts the export process
63+
*
64+
* @param exportOptions Export options
65+
* @param tableMetadata Metadata for a single ScalarDB table
66+
* @param writer Writer to write the exported data
67+
*/
68+
public ExportReport startExport(
69+
ExportOptions exportOptions, TableMetadata tableMetadata, Writer writer) {
70+
ExportReport exportReport = new ExportReport();
71+
try {
72+
validateExportOptions(exportOptions, tableMetadata);
73+
Map<String, DataType> dataTypeByColumnName = tableMetadata.getColumnDataTypes();
74+
handleTransactionMetadata(exportOptions, tableMetadata);
75+
processHeader(exportOptions, tableMetadata, writer);
76+
77+
int maxThreadCount =
78+
exportOptions.getMaxThreadCount() == 0
79+
? Runtime.getRuntime().availableProcessors()
80+
: exportOptions.getMaxThreadCount();
81+
ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount);
82+
83+
BufferedWriter bufferedWriter = new BufferedWriter(writer);
84+
boolean isJson = exportOptions.getOutputFileFormat() == FileFormat.JSON;
85+
86+
try (Scanner scanner = createScanner(exportOptions, dao, storage)) {
87+
88+
Iterator<Result> iterator = scanner.iterator();
89+
AtomicBoolean isFirstBatch = new AtomicBoolean(true);
90+
91+
while (iterator.hasNext()) {
92+
List<Result> dataChunk = fetchDataChunk(iterator, exportOptions.getDataChunkSize());
93+
executorService.submit(
94+
() ->
95+
processDataChunk(
96+
exportOptions,
97+
tableMetadata,
98+
dataTypeByColumnName,
99+
dataChunk,
100+
bufferedWriter,
101+
isJson,
102+
isFirstBatch,
103+
exportReport));
104+
}
105+
executorService.shutdown();
106+
if (executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
107+
logger.info("All tasks completed");
108+
} else {
109+
logger.error("Timeout occurred while waiting for tasks to complete");
110+
// TODO: handle this
111+
}
112+
processFooter(exportOptions, tableMetadata, bufferedWriter);
113+
} catch (InterruptedException | IOException e) {
114+
logger.error("Error during export: {}", e.getMessage());
115+
} finally {
116+
bufferedWriter.flush();
117+
}
118+
} catch (ExportOptionsValidationException | IOException | ScalarDBDaoException e) {
119+
logger.error("Error during export: {}", e.getMessage());
120+
}
121+
return exportReport;
122+
}
123+
124+
/**
125+
* To process result data chunk
126+
*
127+
* @param exportOptions export options
128+
* @param tableMetadata metadata of the table
129+
* @param dataTypeByColumnName map of columns and their data types
130+
* @param dataChunk a list with result data
131+
* @param bufferedWriter writer object
132+
* @param isJson if data format is json or not
133+
* @param isFirstBatch is the data going to be process is the first batch or not
134+
* @param exportReport export report which will be updated once the data chunk is processed
135+
*/
136+
private void processDataChunk(
137+
ExportOptions exportOptions,
138+
TableMetadata tableMetadata,
139+
Map<String, DataType> dataTypeByColumnName,
140+
List<Result> dataChunk,
141+
BufferedWriter bufferedWriter,
142+
boolean isJson,
143+
AtomicBoolean isFirstBatch,
144+
ExportReport exportReport) {
145+
ProducerTask producerTask =
146+
producerTaskFactory.createProducerTask(
147+
exportOptions.getOutputFileFormat(),
148+
exportOptions.getProjectionColumns(),
149+
tableMetadata,
150+
dataTypeByColumnName);
151+
String dataChunkContent = producerTask.process(dataChunk);
152+
153+
try {
154+
synchronized (lock) {
155+
if (isJson && !isFirstBatch.getAndSet(false)) {
156+
bufferedWriter.write(",");
157+
}
158+
bufferedWriter.write(dataChunkContent);
159+
exportReport.updateExportedRowCount(dataChunk.size());
160+
}
161+
} catch (IOException e) {
162+
logger.error("Error while writing data chunk: {}", e.getMessage());
163+
}
164+
}
165+
166+
/**
167+
* To split result into batches
168+
*
169+
* @param iterator iterator which parse results
170+
* @param batchSize size of batch
171+
* @return a list of results split to batches
172+
*/
173+
private List<Result> fetchDataChunk(Iterator<Result> iterator, int batchSize) {
174+
List<Result> batch = new ArrayList<>();
175+
int count = 0;
176+
while (iterator.hasNext() && count < batchSize) {
177+
batch.add(iterator.next());
178+
count++;
179+
}
180+
return batch;
181+
}
182+
183+
/**
184+
* * To validate export options
185+
*
186+
* @param exportOptions export options
187+
* @param tableMetadata metadata of the table
188+
* @throws ExportOptionsValidationException thrown if any of the export option validation fails
189+
*/
190+
private void validateExportOptions(ExportOptions exportOptions, TableMetadata tableMetadata)
191+
throws ExportOptionsValidationException {
192+
ExportOptionsValidator.validate(exportOptions, tableMetadata);
193+
}
194+
195+
/**
196+
* To update projection columns of export options if include metadata options is enabled
197+
*
198+
* @param exportOptions export options
199+
* @param tableMetadata metadata of the table
200+
*/
201+
private void handleTransactionMetadata(ExportOptions exportOptions, TableMetadata tableMetadata) {
202+
if (exportOptions.isIncludeTransactionMetadata()
203+
&& !exportOptions.getProjectionColumns().isEmpty()) {
204+
List<String> projectionMetadata =
205+
TableMetadataUtil.populateProjectionsWithMetadata(
206+
tableMetadata, exportOptions.getProjectionColumns());
207+
exportOptions.setProjectionColumns(projectionMetadata);
208+
}
209+
}
210+
211+
/**
212+
* To create a scanner object
213+
*
214+
* @param exportOptions export options
215+
* @param dao ScalarDB dao object
216+
* @param storage distributed storage object
217+
* @return created scanner
218+
* @throws ScalarDBDaoException throws if any issue occurs in creating scanner object
219+
*/
220+
private Scanner createScanner(
221+
ExportOptions exportOptions, ScalarDBDao dao, DistributedStorage storage)
222+
throws ScalarDBDaoException {
223+
boolean isScanAll = exportOptions.getScanPartitionKey() == null;
224+
if (isScanAll) {
225+
return dao.createScanner(
226+
exportOptions.getNamespace(),
227+
exportOptions.getTableName(),
228+
exportOptions.getProjectionColumns(),
229+
exportOptions.getLimit(),
230+
storage);
231+
} else {
232+
return dao.createScanner(
233+
exportOptions.getNamespace(),
234+
exportOptions.getTableName(),
235+
exportOptions.getScanPartitionKey(),
236+
exportOptions.getScanRange(),
237+
exportOptions.getSortOrders(),
238+
exportOptions.getProjectionColumns(),
239+
exportOptions.getLimit(),
240+
storage);
241+
}
242+
}
243+
}

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataexport/ExportReport.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public long getExportedRowCount() {
2020
return exportedRowCount.sum();
2121
}
2222

23-
public void increaseExportedRowCount() {
24-
this.exportedRowCount.increment();
23+
public void updateExportedRowCount(long count) {
24+
this.exportedRowCount.add(count);
2525
}
2626
}

0 commit comments

Comments
 (0)