From c5c4bcd8e94cb7d06f66174a3bfbe86c033e950f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 7 May 2026 13:53:49 +0200 Subject: [PATCH 1/5] improve: use the default list methods with read-cache-after-write consistency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the index and list related methods "strong consistency variant". The rational behind this change is that the user either uses read-cache-after-write consistency feature, than the "strong read" should be used. If on the other hand tha feature, is not used, the current implementation does not impose any overhead. In addition to that strong consistency is not a good name, we call this feature as read-cache-after-write consistency, that might be too long though. Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 31 ++++++------------- .../informer/InformerEventSourceTest.java | 14 ++++----- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 69a5f36bf4..0f946ca010 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -234,31 +234,12 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } - @Override - public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); - } - - @Override - public Stream list(Predicate predicate) { - return cache.list(predicate); - } - - @Override - public List byIndex(String indexName, String indexKey) { - return manager().byIndex(indexName, indexKey); - } - - public Stream byIndexStream(String indexName, String indexKey) { - return manager().byIndexStream(indexName, indexKey); - } - /** * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is * useful when resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ - public Stream listWithStrongConsistency(String namespace, Predicate predicate) { + public Stream list(String namespace, Predicate predicate) { return mergeWithWithTempCacheResources( manager().list(namespace, predicate), namespace, predicate); } @@ -268,7 +249,7 @@ public Stream listWithStrongConsistency(String namespace, Predicate predic * resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ - public Stream listWithStrongConsistency(Predicate predicate) { + public Stream list(Predicate predicate) { return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); } @@ -277,11 +258,17 @@ public Stream listWithStrongConsistency(Predicate predicate) { * useful when resources are updated using {@link * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. */ - public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { + public Stream byIndexStream(String indexName, String indexKey) { return mergeWithWithTempCacheResources( manager().byIndexStream(indexName, indexKey), indexName, indexKey); } + public List byIndex(String indexName, String indexKey) { + return mergeWithWithTempCacheResources( + manager().byIndexStream(indexName, indexKey), indexName, indexKey) + .toList(); + } + private Stream mergeWithWithTempCacheResources( Stream stream, String indexName, String indexKey) { return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 7313cc3a48..4a5c7b2e96 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -553,7 +553,7 @@ void listWithStrongConsistencyReplacesResourceFromTempCache() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(newer); } @@ -568,7 +568,7 @@ void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + var result = informerEventSource.list("default", r -> true).toList(); assertThat(result).containsExactly(original); } @@ -588,7 +588,7 @@ void listWithStrongConsistencyReplacesOnlyMatchingResources() { when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2)); doReturn(informerManager).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); } @@ -607,7 +607,7 @@ void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { doReturn(informerManager).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(newer); } @@ -626,7 +626,7 @@ void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(original); } @@ -646,7 +646,7 @@ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion doReturn(mim).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(original); } @@ -664,7 +664,7 @@ void listWithStrongConsistencyAddsGhostResources() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); } From 574911fe48e91f0ac8b2e2e9ffeb45569e476b4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 7 May 2026 14:03:23 +0200 Subject: [PATCH 2/5] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 19 ++++--------------- .../informer/InformerEventSourceTest.java | 14 +++++++------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 0f946ca010..f25e65ea7e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -234,35 +234,24 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } - /** - * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. - */ + @Override public Stream list(String namespace, Predicate predicate) { return mergeWithWithTempCacheResources( manager().list(namespace, predicate), namespace, predicate); } - /** - * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when - * resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. - */ + @Override public Stream list(Predicate predicate) { return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); } - /** - * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. - */ + @Override public Stream byIndexStream(String indexName, String indexKey) { return mergeWithWithTempCacheResources( manager().byIndexStream(indexName, indexKey), indexName, indexKey); } + @Override public List byIndex(String indexName, String indexKey) { return mergeWithWithTempCacheResources( manager().byIndexStream(indexName, indexKey), indexName, indexKey) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 4a5c7b2e96..fad61fe940 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -541,7 +541,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { } @Test - void listWithStrongConsistencyReplacesResourceFromTempCache() { + void listReplacesResourceFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); @@ -559,7 +559,7 @@ void listWithStrongConsistencyReplacesResourceFromTempCache() { } @Test - void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { + void listKeepsResourceWhenNotInTempCache() { var original = testDeployment(); when(temporaryResourceCache.getResources()).thenReturn(Map.of()); @@ -574,7 +574,7 @@ void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { } @Test - void listWithStrongConsistencyReplacesOnlyMatchingResources() { + void listReplacesOnlyMatchingResources() { var dep1 = testDeployment(); var dep2 = testDeployment(); dep2.getMetadata().setName("other"); @@ -594,7 +594,7 @@ void listWithStrongConsistencyReplacesOnlyMatchingResources() { } @Test - void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { + void byIndexStreamReplacesFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); @@ -613,7 +613,7 @@ void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { } @Test - void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void listKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); @@ -632,7 +632,7 @@ void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { } @Test - void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); @@ -652,7 +652,7 @@ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion } @Test - void listWithStrongConsistencyAddsGhostResources() { + void listAddsGhostResources() { var resource = testDeployment(); var ghostResource = testDeployment(); ghostResource.getMetadata().setName("ghost"); From 99a4092e0e9208b867ac44a6171773097d426b6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 7 May 2026 14:07:46 +0200 Subject: [PATCH 3/5] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/reconciler/ResourceCache.java | 13 +++++++++ .../processing/event/source/Cache.java | 28 +++++++++++++++++++ .../informer/ManagedInformerEventSource.java | 24 ++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java index dd2844d9f8..672b48e540 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java @@ -24,9 +24,22 @@ @SuppressWarnings("unchecked") public interface ResourceCache extends Cache { + /** + * Lists all resources in the given namespace. + * + * @param namespace the namespace to list resources from + * @return a stream of all cached resources in the namespace + */ default Stream list(String namespace) { return list(namespace, TRUE); } + /** + * Lists resources in the given namespace that match the provided predicate. + * + * @param namespace the namespace to list resources from + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(String namespace, Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java index ffcfd2df58..c93262b5d9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java @@ -25,17 +25,45 @@ public interface Cache { Predicate TRUE = (a) -> true; + /** + * Retrieves a resource from the cache by its {@link ResourceID}. + * + * @param resourceID the identifier of the resource + * @return an optional containing the resource if present in the cache + */ Optional get(ResourceID resourceID); + /** + * Checks whether a resource with the given {@link ResourceID} exists in the cache. + * + * @param resourceID the identifier of the resource + * @return {@code true} if the resource is present in the cache + */ default boolean contains(ResourceID resourceID) { return get(resourceID).isPresent(); } + /** + * Returns a stream of all {@link ResourceID}s currently in the cache. + * + * @return a stream of resource identifiers + */ Stream keys(); + /** + * Lists all resources in the cache. + * + * @return a stream of all cached resources + */ default Stream list() { return list(TRUE); } + /** + * Lists resources in the cache that match the provided predicate. + * + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index f25e65ea7e..346518186f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -234,23 +234,47 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(String namespace, Predicate predicate) { return mergeWithWithTempCacheResources( manager().list(namespace, predicate), namespace, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(Predicate predicate) { return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream byIndexStream(String indexName, String indexKey) { return mergeWithWithTempCacheResources( manager().byIndexStream(indexName, indexKey), indexName, indexKey); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public List byIndex(String indexName, String indexKey) { return mergeWithWithTempCacheResources( From 24dadbf7e4f2c2e8dd80799778cdd3e3057b0021 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 7 May 2026 14:21:49 +0200 Subject: [PATCH 4/5] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 346518186f..1fbfc9ec4a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -279,7 +279,7 @@ public Stream byIndexStream(String indexName, String indexKey) { public List byIndex(String indexName, String indexKey) { return mergeWithWithTempCacheResources( manager().byIndexStream(indexName, indexKey), indexName, indexKey) - .toList(); + .collect(Collectors.toList()); } private Stream mergeWithWithTempCacheResources( From 2bfb6dac6b55a6160ce722ab2fa40f900f2432bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 7 May 2026 14:53:49 +0200 Subject: [PATCH 5/5] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 12 +++++- .../informer/InformerEventSourceTest.java | 43 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 1fbfc9ec4a..6debf13047 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -359,9 +359,19 @@ private Stream mergeWithWithTempCacheResources( return Stream.concat(tempResourceStream, upToDateList.stream()); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Keys from the temporary resource + * cache (ghost resources) are included in the result. + */ @Override public Stream keys() { - return cache.keys(); + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return manager().keys(); + } + var tempKeys = temporaryResourceCache.getResources().keySet(); + return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k))); } @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index fad61fe940..fde605caa7 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -669,6 +669,49 @@ void listAddsGhostResources() { assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); } + @Test + void keysIncludesGhostResourceKeys() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + var resourceId = ResourceID.fromResource(resource); + var ghostResourceId = ResourceID.fromResource(ghostResource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(ghostResourceId, ghostResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(ghostResourceId)).thenReturn(false); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactlyInAnyOrder(resourceId, ghostResourceId); + } + + @Test + void keysDoesNotDuplicateExistingKeys() { + var resource = testDeployment(); + var newerResource = testDeployment(); + newerResource.getMetadata().setResourceVersion("5"); + + var resourceId = ResourceID.fromResource(resource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(resourceId, newerResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(resourceId)).thenReturn(true); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactly(resourceId); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta());