Skip to content

Commit aeb5878

Browse files
committed
Use unbounded queue for batch insert
1 parent 4fd3358 commit aeb5878

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ protected InputBasedPreparedStatement(ClickHouseConnectionImpl connection, Click
7777

7878
counter = 0;
7979
// it's important to make sure the queue has unlimited length
80-
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null);
80+
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0,
81+
config.getSocketTimeout(), null);
8182
}
8283

8384
protected void ensureParams() throws SQLException {
@@ -350,7 +351,10 @@ public void clearBatch() throws SQLException {
350351
// ignore
351352
}
352353
counter = 0;
353-
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(getConfig(), null);
354+
355+
ClickHouseConfig config = getConfig();
356+
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0,
357+
config.getSocketTimeout(), null);
354358
}
355359

356360
@Override

clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,36 @@ public void testQueryWithNamedParameter() throws SQLException {
13681368
}
13691369
}
13701370

1371+
@Test(groups = "integration")
1372+
public void testInsertBufferSize() throws Exception {
1373+
Properties props = new Properties();
1374+
props.setProperty(ClickHouseClientOption.WRITE_BUFFER_SIZE.getKey(), "1");
1375+
props.setProperty(ClickHouseClientOption.MAX_QUEUED_BUFFERS.getKey(), "1");
1376+
try (ClickHouseConnection conn = newConnection(new Properties());
1377+
Statement s = conn.createStatement()) {
1378+
s.execute("drop table if exists test_insert_buffer_size; "
1379+
+ "CREATE TABLE test_insert_buffer_size(value String) ENGINE=Memory");
1380+
try (PreparedStatement ps = conn.prepareStatement(
1381+
"INSERT INTO test_insert_buffer_size")) {
1382+
ps.setString(1, "1");
1383+
ps.addBatch();
1384+
ps.setString(1, "2");
1385+
ps.addBatch();
1386+
ps.setString(1, "3");
1387+
ps.addBatch();
1388+
ps.executeBatch();
1389+
}
1390+
1391+
try (ResultSet rs = s.executeQuery("select * from test_insert_buffer_size order by value")) {
1392+
int count = 1;
1393+
while (rs.next()) {
1394+
Assert.assertEquals(rs.getInt(1), count++);
1395+
}
1396+
Assert.assertEquals(count, 4);
1397+
}
1398+
}
1399+
}
1400+
13711401
@Test(groups = "integration")
13721402
public void testInsertWithAndSelect() throws Exception {
13731403
try (ClickHouseConnection conn = newConnection(new Properties());

0 commit comments

Comments
 (0)