From 0cd6775d038a3e79f3a39627fc8588c51d5f5adb Mon Sep 17 00:00:00 2001 From: Yannick RAFFIN Date: Tue, 3 Jan 2023 16:54:51 +0100 Subject: [PATCH] Fixed AES256 config provider error when restart of connectors (#3) --- .../providers/AES256ConfigProvider.java | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java b/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java index 5b79ea8..9914c10 100644 --- a/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java +++ b/src/main/java/com/michelin/kafka/config/providers/AES256ConfigProvider.java @@ -9,13 +9,17 @@ import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.types.Password; -import javax.crypto.*; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import javax.crypto.SecretKeyFactory; import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.PBEKeySpec; import javax.crypto.spec.SecretKeySpec; -import java.io.IOException; import java.security.spec.KeySpec; -import java.util.*; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; public class AES256ConfigProvider implements ConfigProvider { @@ -31,29 +35,21 @@ public class AES256ConfigProvider implements ConfigProvider { new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH, "The AES256 salt."); + /** + * Represents the aes256 key + */ + private Password aesKey; - - private Cipher cipher; + /** + * Represents the aes256 salt + */ + private String salt; @Override public void configure(Map configs) { Map parsedConfigs = CONFIG_DEF.parse(configs); - Password aesKey = (Password) parsedConfigs.get(AES_KEY_CONFIG); - String salt = parsedConfigs.get(SALT_CONFIG).toString(); - try { - byte[] iv = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - IvParameterSpec ivspec = new IvParameterSpec(iv); - - SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256"); - KeySpec spec = new PBEKeySpec(aesKey.value().toCharArray(), salt.getBytes(), 65536, 256); - SecretKey tmp = factory.generateSecret(spec); - SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES"); - - cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING"); - cipher.init(Cipher.DECRYPT_MODE, secretKey, ivspec); - } catch (Exception e) { - throw new ConfigException("Error during initialization", e); - } + this.aesKey = (Password) parsedConfigs.get(AES_KEY_CONFIG); + this.salt = parsedConfigs.get(SALT_CONFIG).toString(); } @Override @@ -63,9 +59,9 @@ public ConfigData get(String path) { @Override public ConfigData get(String path, Set keys) { + Map decoded = new HashMap<>(); - Map decoded = new HashMap<>(); - + final Cipher cipher = this.getCipher(); keys.forEach(key -> { try { decoded.put(key, new String(cipher.doFinal(Base64.getDecoder().decode(key)))); @@ -78,8 +74,31 @@ public ConfigData get(String path, Set keys) { } @Override - public void close() throws IOException { + public void close() { + // nothing to destroy while closing the config provider. + } + + /** + * Gets the cipher instance for decryption. + * + * @return The cipher used to decrypt the keys. + */ + private Cipher getCipher() { + try { + byte[] iv = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + final IvParameterSpec ivspec = new IvParameterSpec(iv); + final SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256"); + final KeySpec spec = new PBEKeySpec(this.aesKey.value().toCharArray(), this.salt.getBytes(), 65536, 256); + final SecretKey tmp = factory.generateSecret(spec); + final SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES"); + + final Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING"); + cipher.init(Cipher.DECRYPT_MODE, secretKey, ivspec); + return cipher; + } catch (Exception e) { + throw new ConfigException("Error during Cipher initialization", e); + } } }