Skip to content

Commit b1b4cc1

Browse files
authored
Merge pull request #1000 from zhicwu/develop
Use unbounded queue for batch insert
2 parents 4fd3358 + 6a11320 commit b1b4cc1

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed

clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ protected InputBasedPreparedStatement(ClickHouseConnectionImpl connection, Click
7777

7878
counter = 0;
7979
// it's important to make sure the queue has unlimited length
80-
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null);
80+
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0,
81+
config.getSocketTimeout(), null);
8182
}
8283

8384
protected void ensureParams() throws SQLException {
@@ -350,7 +351,10 @@ public void clearBatch() throws SQLException {
350351
// ignore
351352
}
352353
counter = 0;
353-
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(getConfig(), null);
354+
355+
ClickHouseConfig config = getConfig();
356+
stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0,
357+
config.getSocketTimeout(), null);
354358
}
355359

356360
@Override

clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java

+43
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,49 @@ public void testBatchInsert() throws SQLException {
710710
}
711711
}
712712

713+
@Test(groups = "integration")
714+
public void testBatchInsertWithoutUnboundedQueue() throws Exception {
715+
Properties props = new Properties();
716+
props.setProperty(ClickHouseClientOption.WRITE_BUFFER_SIZE.getKey(), "1");
717+
props.setProperty(ClickHouseClientOption.MAX_QUEUED_BUFFERS.getKey(), "1");
718+
try (ClickHouseConnection conn = newConnection(new Properties());
719+
Statement s = conn.createStatement()) {
720+
s.execute("drop table if exists test_insert_buffer_size; "
721+
+ "CREATE TABLE test_insert_buffer_size(value String) ENGINE=Memory");
722+
try (PreparedStatement ps = conn.prepareStatement(
723+
"INSERT INTO test_insert_buffer_size")) {
724+
ps.setString(1, "1");
725+
ps.addBatch();
726+
ps.setString(1, "2");
727+
ps.addBatch();
728+
ps.setString(1, "3");
729+
ps.addBatch();
730+
ps.executeBatch();
731+
732+
ps.setString(1, "4");
733+
ps.addBatch();
734+
ps.executeBatch();
735+
736+
ps.setString(1, "4");
737+
ps.addBatch();
738+
ps.clearBatch();
739+
ps.setString(1, "5");
740+
ps.addBatch();
741+
ps.setString(1, "6");
742+
ps.addBatch();
743+
ps.executeBatch();
744+
}
745+
746+
try (ResultSet rs = s.executeQuery("select * from test_insert_buffer_size order by value")) {
747+
int count = 1;
748+
while (rs.next()) {
749+
Assert.assertEquals(rs.getInt(1), count++);
750+
}
751+
Assert.assertEquals(count, 7);
752+
}
753+
}
754+
}
755+
713756
@Test(groups = "integration")
714757
public void testQueryWithDateTime() throws SQLException {
715758
try (ClickHouseConnection conn = newConnection(new Properties());

0 commit comments

Comments
 (0)