Skip to content

Commit b8d46c7

Browse files
committed
Add import log classes and utils (#2591)
1 parent d134a0a commit b8d46c7

12 files changed

+1559
-0
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package com.scalar.db.dataloader.core.dataimport.log;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.scalar.db.dataloader.core.Constants;
5+
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
6+
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
7+
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
8+
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter;
9+
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
10+
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
11+
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
12+
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
13+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
14+
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
15+
import java.io.IOException;
16+
import java.util.ArrayList;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import lombok.RequiredArgsConstructor;
20+
21+
/**
22+
* An abstract base class for logging import events during data loading operations. This class
23+
* implements the {@link ImportEventListener} interface and provides common functionality for
24+
* logging transaction batch results and managing event listeners. Concrete implementations should
25+
* define how to log transaction batches and handle errors.
26+
*/
27+
@RequiredArgsConstructor
28+
public abstract class AbstractImportLogger implements ImportEventListener {
29+
30+
/** Object mapper used for JSON serialization/deserialization. */
31+
protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
32+
33+
/** Configuration for the import logger. */
34+
protected final ImportLoggerConfig config;
35+
36+
/** Factory for creating log writers. */
37+
protected final LogWriterFactory logWriterFactory;
38+
39+
/** List of event listeners to be notified of import events. */
40+
protected final List<ImportEventListener> listeners = new ArrayList<>();
41+
42+
/**
43+
* Called when a data chunk import is started. Currently, this implementation does not log the
44+
* start of a data chunk.
45+
*
46+
* @param importDataChunkStatus the status of the data chunk being imported
47+
*/
48+
@Override
49+
public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) {
50+
// Currently we are not logging the start of a data chunk
51+
}
52+
53+
/**
54+
* Called when a transaction batch is started. Currently, this implementation does not log the
55+
* start of a transaction batch, but it notifies all registered listeners.
56+
*
57+
* @param batchStatus the status of the transaction batch being started
58+
*/
59+
@Override
60+
public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {
61+
// Currently we are not logging the start of a transaction batch
62+
notifyTransactionBatchStarted(batchStatus);
63+
}
64+
65+
/**
66+
* Called when a transaction batch is completed. This method logs the transaction batch result if
67+
* it should be logged based on the configuration, and notifies all registered listeners.
68+
*
69+
* @param batchResult the result of the completed transaction batch
70+
*/
71+
@Override
72+
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
73+
// skip logging success records if the configuration is set to skip
74+
if (shouldSkipLoggingSuccess(batchResult)) {
75+
return;
76+
}
77+
78+
logTransactionBatch(batchResult);
79+
notifyTransactionBatchCompleted(batchResult);
80+
}
81+
82+
/**
83+
* Logs a transaction batch result. This method should be implemented by concrete subclasses to
84+
* define how to log transaction batch results.
85+
*
86+
* @param batchResult the transaction batch result to log
87+
*/
88+
protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult);
89+
90+
/**
91+
* Determines whether logging of a successful transaction batch should be skipped. Logging is
92+
* skipped if the batch was successful and the configuration specifies not to log success records.
93+
*
94+
* @param batchResult the transaction batch result to check
95+
* @return true if logging should be skipped, false otherwise
96+
*/
97+
protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) {
98+
return batchResult.isSuccess() && !config.isLogSuccessRecordsEnabled();
99+
}
100+
101+
/**
102+
* Creates a filtered JSON representation of a transaction batch result. This method filters out
103+
* raw record data if the configuration specifies not to log raw source records.
104+
*
105+
* @param batchResult the transaction batch result to convert to JSON
106+
* @return a JsonNode representing the filtered transaction batch result
107+
*/
108+
protected JsonNode createFilteredTransactionBatchLogJsonNode(
109+
ImportTransactionBatchResult batchResult) {
110+
111+
// If the batch result does not contain any records, return the batch result as is
112+
if (batchResult.getRecords() == null) {
113+
return OBJECT_MAPPER.valueToTree(batchResult);
114+
}
115+
116+
// Create a new list to store the modified import task results
117+
List<ImportTaskResult> modifiedRecords = new ArrayList<>();
118+
119+
// Loop over the records in the batchResult
120+
for (ImportTaskResult taskResult : batchResult.getRecords()) {
121+
// Create a new ImportTaskResult and not add the raw record yet
122+
List<ImportTargetResult> targetResults =
123+
batchResult.isSuccess()
124+
? taskResult.getTargets()
125+
: updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets());
126+
ImportTaskResult.ImportTaskResultBuilder builder =
127+
ImportTaskResult.builder()
128+
.rowNumber(taskResult.getRowNumber())
129+
.targets(targetResults)
130+
.dataChunkId(taskResult.getDataChunkId());
131+
132+
// Adds the raw record if the configuration is set to log raw source data
133+
if (config.isLogRawSourceRecordsEnabled()) {
134+
builder.rawRecord(taskResult.getRawRecord());
135+
}
136+
ImportTaskResult modifiedTaskResult = builder.build();
137+
138+
// Add the modified task result to the list
139+
modifiedRecords.add(modifiedTaskResult);
140+
}
141+
142+
// Create a new transaction batch result with the modified import task results
143+
ImportTransactionBatchResult modifiedBatchResult =
144+
ImportTransactionBatchResult.builder()
145+
.dataChunkId(batchResult.getDataChunkId())
146+
.transactionBatchId(batchResult.getTransactionBatchId())
147+
.transactionId(batchResult.getTransactionId())
148+
.records(modifiedRecords)
149+
.errors(batchResult.getErrors())
150+
.success(batchResult.isSuccess())
151+
.build();
152+
153+
// Convert the modified batch result to a JsonNode
154+
return OBJECT_MAPPER.valueToTree(modifiedBatchResult);
155+
}
156+
157+
/**
158+
* Safely closes a log writer. If an IOException occurs during closing, it logs the error using
159+
* the {@link #logError} method.
160+
*
161+
* @param logWriter the log writer to close, may be null
162+
*/
163+
protected void closeLogWriter(LogWriter logWriter) {
164+
if (logWriter != null) {
165+
try {
166+
logWriter.close();
167+
} catch (IOException e) {
168+
logError("Failed to close a log writer", e);
169+
}
170+
}
171+
}
172+
173+
/**
174+
* Logs an error message and exception. This method should be implemented by concrete subclasses
175+
* to define how to log errors.
176+
*
177+
* @param errorMessage the error message to log
178+
* @param e the exception that caused the error
179+
*/
180+
protected abstract void logError(String errorMessage, Exception e);
181+
182+
/**
183+
* Creates a log writer for the specified log file path.
184+
*
185+
* @param logFilePath the path to the log file
186+
* @return a new log writer
187+
* @throws IOException if an I/O error occurs while creating the log writer
188+
*/
189+
protected LogWriter createLogWriter(String logFilePath) throws IOException {
190+
return logWriterFactory.createLogWriter(logFilePath);
191+
}
192+
193+
/**
194+
* Notifies all registered listeners that a transaction batch has started.
195+
*
196+
* @param status the status of the transaction batch that has started
197+
*/
198+
private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) {
199+
for (ImportEventListener listener : listeners) {
200+
listener.onTransactionBatchStarted(status);
201+
}
202+
}
203+
204+
/**
205+
* Notifies all registered listeners that a transaction batch has completed.
206+
*
207+
* @param batchResult the result of the completed transaction batch
208+
*/
209+
private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
210+
for (ImportEventListener listener : listeners) {
211+
listener.onTransactionBatchCompleted(batchResult);
212+
}
213+
}
214+
215+
/**
216+
* Updates the status of target results for an aborted transaction batch. For each target with a
217+
* status of SAVED, changes the status to ABORTED and adds an error message.
218+
*
219+
* @param targetResults the list of target results to update
220+
* @return the updated list of target results
221+
*/
222+
private List<ImportTargetResult> updateTargetStatusForAbortedTransactionBatch(
223+
List<ImportTargetResult> targetResults) {
224+
for (int i = 0; i < targetResults.size(); i++) {
225+
ImportTargetResult target = targetResults.get(i);
226+
if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
227+
ImportTargetResult newTarget =
228+
ImportTargetResult.builder()
229+
.importAction(target.getImportAction())
230+
.status(ImportTargetResultStatus.ABORTED)
231+
.importedRecord(target.getImportedRecord())
232+
.namespace(target.getNamespace())
233+
.tableName(target.getTableName())
234+
.dataMapped(target.isDataMapped())
235+
.errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS))
236+
.build();
237+
targetResults.set(i, newTarget);
238+
}
239+
}
240+
return targetResults;
241+
}
242+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.scalar.db.dataloader.core.dataimport.log;
2+
3+
import lombok.Builder;
4+
import lombok.Value;
5+
6+
/**
7+
* Configuration class for import loggers. This class uses Lombok's {@code @Value} annotation to
8+
* create an immutable class and {@code @Builder} annotation to provide a builder pattern for
9+
* creating instances.
10+
*/
11+
@Value
12+
@Builder
13+
public class ImportLoggerConfig {
14+
/**
15+
* The directory path where log files will be stored. This path should end with a directory
16+
* separator (e.g., "/").
17+
*/
18+
String logDirectoryPath;
19+
20+
/**
21+
* Whether to log records that were successfully imported. If true, successful import operations
22+
* will be logged to success log files.
23+
*/
24+
boolean isLogSuccessRecordsEnabled;
25+
26+
/**
27+
* Whether to log raw source records that failed to be imported. If true, failed import operations
28+
* will be logged to failure log files.
29+
*/
30+
boolean isLogRawSourceRecordsEnabled;
31+
32+
/**
33+
* Whether to format the logs with pretty printing. If true, the JSON logs will be formatted with
34+
* indentation for better readability.
35+
*/
36+
boolean prettyPrint;
37+
}

0 commit comments

Comments
 (0)