diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 1ec8172bc..550797ae6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -26,4 +26,9 @@
files="StorageSinkConnectorConfig.java"
/>
+
+
diff --git a/core/pom.xml b/core/pom.xml
index a2d9c2dc5..752773200 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -36,6 +36,10 @@
io.confluent
kafka-connect-avro-data
+
+ io.confluent
+ kafka-connect-protobuf-converter
+
joda-time
joda-time
diff --git a/core/src/main/java/io/confluent/connect/storage/schema/SchemaProjector.java b/core/src/main/java/io/confluent/connect/storage/schema/SchemaProjector.java
new file mode 100644
index 000000000..11c266e2d
--- /dev/null
+++ b/core/src/main/java/io/confluent/connect/storage/schema/SchemaProjector.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.connect.storage.schema;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.SchemaProjectorException;
+import io.confluent.connect.avro.AvroData;
+import io.confluent.connect.protobuf.ProtobufData;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+
+public class SchemaProjector {
+
+ private static final Set> promotable =
+ new HashSet<>();
+
+ static {
+ Schema.Type[] promotableTypes = {Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32,
+ Schema.Type.INT64, Schema.Type.FLOAT32, Schema.Type.FLOAT64};
+ for (int i = 0; i < promotableTypes.length; ++i) {
+ for (int j = i; j < promotableTypes.length; ++j) {
+ promotable.add(new AbstractMap.SimpleImmutableEntry<>(
+ promotableTypes[i],
+ promotableTypes[j]));
+ }
+ }
+ }
+
+ public static Object project(Schema source, Object record, Schema target)
+ throws SchemaProjectorException {
+ checkMaybeCompatible(source, target);
+ if (source.isOptional() && !target.isOptional()) {
+ if (target.defaultValue() != null) {
+ if (record != null) {
+ return projectRequiredSchema(source, record, target);
+ } else {
+ return target.defaultValue();
+ }
+ } else {
+ throw new SchemaProjectorException("Writer schema is optional, "
+ + "however, target schema does not provide a default value.");
+ }
+ } else {
+ if (record != null) {
+ return projectRequiredSchema(source, record, target);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static Object projectRequiredSchema(Schema source, Object record, Schema target)
+ throws SchemaProjectorException {
+ switch (target.type()) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BOOLEAN:
+ case BYTES:
+ case STRING:
+ return projectPrimitive(source, record, target);
+ case STRUCT:
+ return projectStruct(source, (Struct) record, target);
+ case ARRAY:
+ return projectArray(source, record, target);
+ case MAP:
+ return projectMap(source, record, target);
+ default:
+ return null;
+ }
+ }
+
+ private static Object projectStruct(Schema source, Struct sourceStruct, Schema target)
+ throws SchemaProjectorException {
+ Struct targetStruct = new Struct(target);
+ for (Field targetField : target.fields()) {
+ String fieldName = targetField.name();
+ Field sourceField = source.field(fieldName);
+ if (sourceField != null) {
+ Object sourceFieldValue = sourceStruct.get(fieldName);
+ try {
+ Object targetFieldValue = project(
+ sourceField.schema(),
+ sourceFieldValue,
+ targetField.schema());
+ targetStruct.put(fieldName, targetFieldValue);
+ } catch (SchemaProjectorException e) {
+ throw new SchemaProjectorException("Error projecting " + sourceField.name(), e);
+ }
+ } else if (targetField.schema().isOptional()) {
+ // Ignore missing field
+ } else if (targetField.schema().defaultValue() != null) {
+ targetStruct.put(fieldName, targetField.schema().defaultValue());
+ } else {
+ throw new SchemaProjectorException("Required field `" + fieldName
+ + "` is missing from source schema: " + source);
+ }
+ }
+ return targetStruct;
+ }
+
+
+ private static void checkMaybeCompatible(Schema source, Schema target) {
+ if (source.type() != target.type() && !isPromotable(source.type(), target.type())) {
+ throw new SchemaProjectorException("Schema type mismatch. source type: " + source.type()
+ + " and target type: " + target.type());
+ } else if (!Objects.equals(source.name(), target.name())) {
+ throw new SchemaProjectorException("Schema name mismatch. source name: " + source.name()
+ + " and target name: " + target.name());
+ } else if (source.parameters() != null && target.parameters() != null) {
+ if (isEnumSchema(source) && isEnumSchema(target)) {
+ if (!target.parameters().entrySet().containsAll(source.parameters().entrySet())) {
+ throw new SchemaProjectorException("Schema parameters mismatch. Source parameter: "
+ + source.parameters()
+ + " is not a subset of target parameters: " + target.parameters());
+ }
+ } else if (!Objects.equals(source.parameters(), target.parameters())) {
+ throw new SchemaProjectorException("Schema parameters not equal. source parameters: "
+ + source.parameters() + " and target parameters: " + target.parameters());
+ }
+ }
+ }
+
+ static boolean isEnumSchema(Schema schema) {
+ return schema.parameters() != null
+ && (schema.parameters().containsKey(AvroData.AVRO_TYPE_ENUM)
+ || schema.parameters().containsKey(ProtobufData.PROTOBUF_TYPE_ENUM));
+ }
+
+ private static Object projectArray(Schema source, Object record, Schema target)
+ throws SchemaProjectorException {
+ List> array = (List>) record;
+ List
+
+ io.confluent
+ kafka-connect-protobuf-converter
+ ${confluent.version.range}
+
org.apache.parquet
parquet-column