@@ -272,98 +272,121 @@ ChannelInitializer<Channel> initializer(
272
272
DriverChannelOptions options ,
273
273
NodeMetricUpdater nodeMetricUpdater ,
274
274
CompletableFuture <DriverChannel > resultFuture ) {
275
- return new ChannelInitializer <Channel >() {
276
- @ Override
277
- protected void initChannel (Channel channel ) {
278
- try {
279
- DriverExecutionProfile defaultConfig = context .getConfig ().getDefaultProfile ();
280
-
281
- long setKeyspaceTimeoutMillis =
282
- defaultConfig
283
- .getDuration (DefaultDriverOption .CONNECTION_SET_KEYSPACE_TIMEOUT )
284
- .toMillis ();
285
- int maxFrameLength =
286
- (int ) defaultConfig .getBytes (DefaultDriverOption .PROTOCOL_MAX_FRAME_LENGTH );
287
- int maxRequestsPerConnection =
288
- defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_REQUESTS );
289
- int maxOrphanRequests =
290
- defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS );
291
- if (maxOrphanRequests >= maxRequestsPerConnection ) {
292
- if (LOGGED_ORPHAN_WARNING .compareAndSet (false , true )) {
293
- LOG .warn (
294
- "[{}] Invalid value for {}: {}. It must be lower than {}. "
295
- + "Defaulting to {} (1/4 of max-requests) instead." ,
296
- logPrefix ,
297
- DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS .getPath (),
298
- maxOrphanRequests ,
299
- DefaultDriverOption .CONNECTION_MAX_REQUESTS .getPath (),
300
- maxRequestsPerConnection / 4 );
301
- }
302
- maxOrphanRequests = maxRequestsPerConnection / 4 ;
303
- }
275
+ return new ChannelFactoryInitializer (
276
+ endPoint , protocolVersion , options , nodeMetricUpdater , resultFuture );
277
+ };
278
+
279
+ class ChannelFactoryInitializer extends ChannelInitializer <Channel > {
280
+
281
+ private final EndPoint endPoint ;
282
+ private final ProtocolVersion protocolVersion ;
283
+ private final DriverChannelOptions options ;
284
+ private final NodeMetricUpdater nodeMetricUpdater ;
285
+ private final CompletableFuture <DriverChannel > resultFuture ;
286
+
287
+ ChannelFactoryInitializer (
288
+ EndPoint endPoint ,
289
+ ProtocolVersion protocolVersion ,
290
+ DriverChannelOptions options ,
291
+ NodeMetricUpdater nodeMetricUpdater ,
292
+ CompletableFuture <DriverChannel > resultFuture ) {
293
+
294
+ this .endPoint = endPoint ;
295
+ this .protocolVersion = protocolVersion ;
296
+ this .options = options ;
297
+ this .nodeMetricUpdater = nodeMetricUpdater ;
298
+ this .resultFuture = resultFuture ;
299
+ }
304
300
305
- InFlightHandler inFlightHandler =
306
- new InFlightHandler (
307
- protocolVersion ,
308
- new StreamIdGenerator (maxRequestsPerConnection ),
309
- maxOrphanRequests ,
310
- setKeyspaceTimeoutMillis ,
311
- channel .newPromise (),
312
- options .eventCallback ,
313
- options .ownerLogPrefix );
314
- HeartbeatHandler heartbeatHandler = new HeartbeatHandler (defaultConfig );
315
- ProtocolInitHandler initHandler =
316
- new ProtocolInitHandler (
317
- context ,
318
- protocolVersion ,
319
- clusterName ,
320
- endPoint ,
321
- options ,
322
- heartbeatHandler ,
323
- productType == null );
324
-
325
- ChannelPipeline pipeline = channel .pipeline ();
326
- context
327
- .getSslHandlerFactory ()
328
- .map (f -> f .newSslHandler (channel , endPoint ))
329
- .map (h -> pipeline .addLast (SSL_HANDLER_NAME , h ));
330
-
331
- // Only add meter handlers on the pipeline if metrics are enabled.
332
- SessionMetricUpdater sessionMetricUpdater =
333
- context .getMetricsFactory ().getSessionUpdater ();
334
- if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_RECEIVED , null )
335
- || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_RECEIVED , null )) {
336
- pipeline .addLast (
337
- INBOUND_TRAFFIC_METER_NAME ,
338
- new InboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
301
+ @ Override
302
+ protected void initChannel (Channel channel ) {
303
+ try {
304
+ DriverExecutionProfile defaultConfig = context .getConfig ().getDefaultProfile ();
305
+
306
+ long setKeyspaceTimeoutMillis =
307
+ defaultConfig
308
+ .getDuration (DefaultDriverOption .CONNECTION_SET_KEYSPACE_TIMEOUT )
309
+ .toMillis ();
310
+ int maxFrameLength =
311
+ (int ) defaultConfig .getBytes (DefaultDriverOption .PROTOCOL_MAX_FRAME_LENGTH );
312
+ int maxRequestsPerConnection =
313
+ defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_REQUESTS );
314
+ int maxOrphanRequests =
315
+ defaultConfig .getInt (DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS );
316
+ if (maxOrphanRequests >= maxRequestsPerConnection ) {
317
+ if (LOGGED_ORPHAN_WARNING .compareAndSet (false , true )) {
318
+ LOG .warn (
319
+ "[{}] Invalid value for {}: {}. It must be lower than {}. "
320
+ + "Defaulting to {} (1/4 of max-requests) instead." ,
321
+ logPrefix ,
322
+ DefaultDriverOption .CONNECTION_MAX_ORPHAN_REQUESTS .getPath (),
323
+ maxOrphanRequests ,
324
+ DefaultDriverOption .CONNECTION_MAX_REQUESTS .getPath (),
325
+ maxRequestsPerConnection / 4 );
339
326
}
327
+ maxOrphanRequests = maxRequestsPerConnection / 4 ;
328
+ }
340
329
341
- if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_SENT , null )
342
- || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_SENT , null )) {
343
- pipeline .addLast (
344
- OUTBOUND_TRAFFIC_METER_NAME ,
345
- new OutboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
346
- }
330
+ InFlightHandler inFlightHandler =
331
+ new InFlightHandler (
332
+ protocolVersion ,
333
+ new StreamIdGenerator (maxRequestsPerConnection ),
334
+ maxOrphanRequests ,
335
+ setKeyspaceTimeoutMillis ,
336
+ channel .newPromise (),
337
+ options .eventCallback ,
338
+ options .ownerLogPrefix );
339
+ HeartbeatHandler heartbeatHandler = new HeartbeatHandler (defaultConfig );
340
+ ProtocolInitHandler initHandler =
341
+ new ProtocolInitHandler (
342
+ context ,
343
+ protocolVersion ,
344
+ clusterName ,
345
+ endPoint ,
346
+ options ,
347
+ heartbeatHandler ,
348
+ productType == null );
349
+
350
+ ChannelPipeline pipeline = channel .pipeline ();
351
+ context
352
+ .getSslHandlerFactory ()
353
+ .map (f -> f .newSslHandler (channel , endPoint ))
354
+ .map (h -> pipeline .addLast (SSL_HANDLER_NAME , h ));
355
+
356
+ // Only add meter handlers on the pipeline if metrics are enabled.
357
+ SessionMetricUpdater sessionMetricUpdater = context .getMetricsFactory ().getSessionUpdater ();
358
+ if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_RECEIVED , null )
359
+ || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_RECEIVED , null )) {
360
+ pipeline .addLast (
361
+ INBOUND_TRAFFIC_METER_NAME ,
362
+ new InboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
363
+ }
347
364
348
- pipeline
349
- .addLast (
350
- FRAME_TO_BYTES_ENCODER_NAME ,
351
- new FrameEncoder (context .getFrameCodec (), maxFrameLength ))
352
- .addLast (
353
- BYTES_TO_FRAME_DECODER_NAME ,
354
- new FrameDecoder (context .getFrameCodec (), maxFrameLength ))
355
- // Note: HeartbeatHandler is inserted here once init completes
356
- .addLast (INFLIGHT_HANDLER_NAME , inFlightHandler )
357
- .addLast (INIT_HANDLER_NAME , initHandler );
358
-
359
- context .getNettyOptions ().afterChannelInitialized (channel );
360
- } catch (Throwable t ) {
361
- // If the init handler throws an exception, Netty swallows it and closes the channel. We
362
- // want to propagate it instead, so fail the outer future (the result of connect()).
363
- resultFuture .completeExceptionally (t );
364
- throw t ;
365
+ if (nodeMetricUpdater .isEnabled (DefaultNodeMetric .BYTES_SENT , null )
366
+ || sessionMetricUpdater .isEnabled (DefaultSessionMetric .BYTES_SENT , null )) {
367
+ pipeline .addLast (
368
+ OUTBOUND_TRAFFIC_METER_NAME ,
369
+ new OutboundTrafficMeter (nodeMetricUpdater , sessionMetricUpdater ));
365
370
}
371
+
372
+ pipeline
373
+ .addLast (
374
+ FRAME_TO_BYTES_ENCODER_NAME ,
375
+ new FrameEncoder (context .getFrameCodec (), maxFrameLength ))
376
+ .addLast (
377
+ BYTES_TO_FRAME_DECODER_NAME ,
378
+ new FrameDecoder (context .getFrameCodec (), maxFrameLength ))
379
+ // Note: HeartbeatHandler is inserted here once init completes
380
+ .addLast (INFLIGHT_HANDLER_NAME , inFlightHandler )
381
+ .addLast (INIT_HANDLER_NAME , initHandler );
382
+
383
+ context .getNettyOptions ().afterChannelInitialized (channel );
384
+ } catch (Throwable t ) {
385
+ // If the init handler throws an exception, Netty swallows it and closes the channel. We
386
+ // want to propagate it instead, so fail the outer future (the result of connect()).
387
+ resultFuture .completeExceptionally (t );
388
+ throw t ;
366
389
}
367
- };
390
+ }
368
391
}
369
392
}
0 commit comments