From 4d8aa264045a0c210a63dafd4b06a4352d69340c Mon Sep 17 00:00:00 2001 From: He-Pin Date: Tue, 23 Jan 2024 16:29:05 +0800 Subject: [PATCH] chore: Add PekkoManagedBlocker to reduce memory. --- .../pekko/dispatch/ThreadPoolBuilder.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala index c292be9de87..b11f9aa42a0 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala @@ -173,19 +173,23 @@ object MonitorableThreadFactory { val doNothing: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () } + private class PekkoManagedBlocker[T](thunk: => T) + extends AtomicReference[Option[T]](None) with ForkJoinPool.ManagedBlocker { + final override def block(): Boolean = { + set(Some(thunk)) + true + } + + final override def isReleasable: Boolean = get().isDefined // Exception intended if None + } + private[pekko] class PekkoForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext { - override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { - val result = new AtomicReference[Option[T]](None) - ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { - def block(): Boolean = { - result.set(Some(thunk)) - true - } - def isReleasable = result.get.isDefined - }) - result.get.get // Exception intended if None + final override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { + val blocker = new PekkoManagedBlocker(thunk) + ForkJoinPool.managedBlock(blocker) + blocker.get.get // Exception intended if None } } }