Skip to content

Commit 2289a34

Browse files
Update JOSDK to v5.1.2 (#93)
Signed-off-by: OwenCorrigan76 <[email protected]>
1 parent f37398b commit 2289a34

File tree

9 files changed

+63
-62
lines changed

9 files changed

+63
-62
lines changed

api/pom.xml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,6 @@
2121
</properties>
2222

2323
<dependencies>
24-
<dependency>
25-
<groupId>io.javaoperatorsdk</groupId>
26-
<artifactId>operator-framework-core</artifactId>
27-
<exclusions>
28-
<exclusion>
29-
<groupId>io.fabric8</groupId>
30-
<artifactId>kubernetes-httpclient-vertx</artifactId>
31-
</exclusion>
32-
</exclusions>
33-
</dependency>
3424
<dependency>
3525
<groupId>io.fabric8</groupId>
3626
<artifactId>kubernetes-httpclient-jdk</artifactId>

api/src/main/java/io/strimzi/kafka/access/model/KafkaAccess.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,4 @@ public class KafkaAccess extends CustomResource<KafkaAccessSpec, KafkaAccessStat
4545
* The `kind` definition of the KafkaAccess custom resource
4646
*/
4747
public static final String KIND = "KafkaAccess";
48-
49-
@Override
50-
protected KafkaAccessStatus initStatus() {
51-
return new KafkaAccessStatus();
52-
}
5348
}

api/src/main/java/io/strimzi/kafka/access/model/KafkaAccessStatus.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
*/
55
package io.strimzi.kafka.access.model;
66

7-
import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus;
87
import io.strimzi.api.kafka.model.common.Constants;
98
import io.strimzi.api.kafka.model.common.Condition;
109
import io.strimzi.kafka.access.internal.StatusUtils;
@@ -20,9 +19,10 @@
2019
editableEnabled = false,
2120
builderPackage = Constants.FABRIC8_KUBERNETES_API
2221
)
23-
public class KafkaAccessStatus extends ObservedGenerationAwareStatus {
22+
public class KafkaAccessStatus {
2423

2524
private BindingStatus binding;
25+
private long observedGeneration;
2626
private final List<Condition> conditions = new ArrayList<>();
2727

2828
/**
@@ -77,4 +77,17 @@ public void setReadyCondition(final boolean ready, final String message, final S
7777
StatusUtils.buildReadyCondition(ready, reason, message)
7878
);
7979
}
80+
81+
/**
82+
* Gets the observed generation of the KafkaAccess resource.
83+
*
84+
* @return The observed generation.
85+
*/
86+
public long getObservedGeneration() {
87+
return observedGeneration;
88+
}
89+
90+
public void setObservedGeneration(long observedGeneration) {
91+
this.observedGeneration = observedGeneration;
92+
}
8093
}

operator/src/main/java/io/strimzi/kafka/access/KafkaAccessOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ public class KafkaAccessOperator {
2626
*/
2727
public static void main(final String[] args) {
2828
LOGGER.info("Kafka Access operator starting");
29-
final Operator operator = new Operator();
29+
final Operator operator = new Operator(overrider -> overrider
30+
.withUseSSAToPatchPrimaryResource(false));
3031
operator.register(new KafkaAccessReconciler(operator.getKubernetesClient()));
3132
operator.start();
3233
Server server = new Server(HEALTH_CHECK_PORT);

operator/src/main/java/io/strimzi/kafka/access/KafkaAccessReconciler.java

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99
import io.fabric8.kubernetes.api.model.SecretBuilder;
1010
import io.fabric8.kubernetes.client.KubernetesClient;
1111
import io.fabric8.kubernetes.client.KubernetesClientException;
12-
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
12+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
1313
import io.javaoperatorsdk.operator.api.reconciler.Context;
1414
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
15-
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
1615
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
1716
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
18-
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
1917
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
2018
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
2119
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -28,6 +26,7 @@
2826
import io.strimzi.kafka.access.model.BindingStatus;
2927
import io.strimzi.kafka.access.model.KafkaAccess;
3028
import io.strimzi.kafka.access.model.KafkaAccessStatus;
29+
import java.util.List;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
3332

@@ -40,7 +39,7 @@
4039
*/
4140
@SuppressWarnings("ClassFanOutComplexity")
4241
@ControllerConfiguration
43-
public class KafkaAccessReconciler implements Reconciler<KafkaAccess>, EventSourceInitializer<KafkaAccess>, ErrorStatusHandler<KafkaAccess> {
42+
public class KafkaAccessReconciler implements Reconciler<KafkaAccess> {
4443

4544
private final KubernetesClient kubernetesClient;
4645
private InformerEventSource<Secret, KafkaAccess> kafkaAccessSecretEventSource;
@@ -95,8 +94,8 @@ public UpdateControl<KafkaAccess> reconcile(final KafkaAccess kafkaAccess, final
9594

9695
kafkaAccessStatus.setBinding(new BindingStatus(secretName));
9796
kafkaAccessStatus.setReadyCondition(true, "Ready", "Ready");
98-
99-
return UpdateControl.updateStatus(kafkaAccess);
97+
kafkaAccessStatus.setObservedGeneration(kafkaAccess.getMetadata().getGeneration());
98+
return UpdateControl.patchStatus(kafkaAccess);
10099
}
101100

102101
private void createOrUpdateSecret(final Map<String, String> data, final KafkaAccess kafkaAccess, final String secretName) {
@@ -142,54 +141,50 @@ private void createOrUpdateSecret(final Map<String, String> data, final KafkaAcc
142141
}
143142

144143
/**
145-
* Prepares the event sources required for triggering the reconciliation
144+
* Prepares the event sources required for triggering the reconciliation.
145+
* It configures the JOSDK framework with resources the operator needs to watch.
146146
*
147147
* @param context The EventSourceContext for KafkaAccess resource
148148
*
149149
* @return A new map of event sources
150150
*/
151-
@Override
152-
public Map<String, EventSource> prepareEventSources(final EventSourceContext<KafkaAccess> context) {
151+
public List<EventSource<?, KafkaAccess>> prepareEventSources(EventSourceContext<KafkaAccess> context) {
153152
LOGGER.info("Preparing event sources");
154-
InformerEventSource<Kafka, KafkaAccess> kafkaEventSource = new InformerEventSource<>(
155-
InformerConfiguration.from(Kafka.class, context)
153+
InformerEventSourceConfiguration<Kafka> kafkaEventSource =
154+
InformerEventSourceConfiguration.from(Kafka.class, KafkaAccess.class)
156155
.withSecondaryToPrimaryMapper(kafka -> KafkaAccessMapper.kafkaSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafka))
157156
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
158-
.build(),
159-
context);
160-
InformerEventSource<KafkaUser, KafkaAccess> kafkaUserEventSource = new InformerEventSource<>(
161-
InformerConfiguration.from(KafkaUser.class, context)
157+
.build();
158+
InformerEventSourceConfiguration<KafkaUser> kafkaUserEventSource =
159+
InformerEventSourceConfiguration.from(KafkaUser.class, KafkaAccess.class)
162160
.withSecondaryToPrimaryMapper(kafkaUser -> KafkaAccessMapper.kafkaUserSecondaryToPrimaryMapper(context.getPrimaryCache().list(), kafkaUser))
163161
.withPrimaryToSecondaryMapper(kafkaAccess -> KafkaAccessMapper.kafkaUserPrimaryToSecondaryMapper((KafkaAccess) kafkaAccess))
164-
.build(),
165-
context);
166-
InformerEventSource<Secret, KafkaAccess> strimziSecretEventSource = new InformerEventSource<>(
167-
InformerConfiguration.from(Secret.class)
162+
.build();
163+
InformerEventSourceConfiguration<Secret> strimziSecretEventSource =
164+
InformerEventSourceConfiguration.from(Secret.class, KafkaAccess.class)
165+
.withName(STRIMZI_SECRET_EVENT_SOURCE)
168166
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_CLUSTER_LABEL_VALUE))
169167
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
170-
.build(),
171-
context);
172-
InformerEventSource<Secret, KafkaAccess> strimziKafkaUserSecretEventSource = new InformerEventSource<>(
173-
InformerConfiguration.from(Secret.class)
168+
.build();
169+
InformerEventSourceConfiguration<Secret> strimziKafkaUserSecretEventSource =
170+
InformerEventSourceConfiguration.from(Secret.class, KafkaAccess.class)
171+
.withName(KAFKA_USER_SECRET_EVENT_SOURCE)
174172
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.STRIMZI_USER_LABEL_VALUE))
175173
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
176-
.build(),
177-
context);
174+
.build();
178175
kafkaAccessSecretEventSource = new InformerEventSource<>(
179-
InformerConfiguration.from(Secret.class)
176+
InformerEventSourceConfiguration.from(Secret.class, KafkaAccess.class)
180177
.withLabelSelector(String.format("%s=%s", KafkaAccessMapper.MANAGED_BY_LABEL_KEY, KafkaAccessMapper.KAFKA_ACCESS_LABEL_VALUE))
181178
.withSecondaryToPrimaryMapper(secret -> KafkaAccessMapper.secretSecondaryToPrimaryMapper(context.getPrimaryCache().list(), secret))
182179
.build(),
183180
context);
184-
Map<String, EventSource> eventSources = EventSourceInitializer.nameEventSources(
185-
kafkaEventSource,
186-
kafkaUserEventSource,
187-
kafkaAccessSecretEventSource
188-
);
189-
eventSources.put(STRIMZI_SECRET_EVENT_SOURCE, strimziSecretEventSource);
190-
eventSources.put(KAFKA_USER_SECRET_EVENT_SOURCE, strimziKafkaUserSecretEventSource);
191181
LOGGER.info("Finished preparing event sources");
192-
return eventSources;
182+
return List.of(
183+
new InformerEventSource<>(kafkaEventSource, context),
184+
new InformerEventSource<>(kafkaUserEventSource, context),
185+
new InformerEventSource<>(strimziSecretEventSource, context),
186+
new InformerEventSource<>(strimziKafkaUserSecretEventSource, context),
187+
kafkaAccessSecretEventSource);
193188
}
194189

195190
@Override
@@ -207,8 +202,9 @@ public ErrorStatusUpdateControl<KafkaAccess> updateErrorStatus(KafkaAccess kafka
207202
reason = "InvalidUserKind";
208203
}
209204
status.setReadyCondition(false, e.getMessage(), reason);
205+
status.setObservedGeneration(kafkaAccess.getMetadata().getGeneration());
210206

211-
return ErrorStatusUpdateControl.updateStatus(kafkaAccess);
207+
return ErrorStatusUpdateControl.patchStatus(kafkaAccess);
212208
}
213209

214210
/**

operator/src/main/java/io/strimzi/kafka/access/SecretDependentResource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private Map<String, String> getKafkaUserSecretData(final Context<KafkaAccess> co
104104
.map(KafkaUserStatus::getSecret)
105105
.orElseThrow(missingKubernetesResourceException("Secret in KafkaUser status", kafkaUserNamespace, kafkaUserName));
106106
final InformerEventSource<Secret, KafkaAccess> kafkaUserSecretEventSource = (InformerEventSource<Secret, KafkaAccess>) context.eventSourceRetriever()
107-
.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE);
107+
.getEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE);
108108
final Secret kafkaUserSecret = kafkaUserSecretEventSource.get(new ResourceID(userSecretName, kafkaUserNamespace))
109109
.orElseThrow(missingKubernetesResourceException(String.format("Secret %s for KafkaUser", userSecretName), kafkaUserNamespace, kafkaUserName));
110110
return new KafkaUserData(kafkaUser).withSecret(kafkaUserSecret).getConnectionSecretData();
@@ -113,7 +113,7 @@ private Map<String, String> getKafkaUserSecretData(final Context<KafkaAccess> co
113113
private Map<String, String> getKafkaCaCertData(final Context<KafkaAccess> context, String kafkaClusterName, String kafkaClusterNamespace) {
114114
final String caCertSecretName = KafkaResources.clusterCaCertificateSecretName(kafkaClusterName);
115115
final InformerEventSource<Secret, KafkaAccess> strimziSecretEventSource = (InformerEventSource<Secret, KafkaAccess>) context.eventSourceRetriever()
116-
.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE);
116+
.getEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE);
117117
return strimziSecretEventSource.get(new ResourceID(caCertSecretName, kafkaClusterNamespace))
118118
.map(Secret::getData)
119119
.orElse(Map.of());

operator/src/test/java/io/strimzi/kafka/access/KafkaAccessReconcilerTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,13 @@ public class KafkaAccessReconcilerTest {
7171

7272
@BeforeEach
7373
void beforeEach() {
74-
operator = new Operator(overrider -> overrider.withKubernetesClient(client));
74+
operator = new Operator(overrider -> overrider.withKubernetesClient(client)
75+
/**
76+
* Disables the use of Server-Side Apply for patching the primary resource.
77+
* Motivation: Mock Kubernetes client doesn't fully support SSA features.
78+
* See: <a href="https://github.com/fabric8io/kubernetes-client/issues/5337">fabric8io/kubernetes-client Issue #5337</a>
79+
**/
80+
.withUseSSAToPatchPrimaryResource(false));
7581
operator.register(new KafkaAccessReconciler(operator.getKubernetesClient()));
7682
operator.start();
7783
}
@@ -566,7 +572,7 @@ void testReconcileWithUserProvidedSecretAndTestDeleteSecretWithNameChange() {
566572
.map(KafkaAccessStatus::getBinding)
567573
.map(BindingStatus::getName);
568574
return bindingName.isPresent() && NEW_USER_PROVIDED_SECRET_NAME.equals(bindingName.get());
569-
}, 100, TimeUnit.MILLISECONDS);
575+
}, TEST_TIMEOUT, TimeUnit.MILLISECONDS);
570576

571577
Secret oldSecretAfterRename = client.secrets().inNamespace(NAMESPACE).withName(USER_PROVIDED_SECRET_NAME).get();
572578
assertThat(oldSecretAfterRename).isNull();

operator/src/test/java/io/strimzi/kafka/access/SecretDependentResourceTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ void testSecretDataWithTls() {
9393
List.of(ResourceProvider.getListener(LISTENER_1, KafkaListenerType.INTERNAL, true)),
9494
List.of(ResourceProvider.getListenerStatus(LISTENER_1, BOOTSTRAP_HOST, BOOTSTRAP_PORT_9092))
9595
);
96-
final Context<KafkaAccess> mockContext = mock(Context.class);
96+
final Context mockContext = mock(Context.class);
9797
when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));
9898

9999
final KafkaReference kafkaReference = ResourceProvider.getKafkaReference(KAFKA_NAME, KAFKA_NAMESPACE);
@@ -102,7 +102,7 @@ void testSecretDataWithTls() {
102102
final EventSourceRetriever<KafkaAccess> mockEventSourceRetriever = mock(EventSourceRetriever.class);
103103
final InformerEventSource<Secret, KafkaAccess> mockInformerEventSource = mock(InformerEventSource.class);
104104
when(mockContext.eventSourceRetriever()).thenReturn(mockEventSourceRetriever);
105-
when(mockEventSourceRetriever.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
105+
when(mockEventSourceRetriever.getEventSourceFor(Secret.class, KafkaAccessReconciler.STRIMZI_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
106106
when(mockInformerEventSource.get(any(ResourceID.class))).thenReturn(Optional.of(certSecret));
107107

108108
Map<String, String> data = new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext);
@@ -138,7 +138,7 @@ void testSecretDataWithKafkaUser() {
138138
final EventSourceRetriever<KafkaAccess> mockEventSourceRetriever = mock(EventSourceRetriever.class);
139139
final InformerEventSource<Secret, KafkaAccess> mockInformerEventSource = mock(InformerEventSource.class);
140140
when(mockContext.eventSourceRetriever()).thenReturn(mockEventSourceRetriever);
141-
when(mockEventSourceRetriever.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
141+
when(mockEventSourceRetriever.getEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
142142
when(mockInformerEventSource.get(any(ResourceID.class))).thenReturn(Optional.of(ResourceProvider.getStrimziUserSecret(KAFKA_USER_SECRET_NAME, KAFKA_NAMESPACE, KAFKA_NAME)));
143143

144144
Map<String, String> data = new SecretDependentResource().desired(kafkaAccess.getSpec(), NAMESPACE, mockContext);
@@ -244,7 +244,7 @@ void testDesiredMissingKafkaUserSecret() {
244244
final InformerEventSource<Secret, KafkaAccess> mockInformerEventSource = mock(InformerEventSource.class);
245245
when(mockContext.getSecondaryResource(Kafka.class)).thenReturn(Optional.of(kafka));
246246
when(mockContext.eventSourceRetriever()).thenReturn(mockEventSourceRetriever);
247-
when(mockEventSourceRetriever.getResourceEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
247+
when(mockEventSourceRetriever.getEventSourceFor(Secret.class, KafkaAccessReconciler.KAFKA_USER_SECRET_EVENT_SOURCE)).thenReturn(mockInformerEventSource);
248248
when(mockInformerEventSource.get(any(ResourceID.class))).thenReturn(Optional.empty());
249249

250250
final KafkaUser kafkaUser = ResourceProvider.getKafkaUserWithStatus(KAFKA_USER_NAME, KAFKA_NAMESPACE, KAFKA_USER_SECRET_NAME, "user", new KafkaUserScramSha512ClientAuthentication());

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<central-publishing-maven-plugin.version>0.7.0</central-publishing-maven-plugin.version>
7070

7171
<!-- Runtime dependencies -->
72-
<javaoperatorsdk.version>4.4.2</javaoperatorsdk.version>
72+
<javaoperatorsdk.version>5.1.2</javaoperatorsdk.version>
7373
<fabric8.version>7.2.0</fabric8.version>
7474
<sundrio.version>0.200.0</sundrio.version>
7575
<strimzi.version>0.48.0</strimzi.version>

0 commit comments

Comments
 (0)