Skip to content

Commit f540b83

Browse files
committed
chore: Add support for controlling the NettyTransport's byteBuf allocator type. (#1707)
* chore: Add support for controlling the NettyTransport's byteBuf allocator type. * chore: extract deriveByteBufAllocator method (cherry picked from commit dbc9ed3)
1 parent 9d8b12f commit f540b83

File tree

5 files changed

+73
-12
lines changed

5 files changed

+73
-12
lines changed

remote/src/main/resources/reference.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,14 @@ pekko {
621621
# "off-for-windows" of course means that it's "on" for all other platforms
622622
tcp-reuse-addr = off-for-windows
623623

624+
# Used to control the Netty 4's ByteBufAllocator. The default is "pooled".
625+
# pooled : use a PooledByteBufAllocator.DEFAULT
626+
# unpooled : use an UnpooledByteBufAllocator.DEFAULT
627+
# unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false`
628+
# adaptive : use an AdaptiveByteBufAllocator
629+
# adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false`
630+
bytebuf-allocator-type = "pooled"
631+
624632
# Used to configure the number of I/O worker threads on server sockets
625633
server-socket-worker-pool {
626634
# Min number of threads to cap factor-based number to

remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.util.Try
2424
import scala.util.control.{ NoStackTrace, NonFatal }
2525

2626
import com.typesafe.config.Config
27+
2728
import org.apache.pekko
2829
import pekko.ConfigurationException
2930
import pekko.OnlyCauseStackTrace
@@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
3839
import pekko.util.{ Helpers, OptionVal }
3940

4041
import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
41-
import io.netty.buffer.Unpooled
42+
import io.netty.buffer.{
43+
AdaptiveByteBufAllocator,
44+
ByteBufAllocator,
45+
PooledByteBufAllocator,
46+
Unpooled,
47+
UnpooledByteBufAllocator
48+
}
4249
import io.netty.channel.{
4350
Channel,
4451
ChannelFuture,
@@ -160,6 +167,8 @@ class NettyTransportSettings(config: Config) {
160167
case _ => getBoolean("tcp-reuse-addr")
161168
}
162169

170+
val ByteBufAllocator: ByteBufAllocator = NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type"))
171+
163172
val Hostname: String = getString("hostname") match {
164173
case "" => InetAddress.getLocalHost.getHostAddress
165174
case value => value
@@ -318,6 +327,17 @@ private[transport] object NettyTransport {
318327
systemName: String,
319328
hostName: Option[String]): Option[Address] =
320329
addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None)
330+
331+
def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = allocatorType match {
332+
case "pooled" => PooledByteBufAllocator.DEFAULT
333+
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
334+
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
335+
case "adaptive" => new AdaptiveByteBufAllocator()
336+
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
337+
case other => throw new IllegalArgumentException(
338+
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
339+
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
340+
}
321341
}
322342

323343
@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
@@ -442,6 +462,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
442462
bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay)
443463
bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive)
444464

465+
// Use the same allocator for inbound and outbound buffers
466+
bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
467+
bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
468+
445469
settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
446470
settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
447471
settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>

remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313

1414
package org.apache.pekko.remote
1515

16-
import scala.concurrent.duration._
16+
import io.netty.buffer.PooledByteBufAllocator
1717

18+
import scala.concurrent.duration._
1819
import scala.annotation.nowarn
20+
1921
import language.postfixOps
2022

2123
import org.apache.pekko
@@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
103105
TcpNodelay should ===(true)
104106
TcpKeepalive should ===(true)
105107
TcpReuseAddr should ===(!Helpers.isWindows)
108+
ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
106109
c.getString("hostname") should ===("")
107110
c.getString("bind-hostname") should ===("")
108111
c.getString("bind-port") should ===("")

remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec
2424

2525
import org.apache.pekko
2626
import pekko.actor.{ ActorSystem, Address }
27-
import pekko.remote.classic.transport.netty.NettyTransportSpec._
27+
import pekko.remote.transport.NettyTransportSpec._
2828
import pekko.testkit.SocketUtil
2929

3030
trait BindCanonicalAddressBehaviors {
Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,25 @@
1111
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
1212
*/
1313

14-
package org.apache.pekko.remote.classic.transport.netty
15-
16-
import java.net.{ InetAddress, InetSocketAddress }
17-
import java.nio.channels.ServerSocketChannel
18-
19-
import scala.concurrent.Await
20-
import scala.concurrent.duration.Duration
14+
package org.apache.pekko.remote.transport
2115

2216
import com.typesafe.config.ConfigFactory
23-
import org.scalatest.matchers.should.Matchers
24-
import org.scalatest.wordspec.AnyWordSpec
17+
import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, UnpooledByteBufAllocator }
2518

2619
import org.apache.pekko
2720
import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem }
2821
import pekko.remote.BoundAddressesExtension
22+
import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator
2923
import pekko.testkit.SocketUtil
3024

25+
import org.scalatest.matchers.should.Matchers
26+
import org.scalatest.wordspec.AnyWordSpec
27+
28+
import java.net.{ InetAddress, InetSocketAddress }
29+
import java.nio.channels.ServerSocketChannel
30+
import scala.concurrent.Await
31+
import scala.concurrent.duration.Duration
32+
3133
object NettyTransportSpec {
3234
val commonConfig = ConfigFactory.parseString("""
3335
pekko.actor.provider = remote
@@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior {
132134

133135
Await.result(sys.terminate(), Duration.Inf)
134136
}
137+
138+
"be able to specify byte buffer allocator" in {
139+
deriveByteBufAllocator("pooled") should ===(PooledByteBufAllocator.DEFAULT)
140+
deriveByteBufAllocator("unpooled") should ===(UnpooledByteBufAllocator.DEFAULT)
141+
142+
{
143+
val allocator = deriveByteBufAllocator("unpooled-heap")
144+
allocator shouldBe a[UnpooledByteBufAllocator]
145+
allocator.toString.contains("directByDefault: false") should ===(true)
146+
}
147+
148+
{
149+
val allocator = deriveByteBufAllocator("adaptive")
150+
allocator shouldBe a[AdaptiveByteBufAllocator]
151+
allocator.toString.contains("directByDefault: true") should ===(true)
152+
}
153+
154+
{
155+
val allocator = deriveByteBufAllocator("adaptive-heap")
156+
allocator shouldBe a[AdaptiveByteBufAllocator]
157+
allocator.toString.contains("directByDefault: false") should ===(true)
158+
}
159+
160+
}
135161
}
136162
}
137163

0 commit comments

Comments
 (0)