Skip to content

Add import log classes and utils #2591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
be9ee23
Initial commit
inv-jishnu Apr 10, 2025
89b9f05
Spotless applied again
inv-jishnu Apr 11, 2025
49c83b6
Removed unused code
inv-jishnu Apr 11, 2025
b2871fb
Merge branch 'master' into feat/data-loader/import-log-2
ypeckstadt Apr 15, 2025
c5c9c0a
Removed unused classes and references
inv-jishnu Apr 15, 2025
4964e8d
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu Apr 15, 2025
ff81f5f
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu Apr 15, 2025
3934c2a
Improve Javadocs
ypeckstadt Apr 16, 2025
9958f95
Changes
inv-jishnu Apr 21, 2025
1afbc21
Renamed parameters
inv-jishnu Apr 21, 2025
8c5114d
logging changes
inv-jishnu Apr 21, 2025
ffab395
removed repeated code
inv-jishnu Apr 22, 2025
79df1ed
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu Apr 22, 2025
cf31672
Merge branch 'master' into feat/data-loader/import-log-2
brfrn169 Apr 23, 2025
6dd213e
Added excetpion throw
inv-jishnu Apr 23, 2025
6542177
Synchronisation changes
inv-jishnu Apr 25, 2025
603e46e
Added volatile back to fix spotbugs issue
inv-jishnu Apr 25, 2025
eaf9d88
Removed unused variable
inv-jishnu Apr 25, 2025
502034e
Chanaged LOGGER to logger
inv-jishnu Apr 30, 2025
6b22d1a
logger name change in test
inv-jishnu Apr 30, 2025
b393b62
Edge case fix
inv-jishnu Apr 30, 2025
cacdd86
Remove unused param
inv-jishnu Apr 30, 2025
c26cc6e
Revert "Remove unused param"
inv-jishnu Apr 30, 2025
9a4bfa8
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu Apr 30, 2025
415805b
Removed null assignment
inv-jishnu May 2, 2025
0584f5e
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu May 2, 2025
7c72397
comment change
inv-jishnu May 2, 2025
0f34395
renamed params to make it more clear
inv-jishnu May 2, 2025
a59f317
Merge branch 'master' into feat/data-loader/import-log-2
ypeckstadt May 7, 2025
d807387
Merge branch 'master' into feat/data-loader/import-log-2
feeblefakie May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package com.scalar.db.dataloader.core.dataimport.log;

import com.fasterxml.jackson.databind.JsonNode;
import com.scalar.db.dataloader.core.Constants;
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter;
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.RequiredArgsConstructor;

/**
* An abstract base class for logging import events during data loading operations. This class
* implements the {@link ImportEventListener} interface and provides common functionality for
* logging transaction batch results and managing event listeners. Concrete implementations should
* define how to log transaction batches and handle errors.
*/
@RequiredArgsConstructor
public abstract class AbstractImportLogger implements ImportEventListener {

/** Object mapper used for JSON serialization/deserialization. */
protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();

/** Configuration for the import logger. */
protected final ImportLoggerConfig config;

/** Factory for creating log writers. */
protected final LogWriterFactory logWriterFactory;

/** List of event listeners to be notified of import events. */
protected final List<ImportEventListener> listeners = new ArrayList<>();

/**
* Called when a data chunk import is started. Currently, this implementation does not log the
* start of a data chunk.
*
* @param importDataChunkStatus the status of the data chunk being imported
*/
@Override
public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) {
// Currently we are not logging the start of a data chunk
}

/**
* Called when a transaction batch is started. Currently, this implementation does not log the
* start of a transaction batch, but it notifies all registered listeners.
*
* @param batchStatus the status of the transaction batch being started
*/
@Override
public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {
// Currently we are not logging the start of a transaction batch
notifyTransactionBatchStarted(batchStatus);
}

/**
* Called when a transaction batch is completed. This method logs the transaction batch result if
* it should be logged based on the configuration, and notifies all registered listeners.
*
* @param batchResult the result of the completed transaction batch
*/
@Override
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
// skip logging success records if the configuration is set to skip
if (shouldSkipLoggingSuccess(batchResult)) {
return;
}

logTransactionBatch(batchResult);
notifyTransactionBatchCompleted(batchResult);
}

/**
* Logs a transaction batch result. This method should be implemented by concrete subclasses to
* define how to log transaction batch results.
*
* @param batchResult the transaction batch result to log
*/
protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult);

/**
* Determines whether logging of a successful transaction batch should be skipped. Logging is
* skipped if the batch was successful and the configuration specifies not to log success records.
*
* @param batchResult the transaction batch result to check
* @return true if logging should be skipped, false otherwise
*/
protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) {
return batchResult.isSuccess() && !config.isLogSuccessRecordsEnabled();
}

/**
* Creates a filtered JSON representation of a transaction batch result. This method filters out
* raw record data if the configuration specifies not to log raw source records.
*
* @param batchResult the transaction batch result to convert to JSON
* @return a JsonNode representing the filtered transaction batch result
*/
protected JsonNode createFilteredTransactionBatchLogJsonNode(
ImportTransactionBatchResult batchResult) {

// If the batch result does not contain any records, return the batch result as is
if (batchResult.getRecords() == null) {
return OBJECT_MAPPER.valueToTree(batchResult);
}

// Create a new list to store the modified import task results
List<ImportTaskResult> modifiedRecords = new ArrayList<>();

// Loop over the records in the batchResult
for (ImportTaskResult taskResult : batchResult.getRecords()) {
// Create a new ImportTaskResult and not add the raw record yet
List<ImportTargetResult> targetResults =
batchResult.isSuccess()
? taskResult.getTargets()
: updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets());
ImportTaskResult.ImportTaskResultBuilder builder =
ImportTaskResult.builder()
.rowNumber(taskResult.getRowNumber())
.targets(targetResults)
.dataChunkId(taskResult.getDataChunkId());

// Adds the raw record if the configuration is set to log raw source data
if (config.isLogRawSourceRecordsEnabled()) {
builder.rawRecord(taskResult.getRawRecord());
}
ImportTaskResult modifiedTaskResult = builder.build();

// Add the modified task result to the list
modifiedRecords.add(modifiedTaskResult);
}

// Create a new transaction batch result with the modified import task results
ImportTransactionBatchResult modifiedBatchResult =
ImportTransactionBatchResult.builder()
.dataChunkId(batchResult.getDataChunkId())
.transactionBatchId(batchResult.getTransactionBatchId())
.transactionId(batchResult.getTransactionId())
.records(modifiedRecords)
.errors(batchResult.getErrors())
.success(batchResult.isSuccess())
.build();

// Convert the modified batch result to a JsonNode
return OBJECT_MAPPER.valueToTree(modifiedBatchResult);
}

/**
* Safely closes a log writer. If an IOException occurs during closing, it logs the error using
* the {@link #logError} method.
*
* @param logWriter the log writer to close, may be null
*/
protected void closeLogWriter(LogWriter logWriter) {
if (logWriter != null) {
try {
logWriter.close();
} catch (IOException e) {
logError("Failed to close a log writer", e);
}
}
}

/**
* Logs an error message and exception. This method should be implemented by concrete subclasses
* to define how to log errors.
*
* @param errorMessage the error message to log
* @param e the exception that caused the error
*/
protected abstract void logError(String errorMessage, Exception e);

/**
* Creates a log writer for the specified log file path.
*
* @param logFilePath the path to the log file
* @return a new log writer
* @throws IOException if an I/O error occurs while creating the log writer
*/
protected LogWriter createLogWriter(String logFilePath) throws IOException {
return logWriterFactory.createLogWriter(logFilePath);
}

/**
* Notifies all registered listeners that a transaction batch has started.
*
* @param status the status of the transaction batch that has started
*/
private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchStarted(status);
}
}

/**
* Notifies all registered listeners that a transaction batch has completed.
*
* @param batchResult the result of the completed transaction batch
*/
private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchCompleted(batchResult);
}
}

/**
* Updates the status of target results for an aborted transaction batch. For each target with a
* status of SAVED, changes the status to ABORTED and adds an error message.
*
* @param targetResults the list of target results to update
* @return the updated list of target results
*/
private List<ImportTargetResult> updateTargetStatusForAbortedTransactionBatch(
List<ImportTargetResult> targetResults) {
for (int i = 0; i < targetResults.size(); i++) {
ImportTargetResult target = targetResults.get(i);
if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
ImportTargetResult newTarget =
ImportTargetResult.builder()
.importAction(target.getImportAction())
.status(ImportTargetResultStatus.ABORTED)
.importedRecord(target.getImportedRecord())
.namespace(target.getNamespace())
.tableName(target.getTableName())
.dataMapped(target.isDataMapped())
.errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS))
.build();
targetResults.set(i, newTarget);
}
}
return targetResults;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.scalar.db.dataloader.core.dataimport.log;

import lombok.Builder;
import lombok.Value;

/**
* Configuration class for import loggers. This class uses Lombok's {@code @Value} annotation to
* create an immutable class and {@code @Builder} annotation to provide a builder pattern for
* creating instances.
*/
@Value
@Builder
public class ImportLoggerConfig {
/**
* The directory path where log files will be stored. This path should end with a directory
* separator (e.g., "/").
*/
String logDirectoryPath;

/**
* Whether to log records that were successfully imported. If true, successful import operations
* will be logged to success log files.
*/
boolean isLogSuccessRecordsEnabled;

/**
* Whether to log raw source records that failed to be imported. If true, failed import operations
* will be logged to failure log files.
*/
boolean isLogRawSourceRecordsEnabled;

/**
* Whether to format the logs with pretty printing. If true, the JSON logs will be formatted with
* indentation for better readability.
*/
boolean prettyPrint;
}
Loading