Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,22 @@
@SuppressWarnings("unchecked")
public interface ResourceCache<T extends HasMetadata> extends Cache<T> {

/**
* Lists all resources in the given namespace.
*
* @param namespace the namespace to list resources from
* @return a stream of all cached resources in the namespace
*/
default Stream<T> list(String namespace) {
return list(namespace, TRUE);
}

/**
* Lists resources in the given namespace that match the provided predicate.
*
* @param namespace the namespace to list resources from
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(String namespace, Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,45 @@
public interface Cache<T> {
Predicate TRUE = (a) -> true;

/**
* Retrieves a resource from the cache by its {@link ResourceID}.
*
* @param resourceID the identifier of the resource
* @return an optional containing the resource if present in the cache
*/
Optional<T> get(ResourceID resourceID);

/**
* Checks whether a resource with the given {@link ResourceID} exists in the cache.
*
* @param resourceID the identifier of the resource
* @return {@code true} if the resource is present in the cache
*/
default boolean contains(ResourceID resourceID) {
return get(resourceID).isPresent();
}

/**
* Returns a stream of all {@link ResourceID}s currently in the cache.
*
* @return a stream of resource identifiers
*/
Stream<ResourceID> keys();

/**
* Lists all resources in the cache.
*
* @return a stream of all cached resources
*/
default Stream<T> list() {
return list(TRUE);
}

/**
* Lists resources in the cache that match the provided predicate.
*
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,52 +234,52 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
this.indexers.putAll(indexers);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
Comment on lines +238 to +242
@Override
public Stream<R> list(String namespace, Predicate<R> predicate) {
return manager().list(namespace, predicate);
return mergeWithWithTempCacheResources(
manager().list(namespace, predicate), namespace, predicate);
Comment on lines +237 to +246
}
Comment on lines +244 to 247
Comment on lines +245 to 247

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public Stream<R> list(Predicate<R> predicate) {
return cache.list(predicate);
}

@Override
public List<R> byIndex(String indexName, String indexKey) {
return manager().byIndex(indexName, indexKey);
}

public Stream<R> byIndexStream(String indexName, String indexKey) {
return manager().byIndexStream(indexName, indexKey);
return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate);
}

/**
* Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is
* useful when resources are updated using {@link
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
public Stream<R> listWithStrongConsistency(String namespace, Predicate<R> predicate) {
@Override
public Stream<R> byIndexStream(String indexName, String indexKey) {
return mergeWithWithTempCacheResources(
manager().list(namespace, predicate), namespace, predicate);
}

/**
* Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when
* resources are updated using {@link
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
*/
public Stream<R> listWithStrongConsistency(Predicate<R> predicate) {
return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate);
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
}
Comment on lines +267 to 270
Comment on lines 260 to 270

/**
* Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is
* useful when resources are updated using {@link
* io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
public Stream<R> byIndexStreamWithStrongConsistency(String indexName, String indexKey) {
@Override
public List<R> byIndex(String indexName, String indexKey) {
return mergeWithWithTempCacheResources(
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
manager().byIndexStream(indexName, indexKey), indexName, indexKey)
.collect(Collectors.toList());
}

private Stream<R> mergeWithWithTempCacheResources(
Expand Down Expand Up @@ -359,9 +359,19 @@ private Stream<R> mergeWithWithTempCacheResources(
return Stream.concat(tempResourceStream, upToDateList.stream());
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Keys from the temporary resource
* cache (ghost resources) are included in the result.
*/
@Override
public Stream<ResourceID> keys() {
return cache.keys();
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return manager().keys();
}
var tempKeys = temporaryResourceCache.getResources().keySet();
return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
}

@Test
void listWithStrongConsistencyReplacesResourceFromTempCache() {
void listReplacesResourceFromTempCache() {
var original = testDeployment();
var newer = testDeployment();
newer.getMetadata().setResourceVersion("5");
Expand All @@ -553,13 +553,13 @@ void listWithStrongConsistencyReplacesResourceFromTempCache() {
when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
doReturn(mim).when(informerEventSource).manager();

var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
var result = informerEventSource.list(null, r -> true).toList();

assertThat(result).containsExactly(newer);
}

@Test
void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() {
void listKeepsResourceWhenNotInTempCache() {
var original = testDeployment();

when(temporaryResourceCache.getResources()).thenReturn(Map.of());
Expand All @@ -568,13 +568,13 @@ void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() {
when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
doReturn(mim).when(informerEventSource).manager();

var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList();
var result = informerEventSource.list("default", r -> true).toList();

assertThat(result).containsExactly(original);
}

@Test
void listWithStrongConsistencyReplacesOnlyMatchingResources() {
void listReplacesOnlyMatchingResources() {
var dep1 = testDeployment();
var dep2 = testDeployment();
dep2.getMetadata().setName("other");
Expand All @@ -588,13 +588,13 @@ void listWithStrongConsistencyReplacesOnlyMatchingResources() {
when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2));
doReturn(informerManager).when(informerEventSource).manager();

var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
var result = informerEventSource.list(null, r -> true).toList();

assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2);
}

@Test
void byIndexStreamWithStrongConsistencyReplacesFromTempCache() {
void byIndexStreamReplacesFromTempCache() {
var original = testDeployment();
var newer = testDeployment();
newer.getMetadata().setResourceVersion("5");
Expand All @@ -607,13 +607,13 @@ void byIndexStreamWithStrongConsistencyReplacesFromTempCache() {
doReturn(informerManager).when(informerEventSource).manager();
informerEventSource.addIndexers(Map.of("idx", d -> List.of("key")));

var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList();
var result = informerEventSource.byIndexStream("idx", "key").toList();

assertThat(result).containsExactly(newer);
}

@Test
void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() {
void listKeepsResourceWhenTempCacheHasOlderVersion() {
var original = testDeployment();
original.getMetadata().setResourceVersion("5");
var olderTemp = testDeployment();
Expand All @@ -626,13 +626,13 @@ void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() {
when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original));
doReturn(mim).when(informerEventSource).manager();

var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
var result = informerEventSource.list(null, r -> true).toList();

assertThat(result).containsExactly(original);
}

@Test
void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() {
void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() {
var original = testDeployment();
original.getMetadata().setResourceVersion("5");
var olderTemp = testDeployment();
Expand All @@ -646,13 +646,13 @@ void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion
doReturn(mim).when(informerEventSource).manager();
informerEventSource.addIndexers(Map.of("idx", d -> List.of("key")));

var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList();
var result = informerEventSource.byIndexStream("idx", "key").toList();

assertThat(result).containsExactly(original);
}

@Test
void listWithStrongConsistencyAddsGhostResources() {
void listAddsGhostResources() {
var resource = testDeployment();
var ghostResource = testDeployment();
ghostResource.getMetadata().setName("ghost");
Expand All @@ -664,11 +664,54 @@ void listWithStrongConsistencyAddsGhostResources() {
when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource));
doReturn(mim).when(informerEventSource).manager();

var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList();
var result = informerEventSource.list(null, r -> true).toList();

assertThat(result).containsExactlyInAnyOrder(resource, ghostResource);
}

@Test
void keysIncludesGhostResourceKeys() {
var resource = testDeployment();
var ghostResource = testDeployment();
ghostResource.getMetadata().setName("ghost");

var resourceId = ResourceID.fromResource(resource);
var ghostResourceId = ResourceID.fromResource(ghostResource);

when(temporaryResourceCache.getResources()).thenReturn(Map.of(ghostResourceId, ghostResource));
when(temporaryResourceCache.isEmpty()).thenReturn(false);

var mim = mock(InformerManager.class);
when(mim.keys()).thenReturn(Stream.of(resourceId));
when(mim.contains(ghostResourceId)).thenReturn(false);
doReturn(mim).when(informerEventSource).manager();

var result = informerEventSource.keys().toList();

assertThat(result).containsExactlyInAnyOrder(resourceId, ghostResourceId);
}

@Test
void keysDoesNotDuplicateExistingKeys() {
var resource = testDeployment();
var newerResource = testDeployment();
newerResource.getMetadata().setResourceVersion("5");

var resourceId = ResourceID.fromResource(resource);

when(temporaryResourceCache.getResources()).thenReturn(Map.of(resourceId, newerResource));
when(temporaryResourceCache.isEmpty()).thenReturn(false);

var mim = mock(InformerManager.class);
when(mim.keys()).thenReturn(Stream.of(resourceId));
when(mim.contains(resourceId)).thenReturn(true);
doReturn(mim).when(informerEventSource).manager();

var result = informerEventSource.keys().toList();

assertThat(result).containsExactly(resourceId);
}

Deployment testDeployment() {
Deployment deployment = new Deployment();
deployment.setMetadata(new ObjectMeta());
Expand Down
Loading