Skip to content

Commit 1795272

Browse files
authored
Merge pull request #787 from zhicwu/enhance-jdbc
Fix minor issues found after 0.3.2 pre-release
2 parents 08058fe + 70358b1 commit 1795272

File tree

13 files changed

+395
-130
lines changed

13 files changed

+395
-130
lines changed

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseColumn.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public final class ClickHouseColumn implements Serializable {
1919
private static final String KEYWORD_NULLABLE = "Nullable";
2020
private static final String KEYWORD_LOW_CARDINALITY = "LowCardinality";
2121
private static final String KEYWORD_AGGREGATE_FUNCTION = ClickHouseDataType.AggregateFunction.name();
22+
private static final String KEYWORD_SIMPLE_AGGREGATE_FUNCTION = ClickHouseDataType.SimpleAggregateFunction.name();
2223
private static final String KEYWORD_ARRAY = ClickHouseDataType.Array.name();
2324
private static final String KEYWORD_TUPLE = ClickHouseDataType.Tuple.name();
2425
private static final String KEYWORD_MAP = ClickHouseDataType.Map.name();
@@ -140,8 +141,10 @@ protected static int readColumn(String args, int startIndex, int len, String nam
140141
brackets++;
141142
}
142143

143-
if (args.startsWith(KEYWORD_AGGREGATE_FUNCTION, i)) {
144-
int index = args.indexOf('(', i + KEYWORD_AGGREGATE_FUNCTION.length());
144+
String matchedKeyword;
145+
if (args.startsWith((matchedKeyword = KEYWORD_AGGREGATE_FUNCTION), i)
146+
|| args.startsWith((matchedKeyword = KEYWORD_SIMPLE_AGGREGATE_FUNCTION), i)) {
147+
int index = args.indexOf('(', i + matchedKeyword.length());
145148
if (index < i) {
146149
throw new IllegalArgumentException("Missing function parameters");
147150
}
@@ -160,8 +163,8 @@ protected static int readColumn(String args, int startIndex, int len, String nam
160163
nestedColumns.add(ClickHouseColumn.of("", p));
161164
}
162165
}
163-
column = new ClickHouseColumn(ClickHouseDataType.AggregateFunction, name, args.substring(startIndex, i),
164-
nullable, lowCardinality, params, nestedColumns);
166+
column = new ClickHouseColumn(ClickHouseDataType.valueOf(matchedKeyword), name,
167+
args.substring(startIndex, i), nullable, lowCardinality, params, nestedColumns);
165168
column.aggFuncType = aggFunc;
166169
} else if (args.startsWith(KEYWORD_ARRAY, i)) {
167170
int index = args.indexOf('(', i + KEYWORD_ARRAY.length());
@@ -395,7 +398,9 @@ private ClickHouseColumn(ClickHouseDataType dataType, String columnName, String
395398
}
396399

397400
public boolean isAggregateFunction() {
401+
// || dataType == ClickHouseDataType.SimpleAggregateFunction;
398402
return dataType == ClickHouseDataType.AggregateFunction;
403+
399404
}
400405

401406
public boolean isArray() {

clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseDataType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public enum ClickHouseDataType {
7676
"NATIONAL CHARACTER", "NATIONAL CHARACTER LARGE OBJECT", "NATIONAL CHARACTER VARYING", "NCHAR",
7777
"NCHAR LARGE OBJECT", "NCHAR VARYING", "NVARCHAR", "TEXT", "TINYBLOB", "TINYTEXT", "VARCHAR", "VARCHAR2"),
7878
AggregateFunction(String.class, true, true, false, 0, 0, 0, 0, 0), // implementation-defined intermediate state
79+
SimpleAggregateFunction(String.class, true, true, false, 0, 0, 0, 0, 0),
7980
Array(Object.class, true, true, false, 0, 0, 0, 0, 0), Map(Map.class, true, true, false, 0, 0, 0, 0, 0),
8081
Nested(Object.class, true, true, false, 0, 0, 0, 0, 0), Tuple(List.class, true, true, false, 0, 0, 0, 0, 0),
8182
Point(Object.class, false, true, true, 33, 0, 0, 0, 0), // same as Tuple(Float64, Float64)

clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseDateTimeValue.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.time.LocalDateTime;
1111
import java.time.LocalTime;
1212
import java.time.OffsetDateTime;
13-
import java.time.ZoneOffset;
1413
import java.time.ZonedDateTime;
1514
import java.time.format.DateTimeFormatter;
1615
import java.util.TimeZone;
@@ -126,40 +125,41 @@ public ClickHouseDateTimeValue copy(boolean deep) {
126125

127126
@Override
128127
public byte asByte() {
129-
return isNullOrEmpty() ? (byte) 0 : (byte) getValue().toEpochSecond(ZoneOffset.UTC);
128+
return isNullOrEmpty() ? (byte) 0 : (byte) getValue().atZone(tz.toZoneId()).toEpochSecond();
130129
}
131130

132131
@Override
133132
public short asShort() {
134-
return isNullOrEmpty() ? (short) 0 : (short) getValue().toEpochSecond(ZoneOffset.UTC);
133+
return isNullOrEmpty() ? (short) 0 : (short) getValue().atZone(tz.toZoneId()).toEpochSecond();
135134
}
136135

137136
@Override
138137
public int asInteger() {
139-
return isNullOrEmpty() ? 0 : (int) getValue().toEpochSecond(ZoneOffset.UTC);
138+
return isNullOrEmpty() ? 0 : (int) getValue().atZone(tz.toZoneId()).toEpochSecond();
140139
}
141140

142141
@Override
143142
public long asLong() {
144-
return isNullOrEmpty() ? 0L : getValue().toEpochSecond(ZoneOffset.UTC);
143+
return isNullOrEmpty() ? 0L : getValue().atZone(tz.toZoneId()).toEpochSecond();
145144
}
146145

147146
@Override
148147
public float asFloat() {
149148
return isNullOrEmpty() ? 0F
150-
: getValue().toEpochSecond(ZoneOffset.UTC) + getValue().getNano() / ClickHouseValues.NANOS.floatValue();
149+
: getValue().atZone(tz.toZoneId()).toEpochSecond()
150+
+ getValue().getNano() / ClickHouseValues.NANOS.floatValue();
151151
}
152152

153153
@Override
154154
public double asDouble() {
155155
return isNullOrEmpty() ? 0D
156-
: getValue().toEpochSecond(ZoneOffset.UTC)
156+
: getValue().atZone(tz.toZoneId()).toEpochSecond()
157157
+ getValue().getNano() / ClickHouseValues.NANOS.doubleValue();
158158
}
159159

160160
@Override
161161
public BigInteger asBigInteger() {
162-
return isNullOrEmpty() ? null : BigInteger.valueOf(getValue().toEpochSecond(ZoneOffset.UTC));
162+
return isNullOrEmpty() ? null : BigInteger.valueOf(getValue().atZone(tz.toZoneId()).toEpochSecond());
163163
}
164164

165165
@Override
@@ -168,7 +168,7 @@ public BigDecimal asBigDecimal(int scale) {
168168
BigDecimal v = null;
169169
if (value != null) {
170170
int nanoSeconds = value.getNano();
171-
v = new BigDecimal(BigInteger.valueOf(value.toEpochSecond(ZoneOffset.UTC)), scale);
171+
v = new BigDecimal(BigInteger.valueOf(value.atZone(tz.toZoneId()).toEpochSecond()), scale);
172172
if (scale != 0 && nanoSeconds != 0) {
173173
v = v.add(BigDecimal.valueOf(nanoSeconds).divide(ClickHouseValues.NANOS).setScale(scale,
174174
RoundingMode.HALF_UP));

clickhouse-jdbc/legacy.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@
121121
<artifactId>testng</artifactId>
122122
<scope>test</scope>
123123
</dependency>
124+
<dependency>
125+
<groupId>mysql</groupId>
126+
<artifactId>mysql-connector-java</artifactId>
127+
<scope>test</scope>
128+
</dependency>
129+
<dependency>
130+
<groupId>org.mariadb.jdbc</groupId>
131+
<artifactId>mariadb-java-client</artifactId>
132+
<scope>test</scope>
133+
</dependency>
134+
<dependency>
135+
<groupId>org.postgresql</groupId>
136+
<artifactId>postgresql</artifactId>
137+
<scope>test</scope>
138+
</dependency>
124139
</dependencies>
125140

126141
<build>

clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -568,9 +568,8 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res
568568
if (parsedStmt.hasTempTable()) {
569569
// non-insert queries using temp table
570570
ps = new TableBasedPreparedStatement(this,
571-
clientRequest.write().query(parsedStmt.getSQL(), newQueryId()),
572-
parsedStmt.getTempTables(), resultSetType,
573-
resultSetConcurrency, resultSetHoldability);
571+
clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), parsedStmt,
572+
resultSetType, resultSetConcurrency, resultSetHoldability);
574573
} else if (parsedStmt.getStatementType() == StatementType.INSERT) {
575574
if (!ClickHouseChecker.isNullOrBlank(parsedStmt.getInput())) {
576575
// insert query using input function

clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Map.Entry;
1414

1515
import com.clickhouse.client.ClickHouseChecker;
16+
import com.clickhouse.client.ClickHouseClient;
1617
import com.clickhouse.client.ClickHouseConfig;
1718
import com.clickhouse.client.ClickHouseFormat;
1819
import com.clickhouse.client.ClickHouseRequest;
@@ -30,6 +31,7 @@
3031
import com.clickhouse.jdbc.SqlExceptionUtils;
3132
import com.clickhouse.jdbc.JdbcWrapper;
3233
import com.clickhouse.jdbc.parser.ClickHouseSqlStatement;
34+
import com.clickhouse.jdbc.parser.StatementType;
3335

3436
public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseStatement {
3537
private static final Logger log = LoggerFactory.getLogger(ClickHouseStatementImpl.class);
@@ -50,7 +52,7 @@ public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseSt
5052
private int maxFieldSize;
5153
private int maxRows;
5254
private boolean poolable;
53-
private String queryId;
55+
private volatile String queryId;
5456
private int queryTimeout;
5557

5658
private ClickHouseResultSet currentResult;
@@ -149,6 +151,7 @@ protected int executeInsert(String sql, InputStream input) throws SQLException {
149151
try (ClickHouseResponse resp = request.write().query(sql, queryId = connection.newQueryId())
150152
.format(ClickHouseFormat.RowBinary).data(input).execute()
151153
.get()) {
154+
updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp);
152155
summary = resp.getSummary();
153156
} catch (InterruptedException e) {
154157
log.error("can not close stream: %s", e.getMessage());
@@ -197,6 +200,9 @@ protected ResultSet updateResult(ClickHouseSqlStatement stmt, ClickHouseResponse
197200
rs = currentResult;
198201
} else {
199202
currentUpdateCount = response.getSummary().getUpdateCount();
203+
if (currentUpdateCount <= 0) {
204+
currentUpdateCount = 1;
205+
}
200206
response.close();
201207
}
202208

@@ -362,11 +368,19 @@ public void setQueryTimeout(int seconds) throws SQLException {
362368

363369
@Override
364370
public void cancel() throws SQLException {
365-
if (this.queryId == null || isClosed()) {
371+
final String qid;
372+
if ((qid = this.queryId) == null || isClosed()) {
366373
return;
367374
}
368375

369-
executeQuery(String.format("KILL QUERY WHERE query_id='%s'", queryId));
376+
ClickHouseClient.send(request.getServer(), String.format("KILL QUERY WHERE query_id='%s'", qid))
377+
.whenComplete((summary, exception) -> {
378+
if (exception != null) {
379+
log.warn("Failed to kill query [%s] due to: %s", qid, exception.getMessage());
380+
} else if (summary != null) {
381+
log.debug("Killed query [%s]", qid);
382+
}
383+
});
370384
}
371385

372386
@Override
@@ -500,8 +514,10 @@ public int[] executeBatch() throws SQLException {
500514
int len = batchStmts.size();
501515
int[] results = new int[len];
502516
for (int i = 0; i < len; i++) {
503-
try (ClickHouseResponse r = executeStatement(batchStmts.get(i), null, null, null)) {
504-
results[i] = (int) r.getSummary().getWrittenRows();
517+
ClickHouseSqlStatement s = batchStmts.get(i);
518+
try (ClickHouseResponse r = executeStatement(s, null, null, null)) {
519+
updateResult(s, r);
520+
results[i] = currentUpdateCount <= 0 ? 0 : currentUpdateCount;
505521
} catch (Exception e) {
506522
results[i] = EXECUTE_FAILED;
507523
log.error("Faled to execute task %d of %d", i + 1, len, e);

clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ public int executeUpdate() throws SQLException {
107107
ensureParams();
108108

109109
addBatch();
110-
return executeBatch()[0];
110+
int row = getUpdateCount();
111+
return row > 0 ? row : 0;
111112
}
112113

113114
@Override

0 commit comments

Comments
 (0)