Skip to content

Commit

Permalink
Fixed AES256 config provider error when restart of connectors (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
yraffin authored Jan 3, 2023
1 parent 4268c69 commit 0cd6775
Showing 1 changed file with 43 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.types.Password;

import javax.crypto.*;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.security.spec.KeySpec;
import java.util.*;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class AES256ConfigProvider implements ConfigProvider {

Expand All @@ -31,29 +35,21 @@ public class AES256ConfigProvider implements ConfigProvider {
new ConfigDef.NonEmptyString(), ConfigDef.Importance.HIGH,
"The AES256 salt.");

/**
* Represents the aes256 key
*/
private Password aesKey;


private Cipher cipher;
/**
* Represents the aes256 salt
*/
private String salt;

@Override
public void configure(Map<String, ?> configs) {
Map<String, Object> parsedConfigs = CONFIG_DEF.parse(configs);
Password aesKey = (Password) parsedConfigs.get(AES_KEY_CONFIG);
String salt = parsedConfigs.get(SALT_CONFIG).toString();
try {
byte[] iv = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
IvParameterSpec ivspec = new IvParameterSpec(iv);

SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
KeySpec spec = new PBEKeySpec(aesKey.value().toCharArray(), salt.getBytes(), 65536, 256);
SecretKey tmp = factory.generateSecret(spec);
SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES");

cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING");
cipher.init(Cipher.DECRYPT_MODE, secretKey, ivspec);
} catch (Exception e) {
throw new ConfigException("Error during initialization", e);
}
this.aesKey = (Password) parsedConfigs.get(AES_KEY_CONFIG);
this.salt = parsedConfigs.get(SALT_CONFIG).toString();
}

@Override
Expand All @@ -63,9 +59,9 @@ public ConfigData get(String path) {

@Override
public ConfigData get(String path, Set<String> keys) {
Map<String, String> decoded = new HashMap<>();

Map<String,String> decoded = new HashMap<>();

final Cipher cipher = this.getCipher();
keys.forEach(key -> {
try {
decoded.put(key, new String(cipher.doFinal(Base64.getDecoder().decode(key))));
Expand All @@ -78,8 +74,31 @@ public ConfigData get(String path, Set<String> keys) {
}

@Override
public void close() throws IOException {
public void close() {
// nothing to destroy while closing the config provider.
}

/**
* Gets the cipher instance for decryption.
*
* @return The cipher used to decrypt the keys.
*/
private Cipher getCipher() {
try {
byte[] iv = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
final IvParameterSpec ivspec = new IvParameterSpec(iv);

final SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
final KeySpec spec = new PBEKeySpec(this.aesKey.value().toCharArray(), this.salt.getBytes(), 65536, 256);
final SecretKey tmp = factory.generateSecret(spec);
final SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES");

final Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING");
cipher.init(Cipher.DECRYPT_MODE, secretKey, ivspec);
return cipher;
} catch (Exception e) {
throw new ConfigException("Error during Cipher initialization", e);
}
}

}

0 comments on commit 0cd6775

Please sign in to comment.