From 68f05878bb770bb13872bf059d1255b6d59eab40 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Fri, 17 Jan 2025 21:32:16 +0800 Subject: [PATCH] feat: Add support for virtualize fork join pool --- .../dispatch/ForkJoinPoolVirtualzedSpec.scala | 65 ++++++++++++++ .../VirtualThreadPoolDispatcherSpec.scala | 2 - actor/src/main/resources/reference.conf | 5 ++ .../ForkJoinExecutorConfigurator.scala | 32 +++++-- .../pekko/dispatch/ThreadPoolBuilder.scala | 4 + .../dispatch/VirtualizedExecutorService.scala | 89 +++++++++++++++++++ 6 files changed, 186 insertions(+), 11 deletions(-) create mode 100644 actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualzedSpec.scala create mode 100644 actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualzedSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualzedSpec.scala new file mode 100644 index 00000000000..33bfad654ac --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualzedSpec.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2018-2022 Lightbend Inc. + */ + +package org.apache.pekko.dispatch + +import com.typesafe.config.ConfigFactory + +import org.apache.pekko +import pekko.actor.{ Actor, Props } +import pekko.testkit.{ ImplicitSender, PekkoSpec } +import pekko.util.JavaVersion + +object ForkJoinPoolVirtualzedSpec { + val config = ConfigFactory.parseString(""" + |virtual { + | task-dispatcher { + | mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" + | throughput = 5 + | fork-join-executor { + | parallelism-factor = 2 + | parallelism-max = 2 + | parallelism-min = 2 + | virtualize = on + | } + | } + |} + """.stripMargin) + + class ThreadNameActor extends Actor { + + override def receive = { + case "ping" => + sender() ! Thread.currentThread().getName + } + } + +} + +class ForkJoinPoolVirtualzedSpec extends PekkoSpec(ForkJoinPoolVirtualzedSpec.config) with ImplicitSender { + import ForkJoinPoolVirtualzedSpec._ + + "PekkoForkJoinPool" must { + + "support virtualization with Virtual Thread" in { + val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher")) + + for (_ <- 1 to Iterations) { + // External task submission via the default dispatcher + actor ! "ping" + expectMsgPF() { case name: String => name should contains("virtual-thread") } + } + } + + } +} diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala index 739bf26a5e3..4071bf22964 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -43,8 +43,6 @@ object VirtualThreadPoolDispatcherSpec { class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender { import VirtualThreadPoolDispatcherSpec._ - val Iterations = 1000 - "VirtualThreadPool support" must { "handle simple dispatch" in { diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 6831cf056db..df105bfb8b9 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -487,6 +487,11 @@ pekko { # This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above. # Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff. maximum-pool-size = 32767 + + # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above. + # Virtualize this dispatcher as a virtual-thread-executor + # Valid values are: `on`, `off` + virtualize = off } # This will be used if you have set "executor = "thread-pool-executor"" diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 661dfb8dd70..a50f9615dd0 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -89,12 +89,18 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int, val asyncMode: Boolean, - val maxPoolSize: Int) + val maxPoolSize: Int, + val virtualize: Boolean) extends ExecutorServiceFactory { def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int, - asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap) + asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false) + + def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + parallelism: Int, + asyncMode: Boolean, + maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false) private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] = Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption @@ -116,12 +122,19 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true) - def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match { - case Some(handle) => - handle.invoke(parallelism, threadFactory, maxPoolSize, - MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService] - case _ => - new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode) + def createExecutorService: ExecutorService = { + val forkJoinPool = pekkoJdk9ForkJoinPoolHandleOpt match { + case Some(handle) => + handle.invoke(parallelism, threadFactory, maxPoolSize, + MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService] + case _ => + new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode) + } + if (virtualize) { + new VirtualizedExecutorService("pekko", forkJoinPool) + } else { + forkJoinPool + } } } @@ -149,6 +162,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer config.getDouble("parallelism-factor"), config.getInt("parallelism-max")), asyncMode, - config.getInt("maximum-pool-size")) + config.getInt("maximum-pool-size"), + config.getBoolean("virtualize")) } } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala index 205c2e4ac77..a18a04795ba 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala @@ -64,6 +64,10 @@ object ThreadPoolConfig { * Function0 without the fun stuff (mostly for the sake of the Java API side of things) */ trait ExecutorServiceFactory { + + /** + * Create a new ExecutorService + */ def createExecutorService: ExecutorService } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala new file mode 100644 index 00000000000..7818aabedba --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import org.apache.pekko.annotation.InternalApi + +import java.util +import java.util.concurrent.{ Callable, ExecutorService, Future, TimeUnit } + +/** + * A virtualized executor service that creates a new virtual thread for each task. + * Will shut down the underlying executor service when this executor is being shutdown. + * + * INTERNAL API + */ +@InternalApi +final class VirtualizedExecutorService(prefix: String, underlying: ExecutorService) extends ExecutorService { + private val executor = VirtualThreadSupport.newThreadPerTaskExecutor(prefix, underlying) + + override def shutdown(): Unit = { + executor.shutdown() + underlying.shutdown() + } + + override def shutdownNow(): util.List[Runnable] = { + executor.shutdownNow() + underlying.shutdownNow() + } + + override def isShutdown: Boolean = { + executor.isShutdown || underlying.isShutdown + } + + override def isTerminated: Boolean = { + executor.isTerminated && underlying.isTerminated + } + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = { + executor.awaitTermination(timeout, unit) && underlying.awaitTermination(timeout, unit) + } + + override def submit[T](task: Callable[T]): Future[T] = { + executor.submit(task) + } + + override def submit[T](task: Runnable, result: T): Future[T] = { + executor.submit(task, result) + } + + override def submit(task: Runnable): Future[_] = { + executor.submit(task) + } + + override def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = { + executor.invokeAll(tasks) + } + + override def invokeAll[T]( + tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = { + executor.invokeAll(tasks, timeout, unit) + } + + override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = { + executor.invokeAny(tasks) + } + + override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = { + executor.invokeAny(tasks, timeout, unit) + } + + override def execute(command: Runnable): Unit = { + executor.execute(command) + } +}