-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(fix #766) Deprecate AvroCompat, replace automatic schema detection on read + Configurable write #996
(fix #766) Deprecate AvroCompat, replace automatic schema detection on read + Configurable write #996
Changes from all commits
61e5581
a1ee149
81e3f07
a730718
5e622ab
db5b1b0
be75cc3
56ff232
ca535fc
da8fb91
23bcbd9
eb205db
959c7d2
b9a7281
2fd7995
73a012e
1cb0205
96f43b2
aa869ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright 2025 Spotify AB | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package magnolify.parquet | ||
|
||
trait MagnolifyParquetProperties extends Serializable { | ||
def WriteAvroCompatibleArrays: Boolean = false | ||
def writeAvroSchemaToMetadata: Boolean = true | ||
|
||
private[parquet] final def schemaUniquenessKey: Int = WriteAvroCompatibleArrays.hashCode() | ||
} | ||
|
||
/** | ||
* If set in your core-site.xml or an explicit Configruation object passed to ParquetType, will be | ||
* parsed into MagnolifyParquetProperties | ||
*/ | ||
object MagnolifyParquetProperties { | ||
val Default: MagnolifyParquetProperties = new MagnolifyParquetProperties {} | ||
|
||
val WriteAvroCompatibleArrays: String = "magnolify.parquet.write-grouped-arrays" | ||
val WriteAvroSchemaToMetadata: String = "magnolify.parquet.write-avro-schema" | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,18 +30,27 @@ | |
} | ||
import org.apache.parquet.io.api._ | ||
import org.apache.parquet.io.{InputFile, OutputFile} | ||
import org.apache.parquet.schema.MessageType | ||
import org.apache.parquet.schema.{MessageType, Type} | ||
import org.slf4j.LoggerFactory | ||
import org.typelevel.scalaccompat.annotation.nowarn | ||
|
||
sealed trait ParquetArray | ||
|
||
/** | ||
* Add `import magnolify.parquet.ParquetArray.AvroCompat._` to generate AVRO schema on write | ||
* Add `import magnolify.parquet.ParquetArray.AvroCompat._` to generate generate Avro-compatible | ||
* array schemas. This import is DEPRECATED. Instead, pass the following option to your Parquet | ||
* Configuration: | ||
* | ||
* magnolify.parquet.write-grouped-arrays: true | ||
*/ | ||
object ParquetArray { | ||
implicit case object default extends ParquetArray | ||
|
||
@deprecated( | ||
message = | ||
"AvroCompat import is deprecated; set Parquet Configuration option `magnolify.parquet.write-grouped-arrays: true` instead", | ||
since = "0.8.0" | ||
) | ||
object AvroCompat { | ||
implicit case object avroCompat extends ParquetArray | ||
} | ||
|
@@ -52,7 +61,6 @@ | |
|
||
def schema: MessageType | ||
def avroSchema: AvroSchema | ||
def avroCompat: Boolean | ||
|
||
def setupInput(job: Job): Unit = { | ||
job.setInputFormatClass(classOf[ParquetInputFormat[T]]) | ||
|
@@ -72,33 +80,69 @@ | |
def readBuilder(file: InputFile): ReadBuilder[T] = new ReadBuilder(file, readSupport) | ||
def writeBuilder(file: OutputFile): WriteBuilder[T] = new WriteBuilder(file, writeSupport) | ||
|
||
def write(c: RecordConsumer, v: T): Unit | ||
def newConverter(): TypeConverter[T] | ||
private[parquet] def properties: MagnolifyParquetProperties | ||
|
||
private[parquet] def write(c: RecordConsumer, v: T): Unit = () | ||
private[parquet] def newConverter(writerSchema: Type): TypeConverter[T] = null | ||
} | ||
|
||
object ParquetType { | ||
private val logger = LoggerFactory.getLogger(this.getClass) | ||
|
||
implicit def apply[T](implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = | ||
ParquetType(CaseMapper.identity) | ||
ParquetType(CaseMapper.identity, MagnolifyParquetProperties.Default) | ||
|
||
def apply[T]( | ||
cm: CaseMapper | ||
)(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = | ||
ParquetType[T](cm, MagnolifyParquetProperties.Default)(f, pa) | ||
|
||
def apply[T]( | ||
conf: Configuration | ||
)(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = | ||
ParquetType[T]( | ||
CaseMapper.identity, { | ||
val WriteAvroCompatibleArraysOpt = | ||
Option(conf.get(MagnolifyParquetProperties.WriteAvroCompatibleArrays)).map(_.toBoolean) | ||
val writeMetadataOpt = Option( | ||
conf.get(MagnolifyParquetProperties.WriteAvroSchemaToMetadata) | ||
).map(_.toBoolean) | ||
|
||
new MagnolifyParquetProperties { | ||
override def WriteAvroCompatibleArrays: Boolean = | ||
WriteAvroCompatibleArraysOpt.getOrElse(super.WriteAvroCompatibleArrays) | ||
override def writeAvroSchemaToMetadata: Boolean = | ||
writeMetadataOpt.getOrElse(super.writeAvroSchemaToMetadata) | ||
} | ||
} | ||
)(f, pa) | ||
|
||
def apply[T]( | ||
properties: MagnolifyParquetProperties | ||
)(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = | ||
ParquetType[T](CaseMapper.identity, properties)(f, pa) | ||
|
||
def apply[T]( | ||
cm: CaseMapper, | ||
props: MagnolifyParquetProperties | ||
)(implicit f: ParquetField[T], pa: ParquetArray): ParquetType[T] = f match { | ||
case r: ParquetField.Record[_] => | ||
new ParquetType[T] { | ||
@transient override lazy val schema: MessageType = Schema.message(r.schema(cm)) | ||
@transient override lazy val avroSchema: AvroSchema = { | ||
@transient override def schema: MessageType = | ||
Schema.message(r.schema(cm, properties)) | ||
|
||
@transient override def avroSchema: AvroSchema = { | ||
val s = new AvroSchemaConverter().convert(schema) | ||
// add doc to avro schema | ||
val fieldDocs = f.fieldDocs(cm) | ||
SchemaUtil.deepCopy(s, f.typeDoc, fieldDocs.get) | ||
} | ||
|
||
override val avroCompat: Boolean = | ||
pa == ParquetArray.AvroCompat.avroCompat || f.hasAvroArray | ||
override def write(c: RecordConsumer, v: T): Unit = r.write(c, v)(cm) | ||
override def newConverter(): TypeConverter[T] = r.newConverter() | ||
override private[parquet] def properties: MagnolifyParquetProperties = props | ||
override def write(c: RecordConsumer, v: T): Unit = | ||
r.write(c, v)(cm, properties) | ||
override private[parquet] def newConverter(writerSchema: Type): TypeConverter[T] = | ||
r.newConverter(writerSchema) | ||
} | ||
case _ => | ||
throw new IllegalArgumentException(s"ParquetType can only be created from Record. Got $f") | ||
|
@@ -120,7 +164,6 @@ | |
|
||
// From AvroReadSupport | ||
private val AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema" | ||
private val OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema" | ||
|
||
class ReadSupport[T](private var parquetType: ParquetType[T]) extends hadoop.ReadSupport[T] { | ||
def this() = this(null) | ||
|
@@ -133,25 +176,17 @@ | |
parquetType = SerializationUtils.fromBase64[ParquetType[T]](readKeyType) | ||
} | ||
|
||
val metadata = context.getKeyValueMetadata | ||
val model = metadata.get(ParquetWriter.OBJECT_MODEL_NAME_PROP) | ||
val isAvroFile = (model != null && model.contains("avro")) || | ||
metadata.containsKey(AVRO_SCHEMA_METADATA_KEY) || | ||
metadata.containsKey(OLD_AVRO_SCHEMA_METADATA_KEY) | ||
if (isAvroFile && !parquetType.avroCompat) { | ||
logger.warn( | ||
"Parquet file was written from Avro records, " + | ||
"`import magnolify.parquet.ParquetArray.AvroCompat._` to read correctly" | ||
) | ||
} | ||
if (!isAvroFile && parquetType.avroCompat) { | ||
logger.warn( | ||
"Parquet file was not written from Avro records, " + | ||
"remove `import magnolify.parquet.ParquetArray.AvroCompat._` to read correctly" | ||
) | ||
val requestedSchema = { | ||
val s = Schema.message(parquetType.schema) | ||
// If reading Avro, roundtrip schema using parquet-avro converter to ensure array compatibility; | ||
// magnolify-parquet does not automatically wrap repeated fields into a group like parquet-avro does | ||
if (Schema.hasGroupedArray(context.getFileSchema)) { | ||
val converter = new AvroSchemaConverter() | ||
converter.convert(converter.convert(s)) | ||
} else { | ||
s | ||
} | ||
} | ||
|
||
val requestedSchema = Schema.message(parquetType.schema) | ||
Schema.checkCompatibility(context.getFileSchema, requestedSchema) | ||
new hadoop.ReadSupport.ReadContext(requestedSchema, java.util.Collections.emptyMap()) | ||
} | ||
|
@@ -163,7 +198,7 @@ | |
readContext: hadoop.ReadSupport.ReadContext | ||
): RecordMaterializer[T] = | ||
new RecordMaterializer[T] { | ||
private val root = parquetType.newConverter() | ||
private val root = parquetType.newConverter(fileSchema) | ||
override def getCurrentRecord: T = root.get | ||
override def getRootConverter: GroupConverter = root.asGroupConverter() | ||
} | ||
|
@@ -183,16 +218,23 @@ | |
|
||
val schema = Schema.message(parquetType.schema) | ||
val metadata = new java.util.HashMap[String, String]() | ||
if (parquetType.avroCompat) { | ||
// This overrides `WriteSupport#getName` | ||
metadata.put(ParquetWriter.OBJECT_MODEL_NAME_PROP, "avro") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did drop the behavior of writing |
||
metadata.put(AVRO_SCHEMA_METADATA_KEY, parquetType.avroSchema.toString()) | ||
} else { | ||
logger.warn( | ||
"Parquet file is being written with no avro compatibility, this mode is not " + | ||
"producing schema. Add `import magnolify.parquet.ParquetArray.AvroCompat._` to " + | ||
"generate schema" | ||
) | ||
|
||
if (parquetType.properties.writeAvroSchemaToMetadata) { | ||
try { | ||
metadata.put( | ||
AVRO_SCHEMA_METADATA_KEY, | ||
parquetType.avroSchema.toString() | ||
) | ||
} catch { | ||
// parquet-avro has greater schema restrictions than magnolify-parquet, e.g., parquet-avro does not | ||
// support Maps with non-Binary key types | ||
case e: IllegalArgumentException => | ||
logger.warn( | ||
s"Writer schema `$schema` contains a type not supported by Avro schemas; will not write " + | ||
s"key $AVRO_SCHEMA_METADATA_KEY to file metadata", | ||
e | ||
) | ||
} | ||
} | ||
|
||
new hadoop.WriteSupport.WriteContext(schema, metadata) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm on the fence about relying so heavily on the hadoop
Configuration
class, since it pulls in hadoop-common artifact and links us more tightly with Hadoop. Parquet is trying to move away fromConfiguration
and onto their own ParquetConfiguration class, which we could use instead. However, it might be confusing for Scio users since Scio is heavily dependent onConfiguration
and we don't have immediate plans to offboard from itThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I might pull this out into a separate PR. will update shortly