diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java index dd2844d9f8..672b48e540 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java @@ -24,9 +24,22 @@ @SuppressWarnings("unchecked") public interface ResourceCache extends Cache { + /** + * Lists all resources in the given namespace. + * + * @param namespace the namespace to list resources from + * @return a stream of all cached resources in the namespace + */ default Stream list(String namespace) { return list(namespace, TRUE); } + /** + * Lists resources in the given namespace that match the provided predicate. + * + * @param namespace the namespace to list resources from + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(String namespace, Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java index ffcfd2df58..c93262b5d9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java @@ -25,17 +25,45 @@ public interface Cache { Predicate TRUE = (a) -> true; + /** + * Retrieves a resource from the cache by its {@link ResourceID}. + * + * @param resourceID the identifier of the resource + * @return an optional containing the resource if present in the cache + */ Optional get(ResourceID resourceID); + /** + * Checks whether a resource with the given {@link ResourceID} exists in the cache. + * + * @param resourceID the identifier of the resource + * @return {@code true} if the resource is present in the cache + */ default boolean contains(ResourceID resourceID) { return get(resourceID).isPresent(); } + /** + * Returns a stream of all {@link ResourceID}s currently in the cache. + * + * @return a stream of resource identifiers + */ Stream keys(); + /** + * Lists all resources in the cache. + * + * @return a stream of all cached resources + */ default Stream list() { return list(TRUE); } + /** + * Lists resources in the cache that match the provided predicate. + * + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 69a5f36bf4..6debf13047 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -234,52 +234,52 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); + return mergeWithWithTempCacheResources( + manager().list(namespace, predicate), namespace, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(Predicate predicate) { - return cache.list(predicate); - } - - @Override - public List byIndex(String indexName, String indexKey) { - return manager().byIndex(indexName, indexKey); - } - - public Stream byIndexStream(String indexName, String indexKey) { - return manager().byIndexStream(indexName, indexKey); + return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); } /** - * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. */ - public Stream listWithStrongConsistency(String namespace, Predicate predicate) { + @Override + public Stream byIndexStream(String indexName, String indexKey) { return mergeWithWithTempCacheResources( - manager().list(namespace, predicate), namespace, predicate); - } - - /** - * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when - * resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. - */ - public Stream listWithStrongConsistency(Predicate predicate) { - return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); + manager().byIndexStream(indexName, indexKey), indexName, indexKey); } /** - * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. */ - public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { + @Override + public List byIndex(String indexName, String indexKey) { return mergeWithWithTempCacheResources( - manager().byIndexStream(indexName, indexKey), indexName, indexKey); + manager().byIndexStream(indexName, indexKey), indexName, indexKey) + .collect(Collectors.toList()); } private Stream mergeWithWithTempCacheResources( @@ -359,9 +359,19 @@ private Stream mergeWithWithTempCacheResources( return Stream.concat(tempResourceStream, upToDateList.stream()); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Keys from the temporary resource + * cache (ghost resources) are included in the result. + */ @Override public Stream keys() { - return cache.keys(); + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return manager().keys(); + } + var tempKeys = temporaryResourceCache.getResources().keySet(); + return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k))); } @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 7313cc3a48..fde605caa7 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -541,7 +541,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { } @Test - void listWithStrongConsistencyReplacesResourceFromTempCache() { + void listReplacesResourceFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); @@ -553,13 +553,13 @@ void listWithStrongConsistencyReplacesResourceFromTempCache() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(newer); } @Test - void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { + void listKeepsResourceWhenNotInTempCache() { var original = testDeployment(); when(temporaryResourceCache.getResources()).thenReturn(Map.of()); @@ -568,13 +568,13 @@ void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + var result = informerEventSource.list("default", r -> true).toList(); assertThat(result).containsExactly(original); } @Test - void listWithStrongConsistencyReplacesOnlyMatchingResources() { + void listReplacesOnlyMatchingResources() { var dep1 = testDeployment(); var dep2 = testDeployment(); dep2.getMetadata().setName("other"); @@ -588,13 +588,13 @@ void listWithStrongConsistencyReplacesOnlyMatchingResources() { when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2)); doReturn(informerManager).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); } @Test - void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { + void byIndexStreamReplacesFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); @@ -607,13 +607,13 @@ void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { doReturn(informerManager).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(newer); } @Test - void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void listKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); @@ -626,13 +626,13 @@ void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(original); } @Test - void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); @@ -646,13 +646,13 @@ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion doReturn(mim).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(original); } @Test - void listWithStrongConsistencyAddsGhostResources() { + void listAddsGhostResources() { var resource = testDeployment(); var ghostResource = testDeployment(); ghostResource.getMetadata().setName("ghost"); @@ -664,11 +664,54 @@ void listWithStrongConsistencyAddsGhostResources() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); } + @Test + void keysIncludesGhostResourceKeys() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + var resourceId = ResourceID.fromResource(resource); + var ghostResourceId = ResourceID.fromResource(ghostResource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(ghostResourceId, ghostResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(ghostResourceId)).thenReturn(false); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactlyInAnyOrder(resourceId, ghostResourceId); + } + + @Test + void keysDoesNotDuplicateExistingKeys() { + var resource = testDeployment(); + var newerResource = testDeployment(); + newerResource.getMetadata().setResourceVersion("5"); + + var resourceId = ResourceID.fromResource(resource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(resourceId, newerResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(resourceId)).thenReturn(true); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactly(resourceId); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta());