Skip to content

Commit 19ab569

Browse files
authored
Merge pull request #914 from rernas35/r2dbc_feature
r2dbc module is added.
2 parents 4219d8f + dc23b90 commit 19ab569

37 files changed

+3396
-0
lines changed

clickhouse-r2dbc/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# clickhouse-r2dbc
2+
3+
This module provides r2dbc support to clickhouse-jdbc driver.
4+
5+
r2dbc link : https://r2dbc.io/
6+
7+
Sample code:
8+
```java
9+
ConnectionFactory connectionFactory = ConnectionFactories
10+
.get("r2dbc:clickhouse:http://{username}:{password}@{host}:{port}/{database}");
11+
12+
Mono.from(connectionFactory.create())
13+
.flatMapMany(connection -> connection
14+
.createStatement("select domain, path, toDate(cdate) as d, count(1) as count from clickdb.clicks where domain = :domain group by domain, path, d")
15+
.bind("domain", domain)
16+
.execute())
17+
.flatMap(result -> result
18+
.map((row, rowMetadata) -> String.format("%s%s[%s]:%d", row.get("domain", String.class),
19+
row.get("path", String.class),
20+
row.get("d", LocalDate.class),
21+
row.get("count", Long.class)) ))
22+
.doOnNext(System.out::println)
23+
.subscribe();
24+
```
25+
26+
for full example please check clickhouse-jdbc/examples/clickhouse-r2dbc-samples/clickhouse-r2dbc-spring-webflux-sample .

clickhouse-r2dbc/pom.xml

Lines changed: 307 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import com.clickhouse.client.ClickHouseFormat;
4+
import com.clickhouse.client.ClickHouseRequest;
5+
import io.r2dbc.spi.Batch;
6+
import io.r2dbc.spi.Result;
7+
import org.reactivestreams.Publisher;
8+
import reactor.core.publisher.Flux;
9+
import reactor.core.publisher.Mono;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class ClickHouseBatch implements Batch {
15+
16+
private static final ClickHouseFormat PREFERRED_FORMAT = ClickHouseFormat.TabSeparatedWithNamesAndTypes;
17+
private ClickHouseRequest<?> request;
18+
final List<String> sqlList;
19+
20+
public ClickHouseBatch(ClickHouseRequest<?> request) {
21+
this.request = request;
22+
this.sqlList = new ArrayList<>();
23+
}
24+
25+
@Override
26+
public Batch add(String sql) {
27+
sqlList.add(sql);
28+
return this;
29+
}
30+
31+
@Override
32+
public Publisher<? extends Result> execute() {
33+
return Flux.fromStream(sqlList.stream().map(sql -> {
34+
request.query(sql).format(PREFERRED_FORMAT);
35+
return Mono.fromFuture(request::execute); }))
36+
.flatMap(Mono::flux)
37+
.map(ClickHouseResult::new);
38+
}
39+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import com.clickhouse.client.ClickHouseColumn;
4+
import com.clickhouse.r2dbc.types.ClickHouseDataTypeWrapper;
5+
import io.r2dbc.spi.ColumnMetadata;
6+
import io.r2dbc.spi.Type;
7+
8+
public class ClickHouseColumnMetadata implements ColumnMetadata {
9+
10+
final Type type;
11+
final String name;
12+
13+
ClickHouseColumnMetadata(ClickHouseColumn col) {
14+
this.name = col.getColumnName(); // TODO :check alias handling.
15+
this.type = ClickHouseDataTypeWrapper.of(col.getDataType());
16+
}
17+
18+
@Override
19+
public Type getType() {
20+
return type;
21+
}
22+
23+
@Override
24+
public String getName() {
25+
return name;
26+
}
27+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import com.clickhouse.client.ClickHouseResponse;
4+
import com.clickhouse.client.ClickHouseResponseSummary;
5+
import com.clickhouse.client.logging.Logger;
6+
import com.clickhouse.client.logging.LoggerFactory;
7+
import io.r2dbc.spi.Result;
8+
import io.r2dbc.spi.Row;
9+
import io.r2dbc.spi.RowMetadata;
10+
import org.apache.commons.lang3.tuple.Pair;
11+
import org.reactivestreams.Publisher;
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.Mono;
14+
15+
import java.util.function.BiFunction;
16+
import java.util.function.Function;
17+
import java.util.function.Predicate;
18+
import java.util.stream.StreamSupport;
19+
20+
public class ClickHouseResult implements Result {
21+
22+
private static final Logger log = LoggerFactory.getLogger(ClickHouseResult.class);
23+
24+
private final Flux<? extends Result.Segment> rowSegments;
25+
private final Mono<? extends Result.Segment> updatedCount;
26+
private final Flux<? extends Result.Segment> segments;
27+
28+
ClickHouseResult(ClickHouseResponse response) {
29+
this.rowSegments = Mono.just(response)
30+
.flatMapMany(resp -> Flux
31+
.fromStream(StreamSupport.stream(resp.records().spliterator(), false)
32+
.map(rec -> Pair.of(resp.getColumns(), rec))))
33+
.map(pair -> new ClickHouseRow(pair.getRight(), pair.getLeft()))
34+
.map(RowSegment::new);
35+
this.updatedCount = Mono.just(response).map(ClickHouseResponse::getSummary)
36+
.map(ClickHouseResponseSummary::getProgress)
37+
.map(ClickHouseResponseSummary.Progress::getWrittenRows)
38+
.map(UpdateCount::new);
39+
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
40+
}
41+
42+
ClickHouseResult(Flux<? extends Result.Segment> rowSegments, Mono<? extends Result.Segment> updatedCount) {
43+
this.rowSegments = rowSegments;
44+
this.updatedCount = updatedCount;
45+
this.segments = Flux.concat(this.updatedCount, this.rowSegments);
46+
}
47+
48+
/**
49+
* Returns updated count(written rows from summary of {@link ClickHouseResponse}).Important! if writtenRows is greater than MAX_INT then it will return MAX_INT.
50+
*
51+
* @return updated count
52+
*/
53+
@Override
54+
public Mono<Long> getRowsUpdated() {
55+
return updatedCount.map(val -> ((UpdateCount) val).value());
56+
}
57+
58+
@Override
59+
public <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> biFunction) {
60+
return rowSegments.cast(RowSegment.class)
61+
.map(RowSegment::row).handle((row, sink) -> {
62+
try {
63+
sink.next(biFunction.apply(row, row.getMetadata()));
64+
} catch (Exception e) {
65+
log.error("Provided function caused exception:", e);
66+
}
67+
});
68+
}
69+
70+
@Override
71+
public Result filter(Predicate<Segment> predicate) {
72+
return new ClickHouseResult(segments.filter(predicate), updatedCount.filter(predicate));
73+
}
74+
75+
@Override
76+
public <T> Publisher<T> flatMap(Function<Segment, ? extends Publisher<? extends T>> function) {
77+
return segments.flatMap(segment -> {
78+
try {
79+
Publisher<? extends T> retValue = function.apply(segment);
80+
if (retValue == null) {
81+
return Mono.error(new IllegalStateException("flatmap function returned null value"));
82+
}
83+
return retValue;
84+
} catch (Exception e) {
85+
log.error("Provided function caused exception:", e);
86+
return Mono.error(e);
87+
}
88+
});
89+
}
90+
91+
92+
class RowSegment implements Result.RowSegment {
93+
94+
final ClickHouseRow row;
95+
96+
RowSegment(ClickHouseRow row) {
97+
this.row = row;
98+
}
99+
100+
@Override
101+
public Row row() {
102+
return row;
103+
}
104+
}
105+
106+
class UpdateCount implements Result.UpdateCount {
107+
108+
final long updateCount;
109+
110+
UpdateCount(long updateCount) {
111+
this.updateCount = updateCount;
112+
}
113+
114+
@Override
115+
public long value() {
116+
return updateCount;
117+
}
118+
}
119+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import com.clickhouse.client.ClickHouseColumn;
4+
import com.clickhouse.client.ClickHouseRecord;
5+
import io.r2dbc.spi.Row;
6+
import io.r2dbc.spi.RowMetadata;
7+
8+
import java.util.LinkedHashMap;
9+
import java.util.List;
10+
import java.util.NoSuchElementException;
11+
import java.util.function.Function;
12+
import java.util.stream.Collectors;
13+
14+
public class ClickHouseRow implements Row {
15+
16+
final ClickHouseRecord record;
17+
final ClickHouseRowMetadata rowMetadata;
18+
19+
ClickHouseRow(ClickHouseRecord record, List<ClickHouseColumn> columnList) {
20+
this.record = record;
21+
this.rowMetadata = new ClickHouseRowMetadata(columnList.stream()
22+
.map(ClickHouseColumnMetadata::new)
23+
.collect(Collectors
24+
.toMap(ClickHouseColumnMetadata::getName,
25+
Function.identity(),
26+
(v1,v2) -> v2, // since every key will be unique, won't need to merge so just overwrite with the latest one.
27+
LinkedHashMap::new)));
28+
}
29+
30+
@Override
31+
public RowMetadata getMetadata() {
32+
return rowMetadata;
33+
}
34+
35+
@Override
36+
public <T> T get(int i, Class<T> aClass) {
37+
return aClass.cast(record.getValue(i).asObject(aClass));
38+
}
39+
40+
@Override
41+
public <T> T get(String name, Class<T> aClass) {
42+
try {
43+
return aClass.cast(record.getValue(name).asObject(aClass));
44+
} catch (IllegalArgumentException e) {
45+
throw new NoSuchElementException(String.format("Unknown element with a name %s", name));
46+
}
47+
}
48+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.clickhouse.r2dbc;
2+
3+
import io.r2dbc.spi.ColumnMetadata;
4+
import io.r2dbc.spi.RowMetadata;
5+
6+
import java.util.ArrayList;
7+
import java.util.Collections;
8+
import java.util.LinkedHashMap;
9+
import java.util.List;
10+
11+
public class ClickHouseRowMetadata implements RowMetadata {
12+
13+
LinkedHashMap<String, ClickHouseColumnMetadata> columnNameMetadataMap;
14+
15+
ClickHouseRowMetadata( LinkedHashMap<String, ClickHouseColumnMetadata> columnNameMetadataMap) {
16+
this.columnNameMetadataMap = columnNameMetadataMap;
17+
}
18+
19+
@Override
20+
public ColumnMetadata getColumnMetadata(int i) {
21+
if (i > columnNameMetadataMap.size())
22+
throw new IllegalArgumentException("Given index is greater than size column metadata array.");
23+
return columnNameMetadataMap.entrySet().stream().skip(i-1).findFirst().get().getValue();
24+
}
25+
26+
@Override
27+
public ColumnMetadata getColumnMetadata(String columnName) {
28+
return columnNameMetadataMap.get(columnName);
29+
}
30+
31+
@Override
32+
public List<? extends ColumnMetadata> getColumnMetadatas() {
33+
return Collections.unmodifiableList(new ArrayList<>(columnNameMetadataMap.values()));
34+
}
35+
}

0 commit comments

Comments
 (0)