Skip to content

Commit 05ca192

Browse files
authored
feat: Add loom Support. (#159)
Motivation: Add Loom support. refs: #90 , again Modification: 1. add `handlerExecutor()` and some helper methods for virtual threads. 2. some documents. Result: Virtual threads supported. `wrk` is needed to run the benchmark - miniAppWithSleep 100ms : ↑2300% - todoDb: ~↓6% - staticFiles: ~↓6% ```shell ./mill --no-build-lock benchmark.runBenchmarks ``` Results of same 4 Carrier/Platform threads: ```scala [1] staticFilesWithLoom result with (platform threads): [1] Running 30s test @ http://localhost:8080/ [1] 4 threads and 100 connections [1] Thread Stats Avg Stdev Max +/- Stdev [1] Latency 1.23ms 436.88us 28.64ms 98.91% [1] Req/Sec 20.54k 1.08k 22.69k 93.44% [1] 2461250 requests in 30.10s, 342.70MB read [1] Requests/sec: 81766.27 [1] Transfer/sec: 11.38MB [1] [1] staticFilesWithLoom result with (virtual threads): [1] Running 30s test @ http://localhost:8080/ [1] 4 threads and 100 connections [1] Thread Stats Avg Stdev Max +/- Stdev [1] Latency 4.41ms 14.69ms 157.91ms 95.15% [1] Req/Sec 19.00k 4.29k 22.09k 88.63% [1] 2266289 requests in 30.02s, 315.55MB read [1] Requests/sec: 75488.32 [1] Transfer/sec: 10.51MB [1] [1] todoDbWithLoom result with (platform threads): [1] Running 30s test @ http://localhost:8080/ [1] 4 threads and 100 connections [1] Thread Stats Avg Stdev Max +/- Stdev [1] Latency 1.22ms 172.42us 11.67ms 96.46% [1] Req/Sec 20.62k 1.29k 42.72k 93.01% [1] 2466143 requests in 30.10s, 395.12MB read [1] Non-2xx or 3xx responses: 2466143 [1] Requests/sec: 81929.40 [1] Transfer/sec: 13.13MB [1] [1] todoDbWithLoom result with (virtual threads): [1] Running 30s test @ http://localhost:8080/ [1] 4 threads and 100 connections [1] Thread Stats Avg Stdev Max +/- Stdev [1] Latency 3.97ms 13.20ms 160.29ms 95.28% [1] Req/Sec 19.21k 3.68k 22.32k 89.41% [1] 2284539 requests in 30.03s, 366.02MB read [1] Non-2xx or 3xx responses: 2284539 [1] Requests/sec: 76072.80 [1] Transfer/sec: 12.19MB [1] [1] minimalApplicationWithLoom result with (platform threads): [1] Running 30s test @ http://localhost:8080/ [1] 4 threads and 100 connections [1] Thread Stats Avg Stdev Max +/- Stdev [1] Latency 1.05s 575.57ms 1.99s 57.89% [1] Req/Sec 10.05 3.94 30.00 81.66% [1] 1152 requests in 30.03s, 172.12KB read [1] Socket errors: connect 0, read 0, write 0, timeout 1076 [1] Requests/sec: 38.36 [1] Transfer/sec: 5.73KB [1] [1] [1] minimalApplicationWithLoom result with (virtual threads): [1] Running 30s test @ http://localhost:8080/ [1] 4 threads and 100 connections [1] Thread Stats Avg Stdev Max +/- Stdev [1] Latency 106.11ms 2.74ms 126.59ms 77.47% [1] Req/Sec 239.19 36.74 252.00 92.68% [1] 28100 requests in 30.03s, 4.10MB read [1] Requests/sec: 935.74 [1] Transfer/sec: 139.81KB ``` Some design choices: 1. Using MethodHandle/Reflect to make it compile on Java 8 too. 2. Using MethodHandle to name the virtual threads that are needed, JPMS code can be added to open the can by default, but that is a little over-killed, so better with an explicitly `--add-opens java.base/java.lang=ALL-UNNAMED` 3. Add a virtualize or screen method to create a virtual thread executor from a **platform** thread pool, this is useful, especially if you want to limit the underlying queue size, FJP is unbounded. 4. Users can override the `handleExecutor` directly too.
1 parent 5257192 commit 05ca192

File tree

18 files changed

+780
-42
lines changed

18 files changed

+780
-42
lines changed

.github/workflows/actions.yml

+14-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
strategy:
1717
matrix:
1818
java: [ '11', '17', '21' ]
19-
name: Tests for Java ${{ matrix.Java }}
19+
name: Tests local for Java ${{ matrix.Java }}
2020
steps:
2121
- uses: actions/checkout@v3
2222
- name: Setup java
@@ -27,14 +27,18 @@ jobs:
2727
- name: Run tests
2828
run: |
2929
set -eux
30-
./mill -ikj1 --disable-ticker __.testLocal
30+
if [ "${{ matrix.java }}" == "21" ]; then
31+
JAVA_OPTS='--add-opens java.base/java.lang=ALL-UNNAMED -Dcask.virtual-threads.enabled=true' ./mill -ikj1 --disable-ticker __.testLocal
32+
else
33+
./mill -ikj1 --disable-ticker __.testLocal
34+
fi
3135
3236
test-examples:
3337
runs-on: ubuntu-latest
3438
strategy:
3539
matrix:
3640
java: [ '11', '17', '21' ]
37-
name: Tests for Java ${{ matrix.Java }}
41+
name: Tests examples for Java ${{ matrix.Java }}
3842
steps:
3943
- uses: actions/checkout@v3
4044
- name: Setup java
@@ -45,8 +49,13 @@ jobs:
4549
- name: Run tests
4650
run: |
4751
set -eux
48-
./mill __.publishLocal
49-
./mill -ikj1 --disable-ticker testExamples
52+
if [ "${{ matrix.java }}" == "21" ]; then
53+
./mill __.publishLocal
54+
JAVA_OPTS='--add-opens java.base/java.lang=ALL-UNNAMED -Dcask.virtual-threads.enabled=true' ./mill -ikj1 --disable-ticker testExamples
55+
else
56+
./mill __.publishLocal
57+
./mill -ikj1 --disable-ticker testExamples
58+
fi
5059
5160
publish-sonatype:
5261
if: github.repository == 'com-lihaoyi/cask' && contains(github.ref, 'refs/tags/')

build.mill

+86
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,89 @@ object cask extends Cross[CaskMainModule](scalaVersions) {
8585
}
8686
}
8787

88+
trait BenchmarkModule extends CrossScalaModule {
89+
def moduleDeps = Seq(cask(crossScalaVersion))
90+
def ivyDeps = Agg[Dep](
91+
)
92+
}
93+
94+
object benchmark extends Cross[BenchmarkModule](build.scalaVersions) with RunModule {
95+
96+
def waitForServer(url: String, maxAttempts: Int = 120): Boolean = {
97+
(1 to maxAttempts).exists { attempt =>
98+
try {
99+
Thread.sleep(3000)
100+
println("Checking server... Attempt " + attempt)
101+
os.proc("curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", url)
102+
.call(check = false)
103+
.exitCode == 0
104+
} catch {
105+
case _: Throwable =>
106+
Thread.sleep(3000)
107+
false
108+
}
109+
}
110+
}
111+
112+
def runBenchMark(projectRoot: os.Path, example: String, vt: Boolean) = {
113+
def runMillBackground(example: String, vt: Boolean) = {
114+
println(s"Running $example with vt: $vt")
115+
println("projectRoot: " + projectRoot)
116+
os.proc(
117+
"mill",
118+
s"example.$example.app[$scala213].run")
119+
.spawn(
120+
cwd = projectRoot,
121+
env = Map("CASK_VIRTUAL_THREAD" -> vt.toString),
122+
stdout = os.Inherit,
123+
stderr = os.Inherit)
124+
}
125+
126+
val duration = "30s"
127+
val threads = "4"
128+
val connections = "100"
129+
val url = "http://localhost:8080/"
130+
val serverApp = runMillBackground(example, vt)
131+
132+
println(s"Waiting for server to start..., vt:$vt")
133+
if (!waitForServer(url)) {
134+
serverApp.destroy()
135+
println("Failed to start server")
136+
sys.exit(1)
137+
}
138+
139+
val results = os.proc("wrk",
140+
"-t", threads,
141+
"-c", connections,
142+
"-d", duration,
143+
url
144+
).call(stderr = os.Pipe)
145+
serverApp.destroyForcibly()
146+
Thread.sleep(1000)
147+
148+
println(s"""\n$example result with ${if (vt) "(virtual threads)" else "(platform threads)"}:""")
149+
println(results.out.text())
150+
}
151+
152+
def runBenchmarks() = T.command {
153+
val projectRoot = T.workspace
154+
if (os.proc("which", "wrk").call(check = false).exitCode != 0) {
155+
println("Error: wrk is not installed. Please install wrk first.")
156+
sys.exit(1)
157+
}
158+
for (example <- Seq(
159+
"staticFilesWithLoom",
160+
"todoDbWithLoom",
161+
"minimalApplicationWithLoom")) {
162+
println(s"target server started, starting run benchmark with wrk for :$example with VT:false")
163+
runBenchMark(projectRoot, example, vt = false)
164+
println(s"target server started, starting run benchmark with wrk for :$example with VT:true")
165+
runBenchMark(projectRoot, example, vt = true)
166+
}
167+
168+
}
169+
}
170+
88171
trait LocalModule extends CrossScalaModule{
89172
override def millSourcePath = super.millSourcePath / "app"
90173
def moduleDeps = Seq(cask(crossScalaVersion))
@@ -111,13 +194,16 @@ def zippedExamples = T {
111194
build.example.httpMethods.millSourcePath,
112195
build.example.minimalApplication.millSourcePath,
113196
build.example.minimalApplication2.millSourcePath,
197+
build.example.minimalApplicationWithLoom.millSourcePath,
114198
build.example.redirectAbort.millSourcePath,
115199
build.example.scalatags.millSourcePath,
116200
build.example.staticFiles.millSourcePath,
201+
build.example.staticFilesWithLoom.millSourcePath,
117202
build.example.staticFiles2.millSourcePath,
118203
build.example.todo.millSourcePath,
119204
build.example.todoApi.millSourcePath,
120205
build.example.todoDb.millSourcePath,
206+
build.example.todoDbWithLoom.millSourcePath,
121207
build.example.twirl.millSourcePath,
122208
build.example.variableRoutes.millSourcePath,
123209
build.example.queryParams.millSourcePath,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package cask.internal
2+
3+
import io.undertow.server.{HttpHandler, HttpServerExchange}
4+
5+
import java.util.concurrent.Executor
6+
7+
/**
8+
* A handler that dispatches the request to the given handler using the given executor.
9+
* */
10+
final class ThreadBlockingHandler(executor: Executor, handler: HttpHandler) extends HttpHandler {
11+
require(executor ne null, "Executor should not be null")
12+
require(handler ne null, "Handler should not be null")
13+
14+
def handleRequest(exchange: HttpServerExchange): Unit = {
15+
exchange.startBlocking()
16+
exchange.dispatch(executor, handler)
17+
}
18+
}

cask/src/cask/internal/Util.scala

+120-20
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,121 @@
11
package cask.internal
22

33
import java.io.{InputStream, PrintWriter, StringWriter}
4-
54
import scala.collection.generic.CanBuildFrom
65
import scala.collection.mutable
76
import java.io.OutputStream
8-
7+
import java.lang.invoke.{MethodHandles, MethodType}
8+
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ThreadFactory}
99
import scala.annotation.switch
1010
import scala.concurrent.{ExecutionContext, Future, Promise}
11+
import scala.util.Try
12+
import scala.util.control.NonFatal
1113

1214
object Util {
15+
private val lookup = MethodHandles.lookup()
16+
17+
import cask.util.Logger.Console.globalLogger
18+
19+
/**
20+
* Create a virtual thread executor with the given executor as the scheduler.
21+
* */
22+
def createVirtualThreadExecutor(executor: Executor): Option[ExecutorService] = {
23+
(for {
24+
factory <- Try(createVirtualThreadFactory("cask-handler-executor", executor))
25+
executor <- Try(createNewThreadPerTaskExecutor(factory))
26+
} yield executor).toOption
27+
}
28+
29+
/**
30+
* Create a default cask virtual thread executor if possible.
31+
* */
32+
def createDefaultCaskVirtualThreadExecutor: Option[ExecutorService] = {
33+
for {
34+
scheduler <- getDefaultVirtualThreadScheduler
35+
executor <- createVirtualThreadExecutor(scheduler)
36+
} yield executor
37+
}
38+
39+
/**
40+
* Try to get the default virtual thread scheduler, or null if not supported.
41+
* */
42+
def getDefaultVirtualThreadScheduler: Option[ForkJoinPool] = {
43+
try {
44+
val virtualThreadClass = Class.forName("java.lang.VirtualThread")
45+
val privateLookup = MethodHandles.privateLookupIn(virtualThreadClass, lookup)
46+
val defaultSchedulerField = privateLookup.findStaticVarHandle(virtualThreadClass, "DEFAULT_SCHEDULER", classOf[ForkJoinPool])
47+
Option(defaultSchedulerField.get().asInstanceOf[ForkJoinPool])
48+
} catch {
49+
case NonFatal(e) =>
50+
//--add-opens java.base/java.lang=ALL-UNNAMED
51+
globalLogger.exception(e)
52+
None
53+
}
54+
}
55+
56+
def createNewThreadPerTaskExecutor(threadFactory: ThreadFactory): ExecutorService = {
57+
try {
58+
val executorsClazz = ClassLoader.getSystemClassLoader.loadClass("java.util.concurrent.Executors")
59+
val newThreadPerTaskExecutorMethod = lookup.findStatic(
60+
executorsClazz,
61+
"newThreadPerTaskExecutor",
62+
MethodType.methodType(classOf[ExecutorService], classOf[ThreadFactory]))
63+
newThreadPerTaskExecutorMethod.invoke(threadFactory)
64+
.asInstanceOf[ExecutorService]
65+
} catch {
66+
case NonFatal(e) =>
67+
globalLogger.exception(e)
68+
throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e)
69+
}
70+
}
71+
72+
/**
73+
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
74+
* virtual thread.
75+
*
76+
* The executor should run task on platform threads.
77+
*
78+
* returns null if not supported.
79+
*/
80+
def createVirtualThreadFactory(prefix: String,
81+
executor: Executor): ThreadFactory =
82+
try {
83+
val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
84+
val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
85+
val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass))
86+
var builder = ofVirtualMethod.invoke()
87+
if (executor != null) {
88+
val clazz = builder.getClass
89+
val privateLookup = MethodHandles.privateLookupIn(
90+
clazz,
91+
lookup
92+
)
93+
val schedulerFieldSetter = privateLookup
94+
.findSetter(clazz, "scheduler", classOf[Executor])
95+
schedulerFieldSetter.invoke(builder, executor)
96+
}
97+
val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
98+
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
99+
val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory]))
100+
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L)
101+
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
102+
} catch {
103+
case NonFatal(e) =>
104+
globalLogger.exception(e)
105+
//--add-opens java.base/java.lang=ALL-UNNAMED
106+
throw new UnsupportedOperationException("Failed to create virtual thread factory.", e)
107+
}
108+
13109
def firstFutureOf[T](futures: Seq[Future[T]])(implicit ec: ExecutionContext) = {
14110
val p = Promise[T]
15111
futures.foreach(_.foreach(p.trySuccess))
16112
p.future
17113
}
114+
18115
/**
19-
* Convert a string to a C&P-able literal. Basically
20-
* copied verbatim from the uPickle source code.
21-
*/
116+
* Convert a string to a C&P-able literal. Basically
117+
* copied verbatim from the uPickle source code.
118+
*/
22119
def literalize(s: IndexedSeq[Char], unicode: Boolean = true) = {
23120
val sb = new StringBuilder
24121
sb.append('"')
@@ -47,29 +144,30 @@ object Util {
47144
def transferTo(in: InputStream, out: OutputStream) = {
48145
val buffer = new Array[Byte](8192)
49146

50-
while ({
51-
in.read(buffer) match{
147+
while ( {
148+
in.read(buffer) match {
52149
case -1 => false
53150
case n =>
54151
out.write(buffer, 0, n)
55152
true
56153
}
57154
}) ()
58155
}
156+
59157
def pluralize(s: String, n: Int) = {
60158
if (n == 1) s else s + "s"
61159
}
62160

63161
/**
64-
* Splits a string into path segments; automatically removes all
65-
* leading/trailing slashes, and ignores empty path segments.
66-
*
67-
* Written imperatively for performance since it's used all over the place.
68-
*/
162+
* Splits a string into path segments; automatically removes all
163+
* leading/trailing slashes, and ignores empty path segments.
164+
*
165+
* Written imperatively for performance since it's used all over the place.
166+
*/
69167
def splitPath(p: String): collection.IndexedSeq[String] = {
70168
val pLength = p.length
71169
var i = 0
72-
while(i < pLength && p(i) == '/') i += 1
170+
while (i < pLength && p(i) == '/') i += 1
73171
var segmentStart = i
74172
val out = mutable.ArrayBuffer.empty[String]
75173

@@ -81,7 +179,7 @@ object Util {
81179
segmentStart = i + 1
82180
}
83181

84-
while(i < pLength){
182+
while (i < pLength) {
85183
if (p(i) == '/') complete()
86184
i += 1
87185
}
@@ -96,33 +194,35 @@ object Util {
96194
pw.flush()
97195
trace.toString
98196
}
197+
99198
def softWrap(s: String, leftOffset: Int, maxWidth: Int) = {
100199
val oneLine = s.linesIterator.mkString(" ").split(' ')
101200

102201
lazy val indent = " " * leftOffset
103202

104203
val output = new StringBuilder(oneLine.head)
105204
var currentLineWidth = oneLine.head.length
106-
for(chunk <- oneLine.tail){
205+
for (chunk <- oneLine.tail) {
107206
val addedWidth = currentLineWidth + chunk.length + 1
108-
if (addedWidth > maxWidth){
207+
if (addedWidth > maxWidth) {
109208
output.append("\n" + indent)
110209
output.append(chunk)
111210
currentLineWidth = chunk.length
112-
} else{
211+
} else {
113212
currentLineWidth = addedWidth
114213
output.append(' ')
115214
output.append(chunk)
116215
}
117216
}
118217
output.mkString
119218
}
219+
120220
def sequenceEither[A, B, M[X] <: TraversableOnce[X]](in: M[Either[A, B]])(
121221
implicit cbf: CanBuildFrom[M[Either[A, B]], B, M[B]]): Either[A, M[B]] = {
122222
in.foldLeft[Either[A, mutable.Builder[B, M[B]]]](Right(cbf(in))) {
123-
case (acc, el) =>
124-
for (a <- acc; e <- el) yield a += e
125-
}
223+
case (acc, el) =>
224+
for (a <- acc; e <- el) yield a += e
225+
}
126226
.map(_.result())
127227
}
128228
}

0 commit comments

Comments
 (0)