Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: chore: use Hub #1705

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,7 @@ import pekko.stream.ActorAttributes
import pekko.stream.impl.ActorSubscriberMessage.{ OnComplete, OnError, OnNext, OnSubscribe }
import pekko.util.unused

import org.reactivestreams.{ Processor, Subscriber, Subscription }

/**
* INTERNAL API
*/
@InternalApi private[pekko] object ActorProcessor {

def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
val p = new ActorProcessor[I, O](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
p
}
}

/**
* INTERNAL API
*/
@InternalApi private[pekko] class ActorProcessor[I, O](impl: ActorRef)
extends ActorPublisher[O](impl)
with Processor[I, O] {
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
impl ! OnSubscribe(s)
}
override def onError(t: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(t)
impl ! OnError(t)
}
override def onComplete(): Unit = impl ! OnComplete
override def onNext(elem: I): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
impl ! OnNext(elem)
}
}
import org.reactivestreams.{ Subscriber, Subscription }

/**
* INTERNAL API
Expand Down

This file was deleted.

18 changes: 0 additions & 18 deletions stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,6 @@ import org.reactivestreams.Subscriber
new PublisherSink[In](attr, amendShape(attr))
}

/**
* INTERNAL API
*/
@InternalApi private[pekko] final class FanoutPublisherSink[In](val attributes: Attributes, shape: SinkShape[In])
extends SinkModule[In, Publisher[In]](shape) {

override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
val impl = context.materializer.actorOf(context, FanoutProcessorImpl.props(context.effectiveAttributes))
val fanoutProcessor = new ActorProcessor[In, In](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
(fanoutProcessor, fanoutProcessor)
}

override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attr, amendShape(attr))
}

/**
* INTERNAL API
* Attaches a subscriber to this stream.
Expand Down
Loading
Loading