Skip to content

Commit 7eed6f7

Browse files
committed
feat: Add LoadMetrics support for virtual thread executor.
1 parent e5d766b commit 7eed6f7

File tree

4 files changed

+154
-4
lines changed

4 files changed

+154
-4
lines changed

actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -426,11 +426,18 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis
426426
vt
427427
}
428428
}
429-
case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name + "-" + id);
429+
case _ => newVirtualThreadFactory(prerequisites.settings.name + "-" + id);
430430
}
431431
new ExecutorServiceFactory {
432-
import VirtualThreadSupport._
433-
override def createExecutorService: ExecutorService = newThreadPerTaskExecutor(tf)
432+
override def createExecutorService: ExecutorService with LoadMetrics = {
433+
val pool = getVirtualThreadDefaultScheduler // the default scheduler of virtual thread
434+
new VirtualizedExecutorService(
435+
tf,
436+
pool, // the default scheduler of virtual thread
437+
loadMetricsProvider = _ => pool.getActiveThreadCount >= pool.getParallelism,
438+
cascadeShutdown = false // we don't want to cascade shutdown the default virtual thread scheduler
439+
)
440+
}
434441
}
435442
}
436443
}

actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala

+18-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.pekko.annotation.InternalApi
2121
import org.apache.pekko.util.JavaVersion
2222

2323
import java.lang.invoke.{ MethodHandles, MethodType }
24-
import java.util.concurrent.{ ExecutorService, ThreadFactory }
24+
import java.util.concurrent.{ ExecutorService, ForkJoinPool, ThreadFactory }
2525
import scala.util.control.NonFatal
2626

2727
@InternalApi
@@ -73,4 +73,21 @@ private[dispatch] object VirtualThreadSupport {
7373
}
7474
}
7575

76+
/**
77+
* Try to get the default scheduler of virtual thread.
78+
*/
79+
def getVirtualThreadDefaultScheduler: ForkJoinPool =
80+
try {
81+
require(isSupported, "Virtual thread is not supported.")
82+
val clazz = Class.forName("java.lang.VirtualThread")
83+
val fieldName = "DEFAULT_SCHEDULER"
84+
val field = clazz.getDeclaredField(fieldName)
85+
field.setAccessible(true)
86+
field.get(null).asInstanceOf[ForkJoinPool]
87+
} catch {
88+
case NonFatal(e) =>
89+
// --add-opens java.base/java.lang=ALL-UNNAMED
90+
throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e)
91+
}
92+
7693
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.dispatch
19+
20+
import org.apache.pekko.annotation.InternalApi
21+
22+
import java.util
23+
import java.util.concurrent.{ Callable, Executor, ExecutorService, Future, ThreadFactory, TimeUnit }
24+
25+
/**
26+
* A virtualized executor service that creates a new virtual thread for each task.
27+
* Will shut down the underlying executor service when this executor is being shutdown.
28+
*
29+
* INTERNAL API
30+
*/
31+
@InternalApi
32+
final class VirtualizedExecutorService(
33+
vtFactory: ThreadFactory,
34+
underlying: ExecutorService,
35+
loadMetricsProvider: Executor => Boolean,
36+
cascadeShutdown: Boolean)
37+
extends ExecutorService with LoadMetrics {
38+
require(VirtualThreadSupport.isSupported, "Virtual thread is not supported.")
39+
require(underlying != null, "Underlying executor service must not be null")
40+
require(loadMetricsProvider != null, "Load metrics provider must not be null")
41+
42+
def this(prefix: String,
43+
underlying: ExecutorService,
44+
loadMetricsProvider: Executor => Boolean,
45+
cascadeShutdown: Boolean) = {
46+
this(VirtualThreadSupport.newVirtualThreadFactory(prefix), underlying, loadMetricsProvider, cascadeShutdown)
47+
}
48+
49+
private val executor = VirtualThreadSupport.newThreadPerTaskExecutor(vtFactory)
50+
51+
override def atFullThrottle(): Boolean = loadMetricsProvider(this)
52+
53+
override def shutdown(): Unit = {
54+
executor.shutdown()
55+
if (cascadeShutdown) {
56+
underlying.shutdown()
57+
}
58+
}
59+
60+
override def shutdownNow(): util.List[Runnable] = {
61+
val r = executor.shutdownNow()
62+
if (cascadeShutdown) {
63+
underlying.shutdownNow()
64+
}
65+
r
66+
}
67+
68+
override def isShutdown: Boolean = {
69+
if (cascadeShutdown) {
70+
executor.isShutdown || underlying.isShutdown
71+
} else {
72+
executor.isShutdown
73+
}
74+
}
75+
76+
override def isTerminated: Boolean = {
77+
if (cascadeShutdown) {
78+
executor.isTerminated && underlying.isTerminated
79+
} else {
80+
executor.isTerminated
81+
}
82+
}
83+
84+
override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = {
85+
if (cascadeShutdown) {
86+
executor.awaitTermination(timeout, unit) && underlying.awaitTermination(timeout, unit)
87+
} else {
88+
executor.awaitTermination(timeout, unit)
89+
}
90+
}
91+
92+
override def submit[T](task: Callable[T]): Future[T] = {
93+
executor.submit(task)
94+
}
95+
96+
override def submit[T](task: Runnable, result: T): Future[T] = {
97+
executor.submit(task, result)
98+
}
99+
100+
override def submit(task: Runnable): Future[_] = {
101+
executor.submit(task)
102+
}
103+
104+
override def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = {
105+
executor.invokeAll(tasks)
106+
}
107+
108+
override def invokeAll[T](
109+
tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = {
110+
executor.invokeAll(tasks, timeout, unit)
111+
}
112+
113+
override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = {
114+
executor.invokeAny(tasks)
115+
}
116+
117+
override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = {
118+
executor.invokeAny(tasks, timeout, unit)
119+
}
120+
121+
override def execute(command: Runnable): Unit = {
122+
executor.execute(command)
123+
}
124+
}

project/JdkOptions.scala

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ object JdkOptions extends AutoPlugin {
4949

5050
lazy val versionSpecificJavaOptions =
5151
if (isJdk17orHigher) {
52+
// for virtual threads
53+
"--add-opens=java.base/java.lang=ALL-UNNAMED" ::
5254
// for aeron
5355
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" ::
5456
// for LevelDB

0 commit comments

Comments
 (0)