Skip to content

feat: primary resource caching for followup reconciliation(s) #2761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3aa6d17
feat: resource cache
csviri Apr 11, 2025
0125d66
wip
csviri Apr 11, 2025
8870c14
wip
csviri Apr 14, 2025
bff907c
wip
csviri Apr 14, 2025
32823e0
wip
csviri Apr 14, 2025
b016bf0
wip
csviri Apr 14, 2025
00fd9e6
wip
csviri Apr 14, 2025
3b99f78
Integration tests
csviri Apr 15, 2025
1812851
wip
csviri Apr 15, 2025
e09472a
fix
csviri Apr 15, 2025
608fb09
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Apr 15, 2025
21b2ef5
fix
csviri Apr 15, 2025
870db57
additional test
csviri Apr 15, 2025
9c58fd4
doc
csviri Apr 15, 2025
e481342
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Apr 15, 2025
51f1ca0
Update operator-framework/src/test/java/io/javaoperatorsdk/operator/b…
csviri Apr 16, 2025
3409053
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Apr 16, 2025
68ca625
remove with lock versions
csviri Apr 17, 2025
84eec7b
remove not used code
csviri Apr 17, 2025
42b9ead
Revert "remove not used code"
csviri Apr 17, 2025
d51f0e3
Revert "remove with lock versions"
csviri Apr 17, 2025
14c63bb
wip
csviri Apr 17, 2025
e9bcfbe
fix: typos and start improving javadoc
metacosm Apr 17, 2025
e8ede1a
refactor
csviri Apr 22, 2025
a71eafe
docs
csviri Apr 22, 2025
eff1ccb
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Apr 25, 2025
217629f
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Apr 25, 2025
a8e7efc
Update docs/content/en/docs/documentation/reconciler.md
csviri Apr 25, 2025
a1d303d
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Apr 25, 2025
7b58dca
Update docs/content/en/docs/documentation/reconciler.md
csviri Apr 25, 2025
a656160
improvements
csviri Apr 25, 2025
1f1e1d0
docs
csviri Apr 25, 2025
95a9f2e
improve
csviri Apr 25, 2025
720a4c5
docs
csviri Apr 25, 2025
a510cc7
wip
csviri Apr 25, 2025
0a79dd7
wip
csviri Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.function.BiFunction;
import java.util.function.UnaryOperator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.base.PatchType;
import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class PrimaryUpdateAndCacheUtils {

private PrimaryUpdateAndCacheUtils() {}

private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class);

/**
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
* Using update (PUT) method.
*
* @param primary resource
* @param context of reconciliation
* @return updated resource
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P updateAndCacheStatusWithLock(
P primary, Context<P> context) {
return patchAndCacheStatusWithLock(
primary, context, (p, c) -> c.resource(primary).updateStatus());
}

/**
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
* Using JSON Merge patch.
*
* @param primary resource
* @param context of reconciliation
* @return updated resource
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P patchAndCacheStatusWithLock(
P primary, Context<P> context) {
return patchAndCacheStatusWithLock(
primary, context, (p, c) -> c.resource(primary).patchStatus());
}

/**
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
* Using JSON Patch.
*
* @param primary resource
* @param context of reconciliation
* @return updated resource
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P editAndCacheStatusWithLock(
P primary, Context<P> context, UnaryOperator<P> operation) {
return patchAndCacheStatusWithLock(
primary, context, (p, c) -> c.resource(primary).editStatus(operation));
}

/**
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
*
* @param primary resource
* @param context of reconciliation
* @param patch free implementation of cache - make sure you use optimistic locking during the
* update
* @return the updated resource.
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P patchAndCacheStatusWithLock(
P primary, Context<P> context, BiFunction<P, KubernetesClient, P> patch) {
checkResourceVersionPresent(primary);
var updatedResource = patch.apply(primary, context.getClient());
context
.eventSourceRetriever()
.getControllerEventSource()
.handleRecentResourceUpdate(ResourceID.fromResource(primary), updatedResource, primary);
return null;
}

/**
* Makes sure that the up-to-date primary resource will be present during the next reconciliation.
* Using Server Side Apply.
*
* @param primary resource
* @param freshResourceWithStatus - fresh resource with target state
* @param context of reconciliation
* @return the updated resource.
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P ssaPatchAndCacheStatusWithLock(
P primary, P freshResourceWithStatus, Context<P> context) {
checkResourceVersionPresent(freshResourceWithStatus);
var res =
context
.getClient()
.resource(freshResourceWithStatus)
.subresource("status")
.patch(
new PatchContext.Builder()
.withForce(true)
.withFieldManager(context.getControllerConfiguration().fieldManager())
.withPatchType(PatchType.SERVER_SIDE_APPLY)
.build());

context
.eventSourceRetriever()
.getControllerEventSource()
.handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary);
return res;
}

/**
* Patches the resource and adds it to the {@link PrimaryResourceCache} provided. Optimistic
* locking is not required.
*
* @param primary resource
* @param freshResourceWithStatus - fresh resource with target state
* @param context of reconciliation
* @param cache - resource cache managed by user
* @return the updated resource.
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P ssaPatchAndCacheStatus(
P primary, P freshResourceWithStatus, Context<P> context, PrimaryResourceCache<P> cache) {
logWarnIfResourceVersionPresent(freshResourceWithStatus);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If optimistic locking is not required, why do we need to log a warning?

return patchAndCacheStatus(
primary,
context.getClient(),
cache,
(P p, KubernetesClient c) ->
c.resource(freshResourceWithStatus)
.subresource("status")
.patch(
new PatchContext.Builder()
.withForce(true)
.withFieldManager(context.getControllerConfiguration().fieldManager())
.withPatchType(PatchType.SERVER_SIDE_APPLY)
.build()));
}

/**
* Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache} provided.
* Optimistic locking is not required.
*
* @param primary resource*
* @param context of reconciliation
* @param cache - resource cache managed by user
* @return the updated resource.
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P edithAndCacheStatus(
P primary, Context<P> context, PrimaryResourceCache<P> cache, UnaryOperator<P> operation) {
logWarnIfResourceVersionPresent(primary);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If optimistic locking is not required, why do we need to log a warning?

return patchAndCacheStatus(
primary,
context.getClient(),
cache,
(P p, KubernetesClient c) -> c.resource(primary).editStatus(operation));
}

/**
* Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache}
* provided. Optimistic locking is not required.
*
* @param primary resource*
* @param context of reconciliation
* @param cache - resource cache managed by user
* @return the updated resource.
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P patchAndCacheStatus(
P primary, Context<P> context, PrimaryResourceCache<P> cache) {
logWarnIfResourceVersionPresent(primary);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If optimistic locking is not required, why do we need to log a warning?

return patchAndCacheStatus(
primary,
context.getClient(),
cache,
(P p, KubernetesClient c) -> c.resource(primary).patchStatus());
}

/**
* Updates the resource and adds it to the {@link PrimaryResourceCache} provided. Optimistic
* locking is not required.
*
* @param primary resource*
* @param context of reconciliation
* @param cache - resource cache managed by user
* @return the updated resource.
* @param <P> primary resource type
*/
public static <P extends HasMetadata> P updateAndCacheStatus(
P primary, Context<P> context, PrimaryResourceCache<P> cache) {
logWarnIfResourceVersionPresent(primary);
return patchAndCacheStatus(
primary,
context.getClient(),
cache,
(P p, KubernetesClient c) -> c.resource(primary).updateStatus());
}

public static <P extends HasMetadata> P patchAndCacheStatus(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add small JavaDoc for this exposed method as well?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still would be nice to have a JavaDoc for this public method.

P primary,
KubernetesClient client,
PrimaryResourceCache<P> cache,
BiFunction<P, KubernetesClient, P> patch) {
var updatedResource = patch.apply(primary, client);
cache.cacheResource(primary, updatedResource);
return updatedResource;
}

private static <P extends HasMetadata> void checkResourceVersionPresent(P primary) {
if (primary.getMetadata().getResourceVersion() == null) {
throw new IllegalStateException(
"Primary resource version is null, it is expected to set resource version for updates for caching. Name: %s namespace: %s"
.formatted(primary.getMetadata().getName(), primary.getMetadata().getNamespace()));
}
}

private static <P extends HasMetadata> void logWarnIfResourceVersionPresent(P primary) {
if (primary.getMetadata().getResourceVersion() != null) {
log.warn(
"Primary resource version is NOT null, for caching with optimistic locking use"
+ " alternative methods. Name: {} namespace: {}",
primary.getMetadata().getName(),
primary.getMetadata().getNamespace());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.javaoperatorsdk.operator.api.reconciler.support;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiPredicate;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class PrimaryResourceCache<P extends HasMetadata> {

private final BiPredicate<Pair<P>, P> evictionPredicate;
private final ConcurrentHashMap<ResourceID, Pair<P>> cache = new ConcurrentHashMap<>();

public PrimaryResourceCache(BiPredicate<Pair<P>, P> evictionPredicate) {
this.evictionPredicate = evictionPredicate;
}

public PrimaryResourceCache() {
this(new ResourceVersionParsingEvictionPredicate<>());
}

public void cacheResource(P afterUpdate) {
var resourceId = ResourceID.fromResource(afterUpdate);
cache.put(resourceId, new Pair<>(null, afterUpdate));
}

public void cacheResource(P beforeUpdate, P afterUpdate) {
var resourceId = ResourceID.fromResource(beforeUpdate);
cache.put(resourceId, new Pair<>(beforeUpdate, afterUpdate));
}

public P getFreshResource(P newVersion) {
var resourceId = ResourceID.fromResource(newVersion);
var pair = cache.get(resourceId);
if (pair == null) {
return newVersion;
}
if (!newVersion.getMetadata().getUid().equals(pair.afterUpdate().getMetadata().getUid())) {
cache.remove(resourceId);
return newVersion;
}
if (evictionPredicate.test(pair, newVersion)) {
cache.remove(resourceId);
return newVersion;
} else {
return pair.afterUpdate();
}
}

public void cleanup(P resource) {
cache.remove(ResourceID.fromResource(resource));
}

public record Pair<T extends HasMetadata>(T beforeUpdate, T afterUpdate) {}

/** This works in general, but it does not strictly follow the contract with k8s API */
public static class ResourceVersionParsingEvictionPredicate<T extends HasMetadata>
implements BiPredicate<Pair<T>, T> {
@Override
public boolean test(Pair<T> updatePair, T newVersion) {
return Long.parseLong(updatePair.afterUpdate().getMetadata().getResourceVersion())
<= Long.parseLong(newVersion.getMetadata().getResourceVersion());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public Stream<? extends EventSource<?, P>> getEventSourcesStream() {
return eventSources.flatMappedSources();
}

@Override
public ControllerEventSource<P> getControllerEventSource() {
return eventSources.controllerEventSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;

public interface EventSourceRetriever<P extends HasMetadata> {

Expand All @@ -17,6 +18,8 @@ default <R> EventSource<R, P> getEventSourceFor(Class<R> dependentType) {

<R> List<EventSource<R, P>> getEventSourcesFor(Class<R> dependentType);

ControllerEventSource<P> getControllerEventSource();

/**
* Registers (and starts) the specified {@link EventSource} dynamically during the reconciliation.
* If an EventSource is already registered with the specified name, the registration will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

Expand Down Expand Up @@ -167,9 +167,9 @@ public synchronized boolean isKnownResourceVersion(T resource) {
}

/**
* @return true if {@link InformerEventSourceConfiguration#parseResourceVersions()} is enabled and
* the resourceVersion of newResource is numerically greater than cachedResource, otherwise
* false
* @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()}
* is enabled and the resourceVersion of newResource is numerically greater than
* cachedResource, otherwise false
*/
private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) {
try {
Expand Down
Loading