Skip to content

Commit d5ee8a6

Browse files
authored
feat: Add close method on ActorSystem (apache#2486)
1 parent 6d7d2d2 commit d5ee8a6

File tree

37 files changed

+198
-106
lines changed

37 files changed

+198
-106
lines changed

actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/ActorSystemStub.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,10 @@ import com.typesafe.config.{ Config, ConfigFactory }
113113
override def terminate(): Unit = terminationPromise.trySuccess(Done)
114114
override def whenTerminated: Future[Done] = terminationPromise.future
115115
override def getWhenTerminated: CompletionStage[Done] = whenTerminated.asJava
116+
override def close(): Unit = {
117+
terminate()
118+
Await.result(whenTerminated, scala.concurrent.duration.Duration.Inf)
119+
}
116120
override val startTime: Long = System.currentTimeMillis()
117121
override def uptime: Long = System.currentTimeMillis() - startTime
118122
override def threadFactory: java.util.concurrent.ThreadFactory = new ThreadFactory {

actor-tests/src/test/java/org/apache/pekko/actor/ActorSystemTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static java.util.concurrent.TimeUnit.SECONDS;
1717
import static org.junit.Assert.assertFalse;
18+
import static org.junit.Assert.assertTrue;
1819

1920
import java.util.concurrent.CompletionStage;
2021
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
@@ -47,4 +48,14 @@ public void testGetWhenTerminated() throws Exception {
4748
public void testGetWhenTerminatedWithoutTermination() {
4849
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
4950
}
51+
52+
@Test
53+
public void testTryWithResources() throws Exception {
54+
ActorSystem system = null;
55+
try (ActorSystem actorSystem = ActorSystem.create()) {
56+
system = actorSystem;
57+
}
58+
final CompletionStage<Terminated> cs = system.getWhenTerminated();
59+
assertTrue(cs.toCompletableFuture().isDone());
60+
}
5061
}

actor-tests/src/test/scala/org/apache/pekko/actor/ActorSystemSpec.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
1919
import scala.annotation.nowarn
2020
import scala.concurrent.{ Await, Future }
2121
import scala.concurrent.duration._
22+
import scala.util.Using
2223

2324
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
2425

@@ -263,7 +264,7 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen
263264

264265
"throw RejectedExecutionException when shutdown" in {
265266
val system2 = ActorSystem("RejectedExecution-1", PekkoSpec.testConf)
266-
Await.ready(system2.terminate(), 10.seconds)
267+
system2.close()
267268

268269
intercept[RejectedExecutionException] {
269270
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") }
@@ -334,6 +335,20 @@ class ActorSystemSpec extends PekkoSpec(ActorSystemSpec.config) with ImplicitSen
334335
}
335336
}
336337

338+
"close method terminates ActorSystem" in {
339+
val system = ActorSystem()
340+
system.close()
341+
system.whenTerminated.isCompleted should ===(true)
342+
}
343+
344+
"Scala's Using automatically terminates ActorSystem" in {
345+
var currentSystem: ActorSystem = null
346+
Using(ActorSystem()) { system =>
347+
currentSystem = system
348+
}
349+
currentSystem.whenTerminated.isCompleted should ===(true)
350+
}
351+
337352
"allow configuration of guardian supervisor strategy" in {
338353
implicit val system =
339354
ActorSystem(

actor-tests/src/test/scala/org/apache/pekko/dispatch/DispatcherShutdownSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class DispatcherShutdownSpec extends AnyWordSpec with Matchers {
4545
val system = ActorSystem("DispatcherShutdownSpec")
4646
threadCount should be > 0
4747

48-
Await.ready(system.terminate(), 1.second)
48+
system.close()
4949
Await.ready(Future(pekko.Done)(system.dispatcher), 1.second)
5050

5151
TestKit.awaitCond(threadCount == 0, 3.second)

actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/ActorSystemTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static java.util.concurrent.TimeUnit.SECONDS;
1717
import static org.junit.Assert.assertFalse;
18+
import static org.junit.Assert.assertTrue;
1819

1920
import java.util.concurrent.CompletionStage;
2021
import org.apache.pekko.Done;
@@ -39,4 +40,15 @@ public void testGetWhenTerminatedWithoutTermination() {
3940
ActorSystem.create(Behaviors.empty(), "GetWhenTerminatedWithoutTermination");
4041
assertFalse(system.getWhenTerminated().toCompletableFuture().isDone());
4142
}
43+
44+
@Test
45+
public void testTryWithResources() throws Exception {
46+
ActorSystem<Void> system = null;
47+
try (ActorSystem<Void> actorSystem =
48+
ActorSystem.create(Behaviors.empty(), "TryWithResourcesSystem")) {
49+
system = actorSystem;
50+
}
51+
final CompletionStage<Done> cs = system.getWhenTerminated();
52+
assertTrue(cs.toCompletableFuture().isDone());
53+
}
4254
}

actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/ActorSystemSpec.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.pekko.actor.typed
1919

2020
import scala.annotation.nowarn
21+
import scala.util.Using
2122

2223
import org.apache.pekko
2324
import pekko.actor.typed.scaladsl.Behaviors
@@ -45,5 +46,19 @@ class ActorSystemSpec extends PekkoSpec {
4546
system.terminate()
4647
}
4748
}
49+
50+
"close method terminates ActorSystem" in {
51+
val system = ActorSystem(Behaviors.empty[String], "close-terminates-system")
52+
system.close()
53+
system.whenTerminated.isCompleted should ===(true)
54+
}
55+
56+
"Scala's Using automatically terminates ActorSystem" in {
57+
var currentSystem: ActorSystem[Nothing] = null
58+
Using(ActorSystem(Behaviors.empty[String], "using-terminates-system")) { system =>
59+
currentSystem = system
60+
}
61+
currentSystem.whenTerminated.isCompleted should ===(true)
62+
}
4863
}
4964
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Add close to ActorSystem
19+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close")
20+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.typed.ActorSystem.close")

actor-typed/src/main/scala/org/apache/pekko/actor/typed/ActorSystem.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package org.apache.pekko.actor.typed
1515

16-
import java.util.concurrent.{ CompletionStage, ThreadFactory }
16+
import java.util.concurrent.{ CompletionStage, ThreadFactory, TimeoutException }
1717

1818
import scala.concurrent.{ ExecutionContextExecutor, Future }
1919

@@ -42,7 +42,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
4242
* Not for user extension.
4343
*/
4444
@DoNotInherit
45-
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider {
45+
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicActorSystemProvider with AutoCloseable {
4646
this: InternalRecipientRef[T] =>
4747

4848
/**
@@ -147,6 +147,26 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions with ClassicA
147147
*/
148148
def getWhenTerminated: CompletionStage[Done]
149149

150+
/**
151+
* Terminates this actor system by running [[pekko.actor.CoordinatedShutdown]] with reason
152+
* [[pekko.actor.CoordinatedShutdown.ActorSystemTerminateReason]]. This method will block
153+
* until either the actor system is terminated or
154+
* `pekko.coordinated-shutdown.close-actor-system-timeout` timeout duration is
155+
* passed, in which case a [[TimeoutException]] is thrown.
156+
*
157+
* If `pekko.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off`
158+
* it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors
159+
* will still be terminated.
160+
*
161+
* This will stop the guardian actor, which in turn
162+
* will recursively stop all its child actors, and finally the system guardian
163+
* (below which the logging actors reside) and then execute all registered
164+
* termination handlers (see [[pekko.actor.ActorSystem.registerOnTermination]]).
165+
* @since 1.3.0
166+
*/
167+
@throws(classOf[TimeoutException])
168+
override def close(): Unit
169+
150170
/**
151171
* The deadLetter address is a destination that will accept (and discard)
152172
* every message sent to it.

actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ import org.slf4j.{ Logger, LoggerFactory }
122122
override lazy val getWhenTerminated: CompletionStage[pekko.Done] =
123123
whenTerminated.asJava
124124

125+
override def close(): Unit = system.close()
126+
125127
override def systemActorOf[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = {
126128
val ref = system.systemActorOf(
127129
PropsAdapter(
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Add close to ActorSystem
19+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ActorSystem.close")
20+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.pekko.actor.ExtendedActorSystem.close")
21+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.actor.ActorSystem.close")

0 commit comments

Comments
 (0)