Skip to content

Possible way of inserting a lot of CSV files #1563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
emelyanovkr opened this issue Mar 16, 2024 · 21 comments
Closed

Possible way of inserting a lot of CSV files #1563

emelyanovkr opened this issue Mar 16, 2024 · 21 comments
Labels

Comments

@emelyanovkr
Copy link

emelyanovkr commented Mar 16, 2024

Hello, my use case is this:
I have a directory with a lot of CSV files ~ 1.2k, everyone of them contains about from ~10k to ~10-20 mln rows.
I want to insert data from this files to my clickhouse-DB as fast, as possible, by using JavaClient (I've tried with JDBC - I reached a speed about only ~50-70k rows/sec what results in 10 minutes insertion of 30mln rows.)

For sure, I can read this directory with many threads at once, so one thread taking only one chunk of files from this directory, but still - perfomance with JDBC is quite slow, because CLI Client able to load the same directory of 1.2k files with 30 mln rows in total in 57 sec.

I am aware that with JavaClient I could pass my data as the binaryStream, so it might increase the perfomance. However, after some researching in global network and this issues page still, I'm having some error that I'm not quite sure how to fix:

  public void insertClient(List<String> data) {
    try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) {
      ClickHouseRequest.Mutation request =
          client
              .read(server)
              .write()
              .table("tickets_data_db.tickets_data")
              .format(ClickHouseFormat.RowBinary);
      ClickHouseConfig config = request.getConfig();
      try (ClickHouseResponse response =
          request
              .write()
              .format(ClickHouseFormat.RowBinary)
              .query(
                  "INSERT INTO tickets_data SELECT * FROM input('c1 String, c2 UInt64, c3 Float64, "
                      + "c4 Float64, c5 Float64, c6 Float64, "
                      + "c7 Float64, c8 Float64, c9 DateTime')")
              .data(
                  output -> {
                    List<ClickHouseColumn> columns =
                        ClickHouseColumn.parse(
                            "ticketName String, sequence UInt64, price Float64, size Float64, bestAsk Float64, bestAskSize Float64, bestBid Float64, bestBidSize Float64, transactionTime DateTime");
                    ClickHouseValue[] values = ClickHouseValues.newValues(config, columns);
                    ClickHouseDataProcessor processor =
                        ClickHouseDataStreamFactory.getInstance()
                            .getProcessor(config, null, output, null, columns);
                    ClickHouseSerializer[] serializers = processor.getSerializers(config, columns);

                    for (String str : data) {
                      String[] line_values = str.split(",");

                      LocalDateTime transaction_time =
                          LocalDateTime.ofInstant(
                              Instant.ofEpochMilli(Long.parseLong(line_values[8])), ZoneOffset.UTC);

                      values[0].update(line_values[0]);
                      values[1].update(Long.parseLong(line_values[1]));
                      values[2].update(Float.parseFloat(line_values[2]));
                      values[3].update(Float.parseFloat(line_values[3]));
                      values[4].update(Float.parseFloat(line_values[4]));
                      values[5].update(Float.parseFloat(line_values[5]));
                      values[6].update(Float.parseFloat(line_values[6]));
                      values[7].update(Float.parseFloat(line_values[7]));
                      values[8].update(transaction_time);

                      for (int i = 0; i < line_values.length; ++i) {
                        serializers[i].serialize(values[i], output);
                      }
                    }
                  })
              .executeAndWait()) {
        ClickHouseResponseSummary summary = response.getSummary();
        System.out.println(summary.getWrittenRows());
      } catch (ClickHouseException e) {
        throw new RuntimeException(e);
      }
    }
  }

Error is: ClickHouseException: Code: 159. Execution timed out, server ClickHouseNode
So I've got to insert only 700k rows from possibly 50mln.

addOption(ClickHouseClientOption.SOCKET_TIMEOUT.getKey(), "60000") Doesn't help much, still got timeout after 1 min.

I've tried something like this: ClickHouse Example but still got an exception timeout and not even any data was downloaded at all

This is a sample code, where List<String> data - contains all lines from csv file, so one String from this List will be a one record.

I'm thinking, that obviously, there might be some error with request & response variables, especially with their initialization.
Also, as you understand, my table contains different types.

@den-crane
Copy link
Collaborator

den-crane commented Mar 17, 2024

I would pass CSV files as a CSV stream.

see examples #909

@emelyanovkr
Copy link
Author

I would pass CSV files as a CSV stream.

see examples #909

Thanks, I'll try that in a few days!

@emelyanovkr
Copy link
Author

emelyanovkr commented Mar 18, 2024

@den-crane, thanks a lot! That's working kinda fine, however, if I'm trying to insert a really big CSV file with 12mln rows about 1GB, I end up with inserting only 1mln rows (they are inserting like less then a minute I presume) and then in 5 min I got execution time out. Could you possible have any ideas how to fix it? I'm thinkink about open up a file and divide it in some separate files with multithreading and partitions.

@den-crane
Copy link
Collaborator

den-crane commented Mar 18, 2024

First - check time with clickhouse-client to get a baseline for such file.

@emelyanovkr
Copy link
Author

emelyanovkr commented Mar 18, 2024

Yes, I've done that before, sorry for not mentioning - with CLI 1GB file with 12 mln rows - 60sec

With JDBC, for example - about 4 min to insert such a file.

@den-crane
Copy link
Collaborator

It's probably related to a network. Check the network, how fast you can transfer such file from a client to a server.
BTW you can compress CSV stream using any compressor, and I suggest Zstd. If your CSV files are already compressed then it's even better.

@emelyanovkr
Copy link
Author

emelyanovkr commented Mar 18, 2024

I'm inserting the same file with clickhouse-client connected to the same server on clickhouse.cloud. So that's why I presume it's not a network problem, or it might be, even when I am connecting through the same machine but different platforms? (CLI and Java I mean)

I'll try compressing, is there any doc I could find about using it? My CSV files are not compressed.

UPD: So I'm enabling compression for my connection like this

        .addOption(ClickHouseClientOption.COMPRESS.getKey(), "true")
        .addOption(ClickHouseClientOption.COMPRESS_ALGORITHM.getKey(), "ZSTD")

But I got an exception that ZSTD is not supported somehow. I couldn't find any problem-related reasons in google, so I've tried to LZ4, but still - it's really slow, like 40k rows/sec.

By far, there are all my connection settings:

public static ClickHouseNode initJavaClientConnection() {
    Properties properties = PropertiesLoader.loadJDBCProp();

    return ClickHouseNode.builder()
        .host(properties.getProperty("HOST"))
        .port(ClickHouseProtocol.HTTP, Integer.valueOf(properties.getProperty("PORT")))
        .database(properties.getProperty("DATABASE"))
        .credentials(
            ClickHouseCredentials.fromUserAndPassword(
                properties.getProperty("USERNAME"), properties.getProperty("PASSWORD")))
        .addOption(ClickHouseClientOption.SSL.getKey(), properties.getProperty("SSL"))
        .addOption(
            ClickHouseHttpOption.CUSTOM_PARAMS.getKey(), "async_insert=1, wait_for_async_insert=1")
        .addOption(ClickHouseClientOption.SOCKET_TIMEOUT.getKey(), "300000")
        .addOption(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), "300")
        .addOption(ClickHouseClientOption.COMPRESS.getKey(), "true")
        .addOption(ClickHouseClientOption.COMPRESS_ALGORITHM.getKey(), "ZSTD")
        .build();
  }

It's no difference when using compression settings with LZ4, or not. Speed of inserting is the same still.

This my code of inserting, please don't mind no refactoring there - My point first to get work done, then I would like, but for now:

public void readExecutor() {
   List<String> ticketNames = getFilesInDirectory();

   List<List<String>> ticketParts =
       Lists.partition(ticketNames, ticketNames.size() / PARTS_QUANTITY);

   ClickHouseNode server = ConnectionHandler.initJavaClientConnection();
   ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol());

   try (ExecutorService service = Executors.newFixedThreadPool(THREADS_COUNT)) {

     clickHouseDAO = new ClickHouseDAO(server);
     clickHouseDAO.truncateTable();

     for (List<String> ticketPartition : ticketParts) {
       service.execute(
           () -> {
             for (String fileName : ticketPartition) {
               Path filePath = Paths.get(SOURCE_PATH + "/" + fileName);
               try (ClickHouseResponse response =
                   client
                       .write(server)
                       .query("INSERT INTO tickets_data_db.tickets_data")
                       .format(ClickHouseFormat.CSV)
                       .data(filePath.toString())
                       .executeAndWait()) {
               } catch (ClickHouseException e) {
                 throw new RuntimeException(e);
               }
             }
           });
     }
   } finally {
     client.close();
   }
 }

I made this because I was trying to ensure that I'm using the same connection for all my threads. When I was trying to open connection for every thread I was getting some errors like SSL Peer shut down incorrectly or An established connection was aborted by the software in your host machine

@den-crane
Copy link
Collaborator

den-crane commented Mar 18, 2024

clickhouse-connect uses LZ4 for a communication with remote servers (not localhost).

There are 2 different compressions. You have set LZ4 compression for session -- query results and batched inserts.

But you need to set compression option for a particular .write call.

.data(filePath.toString()) and you have to compress the stream by yourself.

@emelyanovkr
Copy link
Author

So you are advising to implement some stream with compressing csv files when pass it to a data function? Should it be a class derived from FileInputStream?

@den-crane
Copy link
Collaborator

den-crane commented Mar 19, 2024

try to compress that file using cli gzip

and then check does it help or not .data("test.csv.gz", ClickHouseCompression.GZIP)

@emelyanovkr
Copy link
Author

All my files stored are just by themself as file with no any extension. test.csv.gz should I somehow compress tham? Or give it like before filePath.toString()?

@den-crane
Copy link
Collaborator

You have one big file xxxx, any file.
Execute gzip xxxx
then try (for the test sake) .data("xxxx.gz", ClickHouseCompression.GZIP) and check the time

@emelyanovkr
Copy link
Author

emelyanovkr commented Mar 19, 2024

I'm sorry for the inconvience, have I got it right: You want me to compress my big file with gzip by some any tools, and upload it on the server with JavaClient also with an addition as ClickHouseCompression.GZIP and check time?

I'm not aware of all posibilities of JavaClient for the ClickHouse cause I couldn't find any docs about classes and object except only how to use JavaClient for simple queries.

@emelyanovkr
Copy link
Author

All my files were compressed with Appache Commons Compress for LZ4 - as it was kinda fast to compress all of them.
However, this code provides en exception:

service.execute(
            () -> {
              for (String fileName : ticketPartition) {
                try (ClickHouseResponse response =
                    client
                        .write(server)
                        .query("INSERT INTO tickets_data_db.tickets_data")
                        .format(ClickHouseFormat.CSV)
                        .data(fileName, ClickHouseCompression.LZ4)
                        .executeAndWait()) {
                } catch (ClickHouseException e) {
                  throw new RuntimeException(e);
                }
              }
            });

Caused by: com.clickhouse.client.ClickHouseException: Code: 432. DB::Exception: Unknown codec family code: 57. (UNKNOWN_CODEC) (version 24.1.2.10900 (official build))

I'm going to try to compress with ZSTD for now, but still, this kinda strange.

@den-crane
Copy link
Collaborator

this kinda strange.

Clickhouse uses own format for LZ4 blocks.

@emelyanovkr
Copy link
Author

Clickhouse uses own format for LZ4 blocks.

So it's not possible to load files compressed with LZ4? I'll try another format then.

@emelyanovkr
Copy link
Author

Compressing files with GZIPOutputStream and passing them to clickhouse like this: .data(fileName, ClickHouseCompression.GZIP) results in a speed about 110k rows/sec, what really nice. Thanks for your advice, den-crane.

However, is there any ways to reach a speed about 200-250k rows/sec like with Clickhouse CLI in JavaClient? Maybe some more optimizations to apply?

.addOption(ClickHouseHttpOption.CUSTOM_PARAMS.getKey(), "async_insert=1, wait_for_async_insert=1")
.addOption(ClickHouseClientOption.SOCKET_TIMEOUT.getKey(), "300000")
.addOption(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), "300")

Async insert is enabled, but I think it's not necessary because I am using one connection instance because it's results in more stable connection and no exception about read time out or ssl peer shutdown incorreclty.

@emelyanovkr
Copy link
Author

emelyanovkr commented Mar 23, 2024

I want to do something like this:

  public void insertFromCompressedFileStream(PipedInputStream pin) {
    try (ClickHouseResponse response =
        client
            .write(server)
            .query("INSERT INTO tickets_data_db.tickets_data")
            .format(ClickHouseFormat.CSV)
            .data(pin, ClickHouseCompression.GZIP)
            .executeAndWait()) {
    } catch (ClickHouseException e) {
      throw new RuntimeException(e);
    }
  }

However, as you know, there is no overloading for data function that accepts first parameter as stream and second one as compression, so with code like data(pin) I got an exception something like Caused by:

 com.clickhouse.client.ClickHouseException: Code: 27. DB::Exception: Cannot parse input: expected ',' before: '#?r????V\f�*�??+??�?\'?D???O?�?�?-??��~w`??~}???�?@?�W?l�EH�~?��????/�??a?<@r???G??�?????4�:?8T?�T7?�?� ??(?�?3=?|?????=NS/{??P?Z?Hy??��??�)???V]*,?x\b??�M': (at row 1)

Removed .format(ClickHouseFormat.CSV results in this: ERROR: Line feed found where tab is expected. It's like your file has less columns than expected..
So is there any way to pass PipedStream to a ClickHouse?
I want to compress my data with one stream and with another stream take this data straight to the clickhouse. I thought PipedStreams would be a great idea for that.

@den-crane
Copy link
Collaborator

I want to do something like this:

It seems it's not possible without the changing the code of http client, though the needed change is fairly simple.
Though I would use the old version of JDBC and extended API https://github.com/ClickHouse/clickhouse-java/tree/v0.2.5?tab=readme-ov-file#extended-api

But anyway you clearly misunderstood me. I just wanted to pinpoint the bottleneck. Clearly it's a network. Maybe easier to investigate the issue using simple curl not java. Just use your biggest CSV file, try to load it using clickhouse-client, then compress it using ZSTD and try to load it using curl https://clickhouse.com/docs/en/interfaces/http example. Then compare the results.

@emelyanovkr
Copy link
Author

emelyanovkr commented Mar 23, 2024

It seems it's not possible without the changing the code of http client, though the needed change is fairly simple.
So propably I could Override this method with ClickHouseCompression?

I want not to use JDBC because I am thinking that it would be much slower then JavaClient. I've tried to load CSV files, not compressed, like this: #1402 (comment)
However, I got speed only about 45k rows/sec vs 200-700k rows/sec in ClickHouseClient vs 110k rows/sec in JavaClient. So, my question is, could JDBC extended API be faster than JavaClient?

Also, I am really curios about network issue, because I'm like simply loading data from two different apps (ports, clients), so that why I'm wondering should it be a network problem, because it's just different clients, but network still appears the same - both clients using clickhouse.cloud as connection point.

With curl query like this: curl 'https://username:password@host:8443/' -d 'INSERT INTO tickets_data_db.tickets_data FORMAT CSV' --data-binary @DATA-5 it was even slower: about 33k rows/sec -> 2mln rows in 1 min when the same file with Clickhouse-client: ** 200k rows/sec, 5-10 sec **

@emelyanovkr
Copy link
Author

I've managed to do something like this:

public void insertFromCompressedFileStream(PipedInputStream pin) {
    try (ClickHouseResponse response =
        client
            .write(server)
            .query("INSERT INTO tickets_data_db.tickets_data")
            .data(ClickHousePassThruStream.of(pin ,ClickHouseCompression.GZIP, ClickHouseFormat.CSV))
            .executeAndWait()) {
    } catch (ClickHouseException e) {
      try
      {
        pin.close();
      } catch (IOException ex)
      {
        // TODO: add logging
      }
      throw new RuntimeException(e);
    }
  }

It reached a speed about 350k rows/sec, so almost like a Clickhouse-Client. Data from GZIPOutputStream compressed straight to PipedOutputStream and in parallel threads PipedStreams are working together to compress and pass data to a server. I guess my question is solved. Thank you, den-crane, for your participate.

Also, I think, there in docs should be a mention about using some input stream with Compression and Format specified. So maybe we could add it someday.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants