Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@
files="StorageSinkConnectorConfig.java"
/>

<suppress
checks="(CyclomaticComplexity)"
files="SchemaProjector.java"
/>

</suppressions>
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-data</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.storage.schema;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.SchemaProjectorException;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.protobuf.ProtobufData;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;


public class SchemaProjector {

private static final Set<AbstractMap.SimpleImmutableEntry<Schema.Type, Schema.Type>> promotable =
new HashSet<>();

static {
Schema.Type[] promotableTypes = {Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32,
Schema.Type.INT64, Schema.Type.FLOAT32, Schema.Type.FLOAT64};
for (int i = 0; i < promotableTypes.length; ++i) {
for (int j = i; j < promotableTypes.length; ++j) {
promotable.add(new AbstractMap.SimpleImmutableEntry<>(
promotableTypes[i],
promotableTypes[j]));
}
}
}

public static Object project(Schema source, Object record, Schema target)
throws SchemaProjectorException {
checkMaybeCompatible(source, target);
if (source.isOptional() && !target.isOptional()) {
if (target.defaultValue() != null) {
if (record != null) {
return projectRequiredSchema(source, record, target);
} else {
return target.defaultValue();
}
} else {
throw new SchemaProjectorException("Writer schema is optional, "
+ "however, target schema does not provide a default value.");
}
} else {
if (record != null) {
return projectRequiredSchema(source, record, target);
} else {
return null;
}
}
}

private static Object projectRequiredSchema(Schema source, Object record, Schema target)
throws SchemaProjectorException {
switch (target.type()) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
case BOOLEAN:
case BYTES:
case STRING:
return projectPrimitive(source, record, target);
case STRUCT:
return projectStruct(source, (Struct) record, target);
case ARRAY:
return projectArray(source, record, target);
case MAP:
return projectMap(source, record, target);
default:
return null;
}
}

private static Object projectStruct(Schema source, Struct sourceStruct, Schema target)
throws SchemaProjectorException {
Struct targetStruct = new Struct(target);
for (Field targetField : target.fields()) {
String fieldName = targetField.name();
Field sourceField = source.field(fieldName);
if (sourceField != null) {
Object sourceFieldValue = sourceStruct.get(fieldName);
try {
Object targetFieldValue = project(
sourceField.schema(),
sourceFieldValue,
targetField.schema());
targetStruct.put(fieldName, targetFieldValue);
} catch (SchemaProjectorException e) {
throw new SchemaProjectorException("Error projecting " + sourceField.name(), e);
}
} else if (targetField.schema().isOptional()) {
// Ignore missing field
} else if (targetField.schema().defaultValue() != null) {
targetStruct.put(fieldName, targetField.schema().defaultValue());
} else {
throw new SchemaProjectorException("Required field `" + fieldName
+ "` is missing from source schema: " + source);
}
}
return targetStruct;
}


private static void checkMaybeCompatible(Schema source, Schema target) {
if (source.type() != target.type() && !isPromotable(source.type(), target.type())) {
throw new SchemaProjectorException("Schema type mismatch. source type: " + source.type()
+ " and target type: " + target.type());
} else if (!Objects.equals(source.name(), target.name())) {
throw new SchemaProjectorException("Schema name mismatch. source name: " + source.name()
+ " and target name: " + target.name());
} else if (source.parameters() != null && target.parameters() != null) {
if (isEnumSchema(source) && isEnumSchema(target)) {
if (!target.parameters().entrySet().containsAll(source.parameters().entrySet())) {
throw new SchemaProjectorException("Schema parameters mismatch. Source parameter: "
+ source.parameters()
+ " is not a subset of target parameters: " + target.parameters());
}
} else if (!Objects.equals(source.parameters(), target.parameters())) {
throw new SchemaProjectorException("Schema parameters not equal. source parameters: "
+ source.parameters() + " and target parameters: " + target.parameters());
}
}
}

static boolean isEnumSchema(Schema schema) {
return schema.parameters() != null
&& (schema.parameters().containsKey(AvroData.AVRO_TYPE_ENUM)
|| schema.parameters().containsKey(ProtobufData.PROTOBUF_TYPE_ENUM));
}

private static Object projectArray(Schema source, Object record, Schema target)
throws SchemaProjectorException {
List<?> array = (List<?>) record;
List<Object> retArray = new ArrayList<>();
for (Object entry : array) {
retArray.add(project(source.valueSchema(), entry, target.valueSchema()));
}
return retArray;
}

private static Object projectMap(Schema source, Object record, Schema target)
throws SchemaProjectorException {
Map<?, ?> map = (Map<?, ?>) record;
Map<Object, Object> retMap = new HashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
Object retKey = project(source.keySchema(), key, target.keySchema());
Object retValue = project(source.valueSchema(), value, target.valueSchema());
retMap.put(retKey, retValue);
}
return retMap;
}

private static Object projectPrimitive(Schema source, Object record, Schema target)
throws SchemaProjectorException {
assert source.type().isPrimitive();
assert target.type().isPrimitive();
Object result;
if (isPromotable(source.type(), target.type()) && record instanceof Number) {
Number numberRecord = (Number) record;
switch (target.type()) {
case INT8:
result = numberRecord.byteValue();
break;
case INT16:
result = numberRecord.shortValue();
break;
case INT32:
result = numberRecord.intValue();
break;
case INT64:
result = numberRecord.longValue();
break;
case FLOAT32:
result = numberRecord.floatValue();
break;
case FLOAT64:
result = numberRecord.doubleValue();
break;
default:
throw new SchemaProjectorException("Not promotable type.");
}
} else {
result = record;
}
return result;
}

private static boolean isPromotable(Schema.Type sourceType, Schema.Type targetType) {
return promotable.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType, targetType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaProjector;
import org.apache.kafka.connect.errors.SchemaProjectorException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -273,7 +272,14 @@ protected boolean checkSchemaParameters(
Schema originalSchema,
Schema currentSchema
) {
return !Objects.equals(originalSchema.parameters(), currentSchema.parameters());
if (SchemaProjector.isEnumSchema(originalSchema)
&& SchemaProjector.isEnumSchema(currentSchema)) {
Map<String, String> originalParams = originalSchema.parameters();
Map<String, String> currentParams = currentSchema.parameters();
return !currentParams.entrySet().containsAll(originalParams.entrySet());
} else {
return !Objects.equals(originalSchema.parameters(), currentSchema.parameters());
}
}

protected boolean isPromotable(Schema.Type sourceType, Schema.Type targetType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.confluent.connect.storage.schema;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.protobuf.ProtobufData;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.SchemaProjectorException;
import org.junit.Test;

import static org.junit.Assert.assertThrows;

public class SchemaProjectorTest {

private static SchemaBuilder buildAvroEnumSchema(String name, int version, String... values) {
// Enum schema is unwrapped as strings; symbols are represented as parameters
SchemaBuilder enumSchema = SchemaBuilder.string()
.version(version)
.name(name);
enumSchema.parameter(AvroData.AVRO_TYPE_ENUM, name);
for (String value: values) {
enumSchema.parameter(AvroData.AVRO_TYPE_ENUM + "." + value, value);
}
return enumSchema;
}

private static SchemaBuilder buildProtobufEnumSchema(String name, int version, String... values) {
// Enum schema is unwrapped as strings or integers; symbols are represented as parameters
SchemaBuilder enumSchema = SchemaBuilder.string()
.version(version)
.name(name);
enumSchema.parameter(ProtobufData.PROTOBUF_TYPE_ENUM, name);
for (String value: values) {
enumSchema.parameter(ProtobufData.PROTOBUF_TYPE_ENUM + "." + value, value);
}
return enumSchema;
}

private static SchemaBuilder buildStringSchema(String name, int version) {
return SchemaBuilder.string()
.version(version)
.name(name);
}

private static final Schema ENUM_SCHEMA_A =
buildAvroEnumSchema("e1", 1, "RED", "GREEN", "BLUE").build();
private static final Schema ENUM_SCHEMA_A2 =
buildAvroEnumSchema("e1", 2, "RED", "GREEN").build();
private static final Schema ENUM_SCHEMA_B =
buildAvroEnumSchema("e1", 1, "RED", "GREEN", "BLUE", "YELLOW").build();
private static final Schema ENUM_SCHEMA_C =
buildProtobufEnumSchema("e1", 1, "RED", "GREEN", "BLUE").build();
private static final Schema ENUM_SCHEMA_C2 =
buildProtobufEnumSchema("e1", 2, "RED", "GREEN").build();
private static final Schema ENUM_SCHEMA_D =
buildProtobufEnumSchema("e1", 1, "RED", "GREEN", "BLUE", "YELLOW").build();
private static final Schema STRING_SCHEMA_A =
buildStringSchema("schema1", 1).build();
private static final Schema STRING_SCHEMA_B =
buildStringSchema("schema2", 1).build();

@Test
public void testCheckMaybeCompatibleWithStringSchema() {
String value = "test";

// Test String schema and Enum schema are treated differently; String schema name mismatch
assertThrows(SchemaProjectorException.class, () -> SchemaProjector.project(STRING_SCHEMA_A, value, STRING_SCHEMA_B));
}

@Test
public void testCheckMaybeCompatibleWithAvroEnumSchema() {
String value = "RED";

// Exception on addition of enum symbol
assertThrows(SchemaProjectorException.class, () -> SchemaProjector.project(ENUM_SCHEMA_B, value, ENUM_SCHEMA_A));

// No exception on removal of enum symbol
SchemaProjector.project(ENUM_SCHEMA_A2, value, ENUM_SCHEMA_A);
}

@Test
public void testCheckMaybeCompatibleWithProtobufEnumSchema() {
String value = "RED";

// Exception on addition of enum symbol
assertThrows(SchemaProjectorException.class, () -> SchemaProjector.project(ENUM_SCHEMA_D, value, ENUM_SCHEMA_C));

// No exception on removal of enum symbol
SchemaProjector.project(ENUM_SCHEMA_C2, value, ENUM_SCHEMA_C);
}
}
Loading