diff --git a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala index 9935027c..b21816dc 100644 --- a/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala +++ b/jmh/src/test/scala/magnolify/jmh/MagnolifyBench.scala @@ -16,16 +16,13 @@ package magnolify.jmh -import magnolify.parquet.ParquetType.WriteSupport -import magnolify.parquet.{MagnolifyParquetProperties, ParquetType} - import java.util.concurrent.TimeUnit import magnolify.scalacheck.auto._ import magnolify.test.Simple._ -import org.apache.hadoop.conf.Configuration import org.scalacheck._ import org.openjdk.jmh.annotations._ +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ object MagnolifyBench { @@ -92,103 +89,6 @@ class AvroBench { @Benchmark def avroSchema: Schema = AvroType[Nested].schema } -@State(Scope.Benchmark) -class ParquetReadState(pt: ParquetType[Nested]) { - import org.apache.parquet.io._ - import org.apache.parquet.column.impl.ColumnWriteStoreV1 - import org.apache.parquet.column.ParquetProperties - import org.apache.parquet.hadoop.api.InitContext - - var reader: RecordReader[Nested] = null - - @Setup(Level.Invocation) - def setup(): Unit = { - // Write page - val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) - val memPageStore = new ParquetInMemoryPageStore(1) - val columns = new ColumnWriteStoreV1( - pt.schema, - memPageStore, - ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build - ) - val writeSupport = pt.writeSupport - val recordWriter = columnIO.getRecordWriter(columns) - writeSupport.prepareForWrite(recordWriter) - writeSupport.write(MagnolifyBench.nested) - recordWriter.flush() - columns.flush() - - // Read and convert page - val conf = new Configuration() - val readSupport = pt.readSupport - reader = columnIO.getRecordReader( - memPageStore, - readSupport.prepareForRead( - conf, - new java.util.HashMap, - pt.schema, - readSupport.init(new InitContext(conf, new java.util.HashMap, pt.schema))) - ) - } -} - -@State(Scope.Benchmark) -class ParquetWriteState(pt: ParquetType[Nested]) { - import org.apache.parquet.io._ - import org.apache.parquet.column.impl.ColumnWriteStoreV1 - import org.apache.parquet.column.ParquetProperties - - var writer: WriteSupport[Nested] = null - - @Setup(Level.Invocation) - def setup(): Unit = { - val columnIO = new ColumnIOFactory(true).getColumnIO(pt.schema) - val memPageStore = new ParquetInMemoryPageStore(1) - val columns = new ColumnWriteStoreV1( - pt.schema, - memPageStore, - ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build - ) - val writeSupport = pt.writeSupport - val recordWriter = columnIO.getRecordWriter(columns) - writeSupport.prepareForWrite(recordWriter) - this.writer = writeSupport - } -} - -object ParquetStates { - def confWithGroupedArraysProp(propValue: Boolean): Configuration = { - val conf = new Configuration() - conf.setBoolean(MagnolifyParquetProperties.WriteGroupedArrays, propValue) - conf - } - class DefaultParquetReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(false))) - class DefaultParquetWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(false))) - - class ParquetAvroCompatReadState extends ParquetReadState(ParquetType[Nested](confWithGroupedArraysProp(true))) - class ParquetAvroCompatWriteState extends ParquetWriteState(ParquetType[Nested](confWithGroupedArraysProp(true))) -} - -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -class ParquetBench { - import MagnolifyBench._ - - @Benchmark def parquetWrite(state: ParquetStates.DefaultParquetWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetStates.DefaultParquetReadState): Nested = state.reader.read() -} - -@BenchmarkMode(Array(Mode.AverageTime)) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -class ParquetAvroCompatBench { - import MagnolifyBench._ - - @Benchmark def parquetWrite(state: ParquetStates.ParquetAvroCompatWriteState): Unit = state.writer.write(nested) - @Benchmark def parquetRead(state: ParquetStates.ParquetAvroCompatReadState): Nested = state.reader.read() -} - @BenchmarkMode(Array(Mode.AverageTime)) @OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Thread) @@ -259,7 +159,149 @@ class ExampleBench { private val exampleNested = implicitly[Arbitrary[ExampleNested]].arbitrary(prms, seed).get private val example = exampleType.to(exampleNested).build() @Benchmark def exampleTo: Example.Builder = exampleType.to(exampleNested) - @Benchmark def exampleFrom: ExampleNested = exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap) + @Benchmark def exampleFrom: ExampleNested = + exampleType.from(example.getFeatures.getFeatureMap.asScala.toMap) +} + +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Thread) +class ParquetBench { + import MagnolifyBench._ + import ParquetStates._ + import magnolify.avro._ + import org.apache.avro.generic.GenericRecord + + private val genericRecord = AvroType[Nested].to(nested) + + @Benchmark def parquetWriteMagnolify(state: ParquetCaseClassWriteState): Unit = + state.writer.write(nested) + @Benchmark def parquetWriteAvro(state: ParquetAvroWriteState): Unit = + state.writer.write(genericRecord) + + @Benchmark def parquetReadMagnolify(state: ParquetCaseClassReadState): Nested = + state.reader.read() + @Benchmark def parquetReadAvro(state: ParquetAvroReadState): GenericRecord = state.reader.read() +} + +@nowarn("cat=deprecation") +object ParquetStates { + import MagnolifyBench._ + import magnolify.avro._ + import magnolify.parquet._ + import magnolify.parquet.ParquetArray.AvroCompat._ + import org.apache.avro.generic.{GenericData, GenericRecord} + import org.apache.hadoop.conf.Configuration + import org.apache.parquet.conf.PlainParquetConfiguration + import org.apache.parquet.avro.{AvroReadSupport, AvroWriteSupport} + import org.apache.parquet.column.ParquetProperties + import org.apache.parquet.hadoop.api.{ReadSupport, WriteSupport} + import org.apache.parquet.schema.MessageType + import org.apache.parquet.io._ + import org.apache.parquet.io.api.{Binary, RecordConsumer} + import org.apache.parquet.column.impl.ColumnWriteStoreV1 + + @State(Scope.Benchmark) + class ReadState[T]( + schema: MessageType, + writeSupport: WriteSupport[T], + readSupport: ReadSupport[T], + record: T + ) { + import org.apache.parquet.hadoop.api.InitContext + + var reader: RecordReader[T] = null + + @Setup(Level.Trial) + def setup(): Unit = { + // Write page + val columnIO = new ColumnIOFactory(true).getColumnIO(schema) + val pageStore = new ParquetInMemoryPageStore(1) + val columnWriteStore = new ColumnWriteStoreV1( + schema, + pageStore, + ParquetProperties.builder.withPageSize(800).withDictionaryEncoding(false).build + ) + val recordConsumer = columnIO.getRecordWriter(columnWriteStore) + writeSupport.init(new PlainParquetConfiguration()) + writeSupport.prepareForWrite(recordConsumer) + writeSupport.write(record) + recordConsumer.flush() + columnWriteStore.flush() + + // Set up reader + val conf = new Configuration() + reader = columnIO.getRecordReader( + pageStore, + readSupport.prepareForRead( + conf, + new java.util.HashMap, + schema, + readSupport.init(new InitContext(conf, new java.util.HashMap, schema)) + ) + ): @nowarn("cat=deprecation") + } + } + + @State(Scope.Benchmark) + class WriteState[T](writeSupport: WriteSupport[T]) { + val writer = writeSupport + + @Setup(Level.Trial) + def setup(): Unit = { + writeSupport.init(new PlainParquetConfiguration()) + // Use a no-op RecordConsumer; we want to measure only the record -> group conversion, and not pollute the + // benchmark with background tasks like flushing pages/blocks or validating records + writeSupport.prepareForWrite(new RecordConsumer { + override def startMessage(): Unit = {} + override def endMessage(): Unit = {} + override def startField(field: String, index: Int): Unit = {} + override def endField(field: String, index: Int): Unit = {} + override def startGroup(): Unit = {} + override def endGroup(): Unit = {} + override def addInteger(value: Int): Unit = {} + override def addLong(value: Long): Unit = {} + override def addBoolean(value: Boolean): Unit = {} + override def addBinary(value: Binary): Unit = {} + override def addFloat(value: Float): Unit = {} + override def addDouble(value: Double): Unit = {} + }) + } + } + + // R/W support for Group <-> Case Class Conversion (magnolify-parquet) + private val parquetType = ParquetType[Nested] + class ParquetCaseClassReadState + extends ParquetStates.ReadState[Nested]( + parquetType.schema, + parquetType.writeSupport, + parquetType.readSupport, + nested + ) + class ParquetCaseClassWriteState + extends ParquetStates.WriteState[Nested](parquetType.writeSupport) + + // R/W support for Group <-> Avro Conversion (parquet-avro) + private val avroType = AvroType[Nested] + class ParquetAvroReadState + extends ParquetStates.ReadState[GenericRecord]( + parquetType.schema, + new AvroWriteSupport[GenericRecord]( + parquetType.schema, + parquetType.avroSchema, + GenericData.get() + ), + new AvroReadSupport[GenericRecord](GenericData.get()), + avroType.to(nested) + ) + class ParquetAvroWriteState + extends ParquetStates.WriteState[GenericRecord]( + new AvroWriteSupport[GenericRecord]( + parquetType.schema, + parquetType.avroSchema, + GenericData.get() + ) + ) } // Collections are not supported diff --git a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala index 5bb596d4..dbbc0cff 100644 --- a/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala +++ b/jmh/src/test/scala/magnolify/jmh/ParquetInMemoryPageStore.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package magnolify.jmh import org.apache.parquet.bytes.{ByteBufferReleaser, BytesInput, HeapByteBufferAllocator} @@ -16,62 +32,108 @@ class ParquetInMemoryPageStore(rowCount: Long) extends PageReadStore with PageWr lazy val readers = new mutable.HashMap[ColumnDescriptor, ParquetInMemoryReader]() override def getPageReader(path: ColumnDescriptor): PageReader = - readers.getOrElseUpdate(path, { - val writer = writers(path) - new ParquetInMemoryReader(writer.numValues, writer.pages.toList, writer.dictionaryPage) - }) + readers.getOrElseUpdate( + path, { + val writer = writers(path) + new ParquetInMemoryReader(writer.pages.toList, writer.dictionaryPage) + } + ) override def getPageWriter(path: ColumnDescriptor): PageWriter = - writers.getOrElseUpdate(path, new ParquetInMemoryWriter()) + writers.getOrElseUpdate(path, new ParquetInMemoryWriter) override def getRowCount: Long = rowCount } -class ParquetInMemoryReader(valueCount: Long, pages: List[DataPage], dictionaryPage: DictionaryPage) extends PageReader { - lazy val pagesIt = pages.iterator +class ParquetInMemoryReader(pages: List[DataPageV1], dictionaryPage: DictionaryPage) + extends PageReader { + // Infinitely return the first page; for the purposes of benchmarking, we don't care about the data itself + private val page = pages.head + override def readDictionaryPage(): DictionaryPage = dictionaryPage - override def getTotalValueCount: Long = valueCount - override def readPage(): DataPage = pagesIt.next() + override def getTotalValueCount: Long = Long.MaxValue + override def readPage(): DataPage = new DataPageV1( + page.getBytes.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), + page.getValueCount, + page.getUncompressedSize, + page.getStatistics, + page.getRlEncoding, + page.getDlEncoding, + page.getValueEncoding + ) } class ParquetInMemoryWriter extends PageWriter { var numRows = 0 var numValues: Long = 0 var memSize: Long = 0 - val pages = new mutable.ListBuffer[DataPage]() + val pages = new mutable.ListBuffer[DataPageV1]() var dictionaryPage: DictionaryPage = null - override def writePage(bytesInput: BytesInput, valueCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { + override def writePage( + bytesInput: BytesInput, + valueCount: Int, + statistics: Statistics[_], + rlEncoding: Encoding, + dlEncoding: Encoding, + valuesEncoding: Encoding + ): Unit = writePage(bytesInput, valueCount, 1, statistics, rlEncoding, dlEncoding, valuesEncoding) - } - override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], sizeStatistics: SizeStatistics, rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { + override def writePage( + bytesInput: BytesInput, + valueCount: Int, + rowCount: Int, + statistics: Statistics[_], + sizeStatistics: SizeStatistics, + rlEncoding: Encoding, + dlEncoding: Encoding, + valuesEncoding: Encoding + ): Unit = writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding) - } - override def writePage(bytesInput: BytesInput, valueCount: Int, rowCount: Int, statistics: Statistics[_], rlEncoding: Encoding, dlEncoding: Encoding, valuesEncoding: Encoding): Unit = { - pages.addOne(new DataPageV1( - bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), - valueCount, - bytesInput.size().toInt, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding)) + override def writePage( + bytesInput: BytesInput, + valueCount: Int, + rowCount: Int, + statistics: Statistics[_], + rlEncoding: Encoding, + dlEncoding: Encoding, + valuesEncoding: Encoding + ): Unit = { + pages.addOne( + new DataPageV1( + bytesInput.copy(new ByteBufferReleaser(new HeapByteBufferAllocator)), + valueCount, + bytesInput.size().toInt, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding + ) + ) memSize += bytesInput.size() numRows += rowCount numValues += valueCount } - override def writePageV2(rowCount: Int, nullCount: Int, valueCount: Int, repetitionLevels: BytesInput, definitionLevels: BytesInput, dataEncoding: Encoding, data: BytesInput, statistics: Statistics[_]): Unit = ??? + override def writePageV2( + rowCount: Int, + nullCount: Int, + valueCount: Int, + repetitionLevels: BytesInput, + definitionLevels: BytesInput, + dataEncoding: Encoding, + data: BytesInput, + statistics: Statistics[_] + ): Unit = ??? override def getMemSize: Long = memSize override def allocatedSize(): Long = memSize - override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit = { + override def writeDictionaryPage(dictionaryPage: DictionaryPage): Unit = this.dictionaryPage = dictionaryPage - } override def memUsageString(prefix: String): String = s"$prefix $memSize bytes" } diff --git a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala index 34a51a91..a5123a47 100644 --- a/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala +++ b/parquet/src/main/scala/magnolify/parquet/MagnolifyParquetProperties.scala @@ -31,11 +31,4 @@ object MagnolifyParquetProperties { val ReadTypeKey = "parquet.type.read.type" val WriteTypeKey = "parquet.type.write.type" - - // Hash any Configuration values that might affect schema creation to use as part of Schema cache key - private[parquet] def hashValues(conf: Configuration): Int = - Option(conf.get(WriteGroupedArrays)) - .map(_.toBoolean) - .getOrElse(WriteGroupedArraysDefault) - .hashCode() }