Skip to content

Commit

Permalink
chore: Add PekkoManagedBlocker to reduce memory.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 23, 2024
1 parent ae210b1 commit 4d8aa26
Showing 1 changed file with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,23 @@ object MonitorableThreadFactory {
val doNothing: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () }

private class PekkoManagedBlocker[T](thunk: => T)
extends AtomicReference[Option[T]](None) with ForkJoinPool.ManagedBlocker {
final override def block(): Boolean = {
set(Some(thunk))
true
}

final override def isReleasable: Boolean = get().isDefined // Exception intended if None
}

private[pekko] class PekkoForkJoinWorkerThread(_pool: ForkJoinPool)
extends ForkJoinWorkerThread(_pool)
with BlockContext {
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
val result = new AtomicReference[Option[T]](None)
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
def block(): Boolean = {
result.set(Some(thunk))
true
}
def isReleasable = result.get.isDefined
})
result.get.get // Exception intended if None
final override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
val blocker = new PekkoManagedBlocker(thunk)
ForkJoinPool.managedBlock(blocker)
blocker.get.get // Exception intended if None
}
}
}
Expand Down

0 comments on commit 4d8aa26

Please sign in to comment.