Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,6 @@
</properties>

<dependencies>
<dependency>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework-core</artifactId>
<exclusions>
<exclusion>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-vertx</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-jdk</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,4 @@ public class KafkaAccess extends CustomResource<KafkaAccessSpec, KafkaAccessStat
* The `kind` definition of the KafkaAccess custom resource
*/
public static final String KIND = "KafkaAccess";

@Override
protected KafkaAccessStatus initStatus() {
return new KafkaAccessStatus();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.strimzi.kafka.access.model;

import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.api.kafka.model.common.Condition;
import io.strimzi.kafka.access.internal.StatusUtils;
Expand All @@ -20,9 +19,10 @@
editableEnabled = false,
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
public class KafkaAccessStatus extends ObservedGenerationAwareStatus {
public class KafkaAccessStatus {

private BindingStatus binding;
private long observedGeneration;
private final List<Condition> conditions = new ArrayList<>();

/**
Expand Down Expand Up @@ -77,4 +77,17 @@ public void setReadyCondition(final boolean ready, final String message, final S
StatusUtils.buildReadyCondition(ready, reason, message)
);
}

/**
* Gets the observed generation of the KafkaAccess resource.
*
* @return The observed generation.
*/
public long getObservedGeneration() {
return observedGeneration;
}

public void setObservedGeneration(long observedGeneration) {
this.observedGeneration = observedGeneration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public class KafkaAccessOperator {
*/
public static void main(final String[] args) {
LOGGER.info("Kafka Access operator starting");
final Operator operator = new Operator();
final Operator operator = new Operator(overrider -> overrider
.withUseSSAToPatchPrimaryResource(false));
operator.register(new KafkaAccessReconciler(operator.getKubernetesClient()));
operator.start();
Server server = new Server(HEALTH_CHECK_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
import io.fabric8.kubernetes.api.model.SecretBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand All @@ -28,6 +26,7 @@
import io.strimzi.kafka.access.model.BindingStatus;
import io.strimzi.kafka.access.model.KafkaAccess;
import io.strimzi.kafka.access.model.KafkaAccessStatus;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +39,7 @@
*/
@SuppressWarnings("ClassFanOutComplexity")
@ControllerConfiguration
public class KafkaAccessReconciler implements Reconciler<KafkaAccess>, EventSourceInitializer<KafkaAccess>, ErrorStatusHandler<KafkaAccess> {
public class KafkaAccessReconciler implements Reconciler<KafkaAccess> {

private final KubernetesClient kubernetesClient;
private InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource;
Expand Down Expand Up @@ -95,8 +94,8 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final

kafkaAccessStatus.setBinding(new BindingStatus(secretName));
kafkaAccessStatus.setReadyCondition(true, "Ready", "Ready");

return UpdateControl.updateStatus(kafkaAccess);
kafkaAccessStatus.setObservedGeneration(kafkaAccess.getMetadata().getGeneration());
return UpdateControl.patchStatus(kafkaAccess);
}

private void createOrUpdateSecret(final Map<String, String> data, final KafkaAccess kafkaAccess, final String secretName) {
Expand Down Expand Up @@ -142,54 +141,50 @@ private void createOrUpdateSecret(final Map<String, String> data, final KafkaAcc
}

/**
* Prepares the event sources required for triggering the reconciliation
* Prepares the event sources required for triggering the reconciliation.
* It configures the JOSDK framework with resources the operator needs to watch.
*
* @param context The EventSourceContext for KafkaAccess resource
*
* @return A new map of event sources
*/
@Override
public Map<String, EventSource> prepareEventSources(final EventSourceContext<KafkaAccess> context) {
public List<EventSource<?, KafkaAccess>> prepareEventSources(EventSourceContext<KafkaAccess> context) {
LOGGER.info("Preparing event sources");
InformerEventSource<Kafka, KafkaAccess> kafkaEventSource = new InformerEventSource<>(
InformerConfiguration.from(Kafka.class, context)
InformerEventSourceConfiguration<Kafka> kafkaEventSource =
InformerEventSourceConfiguration.from(Kafka.class, KafkaAccess.class)
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<KafkaUser, KafkaAccess> kafkaUserEventSource = new InformerEventSource<>(
InformerConfiguration.from(KafkaUser.class, context)
.build();
InformerEventSourceConfiguration<KafkaUser> kafkaUserEventSource =
InformerEventSourceConfiguration.from(KafkaUser.class, KafkaAccess.class)
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.build();
InformerEventSourceConfiguration<Secret> strimziSecretEventSource =
InformerEventSourceConfiguration.from(Secret.class, KafkaAccess.class)
.withName(STRIMZI_SECRET_EVENT_SOURCE)
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
InformerEventSource<Secret, KafkaAccess> strimziKafkaUserSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
.build();
InformerEventSourceConfiguration<Secret> strimziKafkaUserSecretEventSource =
InformerEventSourceConfiguration.from(Secret.class, KafkaAccess.class)
.withName(KAFKA_USER_SECRET_EVENT_SOURCE)
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
.build();
kafkaAccessSecretEventSource = new InformerEventSource<>(
InformerConfiguration.from(Secret.class)
InformerEventSourceConfiguration.from(Secret.class, KafkaAccess.class)
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE))
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
.build(),
context);
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(
kafkaEventSource,
kafkaUserEventSource,
kafkaAccessSecretEventSource
);
eventSources.put(STRIMZI_SECRET_EVENT_SOURCE, strimziSecretEventSource);
eventSources.put(KAFKA_USER_SECRET_EVENT_SOURCE, strimziKafkaUserSecretEventSource);
LOGGER.info("Finished preparing event sources");
return eventSources;
return List.of(
new InformerEventSource<>(kafkaEventSource, context),
new InformerEventSource<>(kafkaUserEventSource, context),
new InformerEventSource<>(strimziSecretEventSource, context),
new InformerEventSource<>(strimziKafkaUserSecretEventSource, context),
kafkaAccessSecretEventSource);
}

@Override
Expand All @@ -207,8 +202,9 @@ public ErrorStatusUpdateControl<KafkaAccess> updateErrorStatus(KafkaAccess kafka
reason = "InvalidUserKind";
}
status.setReadyCondition(false, e.getMessage(), reason);
status.setObservedGeneration(kafkaAccess.getMetadata().getGeneration());

return ErrorStatusUpdateControl.updateStatus(kafkaAccess);
return ErrorStatusUpdateControl.patchStatus(kafkaAccess);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private Map<String, String> getKafkaUserSecretData(final Context<KafkaAccess> co
.map(KafkaUserStatus::getSecret)
.orElseThrow(missingKubernetesResourceException("Secret in KafkaUser status", kafkaUserNamespace, kafkaUserName));
final InformerEventSource<Secret, KafkaAccess> kafkaUserSecretEventSource = (InformerEventSource<Secret, KafkaAccess>) context.eventSourceRetriever()
.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE);
.getEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE);
final Secret kafkaUserSecret = kafkaUserSecretEventSource.get(new ResourceID(userSecretName, kafkaUserNamespace))
.orElseThrow(missingKubernetesResourceException(String.format("Secret %s for KafkaUser", userSecretName), kafkaUserNamespace, kafkaUserName));
return new KafkaUserData(kafkaUser).withSecret(kafkaUserSecret).getConnectionSecretData();
Expand All @@ -113,7 +113,7 @@ private Map<String, String> getKafkaUserSecretData(final Context<KafkaAccess> co
private Map<String, String> getKafkaCaCertData(final Context<KafkaAccess> context, String kafkaClusterName, String kafkaClusterNamespace) {
final String caCertSecretName = KafkaResources.clusterCaCertificateSecretName(kafkaClusterName);
final InformerEventSource<Secret, KafkaAccess> strimziSecretEventSource = (InformerEventSource<Secret, KafkaAccess>) context.eventSourceRetriever()
.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE);
.getEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE);
return strimziSecretEventSource.get(new ResourceID(caCertSecretName, kafkaClusterNamespace))
.map(Secret::getData)
.orElse(Map.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ public class KafkaAccessReconcilerTest {

@BeforeEach
void beforeEach() {
operator = new Operator(overrider -> overrider.withKubernetesClient(client));
operator = new Operator(overrider -> overrider.withKubernetesClient(client)
/**
* Disables the use of Server-Side Apply for patching the primary resource.
* Motivation: Mock Kubernetes client doesn't fully support SSA features.
* See: <a href="https://github.com/fabric8io/kubernetes-client/issues/5337">fabric8io/kubernetes-client Issue #5337</a>
**/
.withUseSSAToPatchPrimaryResource(false));
operator.register(new KafkaAccessReconciler(operator.getKubernetesClient()));
operator.start();
}
Expand Down Expand Up @@ -566,7 +572,7 @@ void testReconcileWithUserProvidedSecretAndTestDeleteSecretWithNameChange() {
.map(KafkaAccessStatus::getBinding)
.map(BindingStatus::getName);
return bindingName.isPresent() && NEW_USER_PROVIDED_SECRET_NAME.equals(bindingName.get());
}, 100, TimeUnit.MILLISECONDS);
}, TEST_TIMEOUT, TimeUnit.MILLISECONDS);

Secret oldSecretAfterRename = client.secrets().inNamespace(NAMESPACE).withName(USER_PROVIDED_SECRET_NAME).get();
assertThat(oldSecretAfterRename).isNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void testSecretDataWithTls() {
List.of(ResourceProvider.getListener(LISTENER_1, KafkaListenerType.INTERNAL, true)),
List.of(ResourceProvider.getListenerStatus(LISTENER_1, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9092))
);
final Context<KafkaAccess> mockContext = mock(Context.class);
final Context mockContext = mock(Context.class);
when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));

final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
Expand All @@ -102,7 +102,7 @@ void testSecretDataWithTls() {
final EventSourceRetriever<KafkaAccess> mockEventSourceRetriever = mock(EventSourceRetriever.class);
final InformerEventSource<Secret, KafkaAccess> mockInformerEventSource = mock(InformerEventSource.class);
when(mockContext.eventSourceRetriever()).thenReturn(mockEventSourceRetriever);
when(mockEventSourceRetriever.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
when(mockEventSourceRetriever.getEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
when(mockInformerEventSource.get(any(ResourceID.class))).thenReturn(Optional.of(certSecret));

Map<String, String> data = new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext);
Expand Down Expand Up @@ -138,7 +138,7 @@ void testSecretDataWithKafkaUser() {
final EventSourceRetriever<KafkaAccess> mockEventSourceRetriever = mock(EventSourceRetriever.class);
final InformerEventSource<Secret, KafkaAccess> mockInformerEventSource = mock(InformerEventSource.class);
when(mockContext.eventSourceRetriever()).thenReturn(mockEventSourceRetriever);
when(mockEventSourceRetriever.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
when(mockEventSourceRetriever.getEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
when(mockInformerEventSource.get(any(ResourceID.class))).thenReturn(Optional.of(ResourceProvider.getStrimziUserSecret(KAFKA_USER_SECRET_NAME, KAFKA_NAMESPACE, KAFKA_NAME)));

Map<String, String> data = new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext);
Expand Down Expand Up @@ -244,7 +244,7 @@ void testDesiredMissingKafkaUserSecret() {
final InformerEventSource<Secret, KafkaAccess> mockInformerEventSource = mock(InformerEventSource.class);
when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));
when(mockContext.eventSourceRetriever()).thenReturn(mockEventSourceRetriever);
when(mockEventSourceRetriever.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
when(mockEventSourceRetriever.getEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
when(mockInformerEventSource.get(any(ResourceID.class))).thenReturn(Optional.empty());

final KafkaUser kafkaUser = ResourceProvider.getKafkaUserWithStatus(KAFKA_USER_NAME, KAFKA_NAMESPACE, KAFKA_USER_SECRET_NAME, "user", new KafkaUserScramSha512ClientAuthentication());
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<central-publishing-maven-plugin.version>0.7.0</central-publishing-maven-plugin.version>

<!-- Runtime dependencies -->
<javaoperatorsdk.version>4.4.2</javaoperatorsdk.version>
<javaoperatorsdk.version>5.1.2</javaoperatorsdk.version>
<fabric8.version>7.2.0</fabric8.version>
<sundrio.version>0.200.0</sundrio.version>
<strimzi.version>0.48.0</strimzi.version>
Expand Down