3
3
import java .io .Serializable ;
4
4
import java .io .UncheckedIOException ;
5
5
import java .net .ConnectException ;
6
+ import java .util .Collections ;
6
7
import java .util .HashMap ;
7
8
import java .util .Map ;
8
9
import java .util .Objects ;
@@ -32,25 +33,33 @@ public class ClickHouseClientBuilder {
32
33
* Dummy client which is only used by {@link Agent}.
33
34
*/
34
35
static class DummyClient implements ClickHouseClient {
35
- static final ClickHouseConfig CONFIG = new ClickHouseConfig ();
36
- static final DummyClient INSTANCE = new DummyClient ();
36
+ static final ClickHouseConfig DEFAULT_CONFIG = new ClickHouseConfig ();
37
+
38
+ private final ClickHouseConfig config ;
39
+
40
+ DummyClient () {
41
+ this (null );
42
+ }
43
+
44
+ DummyClient (ClickHouseConfig config ) {
45
+ this .config = config != null ? config : DEFAULT_CONFIG ;
46
+ }
37
47
38
48
@ Override
39
49
public boolean accept (ClickHouseProtocol protocol ) {
40
- return true ;
50
+ return false ;
41
51
}
42
52
43
53
@ Override
44
54
public CompletableFuture <ClickHouseResponse > execute (ClickHouseRequest <?> request ) {
45
55
CompletableFuture <ClickHouseResponse > future = new CompletableFuture <>();
46
- future .completeExceptionally (
47
- new ConnectException ("No client available for connecting to: " + request .getServer ()));
56
+ future .completeExceptionally (new ConnectException ("No client available" ));
48
57
return future ;
49
58
}
50
59
51
60
@ Override
52
61
public ClickHouseConfig getConfig () {
53
- return CONFIG ;
62
+ return config ;
54
63
}
55
64
56
65
@ Override
@@ -73,8 +82,8 @@ static final class Agent implements ClickHouseClient {
73
82
74
83
private final AtomicReference <ClickHouseClient > client ;
75
84
76
- Agent (ClickHouseClient client ) {
77
- this .client = new AtomicReference <>(client != null ? client : DummyClient . INSTANCE );
85
+ Agent (ClickHouseClient client , ClickHouseConfig config ) {
86
+ this .client = new AtomicReference <>(client != null ? client : new DummyClient ( config ) );
78
87
}
79
88
80
89
ClickHouseClient getClient () {
@@ -95,25 +104,27 @@ boolean changeClient(ClickHouseClient currentClient, ClickHouseClient newClient)
95
104
return changed ;
96
105
}
97
106
98
- ClickHouseResponse failover (ClickHouseRequest <?> sealedRequest , Throwable cause , int times ) {
107
+ ClickHouseResponse failover (ClickHouseRequest <?> sealedRequest , ClickHouseException exception , int times ) {
99
108
for (int i = 1 ; i <= times ; i ++) {
100
- log .debug ("Failover %d of %d due to: %s" , i , times , cause . getMessage () );
109
+ log .debug ("Failover %d of %d due to: %s" , i , times , exception . getCause (), null );
101
110
ClickHouseNode current = sealedRequest .getServer ();
102
111
ClickHouseNodeManager manager = current .manager .get ();
103
112
if (manager == null ) {
104
113
break ;
105
114
}
106
- ClickHouseNode next = manager .suggestNode (current , cause );
115
+ ClickHouseNode next = manager .suggestNode (current , exception );
107
116
if (next == current ) {
117
+ log .debug ("Cancel failover for same node returned from %s" , manager .getPolicy ());
108
118
break ;
109
119
}
110
120
current .update (Status .FAULTY );
111
121
next = sealedRequest .changeServer (current , next );
112
122
if (next == current ) {
123
+ log .debug ("Cancel failover for no alternative of %s" , current );
113
124
break ;
114
125
}
115
126
116
- log .info ("Switching node from %s to %s due to: %s" , current , next , cause . getMessage () );
127
+ log .info ("Switching node from %s to %s due to: %s" , current , next , exception . getCause (), null );
117
128
final ClickHouseProtocol protocol = next .getProtocol ();
118
129
final ClickHouseClient currentClient = client .get ();
119
130
if (!currentClient .accept (protocol )) {
@@ -123,51 +134,50 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, Throwable cause,
123
134
.config (new ClickHouseConfig (currentClient .getConfig (), next .config ))
124
135
.nodeSelector (ClickHouseNodeSelector .of (protocol )).build ();
125
136
} catch (Exception e ) {
126
- cause = e ;
127
- continue ;
137
+ exception = ClickHouseException . of ( new ConnectException ( "No client available for " + next ),
138
+ sealedRequest . getServer ()) ;
128
139
} finally {
129
140
if (newClient != null ) {
130
141
boolean changed = changeClient (currentClient , newClient );
131
- log .debug ("Switching client from %s to %s: %s" , currentClient , newClient , changed );
142
+ log .info ("Switching client from %s to %s: %s" , currentClient , newClient , changed );
132
143
if (changed ) {
133
144
sealedRequest .resetCache ();
134
145
}
135
146
}
136
147
}
148
+
149
+ if (newClient == null ) {
150
+ continue ;
151
+ }
137
152
}
138
153
139
154
try {
140
155
return sendOnce (sealedRequest );
141
156
} catch (Exception exp ) {
142
- cause = exp .getCause ();
143
- if (cause == null ) {
144
- cause = exp ;
145
- }
157
+ exception = ClickHouseException .of (exp .getCause () != null ? exp .getCause () : exp ,
158
+ sealedRequest .getServer ());
146
159
}
147
160
}
148
161
149
- throw new CompletionException (cause );
162
+ throw new CompletionException (exception );
150
163
}
151
164
152
- ClickHouseResponse retry (ClickHouseRequest <?> sealedRequest , Throwable cause , int times ) {
165
+ ClickHouseResponse retry (ClickHouseRequest <?> sealedRequest , ClickHouseException exception , int times ) {
153
166
for (int i = 1 ; i <= times ; i ++) {
154
- log .debug ("Retry %d of %d due to: %s" , i , times , cause .getMessage ());
167
+ log .debug ("Retry %d of %d due to: %s" , i , times , exception .getMessage ());
155
168
// TODO retry idempotent query
156
- if (cause instanceof ClickHouseException
157
- && ((ClickHouseException ) cause ).getErrorCode () == ClickHouseException .ERROR_NETWORK ) {
169
+ if (exception .getErrorCode () == ClickHouseException .ERROR_NETWORK ) {
158
170
log .info ("Retry request on %s due to connection issue" , sealedRequest .getServer ());
159
171
try {
160
172
return sendOnce (sealedRequest );
161
173
} catch (Exception exp ) {
162
- cause = exp .getCause ();
163
- if (cause == null ) {
164
- cause = exp ;
165
- }
174
+ exception = ClickHouseException .of (exp .getCause () != null ? exp .getCause () : exp ,
175
+ sealedRequest .getServer ());
166
176
}
167
177
}
168
178
}
169
179
170
- throw new CompletionException (cause );
180
+ throw new CompletionException (exception );
171
181
}
172
182
173
183
ClickHouseResponse handle (ClickHouseRequest <?> sealedRequest , Throwable cause ) {
@@ -176,16 +186,18 @@ ClickHouseResponse handle(ClickHouseRequest<?> sealedRequest, Throwable cause) {
176
186
cause = ((UncheckedIOException ) cause ).getCause ();
177
187
}
178
188
189
+ log .debug ("Handling %s(failover=%d, retry=%d)" , cause , sealedRequest .getConfig ().getFailover (),
190
+ sealedRequest .getConfig ().getRetry ());
179
191
try {
180
192
int times = sealedRequest .getConfig ().getFailover ();
181
193
if (times > 0 ) {
182
- return failover (sealedRequest , cause , times );
194
+ return failover (sealedRequest , ClickHouseException . of ( cause , sealedRequest . getServer ()) , times );
183
195
}
184
196
185
197
// different from failover: 1) retry on the same node; 2) never retry on timeout
186
198
times = sealedRequest .getConfig ().getRetry ();
187
199
if (times > 0 ) {
188
- return retry (sealedRequest , cause , times );
200
+ return retry (sealedRequest , ClickHouseException . of ( cause , sealedRequest . getServer ()) , times );
189
201
}
190
202
191
203
throw new CompletionException (cause );
@@ -210,8 +222,8 @@ ClickHouseResponse sendOnce(ClickHouseRequest<?> sealedRequest) {
210
222
ClickHouseResponse send (ClickHouseRequest <?> sealedRequest ) {
211
223
try {
212
224
return sendOnce (sealedRequest );
213
- } catch (CompletionException e ) {
214
- return handle (sealedRequest , e .getCause ());
225
+ } catch (Exception e ) {
226
+ return handle (sealedRequest , e .getCause () != null ? e . getCause () : e );
215
227
}
216
228
}
217
229
@@ -238,9 +250,32 @@ public boolean ping(ClickHouseNode server, int timeout) {
238
250
@ Override
239
251
public CompletableFuture <ClickHouseResponse > execute (ClickHouseRequest <?> request ) {
240
252
final ClickHouseRequest <?> sealedRequest = request .seal ();
253
+ final ClickHouseNode server = sealedRequest .getServer ();
254
+ final ClickHouseProtocol protocol = server .getProtocol ();
255
+ final ClickHouseClient currentClient = client .get ();
256
+ if (!currentClient .accept (protocol )) {
257
+ ClickHouseClient newClient = null ;
258
+ try {
259
+ newClient = ClickHouseClient .builder ().agent (false )
260
+ .config (new ClickHouseConfig (currentClient .getConfig (), server .config ))
261
+ .nodeSelector (ClickHouseNodeSelector .of (protocol )).build ();
262
+ } catch (IllegalStateException e ) {
263
+ // let it fail on execution phase
264
+ log .debug ("Failed to find client for %s" , server );
265
+ } finally {
266
+ if (newClient != null ) {
267
+ boolean changed = changeClient (currentClient , newClient );
268
+ log .debug ("Switching client from %s to %s: %s" , currentClient , newClient , changed );
269
+ if (changed ) {
270
+ sealedRequest .resetCache ();
271
+ }
272
+ }
273
+ }
274
+ }
241
275
return sealedRequest .getConfig ().isAsync ()
242
276
? getClient ().execute (sealedRequest )
243
- .handle ((r , t ) -> t == null ? r : handle (sealedRequest , t .getCause ()))
277
+ .handle ((r , t ) -> t == null ? r
278
+ : handle (sealedRequest , t .getCause () != null ? t .getCause () : t ))
244
279
: CompletableFuture .completedFuture (send (sealedRequest ));
245
280
}
246
281
@@ -349,26 +384,28 @@ public ClickHouseConfig getConfig() {
349
384
public ClickHouseClient build () {
350
385
ClickHouseClient client = null ;
351
386
352
- boolean noSelector = nodeSelector == null || nodeSelector == ClickHouseNodeSelector .EMPTY ;
353
- int counter = 0 ;
354
387
ClickHouseConfig conf = getConfig ();
355
- for (ClickHouseClient c : loadClients ()) {
356
- c .init (conf );
388
+ int counter = 0 ;
389
+ if (nodeSelector != null ) {
390
+ for (ClickHouseClient c : loadClients ()) {
391
+ c .init (conf );
357
392
358
- counter ++;
359
- if (noSelector || nodeSelector .match (c )) {
360
- client = c ;
361
- break ;
393
+ counter ++;
394
+ if (nodeSelector == ClickHouseNodeSelector .EMPTY || nodeSelector .match (c )) {
395
+ client = c ;
396
+ break ;
397
+ }
362
398
}
363
399
}
364
400
365
- if (client == null && !agent ) {
401
+ if (agent ) {
402
+ return new Agent (client , conf );
403
+ } else if (client == null ) {
366
404
throw new IllegalStateException (
367
405
ClickHouseUtils .format ("No suitable ClickHouse client(out of %d) found in classpath for %s." ,
368
406
counter , nodeSelector ));
369
407
}
370
-
371
- return agent ? new Agent (client ) : client ;
408
+ return client ;
372
409
}
373
410
374
411
/**
@@ -485,7 +522,11 @@ public ClickHouseClientBuilder defaultCredentials(ClickHouseCredentials credenti
485
522
*/
486
523
public ClickHouseClientBuilder nodeSelector (ClickHouseNodeSelector nodeSelector ) {
487
524
if (!ClickHouseChecker .nonNull (nodeSelector , "nodeSelector" ).equals (this .nodeSelector )) {
488
- this .nodeSelector = nodeSelector ;
525
+ this .nodeSelector = (nodeSelector .getPreferredProtocols ().isEmpty () || nodeSelector .getPreferredProtocols ()
526
+ .equals (Collections .singletonList (ClickHouseProtocol .ANY )))
527
+ && nodeSelector .getPreferredTags ().isEmpty ()
528
+ ? ClickHouseNodeSelector .EMPTY
529
+ : nodeSelector ;
489
530
resetConfig ();
490
531
}
491
532
0 commit comments