From 6c4642728ed3dc307884f6d51834c7cf5d5c574a Mon Sep 17 00:00:00 2001 From: dcotfr <118921595+dcotfr@users.noreply.github.com> Date: Fri, 6 Jan 2023 15:36:48 +0100 Subject: [PATCH] Fix HeaderValueMatches & dependencies upgrades (#4) * Fix HeaderValueMatches, move to Java 11, upgrade lib dependencies, refresh TimestampMicrosConverter implementation * Upgrade Java version * Upgrade Java version --- .github/workflows/on_push_master.yml | 2 +- .github/workflows/on_push_tag.yml | 2 +- build.gradle | 12 +- settings.gradle | 2 +- .../providers/AES256ConfigProvider.java | 58 +-- .../transforms/TimestampMicrosConverter.java | 416 ++++++++++-------- .../predicates/HeaderValueMatches.java | 62 ++- .../providers/AES256ConfigProviderTest.java | 121 ++--- .../transforms/TimestampConverterTest.java | 50 +-- .../predicates/HeaderValueMatchesTest.java | 110 ++--- 10 files changed, 441 insertions(+), 394 deletions(-) diff --git a/.github/workflows/on_push_master.yml b/.github/workflows/on_push_master.yml index 488f2cc..0ccc9de 100644 --- a/.github/workflows/on_push_master.yml +++ b/.github/workflows/on_push_master.yml @@ -15,7 +15,7 @@ jobs: - uses: actions/setup-java@v2 with: distribution: 'adopt' - java-version: '8' + java-version: '11' - name: Cache Gradle packages uses: actions/cache@v1 with: diff --git a/.github/workflows/on_push_tag.yml b/.github/workflows/on_push_tag.yml index 4840961..ed409cf 100644 --- a/.github/workflows/on_push_tag.yml +++ b/.github/workflows/on_push_tag.yml @@ -13,7 +13,7 @@ jobs: - uses: actions/setup-java@v2 with: distribution: 'adopt' - java-version: '8' + java-version: '11' - name: Cache Gradle packages uses: actions/cache@v1 with: diff --git a/build.gradle b/build.gradle index 9a9778d..3f75a4f 100644 --- a/build.gradle +++ b/build.gradle @@ -18,16 +18,16 @@ repositories { dependencies { - implementation 'org.apache.kafka:connect-api:2.8.0' - implementation 'org.apache.kafka:connect-transforms:2.8.0' - implementation 'org.apache.kafka:kafka_2.13:2.8.0' + implementation 'org.apache.kafka:connect-api:3.1.2' + implementation 'org.apache.kafka:connect-transforms:3.1.2' + implementation 'org.apache.kafka:kafka_2.13:3.1.2' // Use JUnit Jupiter for testing. - testImplementation 'org.junit.jupiter:junit-jupiter:5.8.2' + testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' } java { - sourceCompatibility = '1.8' - targetCompatibility = '1.8' + sourceCompatibility = '11' + targetCompatibility = '11' } test { // Use JUnit Platform for unit tests. diff --git a/settings.gradle b/settings.gradle index 287022f..503bcb4 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,7 +13,7 @@ buildscript { } } dependencies { - classpath 'io.alcide:gradle-semantic-build-versioning:4.2.1' + classpath 'io.alcide:gradle-semantic-build-versioning:4.2.2' } } diff --git a/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java b/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java index 9914c10..4043cf5 100644 --- a/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java +++ b/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java @@ -9,31 +9,38 @@ import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.types.Password; +import javax.crypto.BadPaddingException; import javax.crypto.Cipher; -import javax.crypto.SecretKey; +import javax.crypto.IllegalBlockSizeException; import javax.crypto.SecretKeyFactory; import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.PBEKeySpec; import javax.crypto.spec.SecretKeySpec; -import java.security.spec.KeySpec; +import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashMap; import java.util.Map; import java.util.Set; +/** + * AES256 encrypted Kafka config provider. + * + * @author Michelin + */ public class AES256ConfigProvider implements ConfigProvider { - public static final String OVERVIEW_DOC = "A ConfigProvider to decode values encoded with AES256 key."; private static final String AES_KEY_CONFIG = "key"; private static final String SALT_CONFIG = "salt"; + + /** + * Definition of accepted parameters: key and salt. + */ public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(AES_KEY_CONFIG, ConfigDef.Type.PASSWORD, ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.NonNullValidator(), ConfigDef.Importance.HIGH, - "The AES256 key.") + new ConfigDef.NonNullValidator(), ConfigDef.Importance.HIGH, "The AES256 key.") .define(SALT_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, - "The AES256 salt."); + new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, "The AES256 salt."); /** * Represents the aes256 key @@ -46,26 +53,25 @@ public class AES256ConfigProvider implements ConfigProvider { private String salt; @Override - public void configure(Map configs) { - Map parsedConfigs = CONFIG_DEF.parse(configs); + public void configure(final Map pConfigs) { + final var parsedConfigs = CONFIG_DEF.parse(pConfigs); this.aesKey = (Password) parsedConfigs.get(AES_KEY_CONFIG); - this.salt = parsedConfigs.get(SALT_CONFIG).toString(); + this.salt = parsedConfigs.get(SALT_CONFIG).toString().trim(); } @Override - public ConfigData get(String path) { + public ConfigData get(final String pPath) { return new ConfigData(new HashMap<>()); } @Override - public ConfigData get(String path, Set keys) { - Map decoded = new HashMap<>(); - - final Cipher cipher = this.getCipher(); - keys.forEach(key -> { + public ConfigData get(final String pPath, final Set pKeys) { + final var decoded = new HashMap(); + final var cipher = this.getCipher(); + pKeys.forEach(key -> { try { - decoded.put(key, new String(cipher.doFinal(Base64.getDecoder().decode(key)))); - } catch (Exception e) { + decoded.put(key, new String(cipher.doFinal(Base64.getDecoder().decode(key)), StandardCharsets.UTF_8)); + } catch (IllegalArgumentException | IllegalBlockSizeException | BadPaddingException e) { throw new ConfigException("Error while decrypting " + key, e); } }); @@ -85,20 +91,18 @@ public void close() { */ private Cipher getCipher() { try { - byte[] iv = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - final IvParameterSpec ivspec = new IvParameterSpec(iv); + final var ivspec = new IvParameterSpec(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}); - final SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256"); - final KeySpec spec = new PBEKeySpec(this.aesKey.value().toCharArray(), this.salt.getBytes(), 65536, 256); - final SecretKey tmp = factory.generateSecret(spec); - final SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES"); + final var factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256"); + final var spec = new PBEKeySpec(this.aesKey.value().toCharArray(), this.salt.getBytes(StandardCharsets.UTF_8), 65536, 256); + final var tmp = factory.generateSecret(spec); + final var secretKey = new SecretKeySpec(tmp.getEncoded(), "AES"); - final Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING"); + final var cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING"); cipher.init(Cipher.DECRYPT_MODE, secretKey, ivspec); return cipher; - } catch (Exception e) { + } catch (final Exception e) { throw new ConfigException("Error during Cipher initialization", e); } } - } diff --git a/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java b/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java index 4bc75b4..13fd098 100644 --- a/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java +++ b/src/main/java/com/michelin/kafka/connect/transforms/TimestampMicrosConverter.java @@ -18,26 +18,26 @@ import java.text.SimpleDateFormat; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.Arrays; -import java.util.Calendar; import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; +import java.util.*; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; -// Reuse of https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java +/** + * Fork from https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java + * to support timestamps microseconds by default. + * + * @param Type of he record. + * @author Michelin + */ public abstract class TimestampMicrosConverter> implements Transformation { - public static final String OVERVIEW_DOC = "Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types." - + "Applies to individual fields or to the entire value." - + "

Use the concrete transformation type designed for the record key (" + TimestampMicrosConverter.Key.class.getName() + ") " - + "or value (" + TimestampMicrosConverter.Value.class.getName() + ")."; + + "Applies to individual fields or to the entire value." + + "

Use the concrete transformation type designed for the record key (" + TimestampMicrosConverter.Key.class.getName() + ") " + + "or value (" + TimestampMicrosConverter.Value.class.getName() + ")."; public static final String FIELD_CONFIG = "field"; private static final String FIELD_DEFAULT = ""; @@ -47,22 +47,21 @@ public abstract class TimestampMicrosConverter> imple public static final String FORMAT_CONFIG = "format"; private static final String FORMAT_DEFAULT = ""; - public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH, - "The field containing the timestamp, or empty if the entire value is a timestamp") - .define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, - "The desired timestamp representation: string, unix, Date, Time, or Timestamp") - .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM, - "A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string " - + "or used to parse the input if the input is a string."); + public static final String UNIX_PRECISION_CONFIG = "unix.precision"; + private static final String UNIX_PRECISION_DEFAULT = "milliseconds"; private static final String PURPOSE = "converting timestamp formats"; - private static final String TYPE_STRING = "string"; private static final String TYPE_UNIX = "unix"; private static final String TYPE_DATE = "Date"; private static final String TYPE_TIME = "Time"; private static final String TYPE_TIMESTAMP = "Timestamp"; + + private static final String UNIX_PRECISION_MILLIS = "milliseconds"; + private static final String UNIX_PRECISION_MICROS = "microseconds"; + private static final String UNIX_PRECISION_NANOS = "nanoseconds"; + private static final String UNIX_PRECISION_SECONDS = "seconds"; + private static final Set VALID_TYPES = new HashSet<>(Arrays.asList(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP)); private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); @@ -71,88 +70,125 @@ public abstract class TimestampMicrosConverter> imple public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema(); + /** + * Definition of accepted parameters: field, target.type, format and unix.precision. + */ + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELD_CONFIG, ConfigDef.Type.STRING, FIELD_DEFAULT, ConfigDef.Importance.HIGH, + "The field containing the timestamp, or empty if the entire value is a timestamp") + .define(TARGET_TYPE_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.ValidString.in(TYPE_STRING, TYPE_UNIX, TYPE_DATE, TYPE_TIME, TYPE_TIMESTAMP), + ConfigDef.Importance.HIGH, + "The desired timestamp representation: string, unix, Date, Time, or Timestamp") + .define(FORMAT_CONFIG, ConfigDef.Type.STRING, FORMAT_DEFAULT, ConfigDef.Importance.MEDIUM, + "A SimpleDateFormat-compatible format for the timestamp. " + + "Used to generate the output when type=string or used to parse the input if the input is a string.") + .define(UNIX_PRECISION_CONFIG, ConfigDef.Type.STRING, UNIX_PRECISION_DEFAULT, + ConfigDef.ValidString.in( + UNIX_PRECISION_NANOS, UNIX_PRECISION_MICROS, + UNIX_PRECISION_MILLIS, UNIX_PRECISION_SECONDS), + ConfigDef.Importance.LOW, + "The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, or nanoseconds. " + + "Used to generate the output when type=unix or used to parse the input if the input is a Long." + + "Note: This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components."); + private interface TimestampTranslator { /** * Convert from the type-specific format to the universal java.util.Date format */ - Date toRaw(Config config, Object orig); + Date toRaw(final Config pConfig, final Object pOrig); /** * Get the schema for this format. */ - Schema typeSchema(boolean isOptional); + Schema typeSchema(final boolean pIsOptional); /** * Convert from the universal java.util.Date format to the type-specific format */ - Object toType(Config config, Date orig); + Object toType(final Config pConfig, final Date pOrig); } private static final Map TRANSLATORS = new HashMap<>(); + static { TRANSLATORS.put(TYPE_STRING, new TimestampTranslator() { @Override - public Date toRaw(Config config, Object orig) { - if (!(orig instanceof String)) - throw new DataException("Expected string timestamp to be a String, but found " + orig.getClass()); + public Date toRaw(final Config pConfig, final Object pOrig) { + if (!(pOrig instanceof String)) { + throw new DataException("Expected string timestamp to be a String, but found " + pOrig.getClass()); + } try { - return config.format.parse((String) orig); - } catch (ParseException e) { - throw new DataException("Could not parse timestamp: value (" + orig + ") does not match pattern (" - + config.format.toPattern() + ")", e); + return pConfig.format.parse((String) pOrig); + } catch (final ParseException e) { + throw new DataException("Could not parse timestamp: value (" + pOrig + ") does not match pattern (" + + pConfig.format.toPattern() + ")", e); } } @Override - public Schema typeSchema(boolean isOptional) { - return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; + public Schema typeSchema(final boolean pIsOptional) { + return pIsOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA; } @Override - public String toType(Config config, Date orig) { - synchronized (config.format) { - return config.format.format(orig); + public String toType(final Config pConfig, final Date pOrig) { + synchronized (pConfig.format) { + return pConfig.format.format(pOrig); } } }); TRANSLATORS.put(TYPE_UNIX, new TimestampTranslator() { @Override - public Date toRaw(Config config, Object orig) { - if (!(orig instanceof Long)) - throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass()); - return TimestampMicros.toLogical(TimestampMicros.SCHEMA, (Long) orig); + public Date toRaw(final Config pConfig, final Object pOrig) { + if (!(pOrig instanceof Long)) { + throw new DataException("Expected Unix timestamp to be a Long, but found " + pOrig.getClass()); + } + long unixTime = (Long) pOrig; + switch (pConfig.unixPrecision) { + case UNIX_PRECISION_SECONDS: + return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime)); + case UNIX_PRECISION_MICROS: + return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime)); + case UNIX_PRECISION_NANOS: + return TimestampMicros.toLogical(TimestampMicros.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime)); + case UNIX_PRECISION_MILLIS: + default: + return TimestampMicros.toLogical(TimestampMicros.SCHEMA, unixTime); + } } @Override - public Schema typeSchema(boolean isOptional) { - return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; + public Schema typeSchema(final boolean pIsOptional) { + return pIsOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA; } @Override - public Long toType(Config config, Date orig) { - return TimestampMicros.fromLogical(TimestampMicros.SCHEMA, orig); + public Long toType(final Config pConfig, final Date pOrig) { + return TimestampMicros.fromLogical(TimestampMicros.SCHEMA, pOrig); } }); TRANSLATORS.put(TYPE_DATE, new TimestampTranslator() { @Override - public Date toRaw(Config config, Object orig) { - if (!(orig instanceof Date)) - throw new DataException("Expected Date to be a java.util.Date, but found " + orig.getClass()); + public Date toRaw(final Config pConfig, final Object pOrig) { + if (!(pOrig instanceof Date)) { + throw new DataException("Expected Date to be a java.util.Date, but found " + pOrig.getClass()); + } // Already represented as a java.util.Date and Connect Dates are a subset of valid java.util.Date values - return (Date) orig; + return (Date) pOrig; } @Override - public Schema typeSchema(boolean isOptional) { - return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; + public Schema typeSchema(final boolean pIsOptional) { + return pIsOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA; } @Override - public Date toType(Config config, Date orig) { - Calendar result = Calendar.getInstance(UTC); - result.setTime(orig); + public Date toType(final Config pConfig, final Date pOrig) { + final var result = Calendar.getInstance(UTC); + result.setTime(pOrig); result.set(Calendar.HOUR_OF_DAY, 0); result.set(Calendar.MINUTE, 0); result.set(Calendar.SECOND, 0); @@ -163,22 +199,23 @@ public Date toType(Config config, Date orig) { TRANSLATORS.put(TYPE_TIME, new TimestampTranslator() { @Override - public Date toRaw(Config config, Object orig) { - if (!(orig instanceof Date)) - throw new DataException("Expected Time to be a java.util.Date, but found " + orig.getClass()); + public Date toRaw(final Config pConfig, final Object pOrig) { + if (!(pOrig instanceof Date)) { + throw new DataException("Expected Time to be a java.util.Date, but found " + pOrig.getClass()); + } // Already represented as a java.util.Date and Connect Times are a subset of valid java.util.Date values - return (Date) orig; + return (Date) pOrig; } @Override - public Schema typeSchema(boolean isOptional) { - return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; + public Schema typeSchema(final boolean pIsOptional) { + return pIsOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA; } @Override - public Date toType(Config config, Date orig) { - Calendar origCalendar = Calendar.getInstance(UTC); - origCalendar.setTime(orig); + public Date toType(final Config pConfig, final Date pOrig) { + final var origCalendar = Calendar.getInstance(UTC); + origCalendar.setTime(pOrig); Calendar result = Calendar.getInstance(UTC); result.setTimeInMillis(0L); result.set(Calendar.HOUR_OF_DAY, origCalendar.get(Calendar.HOUR_OF_DAY)); @@ -191,51 +228,58 @@ public Date toType(Config config, Date orig) { TRANSLATORS.put(TYPE_TIMESTAMP, new TimestampTranslator() { @Override - public Date toRaw(Config config, Object orig) { - if (!(orig instanceof Date)) - throw new DataException("Expected Timestamp to be a java.util.Date, but found " + orig.getClass()); - return (Date) orig; + public Date toRaw(final Config pConfig, final Object pOrig) { + if (!(pOrig instanceof Date)) { + throw new DataException("Expected Timestamp to be a java.util.Date, but found " + pOrig.getClass()); + } + return (Date) pOrig; } @Override - public Schema typeSchema(boolean isOptional) { - return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; + public Schema typeSchema(final boolean pIsOptional) { + return pIsOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA; } @Override - public Date toType(Config config, Date orig) { - return orig; + public Date toType(final Config pConfig, final Date pOrig) { + return pOrig; } }); } - // This is a bit unusual, but allows the transformation config to be passed to static anonymous classes to customize - // their behavior - private static class Config { - Config(String field, String type, SimpleDateFormat format) { - this.field = field; - this.type = type; - this.format = format; + /** + * This is a bit unusual, but allows the transformation config to be passed to static anonymous classes to customize + * their behavior + */ + private static final class Config { + private final String field; + private final String type; + private final SimpleDateFormat format; + private final String unixPrecision; + + private Config(final String pField, final String pType, final SimpleDateFormat pFormat, final String pUnixPrecision) { + field = pField; + type = pType; + format = pFormat; + unixPrecision = pUnixPrecision; } - String field; - String type; - SimpleDateFormat format; } + private Config config; private Cache schemaUpdateCache; - @Override - public void configure(Map configs) { - final SimpleConfig simpleConfig = new SimpleConfig(CONFIG_DEF, configs); - final String field = simpleConfig.getString(FIELD_CONFIG); - final String type = simpleConfig.getString(TARGET_TYPE_CONFIG); - String formatPattern = simpleConfig.getString(FORMAT_CONFIG); + public void configure(final Map pConfigs) { + final var simpleConfig = new SimpleConfig(CONFIG_DEF, pConfigs); + final var field = simpleConfig.getString(FIELD_CONFIG); + final var type = simpleConfig.getString(TARGET_TYPE_CONFIG); + final var formatPattern = simpleConfig.getString(FORMAT_CONFIG); + final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG); schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16)); if (!VALID_TYPES.contains(type)) { throw new ConfigException("Unknown timestamp type in TimestampConverter: " + type + ". Valid values are " - + Utils.join(VALID_TYPES, ", ") + "."); + + Utils.join(VALID_TYPES, ", ") + "."); } if (type.equals(TYPE_STRING) && formatPattern.trim().isEmpty()) { throw new ConfigException("TimestampConverter requires format option to be specified when using string timestamps"); @@ -247,21 +291,25 @@ public void configure(Map configs) { format.setTimeZone(UTC); } catch (IllegalArgumentException e) { throw new ConfigException("TimestampConverter requires a SimpleDateFormat-compatible pattern for string timestamps: " - + formatPattern, e); + + formatPattern, e); } } - config = new Config(field, type, format); + config = new Config(field, type, format, unixPrecision); } @Override - public R apply(R record) { - if (operatingSchema(record) == null) { - return applySchemaless(record); - } else { - return applyWithSchema(record); + public R apply(final R pRecord) { + if (operatingSchema(pRecord) == null) { + return applySchemaless(pRecord); } + return applyWithSchema(pRecord); } + /** + * Returns the definition of configuration parameters accepted by this plugin. + * + * @return Configuration definition. + */ @Override public ConfigDef config() { return CONFIG_DEF; @@ -269,177 +317,178 @@ public ConfigDef config() { @Override public void close() { + // Nothing to clean } public static class Key> extends TimestampMicrosConverter { @Override - protected Schema operatingSchema(R record) { - return record.keySchema(); + protected Schema operatingSchema(final R pRecord) { + return pRecord.keySchema(); } @Override - protected Object operatingValue(R record) { - return record.key(); + protected Object operatingValue(final R pRecord) { + return pRecord.key(); } @Override - protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { - return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + protected R newRecord(final R pRecord, final Schema pUpdatedSchema, final Object pUpdatedValue) { + return pRecord.newRecord(pRecord.topic(), pRecord.kafkaPartition(), pUpdatedSchema, pUpdatedValue, pRecord.valueSchema(), pRecord.value(), pRecord.timestamp()); } } public static class Value> extends TimestampMicrosConverter { @Override - protected Schema operatingSchema(R record) { - return record.valueSchema(); + protected Schema operatingSchema(final R pRecord) { + return pRecord.valueSchema(); } @Override - protected Object operatingValue(R record) { - return record.value(); + protected Object operatingValue(final R pRecord) { + return pRecord.value(); } @Override - protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { - return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); + protected R newRecord(final R pRecord, final Schema pUpdatedSchema, final Object pUpdatedValue) { + return pRecord.newRecord(pRecord.topic(), pRecord.kafkaPartition(), pRecord.keySchema(), pRecord.key(), pUpdatedSchema, pUpdatedValue, pRecord.timestamp()); } } - protected abstract Schema operatingSchema(R record); + protected abstract Schema operatingSchema(final R pRecord); - protected abstract Object operatingValue(R record); + protected abstract Object operatingValue(final R pRecord); - protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); + protected abstract R newRecord(final R pRecord, final Schema pUpdatedSchema, final Object pUpdatedValue); - private R applyWithSchema(R record) { - final Schema schema = operatingSchema(record); + private R applyWithSchema(final R pRecord) { + final var schema = operatingSchema(pRecord); if (config.field.isEmpty()) { - Object value = operatingValue(record); + final var value = operatingValue(pRecord); // New schema is determined by the requested target timestamp type - Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(schema.isOptional()); - return newRecord(record, updatedSchema, convertTimestamp(value, timestampTypeFromSchema(schema))); - } else { - final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); - Schema updatedSchema = schemaUpdateCache.get(schema); - if (updatedSchema == null) { - SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); - for (Field field : schema.fields()) { - if (field.name().equals(config.field)) { - builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional())); - } else { - builder.field(field.name(), field.schema()); - } - } - if (schema.isOptional()) - builder.optional(); - if (schema.defaultValue() != null) { - Struct updatedDefaultValue = applyValueWithSchema((Struct) schema.defaultValue(), builder); - builder.defaultValue(updatedDefaultValue); - } + final var updatedSchema = TRANSLATORS.get(config.type).typeSchema(schema.isOptional()); + return newRecord(pRecord, updatedSchema, convertTimestamp(value, timestampTypeFromSchema(schema))); + } - updatedSchema = builder.build(); - schemaUpdateCache.put(schema, updatedSchema); + final var value = requireStructOrNull(operatingValue(pRecord), PURPOSE); + var updatedSchema = schemaUpdateCache.get(schema); + if (updatedSchema == null) { + final var builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + for (final Field field : schema.fields()) { + if (field.name().equals(config.field)) { + builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional())); + } else { + builder.field(field.name(), field.schema()); + } + } + if (schema.isOptional()) { + builder.optional(); + } + if (schema.defaultValue() != null) { + final var updatedDefaultValue = applyValueWithSchema((Struct) schema.defaultValue(), builder); + builder.defaultValue(updatedDefaultValue); } - Struct updatedValue = applyValueWithSchema(value, updatedSchema); - return newRecord(record, updatedSchema, updatedValue); + updatedSchema = builder.build(); + schemaUpdateCache.put(schema, updatedSchema); } + + final var updatedValue = applyValueWithSchema(value, updatedSchema); + return newRecord(pRecord, updatedSchema, updatedValue); } - private Struct applyValueWithSchema(Struct value, Schema updatedSchema) { - if (value == null) { + private Struct applyValueWithSchema(final Struct pValue, final Schema pUpdatedSchema) { + if (pValue == null) { return null; } - Struct updatedValue = new Struct(updatedSchema); - for (Field field : value.schema().fields()) { + + final var updatedValue = new Struct(pUpdatedSchema); + for (final Field field : pValue.schema().fields()) { final Object updatedFieldValue; if (field.name().equals(config.field)) { - updatedFieldValue = convertTimestamp(value.get(field), timestampTypeFromSchema(field.schema())); + updatedFieldValue = convertTimestamp(pValue.get(field), timestampTypeFromSchema(field.schema())); } else { - updatedFieldValue = value.get(field); + updatedFieldValue = pValue.get(field); } updatedValue.put(field.name(), updatedFieldValue); } return updatedValue; } - private R applySchemaless(R record) { - Object rawValue = operatingValue(record); + private R applySchemaless(final R pRecord) { + final var rawValue = operatingValue(pRecord); if (rawValue == null || config.field.isEmpty()) { - return newRecord(record, null, convertTimestamp(rawValue)); - } else { - final Map value = requireMap(rawValue, PURPOSE); - final HashMap updatedValue = new HashMap<>(value); - updatedValue.put(config.field, convertTimestamp(value.get(config.field))); - return newRecord(record, null, updatedValue); + return newRecord(pRecord, null, convertTimestamp(rawValue)); } + + final var value = requireMap(rawValue, PURPOSE); + final var updatedValue = new HashMap<>(value); + updatedValue.put(config.field, convertTimestamp(value.get(config.field))); + return newRecord(pRecord, null, updatedValue); } /** * Determine the type/format of the timestamp based on the schema */ - private String timestampTypeFromSchema(Schema schema) { - if (Timestamp.LOGICAL_NAME.equals(schema.name())) { + private static String timestampTypeFromSchema(final Schema pSchema) { + if (Timestamp.LOGICAL_NAME.equals(pSchema.name())) { return TYPE_TIMESTAMP; - } else if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(schema.name())) { + } else if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(pSchema.name())) { return TYPE_DATE; - } else if (Time.LOGICAL_NAME.equals(schema.name())) { + } else if (Time.LOGICAL_NAME.equals(pSchema.name())) { return TYPE_TIME; - } else if (schema.type().equals(Schema.Type.STRING)) { + } else if (pSchema.type() == Schema.Type.STRING) { // If not otherwise specified, string == user-specified string format for timestamps return TYPE_STRING; - } else if (schema.type().equals(Schema.Type.INT64)) { + } else if (pSchema.type() == Schema.Type.INT64) { // If not otherwise specified, long == unix time return TYPE_UNIX; } - throw new ConnectException("Schema " + schema + " does not correspond to a known timestamp type format"); + throw new ConnectException("Schema " + pSchema + " does not correspond to a known timestamp type format"); } /** * Infer the type/format of the timestamp based on the raw Java type */ - private String inferTimestampType(Object timestamp) { + private static String inferTimestampType(final Object pTimestamp) { // Note that we can't infer all types, e.g. Date/Time/Timestamp all have the same runtime representation as a // java.util.Date - if (timestamp instanceof Date) { + if (pTimestamp instanceof Date) { return TYPE_TIMESTAMP; - } else if (timestamp instanceof Long) { + } else if (pTimestamp instanceof Long) { return TYPE_UNIX; - } else if (timestamp instanceof String) { + } else if (pTimestamp instanceof String) { return TYPE_STRING; } - throw new DataException("TimestampConverter does not support " + timestamp.getClass() + " objects as timestamps"); + throw new DataException("TimestampConverter does not support " + pTimestamp.getClass() + " objects as timestamps"); } /** * Convert the given timestamp to the target timestamp format. - * @param timestamp the input timestamp, may be null - * @param timestampFormat the format of the timestamp, or null if the format should be inferred + * + * @param pTimestamp the input timestamp, may be null + * @param pTimestampFormat the format of the timestamp, or null if the format should be inferred * @return the converted timestamp */ - private Object convertTimestamp(Object timestamp, String timestampFormat) { - if (timestamp == null) { + private Object convertTimestamp(final Object pTimestamp, final String pTimestampFormat) { + if (pTimestamp == null) { return null; } - if (timestampFormat == null) { - timestampFormat = inferTimestampType(timestamp); - } - - TimestampTranslator sourceTranslator = TRANSLATORS.get(timestampFormat); + final var timestampFormat = pTimestampFormat == null ? inferTimestampType(pTimestamp) : pTimestampFormat; + final var sourceTranslator = TRANSLATORS.get(timestampFormat); if (sourceTranslator == null) { throw new ConnectException("Unsupported timestamp type: " + timestampFormat); } - Date rawTimestamp = sourceTranslator.toRaw(config, timestamp); + final var rawTimestamp = sourceTranslator.toRaw(config, pTimestamp); - TimestampTranslator targetTranslator = TRANSLATORS.get(config.type); + final var targetTranslator = TRANSLATORS.get(config.type); if (targetTranslator == null) { throw new ConnectException("Unsupported timestamp type: " + config.type); } return targetTranslator.toType(config, rawTimestamp); } - private Object convertTimestamp(Object timestamp) { - return convertTimestamp(timestamp, null); + private Object convertTimestamp(final Object pTimestamp) { + return convertTimestamp(pTimestamp, null); } public static class TimestampMicros { @@ -447,26 +496,25 @@ public static class TimestampMicros { public static final Schema SCHEMA = builder().schema(); public TimestampMicros() { + // No specific } public static SchemaBuilder builder() { return SchemaBuilder.int64().name(LOGICAL_NAME).version(1); } - public static long fromLogical(Schema schema, java.util.Date value) { - if (!LOGICAL_NAME.equals(schema.name())) { + public static long fromLogical(final Schema pSchema, final java.util.Date pValue) { + if (!LOGICAL_NAME.equals(pSchema.name())) { throw new DataException("Requested conversion of TimestampMicros object but the schema does not match."); - } else { - return ChronoUnit.MILLIS.between(Instant.EPOCH, value.toInstant()); } + return ChronoUnit.MILLIS.between(Instant.EPOCH, pValue.toInstant()); } - public static java.util.Date toLogical(Schema schema, long value) { - if (!LOGICAL_NAME.equals(schema.name())) { + public static java.util.Date toLogical(final Schema pSchema, final long pValue) { + if (!LOGICAL_NAME.equals(pSchema.name())) { throw new DataException("Requested conversion of TimestampMicros object but the schema does not match."); - } else { - return Date.from(Instant.EPOCH.plus(value, ChronoUnit.MICROS)); } + return Date.from(Instant.EPOCH.plus(pValue, ChronoUnit.MICROS)); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java b/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java index 592a4cf..3e2da83 100644 --- a/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java +++ b/src/main/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatches.java @@ -2,46 +2,56 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.transforms.predicates.Predicate; import org.apache.kafka.connect.transforms.util.RegexValidator; import org.apache.kafka.connect.transforms.util.SimpleConfig; -import java.util.Iterator; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.regex.Pattern; +/** + * Kafka Connect custom header value filter. + * + * @param Type of Kafka record. + * @author Michelin + */ public class HeaderValueMatches> implements Predicate { + public static final String OVERVIEW_DOC = "A predicate which is true for records with a header's value that matches the configured regular expression."; private static final String HEADER_CONFIG = "header.name"; private static final String PATTERN_CONFIG = "pattern"; private static final String MISSING_HEADER_CONFIG = "missing.header.behavior"; - public static final String OVERVIEW_DOC = "A predicate which is true for records with a header's value that matches the configured regular expression."; - + /** + * Definition of accepted parameters: header.name, pattern and missing.header.behavior. + */ public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(HEADER_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, - "The header name.") + new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "The header name.") .define(PATTERN_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.CompositeValidator.of(new ConfigDef.NonEmptyString(), new RegexValidator()), - ConfigDef.Importance.MEDIUM, - "A Java regular expression for matching against the value of a record's header.") - .define(MISSING_HEADER_CONFIG, ConfigDef.Type.BOOLEAN, "false", ConfigDef.Importance.LOW, - "Predicate behavior when header is missing [true/false]. Default to false"); + ConfigDef.Importance.MEDIUM, "A Java regular expression for matching against the value of a record's header.") + .define(MISSING_HEADER_CONFIG, ConfigDef.Type.BOOLEAN, "false", + ConfigDef.Importance.LOW, "Predicate behavior when header is missing [true/false]. Default to false"); private String headerName; private Pattern pattern; private boolean missingHeaderBehavior; + /** + * Returns the definition of configuration parameters accepted by this plugin. + * + * @return Configuration definition. + */ @Override public ConfigDef config() { return CONFIG_DEF; } @Override - public void configure(Map configs) { - final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + public void configure(final Map pConfigs) { + final var config = new SimpleConfig(CONFIG_DEF, pConfigs); this.headerName = config.getString(HEADER_CONFIG); this.pattern = Pattern.compile(config.getString(PATTERN_CONFIG)); this.missingHeaderBehavior = config.getBoolean(MISSING_HEADER_CONFIG); @@ -49,16 +59,27 @@ public void configure(Map configs) { @Override - public boolean test(R record) { - Iterator

headerIterator = record.headers().allWithName(headerName); + public boolean test(final R pRecord) { + final var headerIterator = pRecord.headers().allWithName(headerName); // Header not found if (headerIterator == null || !headerIterator.hasNext()) { return missingHeaderBehavior; } // Loop over headers (multiple with same name allowed) while (headerIterator.hasNext()) { - Header header = headerIterator.next(); - if (header.value() != null && pattern.matcher(header.value().toString()).matches()) { + final var header = headerIterator.next(); + final Object valueAsObject = header.value(); + final String valueAsString; + if (valueAsObject != null) { + if (valueAsObject instanceof byte[]) { + valueAsString = new String((byte[]) valueAsObject, StandardCharsets.UTF_8); + } else { + valueAsString = valueAsObject.toString(); + } + } else { + valueAsString = null; + } + if (valueAsString != null && pattern.matcher(valueAsString).matches()) { return true; } } @@ -68,15 +89,14 @@ public boolean test(R record) { @Override public void close() { - + // Nothing to clean } @Override public String toString() { return "HasHeader{" + - "headerName='" + headerName + "'," + - "pattern='" + pattern + "'," + - "missingHeaderBehavior='" + missingHeaderBehavior + "'" + - '}'; + "headerName='" + headerName + "'," + + "pattern='" + pattern + "'," + + "missingHeaderBehavior='" + missingHeaderBehavior + "'}"; } } diff --git a/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java b/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java index 66f25cd..8dad41f 100644 --- a/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java +++ b/src/test/java/com/michelin/kafka/config/providers/AES256ConfigProviderTest.java @@ -6,107 +6,78 @@ import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; -import javax.crypto.Cipher; -import javax.crypto.SecretKey; -import javax.crypto.SecretKeyFactory; -import javax.crypto.spec.IvParameterSpec; -import javax.crypto.spec.PBEKeySpec; -import javax.crypto.spec.SecretKeySpec; -import java.nio.charset.StandardCharsets; -import java.security.spec.KeySpec; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; class AES256ConfigProviderTest { @Test void DecryptFailure_NotBase64() { - AES256ConfigProvider configProvider = new AES256ConfigProvider(); - - Map configs = new HashMap<>(); - configs.put("key", "key-aaaabbbbccccdddd"); - configs.put("salt", "salt-aaaabbbbccccdddd"); - configProvider.configure(configs); - - Set wrongKey = new HashSet<>(); - wrongKey.add("does_not_match"); // secret can't be decoded - assertThrows(ConfigException.class, () -> configProvider.get("", wrongKey)); + try (final var configProvider = new AES256ConfigProvider()) { + final var configs = new HashMap(); + configs.put("key", "key-aaaabbbbccccdddd"); + configs.put("salt", "salt-aaaabbbbccccdddd"); + configProvider.configure(configs); + + final var wrongKey = new HashSet(); + wrongKey.add("does_not_match"); // secret can't be decoded + assertThrows(ConfigException.class, () -> configProvider.get("", wrongKey)); + } } @Test void DecryptFailure_InvalidKey() { - AES256ConfigProvider configProvider = new AES256ConfigProvider(); - - Map configs = new HashMap<>(); - configs.put("key", "key-aaaabbbbccccdddd"); - configs.put("salt", "salt-aaaabbbbccccdddd"); - configProvider.configure(configs); - - Set wrongKey = new HashSet<>(); - wrongKey.add("mfw43l96122yZiDhu2RevQ=="); // secret can't be decoded - assertThrows(ConfigException.class, () -> configProvider.get("", wrongKey)); + try (final var configProvider = new AES256ConfigProvider()) { + final var configs = new HashMap(); + configs.put("key", "key-aaaabbbbccccdddd"); + configs.put("salt", "salt-aaaabbbbccccdddd"); + configProvider.configure(configs); + + final var wrongKey = new HashSet(); + wrongKey.add("mfw43l96122yZiDhu2RevQ=="); // secret can't be decoded + assertThrows(ConfigException.class, () -> configProvider.get("", wrongKey)); + } } @Test void DecryptSuccess() { - String originalPassword = "hello !"; - String encodedPassword = "hgkWF2Gp3qPxcPnVifDgJA=="; - - AES256ConfigProvider configProvider = new AES256ConfigProvider(); - - Map configs = new HashMap<>(); - configs.put("key", "key-aaaabbbbccccdddd"); - configs.put("salt", "salt-aaaabbbbccccdddd"); - configProvider.configure(configs); + final var originalPassword = "hello !"; + final var encodedPassword = "hgkWF2Gp3qPxcPnVifDgJA=="; + try (final var configProvider = new AES256ConfigProvider()) { + final var configs = new HashMap(); + configs.put("key", "key-aaaabbbbccccdddd"); + configs.put("salt", "salt-aaaabbbbccccdddd"); + configProvider.configure(configs); - // String encoded = AES256Helper.encrypt("aaaabbbbccccdddd",AES256ConfigProvider.DEFAULT_SALT, originalPassword); - // System.out.println(encoded); + // String encoded = AES256Helper.encrypt("aaaabbbbccccdddd",AES256ConfigProvider.DEFAULT_SALT, originalPassword); + // System.out.println(encoded); - Set rightKeys = new HashSet<>(); - rightKeys.add(encodedPassword); + final var rightKeys = new HashSet(); + rightKeys.add(encodedPassword); - assertEquals(originalPassword, configProvider.get("", rightKeys).data().get(encodedPassword)); + assertEquals(originalPassword, configProvider.get("", rightKeys).data().get(encodedPassword)); + } } @Test void MissingConfig_key() { - AES256ConfigProvider configProvider = new AES256ConfigProvider(); - - Map configs = new HashMap<>(); - configs.put("salt", "salt-aaaabbbbccccdddd"); + try (final var configProvider = new AES256ConfigProvider()) { + final var configs = new HashMap(); + configs.put("salt", "salt-aaaabbbbccccdddd"); - assertThrows(ConfigException.class, () -> configProvider.configure(configs)); + assertThrows(ConfigException.class, () -> configProvider.configure(configs)); + } } @Test void MissingConfig_salt() { - AES256ConfigProvider configProvider = new AES256ConfigProvider(); - - Map configs = new HashMap<>(); - configs.put("key", "key-aaaabbbbccccdddd"); - - assertThrows(ConfigException.class, () -> configProvider.configure(configs)); - } - - static class AES256Helper { - public static String encrypt(String key, String salt, String strToEncrypt) { - try { - byte[] iv = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - IvParameterSpec ivspec = new IvParameterSpec(iv); - - SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256"); - KeySpec spec = new PBEKeySpec(key.toCharArray(), salt.getBytes(), 65536, 256); - SecretKey tmp = factory.generateSecret(spec); - SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES"); + try (final var configProvider = new AES256ConfigProvider()) { + final var configs = new HashMap(); + configs.put("key", "key-aaaabbbbccccdddd"); - Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding"); - cipher.init(Cipher.ENCRYPT_MODE, secretKey, ivspec); - return Base64.getEncoder() - .encodeToString(cipher.doFinal(strToEncrypt.getBytes(StandardCharsets.UTF_8))); - } catch (Exception e) { - System.out.println("Error while encrypting: " + e.toString()); - } - return null; + assertThrows(ConfigException.class, () -> configProvider.configure(configs)); } } } diff --git a/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java b/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java index 092bff2..4852eb9 100644 --- a/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java +++ b/src/test/java/com/michelin/kafka/connect/transforms/TimestampConverterTest.java @@ -1,10 +1,8 @@ package com.michelin.kafka.connect.transforms; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.transforms.TimestampConverter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -16,85 +14,85 @@ import java.util.Map; import java.util.TimeZone; -public class TimestampConverterTest { +class TimestampConverterTest { private final TimestampMicrosConverter.Value smt = new TimestampMicrosConverter.Value<>(); @Test - public void shouldConvertTimestampMicroSecondsWithSchema() { - Map properties = new HashMap<>(); + void shouldConvertTimestampMicroSecondsWithSchema() { + final var properties = new HashMap(); properties.put("field", "timestamp"); properties.put("target.type", "unix"); smt.configure(properties); - Schema schemaWithTimestampMicros = SchemaBuilder.struct() + final var schemaWithTimestampMicros = SchemaBuilder.struct() .field("id", SchemaBuilder.int8()) .field("timestamp", SchemaBuilder.int64().parameter("type", "long").parameter("logicalType", "timestamp-micros")) .optional() .build(); - Instant now = Instant.now(); + final var now = Instant.now(); - Struct recordWithTimestampMicros = new Struct(schemaWithTimestampMicros) + final var recordWithTimestampMicros = new Struct(schemaWithTimestampMicros) .put("id", (byte) 1) .put("timestamp", ChronoUnit.MICROS.between(Instant.EPOCH, now)); - SourceRecord record = new SourceRecord( + final var record = new SourceRecord( null, null, "test", 0, schemaWithTimestampMicros, recordWithTimestampMicros ); - SourceRecord transformedRecord = smt.apply(record); - Assertions.assertEquals(ChronoUnit.MILLIS.between(Instant.EPOCH, now), ((Struct)transformedRecord.value()).get("timestamp")); + final var transformedRecord = smt.apply(record); + Assertions.assertEquals(ChronoUnit.MILLIS.between(Instant.EPOCH, now), ((Struct) transformedRecord.value()).get("timestamp")); } @Test - public void shouldConvertTimestampToStringMicroSecondsWithSchema() { - String format = "yyyy-MM-dd HH:mm:ss.SSS"; - Map properties = new HashMap<>(); + void shouldConvertTimestampToStringMicroSecondsWithSchema() { + final var format = "yyyy-MM-dd HH:mm:ss.SSS"; + final var properties = new HashMap(); properties.put("field", "timestamp"); properties.put("target.type", "string"); properties.put("format", format); smt.configure(properties); - Schema schemaWithTimestampMicros = SchemaBuilder.struct() + final var schemaWithTimestampMicros = SchemaBuilder.struct() .field("id", SchemaBuilder.int8()) .field("timestamp", SchemaBuilder.int64().parameter("type", "long").parameter("logicalType", "timestamp-micros")) .optional() .build(); - Instant now = Instant.now(); - SimpleDateFormat formatter = new SimpleDateFormat(format); + final var now = Instant.now(); + final var formatter = new SimpleDateFormat(format); formatter.setTimeZone(TimeZone.getTimeZone("UTC")); - Struct recordWithTimestampMicros = new Struct(schemaWithTimestampMicros) + final var recordWithTimestampMicros = new Struct(schemaWithTimestampMicros) .put("id", (byte) 1) .put("timestamp", ChronoUnit.MICROS.between(Instant.EPOCH, now)); - SourceRecord record = new SourceRecord( + final var record = new SourceRecord( null, null, "test", 0, schemaWithTimestampMicros, recordWithTimestampMicros ); - SourceRecord transformedRecord = smt.apply(record); + final var transformedRecord = smt.apply(record); Assertions.assertEquals(formatter.format(Date.from(now)), ((Struct) transformedRecord.value()).get("timestamp")); } @Test - public void shouldConvertTimestampMicroSecondsWhenNoSchema() { - Map properties = new HashMap<>(); + void shouldConvertTimestampMicroSecondsWhenNoSchema() { + final var properties = new HashMap(); properties.put("field", "timestamp"); properties.put("target.type", "unix"); smt.configure(properties); - Instant now = Instant.now(); + final var now = Instant.now(); - Map dataWithoutSchema = new HashMap<>(); + final var dataWithoutSchema = new HashMap(); dataWithoutSchema.put("id", (byte) 1); dataWithoutSchema.put("timestamp", ChronoUnit.MICROS.between(Instant.EPOCH, now)); - SourceRecord record = new SourceRecord( + final var record = new SourceRecord( null, null, "test", 0, null, dataWithoutSchema ); - SourceRecord transformedRecord = smt.apply(record); + final var transformedRecord = smt.apply(record); Assertions.assertEquals(ChronoUnit.MILLIS.between(Instant.EPOCH, now), ((Map) transformedRecord.value()).get("timestamp")); } } diff --git a/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java b/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java index df5ad08..6d0c277 100644 --- a/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java +++ b/src/test/java/com/michelin/kafka/connect/transforms/predicates/HeaderValueMatchesTest.java @@ -4,121 +4,131 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; import java.util.HashMap; -import java.util.Map; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; -public class HeaderValueMatchesTest { - private HeaderValueMatches predicate = new HeaderValueMatches<>(); +class HeaderValueMatchesTest { + private final HeaderValueMatches predicate = new HeaderValueMatches<>(); @Test - public void ValueMatches() { - final Map props = new HashMap<>(); + void ValueMatches() { + final var props = new HashMap(); props.put("header.name", "my-header"); props.put("pattern", "fixed"); predicate.configure(props); - ConnectHeaders headers = new ConnectHeaders(); + final var headers = new ConnectHeaders(); headers.add("my-header", "fixed", null); headers.add("unrelated-header", null, null); - final SourceRecord record = new SourceRecord(null, null, "test", 0, + final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers); - final boolean result = predicate.test(record); - - assertTrue(result); + assertTrue(predicate.test(record)); } @Test - public void ValueRegexMatches() { - final Map props = new HashMap<>(); + void ValueRegexMatches() { + final var props = new HashMap(); props.put("header.name", "license-plate"); props.put("pattern", "[A-Z]{2}-[0-9]{3}-[A-Z]{2}"); predicate.configure(props); - ConnectHeaders headers = new ConnectHeaders(); + final var headers = new ConnectHeaders(); headers.add("license-plate", "CG-768-AP", null); headers.add("unrelated-header", null, null); - final SourceRecord record = new SourceRecord(null, null, "test", 0, + final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers); - final boolean result = predicate.test(record); + assertTrue(predicate.test(record)); + } + + @Test + void ByteArrayValueRegexMatches() { + final var props = new HashMap(); + + props.put("header.name", "license-plate"); + props.put("pattern", "[A-Z]{2}-[0-9]{3}-[A-Z]{2}"); + + predicate.configure(props); + + final var headers = new ConnectHeaders(); + headers.add("license-plate", "CG-768-AP".getBytes(StandardCharsets.UTF_8), null); + headers.add("unrelated-header", null, null); - assertTrue(result); + final var record = new SourceRecord(null, null, "test", 0, + null, null, null, null, 0L, headers); + + assertTrue(predicate.test(record)); } @Test - public void valueNull() { - final Map props = new HashMap<>(); + void valueNull() { + final var props = new HashMap(); props.put("header.name", "my-header"); props.put("pattern", "fixed"); predicate.configure(props); - ConnectHeaders headers = new ConnectHeaders(); + final var headers = new ConnectHeaders(); headers.add("my-header", null, null); headers.add("unrelated-header", null, null); - final SourceRecord record = new SourceRecord(null, null, "test", 0, + final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers); - final boolean result = predicate.test(record); - - assertFalse(result); + assertFalse(predicate.test(record)); } @Test - public void ValueNotMatching() { - final Map props = new HashMap<>(); + void ValueNotMatching() { + final var props = new HashMap(); props.put("header.name", "my-header"); props.put("pattern", "fixed"); predicate.configure(props); - ConnectHeaders headers = new ConnectHeaders(); + final var headers = new ConnectHeaders(); headers.add("my-header", "OTHER", null); headers.add("unrelated-header", null, null); - final SourceRecord record = new SourceRecord(null, null, "test", 0, + final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers); - final boolean result = predicate.test(record); - - assertFalse(result); + assertFalse(predicate.test(record)); } @Test - public void MissingHeaderDefaultBehavior() { - final Map props = new HashMap<>(); + void MissingHeaderDefaultBehavior() { + final var props = new HashMap(); props.put("header.name", "my-header"); props.put("pattern", "fixed"); predicate.configure(props); - ConnectHeaders headers = new ConnectHeaders(); + final var headers = new ConnectHeaders(); headers.add("other-header", "OTHER", null); headers.add("unrelated-header", null, null); - final SourceRecord record = new SourceRecord(null, null, "test", 0, + final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers); - final boolean result = predicate.test(record); - - assertFalse(result); + assertFalse(predicate.test(record)); } @Test - public void MissingHeaderOverriddenBehavior() { - final Map props = new HashMap<>(); + void MissingHeaderOverriddenBehavior() { + final var props = new HashMap(); props.put("header.name", "my-header"); props.put("pattern", "fixed"); @@ -126,21 +136,19 @@ public void MissingHeaderOverriddenBehavior() { predicate.configure(props); - ConnectHeaders headers = new ConnectHeaders(); + final var headers = new ConnectHeaders(); headers.add("other-header", "OTHER", null); headers.add("unrelated-header", null, null); - final SourceRecord record = new SourceRecord(null, null, "test", 0, + final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers); - final boolean result = predicate.test(record); - - assertTrue(result); + assertTrue(predicate.test(record)); } @Test - public void MultipleHeadersWithMatchingValue() { - final Map props = new HashMap<>(); + void MultipleHeadersWithMatchingValue() { + final var props = new HashMap(); props.put("header.name", "my-header"); props.put("pattern", "fixed"); @@ -148,17 +156,15 @@ public void MultipleHeadersWithMatchingValue() { predicate.configure(props); - ConnectHeaders headers = new ConnectHeaders(); + final var headers = new ConnectHeaders(); headers.add("my-header", "OTHER", null); headers.add("my-header", "DIFFERENT", null); headers.add("my-header", "fixed", null); headers.add("unrelated-header", null, null); - final SourceRecord record = new SourceRecord(null, null, "test", 0, + final var record = new SourceRecord(null, null, "test", 0, null, null, null, null, 0L, headers); - final boolean result = predicate.test(record); - - assertTrue(result); + assertTrue(predicate.test(record)); } }