From ef628ea3206001a1bd13eb3e68c0ffced9764cc1 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 5 Feb 2024 16:09:56 +0100 Subject: [PATCH] add asInputStream to ByteString (#1085) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add getInputStream to ByteString * scala 2 updates * rename as asInputStream * make methods final * use different asInputStream implementations on different subclasses Co-Authored-By: João Ferreira <943051+jtjeferreira@users.noreply.github.com> * Update ByteBufferBackedInputStream.scala scalafmt * remove default asInputStream impl * add some tests - more needed * Create bytestring-inputstream.excludes * scalafmt * Update bytestring-inputstream.excludes * Update ByteStringInputStreamSpec.scala * Update ByteStringInputStreamSpec.scala * Update ByteStringInputStreamSpec.scala * add benchmark * Update ByteString_asInputStream_Benchmark.scala * change to iterator earlier in chain --------- Co-authored-by: João Ferreira <943051+jtjeferreira@users.noreply.github.com> --- .../util/ByteStringInputStreamSpec.scala | 81 +++++++++++++++ .../bytestring-inputstream.excludes | 19 ++++ .../org/apache/pekko/util/ByteString.scala | 21 +++- .../org/apache/pekko/util/ByteString.scala | 24 ++++- .../org/apache/pekko/util/ByteString.scala | 24 ++++- bench-jmh/README.md | 2 +- .../ByteString_asInputStream_Benchmark.scala | 98 +++++++++++++++++++ 7 files changed, 256 insertions(+), 13 deletions(-) create mode 100644 actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala create mode 100644 actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes create mode 100644 bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala diff --git a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala new file mode 100644 index 00000000000..b49f4de017c --- /dev/null +++ b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import java.io.{ ByteArrayOutputStream, InputStream, OutputStream } +import java.nio.charset.StandardCharsets + +import org.apache.pekko +import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteStrings } + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class ByteStringInputStreamSpec extends AnyWordSpec with Matchers { + "ByteString1" must { + "support asInputStream" in { + ByteString1.empty.asInputStream.read() shouldEqual -1 + ByteString1.empty.asInputStream.read(Array.empty) shouldEqual -1 + toUtf8String(ByteString1.empty.asInputStream) shouldEqual "" + toUtf8String(ByteString1.fromString("abc").asInputStream) shouldEqual "abc" + } + } + "ByteString1C" must { + "support asInputStream" in { + toUtf8String(ByteString1C.fromString("").asInputStream) shouldEqual "" + toUtf8String(ByteString1C.fromString("abc").asInputStream) shouldEqual "abc" + val bytes = "abcdef".getBytes(StandardCharsets.US_ASCII) + toUtf8String(ByteString.fromArray(bytes, 1, 3).asInputStream) shouldEqual "bcd" + } + } + "ByteStrings" must { + "support asInputStream" in { + val empty = ByteStrings(ByteString1.fromString(""), ByteString1.fromString("")) + empty.asInputStream.read() shouldEqual -1 + empty.asInputStream.read(Array.empty) shouldEqual -1 + toUtf8String(empty.asInputStream) shouldEqual "" + val abc = ByteStrings(ByteString1.fromString("a"), ByteString1.fromString("bc")) + toUtf8String(abc.asInputStream) shouldEqual "abc" + } + } + + private def toUtf8String(input: InputStream): String = + new String(toByteArray(input), StandardCharsets.UTF_8) + + private def toByteArray(input: InputStream): Array[Byte] = { + val output = new ByteArrayOutputStream + try { + copy(input, output) + output.toByteArray + } finally { + output.close() + } + } + + private def copy(input: InputStream, output: OutputStream): Int = { + val buffer = new Array[Byte](4096) + var count = 0 + var n = input.read(buffer) + while (n != -1) { + output.write(buffer, 0, n) + count += n + n = input.read(buffer) + } + count + } +} diff --git a/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes b/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes new file mode 100644 index 00000000000..3f071efce57 --- /dev/null +++ b/actor/src/main/mima-filters/1.0.x.backwards.excludes/bytestring-inputstream.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Add ByteString.asInputStream +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.util.ByteString.asInputStream") diff --git a/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala b/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala index 0b06483b383..ac9d6ead762 100644 --- a/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala +++ b/actor/src/main/scala-2.12/org/apache/pekko/util/ByteString.scala @@ -13,7 +13,7 @@ package org.apache.pekko.util -import java.io.{ ObjectInputStream, ObjectOutputStream } +import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream, ObjectOutputStream, SequenceInputStream } import java.lang.{ Iterable => JIterable } import java.nio.{ ByteBuffer, ByteOrder } import java.nio.charset.{ Charset, StandardCharsets } @@ -21,6 +21,7 @@ import java.util.Base64 import scala.annotation.{ tailrec, varargs } import scala.collection.IndexedSeqOptimized +import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom import scala.collection.immutable import scala.collection.immutable.{ IndexedSeq, VectorBuilder } @@ -271,6 +272,8 @@ object ByteString { } override def toArrayUnsafe(): Array[Byte] = bytes + + override def asInputStream: InputStream = new ByteArrayInputStream(bytes) } /** INTERNAL API: ByteString backed by exactly one array, with start / end markers */ @@ -436,6 +439,9 @@ object ByteString { if (startIndex == 0 && length == bytes.length) bytes else toArray } + + override def asInputStream: InputStream = + new ByteArrayInputStream(bytes, startIndex, length) } private[pekko] object ByteStrings extends Companion { @@ -566,6 +572,9 @@ object ByteString { def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = bytestrings.map { _.asByteBuffer } + override def asInputStream: InputStream = + new SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration) + def decodeString(charset: String): String = compact.decodeString(charset) def decodeString(charset: Charset): String = compact.decodeString(charset) @@ -827,6 +836,15 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz */ def toArrayUnsafe(): Array[Byte] = toArray + /** + * Return the bytes in this ByteString as an InputStream. + * + * @return the bytes in this ByteString accessible as an InputStream + * @see [[asByteBuffer]] + * @since 1.1.0 + */ + def asInputStream: InputStream + override def foreach[@specialized U](f: Byte => U): Unit = iterator.foreach(f) private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit @@ -886,7 +904,6 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz * all fragments. Will always have at least one entry. */ def getByteBuffers(): JIterable[ByteBuffer] = { - import scala.collection.JavaConverters.asJavaIterableConverter asByteBuffers.asJava } diff --git a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala index bf5be5a69a2..ca3ee818d84 100644 --- a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala +++ b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteString.scala @@ -13,17 +13,17 @@ package org.apache.pekko.util -import java.io.{ ObjectInputStream, ObjectOutputStream } +import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream, ObjectOutputStream, SequenceInputStream } import java.lang.{ Iterable => JIterable } import java.nio.{ ByteBuffer, ByteOrder } import java.nio.charset.{ Charset, StandardCharsets } import java.util.Base64 -import scala.annotation.{ tailrec, varargs } +import scala.annotation.{ nowarn, tailrec, varargs } import scala.collection.{ immutable, mutable } import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps, StrictOptimizedSeqOps, VectorBuilder } import scala.collection.mutable.{ Builder, WrappedArray } +import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag -import scala.annotation.nowarn object ByteString { @@ -278,6 +278,7 @@ object ByteString { override def toArrayUnsafe(): Array[Byte] = bytes + override def asInputStream: InputStream = new ByteArrayInputStream(bytes) } /** INTERNAL API: ByteString backed by exactly one array, with start / end markers */ @@ -448,6 +449,9 @@ object ByteString { if (startIndex == 0 && length == bytes.length) bytes else toArray } + + override def asInputStream: InputStream = + new ByteArrayInputStream(bytes, startIndex, length) } private[pekko] object ByteStrings extends Companion { @@ -578,6 +582,9 @@ object ByteString { def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = bytestrings.map { _.asByteBuffer } + override def asInputStream: InputStream = + new SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration) + def decodeString(charset: String): String = compact.decodeString(charset) def decodeString(charset: Charset): String = compact.decodeString(charset) @@ -876,6 +883,15 @@ sealed abstract class ByteString */ def toArrayUnsafe(): Array[Byte] = toArray + /** + * Return the bytes in this ByteString as an InputStream. + * + * @return the bytes in this ByteString accessible as an InputStream + * @see [[asByteBuffer]] + * @since 1.1.0 + */ + def asInputStream: InputStream + override def foreach[@specialized U](f: Byte => U): Unit = iterator.foreach(f) private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit @@ -931,9 +947,7 @@ sealed abstract class ByteString * Java API: Returns an Iterable of read-only ByteBuffers that directly wraps this ByteStrings * all fragments. Will always have at least one entry. */ - @nowarn def getByteBuffers(): JIterable[ByteBuffer] = { - import scala.collection.JavaConverters.asJavaIterableConverter asByteBuffers.asJava } diff --git a/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala b/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala index bcd4c3284c5..e8e64af848a 100644 --- a/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala +++ b/actor/src/main/scala-3/org/apache/pekko/util/ByteString.scala @@ -13,7 +13,7 @@ package org.apache.pekko.util -import java.io.{ ObjectInputStream, ObjectOutputStream } +import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream, ObjectOutputStream, SequenceInputStream } import java.lang.{ Iterable => JIterable } import java.nio.{ ByteBuffer, ByteOrder } import java.nio.charset.{ Charset, StandardCharsets } @@ -23,10 +23,9 @@ import scala.annotation.{ tailrec, varargs } import scala.collection.{ immutable, mutable } import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps, StrictOptimizedSeqOps, VectorBuilder } import scala.collection.mutable.{ Builder, WrappedArray } +import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag -import scala.annotation.nowarn - object ByteString { /** @@ -279,6 +278,8 @@ object ByteString { } override def toArrayUnsafe(): Array[Byte] = bytes + + override def asInputStream: InputStream = new ByteArrayInputStream(bytes) } /** INTERNAL API: ByteString backed by exactly one array, with start / end markers */ @@ -449,6 +450,9 @@ object ByteString { if (startIndex == 0 && length == bytes.length) bytes else toArray } + + override def asInputStream: InputStream = + new ByteArrayInputStream(bytes, startIndex, length) } private[pekko] object ByteStrings extends Companion { @@ -579,6 +583,9 @@ object ByteString { def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = bytestrings.map { _.asByteBuffer } + override def asInputStream: InputStream = + new SequenceInputStream(bytestrings.iterator.map(_.asInputStream).asJavaEnumeration) + def decodeString(charset: String): String = compact.decodeString(charset) def decodeString(charset: Charset): String = compact.decodeString(charset) @@ -876,6 +883,15 @@ sealed abstract class ByteString */ def toArrayUnsafe(): Array[Byte] = toArray + /** + * Return the bytes in this ByteString as an InputStream. + * + * @return the bytes in this ByteString accessible as an InputStream + * @see [[asByteBuffer]] + * @since 1.1.0 + */ + def asInputStream: InputStream + override def foreach[@specialized U](f: Byte => U): Unit = iterator.foreach(f) private[pekko] def writeToOutputStream(os: ObjectOutputStream): Unit @@ -931,9 +947,7 @@ sealed abstract class ByteString * Java API: Returns an Iterable of read-only ByteBuffers that directly wraps this ByteStrings * all fragments. Will always have at least one entry. */ - @nowarn def getByteBuffers(): JIterable[ByteBuffer] = { - import scala.collection.JavaConverters.asJavaIterableConverter asByteBuffers.asJava } diff --git a/bench-jmh/README.md b/bench-jmh/README.md index df4709fcaf7..08aa061eb7d 100644 --- a/bench-jmh/README.md +++ b/bench-jmh/README.md @@ -9,7 +9,7 @@ Pekko uses [sbt-jmh](https://github.com/sbt/sbt-jmh) to integrate [Java Microben ```shell sbt shell pekko > project bench-jmh -sbt:bench-jmh> Jmh/run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark +sbt:pekko-bench-jmh> Jmh/run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark ``` or execute in one-line command diff --git a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala new file mode 100644 index 00000000000..36fb2b9e2d5 --- /dev/null +++ b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import java.io.{ ByteArrayInputStream, InputStream } +import java.util.concurrent.TimeUnit + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole + +/** + * Compares ByteString.asInputStream and new ByteStreamArray(ByteString.toArray). + */ +@State(Scope.Benchmark) +@Measurement(timeUnit = TimeUnit.MILLISECONDS) +class ByteString_asInputStream_Benchmark { + + var bs: ByteString = _ + + var composed: ByteString = _ + + @Param(Array("10", "100", "1000")) + var kb = 0 + + /* + bench-jmh/jmh:run -f 1 -wi 3 -i 3 .*ByteString_asInputStream_Benchmark.* + + [info] Benchmark (kb) Mode Cnt Score Error Units + [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream 10 thrpt 3 32398.229 ± 26714.266 ops/s + [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream 100 thrpt 3 3642.487 ± 576.459 ops/s + [info] ByteString_asInputStream_Benchmark.composed_bs_as_input_stream 1000 thrpt 3 285.910 ± 40.463 ops/s + [info] ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream 10 thrpt 3 6182.509 ± 933.899 ops/s + [info] ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream 100 thrpt 3 474.634 ± 84.763 ops/s + [info] ByteString_asInputStream_Benchmark.composed_bs_bytes_to_input_stream 1000 thrpt 3 38.764 ± 49.698 ops/s + [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream 10 thrpt 3 2436952.866 ± 1253216.244 ops/s + [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream 100 thrpt 3 339116.689 ± 297756.892 ops/s + [info] ByteString_asInputStream_Benchmark.single_bs_as_input_stream 1000 thrpt 3 32592.451 ± 12465.507 ops/s + [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream 10 thrpt 3 619077.237 ± 200242.708 ops/s + [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream 100 thrpt 3 50481.984 ± 78485.741 ops/s + [info] ByteString_asInputStream_Benchmark.single_bs_bytes_to_input_stream 1000 thrpt 3 4271.984 ± 1061.978 ops/s + */ + + @Setup + def setup(): Unit = { + val bytes = Array.ofDim[Byte](1024 * kb) + bs = ByteString(bytes) + composed = ByteString.empty + for (_ <- 0 to 100) { + composed = composed ++ bs + } + } + + @Benchmark + def single_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = { + blackhole.consume(countBytes(new ByteArrayInputStream(bs.toArray))) + } + + @Benchmark + def composed_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = { + blackhole.consume(countBytes(new ByteArrayInputStream(composed.toArray))) + } + + @Benchmark + def single_bs_as_input_stream(blackhole: Blackhole): Unit = { + blackhole.consume(countBytes(bs.asInputStream)) + } + + @Benchmark + def composed_bs_as_input_stream(blackhole: Blackhole): Unit = { + blackhole.consume(countBytes(composed.asInputStream)) + } + + private def countBytes(stream: InputStream): Int = { + val buffer = new Array[Byte](1024) + var count = 0 + var read = stream.read(buffer) + while (read != -1) { + count += read + read = stream.read(buffer) + } + count + } +}