Skip to content

Commit 620195c

Browse files
committed
first version of r2dbc
1 parent 2988a42 commit 620195c

22 files changed

+2904
-0
lines changed

clickhouse-r2dbc/pom.xml

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>clickhouse-java</artifactId>
7+
<groupId>com.clickhouse</groupId>
8+
<version>0.3.2-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>clickhouse-r2dbc</artifactId>
13+
14+
<properties>
15+
<hikari-cp.version>4.0.3</hikari-cp.version>
16+
<maven.compiler.source>1.8</maven.compiler.source>
17+
<maven.compiler.target>1.8</maven.compiler.target>
18+
<r2dbc-spi.version>0.9.1.RELEASE</r2dbc-spi.version>
19+
<commons-lang3.version>3.12.0</commons-lang3.version>
20+
<testcontainers.elasticsearch.version>1.16.3</testcontainers.elasticsearch.version>
21+
<testcontainers.clickhouse.version>1.16.3</testcontainers.clickhouse.version>
22+
<docker-java-api.version>3.2.13</docker-java-api.version>
23+
<docker-java-transport-zerodep.version>3.2.13</docker-java-transport-zerodep.version>
24+
<surefire-plugin.version>3.0.0-M5</surefire-plugin.version>
25+
</properties>
26+
27+
<dependencyManagement>
28+
<dependencies>
29+
<dependency>
30+
<groupId>io.projectreactor</groupId>
31+
<artifactId>reactor-bom</artifactId>
32+
<version>2020.0.16</version>
33+
<type>pom</type>
34+
<scope>import</scope>
35+
</dependency>
36+
</dependencies>
37+
</dependencyManagement>
38+
39+
<dependencies>
40+
<dependency>
41+
<groupId>io.projectreactor</groupId>
42+
<artifactId>reactor-core</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>io.r2dbc</groupId>
46+
<artifactId>r2dbc-spi</artifactId>
47+
<version>${r2dbc-spi.version}</version>
48+
</dependency>
49+
<dependency>
50+
<groupId>io.r2dbc</groupId>
51+
<artifactId>r2dbc-spi-test</artifactId>
52+
<version>${r2dbc-spi.version}</version>
53+
</dependency>
54+
<dependency>
55+
<groupId>com.clickhouse</groupId>
56+
<artifactId>clickhouse-client</artifactId>
57+
<version>${project.version}</version>
58+
</dependency>
59+
<dependency>
60+
<groupId>com.clickhouse</groupId>
61+
<artifactId>clickhouse-grpc-client</artifactId>
62+
<version>${project.version}</version>
63+
</dependency>
64+
<dependency>
65+
<groupId>com.clickhouse</groupId>
66+
<artifactId>clickhouse-http-client</artifactId>
67+
<version>${project.version}</version>
68+
</dependency>
69+
<dependency>
70+
<groupId>com.clickhouse</groupId>
71+
<artifactId>clickhouse-jdbc</artifactId>
72+
<version>${project.version}</version>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.apache.commons</groupId>
76+
<artifactId>commons-lang3</artifactId>
77+
<version>${commons-lang3.version}</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.testcontainers</groupId>
81+
<artifactId>elasticsearch</artifactId>
82+
<version>${testcontainers.elasticsearch.version}</version>
83+
<scope>test</scope>
84+
</dependency>
85+
<dependency>
86+
<groupId>org.testcontainers</groupId>
87+
<artifactId>clickhouse</artifactId>
88+
<version>${testcontainers.clickhouse.version}</version>
89+
<scope>test</scope>
90+
</dependency>
91+
<dependency>
92+
<groupId>com.github.docker-java</groupId>
93+
<artifactId>docker-java-api</artifactId>
94+
<version>${docker-java-api.version}</version>
95+
<scope>test</scope>
96+
</dependency>
97+
<dependency>
98+
<groupId>com.github.docker-java</groupId>
99+
<artifactId>docker-java-transport-zerodep</artifactId>
100+
<version>${docker-java-transport-zerodep.version}</version>
101+
<scope>test</scope>
102+
</dependency>
103+
<dependency>
104+
<groupId>com.zaxxer</groupId>
105+
<artifactId>HikariCP</artifactId>
106+
<version>${hikari-cp.version}</version>
107+
<scope>test</scope>
108+
<exclusions>
109+
<exclusion>
110+
<groupId>org.slf4j</groupId>
111+
<artifactId>slf4j-api</artifactId>
112+
</exclusion>
113+
</exclusions>
114+
</dependency>
115+
116+
</dependencies>
117+
118+
<build>
119+
<plugins>
120+
<plugin>
121+
<groupId>org.apache.maven.plugins</groupId>
122+
<artifactId>maven-surefire-plugin</artifactId>
123+
<version>${surefire-plugin.version}</version>
124+
<configuration>
125+
<includes>
126+
<include>%regex[.*Test*.*]</include>
127+
</includes>
128+
</configuration>
129+
</plugin>
130+
</plugins>
131+
</build>
132+
</project>
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import com.clickhouse.client.ClickHouseFormat;
4+
import com.clickhouse.client.ClickHouseRequest;
5+
import io.r2dbc.spi.Batch;
6+
import io.r2dbc.spi.Result;
7+
import org.reactivestreams.Publisher;
8+
import reactor.core.publisher.Flux;
9+
import reactor.core.publisher.Mono;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class ClickHouseBatch implements Batch {
15+
16+
private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.RowBinaryWithNamesAndTypes;
17+
private ClickHouseRequest<?> request;
18+
List<String> sqlList = new ArrayList<>();
19+
20+
public ClickHouseBatch(ClickHouseRequest request) {
21+
this.request = request;
22+
}
23+
24+
@Override
25+
public Batch add(String sql) {
26+
sqlList.add(sql);
27+
return this;
28+
}
29+
30+
@Override
31+
public Publisher<? extends Result> execute() {
32+
return Flux.fromStream(sqlList.stream().map(sql -> {
33+
request.query(sql).format(PREFERRED_FORMAT);
34+
return Mono.fromFuture(request.execute());
35+
}).map(ClickHouseResult::new));
36+
}
37+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import com.clickhouse.client.ClickHouseColumn;
4+
import com.clickhouse.r2dbc.types.TypeMapper;
5+
import io.r2dbc.spi.ColumnMetadata;
6+
import io.r2dbc.spi.Type;
7+
8+
public class ClickHouseColumnMetadata implements ColumnMetadata {
9+
10+
final Type type;
11+
final String name;
12+
13+
ClickHouseColumnMetadata(ClickHouseColumn col) {
14+
this.name = col.getColumnName(); // TODO :check alias handling.
15+
this.type = TypeMapper.getType(col.getDataType());
16+
}
17+
18+
@Override
19+
public Type getType() {
20+
return type;
21+
}
22+
23+
@Override
24+
public String getName() {
25+
return name;
26+
}
27+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import com.clickhouse.client.ClickHouseResponse;
4+
import com.clickhouse.client.ClickHouseResponseSummary;
5+
import com.clickhouse.client.logging.Logger;
6+
import com.clickhouse.client.logging.LoggerFactory;
7+
import io.r2dbc.spi.Result;
8+
import io.r2dbc.spi.Row;
9+
import io.r2dbc.spi.RowMetadata;
10+
import org.apache.commons.lang3.tuple.Pair;
11+
import org.reactivestreams.Publisher;
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.Mono;
14+
15+
import java.util.function.BiFunction;
16+
import java.util.function.Function;
17+
import java.util.function.Predicate;
18+
import java.util.stream.StreamSupport;
19+
20+
public class ClickHouseResult implements Result {
21+
22+
private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class);
23+
24+
private final Flux<? extends Result.Segment> rowSegments;
25+
private final Mono<? extends Result.Segment> updatedCount;
26+
private final Flux<? extends Result.Segment> segments;
27+
28+
ClickHouseResult(Mono<ClickHouseResponse> response) {
29+
this.rowSegments = response.flux()
30+
.flatMap(resp -> Flux
31+
.fromStream(StreamSupport.stream(resp.records().spliterator(), false)
32+
.map(rec -> Pair.of(resp.getColumns(), rec))))
33+
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
34+
.map(RowSegment::new);
35+
this.updatedCount = response.map(clickHouseResponse -> clickHouseResponse.getSummary())
36+
.map(ClickHouseResponseSummary::getProgress)
37+
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
38+
.map(UpdateCount::new);
39+
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
40+
}
41+
42+
ClickHouseResult(Flux<? extends Result.Segment> rowSegments, Mono<? extends Result.Segment> updatedCount) {
43+
this.rowSegments = rowSegments;
44+
this.updatedCount = updatedCount;
45+
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
46+
}
47+
48+
/**
49+
* Returns updated count(written rows from summary of {@link ClickHouseResponse}).Important! if writtenRows is greater than MAX_INT then it will return MAX_INT.
50+
* @return
51+
*/
52+
@Override
53+
public Mono<Integer> getRowsUpdated() {
54+
55+
return updatedCount.map(val -> {
56+
UpdateCount updateCount = (UpdateCount) val;
57+
if (updateCount.value() > Integer.MAX_VALUE)
58+
return Integer.MAX_VALUE;
59+
else return Long.valueOf(updateCount.value()).intValue();
60+
});
61+
}
62+
63+
@Override
64+
public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction) {
65+
return rowSegments.cast(RowSegment.class)
66+
.map(RowSegment::row).handle((row, sink) -> {
67+
try {
68+
sink.next(biFunction.apply(row, row.getMetadata()));
69+
} catch (Exception e) {
70+
log.error("Provided function caused exception:", e);
71+
}
72+
});
73+
}
74+
75+
@Override
76+
public Result filter(Predicate<Segment> predicate) {
77+
return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate));
78+
}
79+
80+
@Override
81+
public <T> Publisher<T> flatMap(Function<Segment, ? extends Publisher<? extends T>> function) {
82+
return segments.flatMap(segment -> {
83+
try {
84+
Publisher<? extends T> retValue = function.apply(segment);
85+
if (retValue == null) {
86+
return Mono.error(new IllegalStateException("flatmap function returned null value"));
87+
}
88+
return retValue;
89+
} catch (Exception e) {
90+
log.error("Provided function caused exception:", e);
91+
return Mono.error(e);
92+
}
93+
});
94+
}
95+
96+
97+
class RowSegment implements Result.RowSegment {
98+
99+
final ClickHouseRow row;
100+
101+
RowSegment(ClickHouseRow row) {
102+
this.row = row;
103+
}
104+
105+
@Override
106+
public Row row() {
107+
return row;
108+
}
109+
}
110+
111+
class UpdateCount implements Result.UpdateCount {
112+
113+
final long updateCount;
114+
115+
UpdateCount(long updateCount) {
116+
this.updateCount = updateCount;
117+
}
118+
119+
@Override
120+
public long value() {
121+
return updateCount;
122+
}
123+
}
124+
}

0 commit comments

Comments
 (0)