Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved bridge configuration setup within the operator #11032

Merged
merged 6 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Added support to configure `dnsPolicy` and `dnsConfig` using the `template` sections.
* Store Kafka node certificates in separate Secrets, one Secret per pod.
* Allow configuring `ssl.principal.mapping.rules` and custom trusted CAs in Kafka brokers with `type: custom` authentication
* Moved HTTP bridge configuration to the ConfigMap setup by the operator.

### Major changes, deprecations and removals

Expand Down
2 changes: 1 addition & 1 deletion bridge.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.31.1
latest
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.model.InvalidResourceException;
Expand Down Expand Up @@ -86,29 +87,23 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
// Kafka Bridge configuration keys (EnvVariables)
protected static final String ENV_VAR_PREFIX = "KAFKA_BRIDGE_";
protected static final String ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS = "KAFKA_BRIDGE_BOOTSTRAP_SERVERS";
protected static final String ENV_VAR_KAFKA_BRIDGE_TLS = "KAFKA_BRIDGE_TLS";
protected static final String ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS = "KAFKA_BRIDGE_TRUSTED_CERTS";
protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth-certs/";
protected static final String ENV_VAR_STRIMZI_TRACING = "STRIMZI_TRACING";

protected static final String ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG = "KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG = "KAFKA_BRIDGE_PRODUCER_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG = "KAFKA_BRIDGE_CONSUMER_CONFIG";
protected static final String ENV_VAR_KAFKA_BRIDGE_ID = "KAFKA_BRIDGE_ID";

protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_HOST = "KAFKA_BRIDGE_HTTP_HOST";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PORT = "KAFKA_BRIDGE_HTTP_PORT";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT = "KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED = "KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED = "KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED = "KAFKA_BRIDGE_CORS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS = "KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS";
protected static final String ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS = "KAFKA_BRIDGE_CORS_ALLOWED_METHODS";

protected static final String CO_ENV_VAR_CUSTOM_BRIDGE_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_LABELS";
protected static final String INIT_VOLUME_MOUNT = "/opt/strimzi/init";

/**
* Key under which the bridge configuration is stored in ConfigMap
*/
public static final String BRIDGE_CONFIGURATION_FILENAME = "application.properties";

/**
* Annotation for rolling the bridge whenever the configuration within the application.properties file is changed.
* When the configuration hash annotation change is detected, we force a pod restart.
*/
public static final String ANNO_STRIMZI_IO_CONFIGURATION_HASH = Annotations.STRIMZI_DOMAIN + "configuration-hash";

private int replicas;
private ClientTls tls;
private KafkaClientAuthentication authentication;
Expand Down Expand Up @@ -411,59 +406,12 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));
JvmOptionUtils.javaOptions(varList, jvmOptions);

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_BOOTSTRAP_SERVERS, bootstrapServers));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ADMIN_CLIENT_CONFIG, kafkaBridgeAdminClient == null ? "" : new KafkaBridgeAdminClientConfiguration(reconciliation, kafkaBridgeAdminClient.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_ID, cluster));

if (kafkaBridgeConsumer != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, new KafkaBridgeConsumerConfiguration(reconciliation, kafkaBridgeConsumer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, String.valueOf(kafkaBridgeConsumer.isEnabled())));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(kafkaBridgeConsumer.getTimeoutSeconds())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CONSUMER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_ENABLED, "true"));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_CONSUMER_TIMEOUT, String.valueOf(KafkaBridgeConsumerSpec.HTTP_DEFAULT_TIMEOUT)));
}

if (kafkaBridgeProducer != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, new KafkaBridgeProducerConfiguration(reconciliation, kafkaBridgeProducer.getConfig().entrySet()).getConfiguration()));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, String.valueOf(kafkaBridgeProducer.isEnabled())));
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_PRODUCER_CONFIG, ""));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PRODUCER_ENABLED, "true"));
}

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_HOST, KafkaBridgeHttpConfig.HTTP_DEFAULT_HOST));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_HTTP_PORT, String.valueOf(http != null ? http.getPort() : KafkaBridgeHttpConfig.HTTP_DEFAULT_PORT)));

if (http != null && http.getCors() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "true"));

if (http.getCors().getAllowedOrigins() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_ORIGINS, String.join(",", http.getCors().getAllowedOrigins())));
}

if (http.getCors().getAllowedMethods() != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ALLOWED_METHODS, String.join(",", http.getCors().getAllowedMethods())));
}
} else {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_CORS_ENABLED, "false"));
}

if (tls != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TLS, "true"));

if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates())));
}
if (tls != null && tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS, CertUtils.trustedCertsEnvVar(tls.getTrustedCertificates())));
}

AuthenticationUtils.configureClientAuthenticationEnvVars(authentication, varList, name -> ENV_VAR_PREFIX + name);

if (tracing != null) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_TRACING, tracing.getType()));
}

// Add shared environment variables used for all containers
varList.addAll(sharedEnvironmentProvider.variables());

Expand Down Expand Up @@ -600,21 +548,38 @@ protected List<EnvVar> getInitContainerEnvVars() {
}

/**
* Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging
* or metrics, they will nto be set.
* Generates a ConfigMap containing the bridge configuration related to HTTP and Kafka clients.
* It also generates the metrics and logging configuration. If this operand doesn't support logging
* or metrics, they will not be set.
*
* @param metricsAndLogging The external CMs with logging and metrics configuration
*
* @return The generated ConfigMap
*/
public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) {
public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
// generate the ConfigMap data entries for the metrics and logging configuration
Map<String, String> data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging);
// add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration
data.put(
BRIDGE_CONFIGURATION_FILENAME,
new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers)
.withTracing(tracing)
.withTls(tls)
.withAuthentication(authentication)
.withKafkaAdminClient(kafkaBridgeAdminClient)
.withKafkaProducer(kafkaBridgeProducer)
.withKafkaConsumer(kafkaBridgeConsumer)
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer)
.build()
);

return ConfigMapUtils
.createConfigMap(
KafkaBridgeResources.metricsAndLogConfigMapName(cluster),
namespace,
labels,
ownerReference,
ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)
data
);
}

Expand Down
Loading
Loading