Skip to content

Commit bcb7039

Browse files
committed
perf: Add dedicated collectFirst implementation.
1 parent e597a70 commit bcb7039

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.impl.fusing
19+
20+
import org.apache.pekko
21+
import pekko.annotation.InternalApi
22+
import pekko.stream.ActorAttributes.SupervisionStrategy
23+
import pekko.stream.Attributes.SourceLocation
24+
import pekko.stream.impl.Stages.DefaultAttributes
25+
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
26+
import pekko.stream._
27+
28+
import scala.util.control.NonFatal
29+
30+
/**
31+
* INTERNAL API
32+
*/
33+
@InternalApi
34+
private[pekko] final class CollectFirst[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
35+
private val in = Inlet[In]("CollectFirst.in")
36+
private val out = Outlet[Out]("CollectFirst.out")
37+
override val shape = FlowShape(in, out)
38+
39+
override def initialAttributes: Attributes = DefaultAttributes.collectFirst and SourceLocation.forLambda(pf)
40+
41+
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
42+
new GraphStageLogic(shape) with InHandler with OutHandler {
43+
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
44+
import Collect.NotApplied
45+
46+
override final def onPush(): Unit =
47+
try {
48+
// 1. `applyOrElse` is faster than (`pf.isDefinedAt` and then `pf.apply`)
49+
// 2. using reference comparing here instead of pattern matching can generate less and quicker bytecode,
50+
// eg: just a simple `IF_ACMPNE`, and you can find the same trick in `CollectWhile` operator.
51+
// If you interest, you can check the associated PR for this change and the
52+
// current implementation of `scala.collection.IterableOnceOps.collectFirst`.
53+
val result = pf.applyOrElse(grab(in), NotApplied)
54+
if (result.asInstanceOf[AnyRef] eq NotApplied) {
55+
pull(in)
56+
} else {
57+
push(out, result.asInstanceOf[Out])
58+
completeStage()
59+
}
60+
} catch {
61+
case NonFatal(ex) =>
62+
decider(ex) match {
63+
case Supervision.Stop => failStage(ex)
64+
case _ =>
65+
// The !hasBeenPulled(in) check is not required here since it
66+
// isn't possible to do an additional pull(in) due to the nature
67+
// of how collect works
68+
pull(in)
69+
}
70+
}
71+
72+
override final def onPull(): Unit = pull(in)
73+
74+
setHandlers(in, out, this)
75+
}
76+
77+
override def toString: String = "CollectFirst"
78+
}

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,9 +1561,7 @@ trait FlowOps[+Out, +Mat] {
15611561
*
15621562
* '''Cancels when''' downstream cancels
15631563
*/
1564-
def collectFirst[T](pf: PartialFunction[Out, T]): Repr[T] =
1565-
via(Flow[Out].collect(pf).take(1)
1566-
.withAttributes(DefaultAttributes.collectFirst and SourceLocation.forLambda(pf)))
1564+
def collectFirst[T](pf: PartialFunction[Out, T]): Repr[T] = via(new CollectFirst(pf))
15671565

15681566
/**
15691567
* Transform this stream by applying the given partial function to each of the elements

0 commit comments

Comments
 (0)