diff --git a/stream/src/main/resources/reference.conf b/stream/src/main/resources/reference.conf index 2f376ec5ae5..17be879104d 100644 --- a/stream/src/main/resources/reference.conf +++ b/stream/src/main/resources/reference.conf @@ -21,6 +21,14 @@ pekko { # or full dispatcher configuration to be used by ActorMaterializer when creating Actors. dispatcher = "pekko.actor.default-dispatcher" + # FQCN of the MailboxType. The Class of the FQCN must have a public + # constructor with + # (org.apache.pekko.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. + # defaults to the single consumer mailbox for better performance. + mailbox { + mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" + } + # Fully qualified config path which holds the dispatcher configuration # or full dispatcher configuration to be used by stream operators that # perform blocking operations diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala index 06391f899f5..aaa4d83f17f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorMaterializerImpl.scala @@ -63,6 +63,7 @@ import pekko.util.OptionVal case Dispatchers.DefaultDispatcherId => // the caller said to use the default dispatcher, but that can been trumped by the dispatcher attribute props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) + .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName) case _ => props } @@ -195,10 +196,13 @@ private[pekko] class SubFusingActorMaterializerImpl( * INTERNAL API */ @InternalApi private[pekko] object StreamSupervisor { - def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props = + def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props = { Props(new StreamSupervisor(haveShutDown)) .withDeploy(Deploy.local) .withDispatcher(attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) + .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName) + } + private[stream] val baseName = "StreamSupervisor" private val actorName = SeqActorName(baseName) def nextName(): String = actorName.next() diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala index 0e58fef9764..0bd9f06705b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala @@ -62,6 +62,8 @@ import pekko.util.OptionVal val Debug = false + val MailboxConfigName: String = "pekko.stream.materializer.mailbox" + val DefaultPhase: Phase[Any] = new Phase[Any] { override def apply( settings: ActorMaterializerSettings, @@ -116,7 +118,10 @@ import pekko.util.OptionVal val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher val supervisorProps = - StreamSupervisor.props(attributes, haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local) + StreamSupervisor.props(attributes, haveShutDown) + .withDispatcher(dispatcher) + .withMailbox(MailboxConfigName) + .withDeploy(Deploy.local) // FIXME why do we need a global unique name for the child? val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName()) @@ -625,6 +630,7 @@ private final case class SavedIslandData( val effectiveProps = props.dispatcher match { case Dispatchers.DefaultDispatcherId => props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) + .withMailbox(MailboxConfigName) case _ => props } @@ -819,6 +825,7 @@ private final case class SavedIslandData( val props = ActorGraphInterpreter .props(shell) .withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher) + .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName) val actorName = fullIslandName match { case OptionVal.Some(n) => n @@ -974,7 +981,10 @@ private final case class SavedIslandData( val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max val props = - TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(dispatcher) + TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing) + .withDispatcher(dispatcher) + .withMailbox(PhasedFusingActorMaterializer.MailboxConfigName) + tlsActor = materializer.actorOf(props, "TLS-for-" + islandName) def factory(id: Int) = new ActorPublisher[Any](tlsActor) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)