4
4
import java .io .InputStream ;
5
5
import java .io .OutputStream ;
6
6
import java .io .Serializable ;
7
- import java .io .UncheckedIOException ;
8
7
import java .util .ArrayList ;
9
8
import java .util .Collection ;
10
9
import java .util .Collections ;
17
16
import java .util .Map ;
18
17
import java .util .Objects ;
19
18
import java .util .Map .Entry ;
20
- import java .util .concurrent .CancellationException ;
21
19
import java .util .concurrent .CompletableFuture ;
22
20
import java .util .concurrent .CompletionException ;
23
- import java .util .concurrent .ExecutionException ;
24
- import java .util .concurrent .TimeUnit ;
25
- import java .util .concurrent .TimeoutException ;
26
21
import java .util .concurrent .atomic .AtomicReference ;
27
22
import java .util .Optional ;
28
23
import java .util .Properties ;
31
26
import java .util .function .Function ;
32
27
33
28
import com .clickhouse .client .config .ClickHouseClientOption ;
34
- import com .clickhouse .config .ClickHouseBufferingMode ;
35
29
import com .clickhouse .config .ClickHouseConfigChangeListener ;
36
30
import com .clickhouse .config .ClickHouseOption ;
37
31
import com .clickhouse .data .ClickHouseChecker ;
38
32
import com .clickhouse .data .ClickHouseCompression ;
39
- import com .clickhouse .data .ClickHouseDataStreamFactory ;
40
33
import com .clickhouse .data .ClickHouseDeferredValue ;
41
34
import com .clickhouse .data .ClickHouseExternalTable ;
42
35
import com .clickhouse .data .ClickHouseFile ;
43
36
import com .clickhouse .data .ClickHouseFormat ;
44
37
import com .clickhouse .data .ClickHouseInputStream ;
45
38
import com .clickhouse .data .ClickHouseOutputStream ;
46
39
import com .clickhouse .data .ClickHousePassThruStream ;
47
- import com .clickhouse .data .ClickHousePipedOutputStream ;
48
40
import com .clickhouse .data .ClickHouseUtils ;
49
41
import com .clickhouse .data .ClickHouseValue ;
50
42
import com .clickhouse .data .ClickHouseValues ;
@@ -69,12 +61,26 @@ public class ClickHouseRequest<SelfT extends ClickHouseRequest<SelfT>> implement
69
61
SPECIAL_SETTINGS = Collections .unmodifiableSet (set );
70
62
}
71
63
64
+ static class PipedWriter implements ClickHouseWriter {
65
+ private final ClickHouseDeferredValue <ClickHouseInputStream > input ;
66
+
67
+ PipedWriter (ClickHouseDeferredValue <ClickHouseInputStream > input ) {
68
+ this .input = input ;
69
+ }
70
+
71
+ @ Override
72
+ public void write (ClickHouseOutputStream output ) throws IOException {
73
+ ClickHouseInputStream in = input .get ();
74
+ if (in != null ) {
75
+ in .pipe (output );
76
+ }
77
+ }
78
+ }
79
+
72
80
/**
73
81
* Mutation request.
74
82
*/
75
83
public static class Mutation extends ClickHouseRequest <Mutation > {
76
- private transient ClickHouseWriter writer ;
77
-
78
84
protected Mutation (ClickHouseRequest <?> request , boolean sealed ) {
79
85
super (request .getClient (), request .server , request .serverRef , request .options , sealed );
80
86
@@ -151,16 +157,18 @@ public Mutation format(ClickHouseFormat format) {
151
157
}
152
158
153
159
/**
154
- * Sets custom writer for streaming. This will create a piped stream between the
155
- * writer and ClickHouse server .
160
+ * Sets custom writer for streaming. This will remove input stream set by other
161
+ * {@code data()} methods .
156
162
*
157
163
* @param writer writer
158
164
* @return mutation request
159
165
*/
160
166
public Mutation data (ClickHouseWriter writer ) {
161
167
checkSealed ();
162
168
169
+ this .input = changeProperty (PROP_DATA , this .input , null );
163
170
this .writer = changeProperty (PROP_WRITER , this .writer , writer );
171
+
164
172
return this ;
165
173
}
166
174
@@ -191,6 +199,7 @@ public Mutation data(ClickHousePassThruStream stream) {
191
199
final int bufferSize = c .getReadBufferSize ();
192
200
this .input = changeProperty (PROP_DATA , this .input , ClickHouseDeferredValue
193
201
.of (() -> ClickHouseInputStream .of (stream , bufferSize , null )));
202
+ this .writer = changeProperty (PROP_WRITER , this .writer , null );
194
203
return this ;
195
204
}
196
205
@@ -259,6 +268,7 @@ public Mutation data(ClickHouseInputStream input) {
259
268
260
269
this .input = changeProperty (PROP_DATA , this .input ,
261
270
ClickHouseDeferredValue .of (input , ClickHouseInputStream .class ));
271
+ this .writer = changeProperty (PROP_WRITER , this .writer , null );
262
272
return this ;
263
273
}
264
274
@@ -272,83 +282,11 @@ public Mutation data(ClickHouseDeferredValue<ClickHouseInputStream> input) {
272
282
checkSealed ();
273
283
274
284
this .input = changeProperty (PROP_DATA , this .input , input );
285
+ this .writer = changeProperty (PROP_WRITER , this .writer , null );
275
286
276
287
return this ;
277
288
}
278
289
279
- @ Override
280
- public CompletableFuture <ClickHouseResponse > execute () {
281
- if (writer != null ) {
282
- final ClickHouseConfig c = getConfig ();
283
- final boolean perfMode = c .getResponseBuffering () == ClickHouseBufferingMode .PERFORMANCE ;
284
- final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ()
285
- .createPipedOutputStream (c , null );
286
- data (stream .getInputStream ());
287
- CompletableFuture <ClickHouseResponse > future = null ;
288
- if (c .isAsync ()) {
289
- future = getClient ().execute (this );
290
- }
291
-
292
- if (perfMode ) {
293
- ClickHouseClient .submit (() -> {
294
-
295
- });
296
- } else {
297
- try (ClickHouseOutputStream out = stream ) {
298
- writer .write (out );
299
- } catch (IOException e ) {
300
- throw new CompletionException (e );
301
- }
302
- }
303
-
304
- if (future != null ) {
305
- return future ;
306
- }
307
- }
308
-
309
- return getClient ().execute (this );
310
- }
311
-
312
- @ Override
313
- public ClickHouseResponse executeAndWait () throws ClickHouseException {
314
- if (writer != null ) {
315
- ClickHouseConfig c = getConfig ();
316
- ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ()
317
- .createPipedOutputStream (c , null );
318
- data (stream .getInputStream ());
319
- CompletableFuture <ClickHouseResponse > future = null ;
320
- if (c .isAsync ()) {
321
- future = getClient ().execute (this );
322
- }
323
- try (ClickHouseOutputStream out = stream ) {
324
- writer .write (out );
325
- } catch (IOException e ) {
326
- throw ClickHouseException .of (e , getServer ());
327
- }
328
- if (future != null ) {
329
- try {
330
- return future .get (c .getSocketTimeout (), TimeUnit .MILLISECONDS );
331
- } catch (InterruptedException e ) {
332
- Thread .currentThread ().interrupt ();
333
- throw ClickHouseException .forCancellation (e , getServer ());
334
- } catch (CancellationException e ) {
335
- throw ClickHouseException .forCancellation (e , getServer ());
336
- } catch (ExecutionException | TimeoutException | UncheckedIOException e ) {
337
- Throwable cause = e .getCause ();
338
- if (cause == null ) {
339
- cause = e ;
340
- }
341
- throw cause instanceof ClickHouseException ? (ClickHouseException ) cause
342
- : ClickHouseException .of (cause , getServer ());
343
- } catch (RuntimeException e ) { // unexpected
344
- throw ClickHouseException .of (e , getServer ());
345
- }
346
- }
347
- }
348
-
349
- return getClient ().executeAndWait (this );
350
- }
351
-
352
290
@ Override
353
291
public Mutation table (String table , String queryId ) {
354
292
checkSealed ();
@@ -369,6 +307,7 @@ public Mutation seal() {
369
307
req .namedParameters .putAll (namedParameters );
370
308
371
309
req .input = input ;
310
+ req .writer = writer ;
372
311
req .queryId = queryId ;
373
312
req .sql = sql ;
374
313
@@ -407,6 +346,7 @@ public Mutation seal() {
407
346
408
347
protected final Map <String , String > namedParameters ;
409
348
349
+ protected transient ClickHouseWriter writer ;
410
350
protected transient ClickHouseDeferredValue <ClickHouseInputStream > input ;
411
351
protected transient ClickHouseDeferredValue <ClickHouseOutputStream > output ;
412
352
protected String queryId ;
@@ -508,6 +448,7 @@ public ClickHouseRequest<SelfT> copy() {
508
448
req .settings .putAll (settings );
509
449
req .namedParameters .putAll (namedParameters );
510
450
req .input = input ;
451
+ req .writer = writer ;
511
452
req .output = output ;
512
453
req .queryId = queryId ;
513
454
req .sql = sql ;
@@ -555,12 +496,12 @@ public boolean isTransactional() {
555
496
}
556
497
557
498
/**
558
- * Checks if the request contains any input stream.
499
+ * Checks if the request contains any input stream or custom writer .
559
500
*
560
- * @return true if there's input stream; false otherwise
501
+ * @return true if there's input stream or customer writer ; false otherwise
561
502
*/
562
503
public boolean hasInputStream () {
563
- return this .input != null || !this .externalTables .isEmpty ();
504
+ return this .input != null || this . writer != null || !this .externalTables .isEmpty ();
564
505
}
565
506
566
507
/**
@@ -632,9 +573,27 @@ public final BiConsumer<ClickHouseNode, ClickHouseNode> getServerListener() {
632
573
* @return input stream
633
574
*/
634
575
public Optional <ClickHouseInputStream > getInputStream () {
576
+ if (this .input == null && this .writer != null ) {
577
+ final ClickHouseConfig c = getConfig ();
578
+ final ClickHouseWriter w = this .writer ;
579
+ this .input = changeProperty (PROP_DATA , this .input ,
580
+ ClickHouseDeferredValue .of (() -> ClickHouseInputStream .of (c , w )));
581
+ }
635
582
return input != null ? input .getOptional () : Optional .empty ();
636
583
}
637
584
585
+ /**
586
+ * Gets custom writer for writing raw request.
587
+ *
588
+ * @return custom writer
589
+ */
590
+ public Optional <ClickHouseWriter > getWriter () {
591
+ if (this .writer == null && this .input != null ) {
592
+ this .writer = changeProperty (PROP_WRITER , this .writer , new PipedWriter (input ));
593
+ }
594
+ return Optional .ofNullable (this .writer );
595
+ }
596
+
638
597
/**
639
598
* Gets output stream.
640
599
*
@@ -2026,6 +1985,7 @@ public SelfT reset() {
2026
1985
}
2027
1986
}
2028
1987
this .input = changeProperty (PROP_DATA , this .input , null );
1988
+ this .writer = changeProperty (PROP_WRITER , this .writer , null );
2029
1989
this .output = changeProperty (PROP_OUTPUT , this .output , null );
2030
1990
this .sql = changeProperty (PROP_QUERY , this .sql , null );
2031
1991
this .preparedQuery = changeProperty (PROP_PREPARED_QUERY , this .preparedQuery , null );
@@ -2065,6 +2025,7 @@ public ClickHouseRequest<SelfT> seal() {
2065
2025
req .namedParameters .putAll (namedParameters );
2066
2026
2067
2027
req .input = input ;
2028
+ req .writer = writer ;
2068
2029
req .output = output ;
2069
2030
req .queryId = queryId ;
2070
2031
req .sql = sql ;
0 commit comments