Skip to content

Commit 46042b8

Browse files
committed
perf: avoid boxing in zipWithIndex
1 parent dad6b9f commit 46042b8

File tree

10 files changed

+153
-20
lines changed

10 files changed

+153
-20
lines changed

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package org.apache.pekko.stream.javadsl;
1515

16+
import com.google.common.collect.Sets;
1617
import org.apache.pekko.Done;
1718
import org.apache.pekko.NotUsed;
1819
import org.apache.pekko.actor.ActorRef;
@@ -1663,4 +1664,42 @@ public void mustBeAbleToConvertToJavaInJava() {
16631664
org.apache.pekko.stream.scaladsl.Flow.apply();
16641665
Flow<Integer, Integer, NotUsed> javaFlow = scalaFlow.asJava();
16651666
}
1667+
1668+
@Test
1669+
public void zipWithIndex() {
1670+
final List<Integer> input = Arrays.asList(1, 2, 3);
1671+
final List<Pair<Integer, Long>> expected =
1672+
Arrays.asList(new Pair<>(1, 0L), new Pair<>(2, 1L), new Pair<>(3, 2L));
1673+
1674+
final List<Pair<Integer, Long>> result =
1675+
Source.from(input)
1676+
.via(Flow.of(Integer.class).zipWithIndex())
1677+
.runWith(Sink.seq(), system)
1678+
.toCompletableFuture()
1679+
.join();
1680+
1681+
assertEquals(expected, result);
1682+
}
1683+
1684+
@Test
1685+
public void zipWithIndexInSubFlow() {
1686+
1687+
final Set<Pair<Integer, Long>> resultSet =
1688+
new HashSet<>(
1689+
Source.range(1, 5)
1690+
.via(Flow.of(Integer.class).groupBy(2, i -> i % 2).zipWithIndex().mergeSubstreams())
1691+
.runWith(Sink.seq(), system)
1692+
.toCompletableFuture()
1693+
.join());
1694+
1695+
Assert.assertEquals(
1696+
new HashSet<>(
1697+
Arrays.asList(
1698+
Pair.create(1, 0L),
1699+
Pair.create(3, 1L),
1700+
Pair.create(5, 2L),
1701+
Pair.create(2, 0L),
1702+
Pair.create(4, 1L))),
1703+
resultSet);
1704+
}
16661705
}

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,4 +1504,34 @@ public void flattenOptionalOptional() throws Exception {
15041504
.get(3, TimeUnit.SECONDS);
15051505
Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList);
15061506
}
1507+
1508+
@Test
1509+
public void zipWithIndex() {
1510+
final List<Pair<Integer, Long>> resultList =
1511+
Source.range(1, 3).zipWithIndex().runWith(Sink.seq(), system).toCompletableFuture().join();
1512+
assertEquals(
1513+
Arrays.asList(Pair.create(1, 0L), Pair.create(2, 1L), Pair.create(3, 2L)), resultList);
1514+
}
1515+
1516+
@Test
1517+
public void zipWithIndexOnSubSource() {
1518+
final Set<Pair<Integer, Long>> resultSet =
1519+
new HashSet<>(
1520+
Source.range(1, 5)
1521+
.groupBy(2, i -> i % 2)
1522+
.zipWithIndex()
1523+
.mergeSubstreams()
1524+
.runWith(Sink.seq(), system)
1525+
.toCompletableFuture()
1526+
.join());
1527+
Assert.assertEquals(
1528+
new HashSet<>(
1529+
Arrays.asList(
1530+
Pair.create(1, 0L),
1531+
Pair.create(3, 1L),
1532+
Pair.create(5, 2L),
1533+
Pair.create(2, 0L),
1534+
Pair.create(4, 1L))),
1535+
resultSet);
1536+
}
15071537
}

stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313

1414
package org.apache.pekko.stream.impl
1515

16+
import java.util.Spliterator
17+
import java.util.function.Consumer
18+
1619
import org.apache.pekko
1720
import pekko.annotation.InternalApi
1821
import pekko.stream._
1922
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
2023

21-
import java.util.Spliterator
22-
import java.util.function.Consumer
23-
2424
/** INTERNAL API */
2525
@InternalApi private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]](
2626
open: () => java.util.stream.BaseStream[T, S])
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.impl.fusing
19+
20+
import org.apache.pekko
21+
import pekko.annotation.InternalApi
22+
import pekko.japi.Pair
23+
import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
24+
import pekko.stream.impl.Stages.DefaultAttributes
25+
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
26+
27+
/**
28+
* INTERNAL API
29+
*/
30+
@InternalApi private[pekko] object ZipWithIndex extends GraphStage[FlowShape[Any, (Any, Long)]] {
31+
val in = Inlet[Any]("ZipWithIndex.in")
32+
val out = Outlet[(Any, Long)]("ZipWithIndex.out")
33+
override val shape = FlowShape(in, out)
34+
override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex
35+
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
36+
new GraphStageLogic(shape) with InHandler with OutHandler {
37+
private var index = 0L
38+
override def onPush(): Unit = {
39+
push(out, (grab(in), index))
40+
index += 1
41+
}
42+
43+
override def onPull(): Unit = pull(in)
44+
setHandlers(in, out, this)
45+
}
46+
}
47+
48+
/**
49+
* INTERNAL API
50+
*/
51+
@InternalApi private[pekko] object ZipWithIndexJava extends GraphStage[FlowShape[Any, Pair[Any, Long]]] {
52+
val in = Inlet[Any]("ZipWithIndex.in")
53+
val out = Outlet[Pair[Any, Long]]("ZipWithIndex.out")
54+
override val shape = FlowShape(in, out)
55+
override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex
56+
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
57+
new GraphStageLogic(shape) with InHandler with OutHandler {
58+
private var index = 0L
59+
override def onPush(): Unit = {
60+
push(out, new Pair(grab(in), index))
61+
index += 1
62+
}
63+
64+
override def onPull(): Unit = pull(in)
65+
setHandlers(in, out, this)
66+
}
67+
}

stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration
2626
import scala.reflect.ClassTag
2727

2828
import org.apache.pekko
29+
import org.apache.pekko.stream.impl.fusing.ZipWithIndexJava
2930
import pekko.Done
3031
import pekko.NotUsed
3132
import pekko.actor.ActorRef
@@ -3691,7 +3692,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
36913692
* '''Cancels when''' downstream cancels
36923693
*/
36933694
def zipWithIndex: Flow[In, Pair[Out, java.lang.Long], Mat] =
3694-
new Flow(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) })
3695+
via(ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])
36953696

36963697
/**
36973698
* If the first element has not passed through this operator before the provided timeout, the stream is failed

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import pekko.japi.{ function, JavaPartialFunction, Pair }
3535
import pekko.japi.function.Creator
3636
import pekko.stream._
3737
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava }
38-
import pekko.stream.impl.fusing.ArraySource
38+
import pekko.stream.impl.fusing.{ ArraySource, ZipWithIndexJava }
3939
import pekko.util.{ unused, _ }
4040
import pekko.util.FutureConverters._
4141
import pekko.util.JavaDurationConverters._
@@ -2173,7 +2173,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
21732173
* '''Cancels when''' downstream cancels
21742174
*/
21752175
def zipWithIndex: javadsl.Source[Pair[Out @uncheckedVariance, java.lang.Long], Mat] =
2176-
new Source(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) })
2176+
new Source(delegate.via(
2177+
ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]]))
21772178

21782179
/**
21792180
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange
2929
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
3030
import pekko.japi.{ function, Pair }
3131
import pekko.stream._
32+
import pekko.stream.impl.fusing.ZipWithIndexJava
3233
import pekko.util.ConstantFun
3334
import pekko.util.FutureConverters._
3435
import pekko.util.JavaDurationConverters._
@@ -2272,7 +2273,8 @@ class SubFlow[In, Out, Mat](
22722273
* '''Cancels when''' downstream cancels
22732274
*/
22742275
def zipWithIndex: SubFlow[In, pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] =
2275-
new SubFlow(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair[Out, java.lang.Long](elem, index) })
2276+
new SubFlow(delegate.via(
2277+
ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]]))
22762278

22772279
/**
22782280
* If the first element has not passed through this operator before the provided timeout, the stream is failed

stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange
2929
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
3030
import pekko.japi.{ function, Pair }
3131
import pekko.stream._
32+
import pekko.stream.impl.fusing.ZipWithIndexJava
3233
import pekko.util.ConstantFun
3334
import pekko.util.FutureConverters._
3435
import pekko.util.JavaDurationConverters._
@@ -2246,8 +2247,9 @@ class SubSource[Out, Mat](
22462247
*
22472248
* '''Cancels when''' downstream cancels
22482249
*/
2249-
def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, Long], Mat] =
2250-
new SubSource(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair(elem, index) })
2250+
def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] =
2251+
new SubSource(delegate.via(
2252+
ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]]))
22512253

22522254
/**
22532255
* If the first element has not passed through this operator before the provided timeout, the stream is failed

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3304,16 +3304,7 @@ trait FlowOps[+Out, +Mat] {
33043304
*
33053305
* '''Cancels when''' downstream cancels
33063306
*/
3307-
def zipWithIndex: Repr[(Out, Long)] = {
3308-
statefulMapConcat[(Out, Long)] { () =>
3309-
var index: Long = 0L
3310-
elem => {
3311-
val zipped = (elem, index)
3312-
index += 1
3313-
immutable.Iterable[(Out, Long)](zipped)
3314-
}
3315-
}
3316-
}
3307+
def zipWithIndex: Repr[(Out, Long)] = via(ZipWithIndex.asInstanceOf[Graph[FlowShape[Out, (Out, Long)], NotUsed]])
33173308

33183309
/**
33193310
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].

stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ package org.apache.pekko.stream.scaladsl
1616
import scala.annotation.unchecked.uncheckedVariance
1717

1818
import org.apache.pekko
19-
import pekko.annotation.ApiMayChange
2019
import pekko.NotUsed
20+
import pekko.annotation.ApiMayChange
2121
import pekko.japi.Pair
2222
import pekko.stream._
2323

0 commit comments

Comments
 (0)