Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the import process implementation for data loader #2462

Open
wants to merge 122 commits into
base: master
Choose a base branch
from

Conversation

inv-jishnu
Copy link
Contributor

@inv-jishnu inv-jishnu commented Jan 10, 2025

Description

In this PR I have added import processes to process the import file based on the file format and related dtos and util files for it.

Related issues and/or PRs

Please review this PR once the below PRs are reviewed and merged and master branch is merged to this branch with those changes.

Some more information on data chunk and transaction size
The data chunk size and transaction size are introduced in new changes. The data chunk size is specified is used to split the input files to data chunks of specified size. If the scalardb mode is transaction, the records in each data chunk is processed as transactions. The records are then further split up based on transaction size and are processed together as a single transaction.

Changes made

Added classes to process the import source file based on the file format and related dtos and util classes

Checklist

The following is a best-effort checklist. If any items in this checklist are not applicable to this PR or are dependent on other, unmerged PRs, please still mark the checkboxes after you have read and understood each item.

  • I have commented my code, particularly in hard-to-understand areas.
  • I have updated the documentation to reflect the changes.
  • Any remaining open issues linked to this PR are documented and up-to-date (Jira, GitHub, etc.).
  • Tests (unit, integration, etc.) have been added for the changes.
  • My changes generate no new warnings.
  • Any dependent changes in other PRs have been merged and published.

Additional notes (optional)

Road map to merge remaining data loader core files. Current status

Release notes

NA

@ypeckstadt ypeckstadt marked this pull request as ready for review March 14, 2025 09:00
@ypeckstadt ypeckstadt changed the title Add import processes Add the import process implementation for data loader Mar 14, 2025
@Override
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
synchronized (importDataChunkStatusList) {
for (int i = 0; i < importDataChunkStatusList.size(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the size of importDataChunkStatusList is large, this loop might take tens or hundreds milliseconds with holding the lock and affect the performance.

It seems the order of the items is not important. How about using Set or Map instead of List to access an item in O(1).

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(
CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to handle other Exception types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently here there is InterruptedException may only be thrown by dataChunkQueue.poll.
Please do let me know if you feel there is need to handle any other exception here.
Thank you

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That sounds reasonable basically since processDataChunk is called asynchronously. (OOMException could still be thrown, though)

* @return a list of {@link ImportDataChunkStatus} objects indicating the processing status of
* each data chunk
*/
public List<ImportDataChunkStatus> process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks this is expected to always be overridden. How about making this abstract method?

}

} catch (TransactionException e) {
isSuccess = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the transaction should be aborted since it's possible any failure occurred before the commit.

List<ImportTransactionBatch> transactionBatches =
splitIntoTransactionBatches(dataChunk, transactionBatchSize);
ExecutorService transactionBatchExecutor =
Executors.newFixedThreadPool(Math.min(transactionBatches.size(), numCores));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tasks are computation-dominant, setting the concurrency to the number of CPU cores is reasonable. But, the tasks invoked in this method are IO-dominant and involve waiting for responses from the underlying database. So, I think higher concurrency would be better in terms of throughput.

.importAction(importAction)
.status(ImportTargetResultStatus.DATA_ALREADY_EXISTS)
.errors(
Collections.singletonList(CoreError.DATA_LOADER_DATA_ALREADY_EXISTS.buildMessage()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding key information in the error message for investigation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@komamitsu san,

The error message is part of ImportTargetResult object and which is then added to an ImportTaskResult object. The ImportTargetResult has necessary details regarding the failure. I have added a sample below. It also includes the actual data that was tried to import into a table.

{
      "namespace" : "testn",
      "tableName" : "all_columns",
      "importAction" : "INSERT",
      "errors" : [ "DB-CORE-10182: Record was not found" ],
      "dataMapped" : false,
      "importedRecord" : {
        "col1" : 9111111111,
        "col2" : 1,
        "col3" : 1,
        "col4" : 1.4E-45,
        "col5" : 4.9E-324,
        "col6" : "VALUE!!s",
        "col7" : "0x626C6F6220746573742076616C7565",
        "col8" : "2000-01-01",
        "col9" : "01:01:01.000000",
        "col10" : "2000-01-01T01:01:00",
        "col11" : "1970-01-21T03:20:41.740Z",
        "id" : 9111111111,
        "name" : "sample1",
        "email" : "[email protected]",
        "tx_id" : "6361bc3a-9c44-4de9-8829-b028fb48a749",
        "tx_state" : 3,
        "tx_version" : 1,
        "tx_prepared_at" : 1732508362517,
        "tx_committed_at" : 1732508364532
      },
      "status" : "DATA_NOT_FOUND"
    }

Could you please let me know whether the error message (in errors) need to be more clear or this needs any more details. The error message was kept simple as we are providing all details of failure in the failure log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! Sounds good.

How about adding a comment at https://github.com/scalar-labs/scalardb/pull/2462/files#diff-fd06988f4ced6a0d6d0b1049c69ab07df11c372c3ccb868a78afbe4d24e6f663R827 like // These error messages will be output with key information ?

.importedRecord(mutableSourceRecord)
.importAction(importAction)
.status(ImportTargetResultStatus.DATA_NOT_FOUND)
.errors(Collections.singletonList(CoreError.DATA_LOADER_DATA_NOT_FOUND.buildMessage()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

return ImportTargetResult.builder()
.namespace(namespace)
.tableName(table)
.status(ImportTargetResultStatus.VALIDATION_FAILED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably same as above? Could you take a look at those error messages added by this PR from the view point of users who need to investigate failures?

* @throws ColumnParsingException if an error occurs while base64 parsing the column
*/
private static Column<?> getColumn(
Result scalarDBResult,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Result scalarDBResult,
@Nullable Result scalarDBResult,

import org.slf4j.LoggerFactory;

@RequiredArgsConstructor
public abstract class ImportProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImportProcessor and its child classes are a bit complicated. They don't need to have unit tests?

@inv-jishnu
Copy link
Contributor Author

@komamitsu san,

I have added changes as suggested including adding some basic unit test cases apart from the feedback related to error message clarity (I have added some more details for that as a comment).
Please take a look at this again and please do let me know if any further changes are required.
Thank you.

@inv-jishnu inv-jishnu requested a review from komamitsu March 25, 2025 09:56
@brfrn169 brfrn169 added the enhancement New feature or request label Mar 25, 2025
Copy link
Collaborator

@brfrn169 brfrn169 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left several comments, but I haven’t reviewed all the code yet. I’ll continue reviewing this PR. Thanks.

* @throws InterruptedException if the import process is interrupted
*/
public ConcurrentHashMap<Integer, ImportDataChunkStatus> startImport()
throws ExecutionException, InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throws ExecutionException, InterruptedException {
{

public ConcurrentHashMap<Integer, ImportDataChunkStatus> process(
int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
int numCores = Runtime.getRuntime().availableProcessors();
ExecutorService dataChunkExecutor = Executors.newFixedThreadPool(numCores);
Copy link
Collaborator

@brfrn169 brfrn169 Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure, but it looks like we can replace this with Executors.newSingleThreadExecutor()?

Suggested change
ExecutorService dataChunkExecutor = Executors.newFixedThreadPool(numCores);
ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();

@NonNull private final ImportOptions importOptions;
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDBMode scalarDBMode;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like I missed this in the previous PR review, but we should rename this to ScalarDbMode. We should also rename ScalarDBDao and ScalarDBDaoException accordingly.

Could you please handle this in a separate PR?

Comment on lines +334 to +335
ExecutorService transactionBatchExecutor =
Executors.newFixedThreadPool(Math.min(transactionBatches.size(), numCores * 2));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain why we use Math.min(transactionBatches.size(), numCores * 2) here?

@@ -45,6 +44,7 @@ public class JsonImportProcessor extends ImportProcessor {

private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);
private static final int MAX_QUEUE_SIZE = 10;
Copy link
Contributor

@komamitsu komamitsu Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This queue size might be too small when using a large number of cores like 32 cores. I guess this limitation is to prevent OOM. If so, 256 or 512 should be okay. Maybe it would be great if this is configurable.

I think you can decide whether to change this after conducting some performance tests with various numbers of cores.

import org.mockito.Mock;
import org.mockito.Mockito;

public class CsvImportProcessorTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] This can be package-private.

import org.mockito.Mockito;

public class CsvImportProcessorTest {
@Mock ImportProcessorParams params;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] This can be private

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants