Skip to content

Commit 9f2c25a

Browse files
Add dtos and other classes for task (#2446)
Co-authored-by: Peckstadt Yves <[email protected]>
1 parent 90cbc4d commit 9f2c25a

File tree

8 files changed

+386
-0
lines changed

8 files changed

+386
-0
lines changed

core/src/main/java/com/scalar/db/common/error/CoreError.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,20 @@ public enum CoreError implements ScalarDbError {
742742
"Duplicated data mappings found for column '%s' in table '%s'",
743743
"",
744744
""),
745+
DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN(
746+
Category.USER_ERROR,
747+
"0174",
748+
"Missing required field or column mapping for clustering key %s",
749+
"",
750+
""),
751+
DATA_LOADER_MISSING_PARTITION_KEY_COLUMN(
752+
Category.USER_ERROR,
753+
"0175",
754+
"Missing required field or column mapping for partition key %s",
755+
"",
756+
""),
757+
DATA_LOADER_MISSING_COLUMN(
758+
Category.USER_ERROR, "0176", "Missing field or column mapping for %s", "", ""),
745759

746760
//
747761
// Errors for the concurrency error category
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.scalar.db.dataloader.core.dataimport;
2+
3+
import com.scalar.db.dataloader.core.FileFormat;
4+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
5+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
6+
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
7+
import lombok.Builder;
8+
import lombok.Data;
9+
10+
/** Import options to import data into one or more ScalarDB tables */
11+
@Builder
12+
@Data
13+
public class ImportOptions {
14+
15+
@Builder.Default private final ImportMode importMode = ImportMode.UPSERT;
16+
@Builder.Default private final boolean requireAllColumns = false;
17+
@Builder.Default private final FileFormat fileFormat = FileFormat.JSON;
18+
@Builder.Default private final boolean prettyPrint = false;
19+
@Builder.Default private final boolean ignoreNullValues = false;
20+
@Builder.Default private final LogMode logMode = LogMode.SPLIT_BY_DATA_CHUNK;
21+
22+
@Builder.Default
23+
private final ControlFileValidationLevel controlFileValidationLevel =
24+
ControlFileValidationLevel.MAPPED;
25+
26+
@Builder.Default private final char delimiter = ',';
27+
28+
@Builder.Default private final boolean logSuccessRecords = false;
29+
@Builder.Default private final boolean logRawRecord = false;
30+
31+
private final int dataChunkSize;
32+
private final int transactionBatchSize;
33+
private final ControlFile controlFile;
34+
private final String namespace;
35+
private final String tableName;
36+
private final int maxThreads;
37+
private final String customHeaderRow;
38+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.scalar.db.dataloader.core.dataimport.log;
2+
3+
/** Log modes available for import logging */
4+
public enum LogMode {
5+
SINGLE_FILE,
6+
SPLIT_BY_DATA_CHUNK
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.mapping;
2+
3+
import com.fasterxml.jackson.databind.node.ObjectNode;
4+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
5+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
6+
7+
public class ImportDataMapping {
8+
9+
/**
10+
* * Update the source data replace the source column name with the target column name according
11+
* to control file table data
12+
*
13+
* @param source source data
14+
* @param controlFileTable control file table to map source data
15+
*/
16+
public static void apply(ObjectNode source, ControlFileTable controlFileTable) {
17+
// Copy the source field data to the target column if missing
18+
for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) {
19+
String sourceField = mapping.getSourceField();
20+
String targetColumn = mapping.getTargetColumn();
21+
22+
if (source.has(sourceField) && !source.has(targetColumn)) {
23+
source.set(targetColumn, source.get(sourceField));
24+
source.remove(sourceField);
25+
}
26+
}
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.validation;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collections;
5+
import java.util.HashSet;
6+
import java.util.List;
7+
import java.util.Set;
8+
import javax.annotation.concurrent.Immutable;
9+
10+
/** The validation result for a data source record */
11+
@Immutable
12+
public final class ImportSourceRecordValidationResult {
13+
14+
private final List<String> errorMessages;
15+
private final Set<String> columnsWithErrors;
16+
17+
/** Constructor */
18+
public ImportSourceRecordValidationResult() {
19+
this.errorMessages = new ArrayList<>();
20+
this.columnsWithErrors = new HashSet<>();
21+
}
22+
23+
/**
24+
* Add a validation error message for a column. Also marking the column as containing an error.
25+
*
26+
* @param columnName column name
27+
* @param errorMessage error message
28+
*/
29+
public void addErrorMessage(String columnName, String errorMessage) {
30+
this.columnsWithErrors.add(columnName);
31+
this.errorMessages.add(errorMessage);
32+
}
33+
34+
/** @return Immutable list of validation error messages */
35+
public List<String> getErrorMessages() {
36+
return Collections.unmodifiableList(this.errorMessages);
37+
}
38+
39+
/** @return Immutable set of columns that had errors */
40+
public Set<String> getColumnsWithErrors() {
41+
return Collections.unmodifiableSet(this.columnsWithErrors);
42+
}
43+
44+
/** @return Validation is valid or not */
45+
public boolean isValid() {
46+
return this.errorMessages.isEmpty();
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.validation;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.scalar.db.api.TableMetadata;
5+
import com.scalar.db.common.error.CoreError;
6+
import com.scalar.db.dataloader.core.DatabaseKeyType;
7+
import com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils;
8+
import java.util.Set;
9+
import lombok.AccessLevel;
10+
import lombok.NoArgsConstructor;
11+
12+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
13+
public class ImportSourceRecordValidator {
14+
15+
/**
16+
* Create list for validation error messages. Validate everything and not return when one single
17+
* error is found. Avoiding trial and error imports where every time a new error appears
18+
*
19+
* @param partitionKeyNames List of partition keys in table
20+
* @param clusteringKeyNames List of clustering keys in table
21+
* @param columnNames List of all column names in table
22+
* @param sourceRecord source data
23+
* @param allColumnsRequired If true treat missing columns as an error
24+
* @return Source record validation result
25+
*/
26+
public static ImportSourceRecordValidationResult validateSourceRecord(
27+
Set<String> partitionKeyNames,
28+
Set<String> clusteringKeyNames,
29+
Set<String> columnNames,
30+
JsonNode sourceRecord,
31+
boolean allColumnsRequired,
32+
TableMetadata tableMetadata) {
33+
ImportSourceRecordValidationResult validationResult = new ImportSourceRecordValidationResult();
34+
35+
// check if partition keys are found
36+
checkMissingKeys(DatabaseKeyType.PARTITION, partitionKeyNames, sourceRecord, validationResult);
37+
38+
// check if clustering keys are found
39+
checkMissingKeys(
40+
DatabaseKeyType.CLUSTERING, clusteringKeyNames, sourceRecord, validationResult);
41+
42+
// Check if the record is missing any columns
43+
if (allColumnsRequired) {
44+
checkMissingColumns(
45+
sourceRecord,
46+
columnNames,
47+
validationResult,
48+
validationResult.getColumnsWithErrors(),
49+
tableMetadata);
50+
}
51+
52+
return validationResult;
53+
}
54+
55+
/**
56+
* Check if the required keys are found in the data file.
57+
*
58+
* @param keyType Type of key to validate
59+
* @param keyColumnNames List of required column names
60+
* @param sourceRecord source data
61+
* @param validationResult Source record validation result
62+
*/
63+
public static void checkMissingKeys(
64+
DatabaseKeyType keyType,
65+
Set<String> keyColumnNames,
66+
JsonNode sourceRecord,
67+
ImportSourceRecordValidationResult validationResult) {
68+
for (String columnName : keyColumnNames) {
69+
if (!sourceRecord.has(columnName)) {
70+
String errorMessageFormat =
71+
keyType == DatabaseKeyType.PARTITION
72+
? CoreError.DATA_LOADER_MISSING_PARTITION_KEY_COLUMN.buildMessage(columnName)
73+
: CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage(columnName);
74+
validationResult.addErrorMessage(columnName, errorMessageFormat);
75+
}
76+
}
77+
}
78+
79+
/**
80+
* Make sure the json object is not missing any columns. Error added to validation errors lists
81+
*
82+
* @param sourceRecord Source json object
83+
* @param columnNames List of column names for a table
84+
* @param validationResult Source record validation result
85+
* @param ignoreColumns Columns that can be ignored in the check
86+
*/
87+
public static void checkMissingColumns(
88+
JsonNode sourceRecord,
89+
Set<String> columnNames,
90+
ImportSourceRecordValidationResult validationResult,
91+
Set<String> ignoreColumns,
92+
TableMetadata tableMetadata) {
93+
for (String columnName : columnNames) {
94+
// If the field is not a metadata column and is missing and should not be ignored
95+
if ((ignoreColumns == null || !ignoreColumns.contains(columnName))
96+
&& !ConsensusCommitUtils.isTransactionMetaColumn(columnName, tableMetadata)
97+
&& !sourceRecord.has(columnName)) {
98+
validationResult.addErrorMessage(
99+
columnName, CoreError.DATA_LOADER_MISSING_COLUMN.buildMessage(columnName));
100+
}
101+
}
102+
}
103+
104+
/**
105+
* Make sure the json object is not missing any columns. Error added to validation errors lists
106+
*
107+
* @param sourceRecord Source json object
108+
* @param columnNames List of column names for a table
109+
* @param validationResult Source record validation result
110+
*/
111+
public static void checkMissingColumns(
112+
JsonNode sourceRecord,
113+
Set<String> columnNames,
114+
ImportSourceRecordValidationResult validationResult,
115+
TableMetadata tableMetadata) {
116+
ImportSourceRecordValidator.checkMissingColumns(
117+
sourceRecord, columnNames, validationResult, null, tableMetadata);
118+
}
119+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.mapping;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
6+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
7+
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
8+
import java.util.ArrayList;
9+
import org.junit.jupiter.api.Assertions;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
13+
class ImportDataMappingTest {
14+
15+
ControlFileTable controlFilTable;
16+
17+
@BeforeEach
18+
void setup() {
19+
controlFilTable = new ControlFileTable("namespace", "table");
20+
ControlFileTableFieldMapping m1 = new ControlFileTableFieldMapping("source_id", "target_id");
21+
ControlFileTableFieldMapping m2 =
22+
new ControlFileTableFieldMapping("source_name", "target_name");
23+
ControlFileTableFieldMapping m3 =
24+
new ControlFileTableFieldMapping("source_email", "target_email");
25+
ArrayList<ControlFileTableFieldMapping> mappingArrayList = new ArrayList<>();
26+
mappingArrayList.add(m1);
27+
mappingArrayList.add(m2);
28+
mappingArrayList.add(m3);
29+
controlFilTable.getMappings().addAll(mappingArrayList);
30+
}
31+
32+
@Test
33+
void apply_withValidData_shouldUpdateSourceData() throws JsonProcessingException {
34+
ObjectMapper objectMapper = new ObjectMapper();
35+
ObjectNode source = objectMapper.createObjectNode();
36+
source.put("source_id", "111");
37+
source.put("source_name", "abc");
38+
source.put("source_email", "[email protected]");
39+
ImportDataMapping.apply(source, controlFilTable);
40+
// Assert changes
41+
Assertions.assertEquals("111", source.get("target_id").asText());
42+
Assertions.assertEquals("abc", source.get("target_name").asText());
43+
Assertions.assertEquals("[email protected]", source.get("target_email").asText());
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.scalar.db.dataloader.core.dataimport.task.validation;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.scalar.db.api.TableMetadata;
5+
import com.scalar.db.common.error.CoreError;
6+
import com.scalar.db.dataloader.core.UnitTestUtils;
7+
import java.util.HashSet;
8+
import java.util.Set;
9+
import org.junit.jupiter.api.Assertions;
10+
import org.junit.jupiter.api.Test;
11+
12+
class ImportSourceRecordValidatorTest {
13+
14+
TableMetadata mockMetadata = UnitTestUtils.createTestTableMetadata();
15+
16+
@Test
17+
void
18+
validateSourceRecord_withValidData_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() {
19+
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
20+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
21+
Set<String> columnNames = mockMetadata.getColumnNames();
22+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
23+
ImportSourceRecordValidationResult result =
24+
ImportSourceRecordValidator.validateSourceRecord(
25+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
26+
Assertions.assertTrue(result.getColumnsWithErrors().isEmpty());
27+
}
28+
29+
@Test
30+
void
31+
validateSourceRecord_withValidDataWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithoutErrors() {
32+
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
33+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
34+
Set<String> columnNames = mockMetadata.getColumnNames();
35+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
36+
ImportSourceRecordValidationResult result =
37+
ImportSourceRecordValidator.validateSourceRecord(
38+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata);
39+
Assertions.assertTrue(result.getColumnsWithErrors().isEmpty());
40+
}
41+
42+
@Test
43+
void
44+
validateSourceRecord_withInValidPartitionKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
45+
Set<String> partitionKeyNames = new HashSet<>();
46+
partitionKeyNames.add("id1");
47+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
48+
Set<String> columnNames = mockMetadata.getColumnNames();
49+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
50+
ImportSourceRecordValidationResult result =
51+
ImportSourceRecordValidator.validateSourceRecord(
52+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
53+
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
54+
}
55+
56+
@Test
57+
void
58+
validateSourceRecord_withInValidPartitionKeyWithAllColumnsRequired_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
59+
Set<String> partitionKeyNames = new HashSet<>();
60+
partitionKeyNames.add("id1");
61+
Set<String> clusteringKeyNames = mockMetadata.getClusteringKeyNames();
62+
Set<String> columnNames = mockMetadata.getColumnNames();
63+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
64+
ImportSourceRecordValidationResult result =
65+
ImportSourceRecordValidator.validateSourceRecord(
66+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, true, mockMetadata);
67+
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
68+
Assertions.assertEquals(1, result.getErrorMessages().size());
69+
}
70+
71+
@Test
72+
void
73+
validateSourceRecord_withInValidClusteringKey_shouldReturnValidImportSourceRecordValidationResultWithErrors() {
74+
Set<String> partitionKeyNames = mockMetadata.getPartitionKeyNames();
75+
Set<String> clusteringKeyNames = new HashSet<>();
76+
clusteringKeyNames.add("id1");
77+
Set<String> columnNames = mockMetadata.getColumnNames();
78+
JsonNode sourceRecord = UnitTestUtils.getOutputDataWithoutMetadata();
79+
ImportSourceRecordValidationResult result =
80+
ImportSourceRecordValidator.validateSourceRecord(
81+
partitionKeyNames, clusteringKeyNames, columnNames, sourceRecord, false, mockMetadata);
82+
Assertions.assertFalse(result.getColumnsWithErrors().isEmpty());
83+
Assertions.assertEquals(
84+
CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage("id1"),
85+
result.getErrorMessages().get(0));
86+
}
87+
}

0 commit comments

Comments
 (0)