Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bootstrapper-maven-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>java-operator-sdk</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>bootstrapper</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion caffeine-bounded-cache-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>java-operator-sdk</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>caffeine-bounded-cache-support</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion micrometer-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>java-operator-sdk</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>micrometer-support</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion migration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>java-operator-sdk</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>migration</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion operator-framework-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework-bom</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>999-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Operator SDK - Bill of Materials</name>
<description>Java SDK for implementing Kubernetes operators</description>
Expand Down
2 changes: 1 addition & 1 deletion operator-framework-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>java-operator-sdk</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>999-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import io.javaoperatorsdk.operator.processing.event.source.informer.pool.DefaultInformerPool;
import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -476,4 +478,8 @@ default boolean useSSAToPatchPrimaryResource() {
default boolean cloneSecondaryResourcesWhenGettingFromCache() {
return false;
}

default InformerPool informerPool() {
return new DefaultInformerPool(getKubernetesClient(),this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,6 +56,7 @@ class InformerManager<R extends HasMetadata, C extends Informable<R>>
private final ResourceEventHandler<R> eventHandler;
private final Map<String, Function<R, List<String>>> indexers = new HashMap<>();
private ControllerConfiguration<R> controllerConfiguration;
private InformerPool informerPool;

InformerManager(
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client,
Expand All @@ -67,6 +69,7 @@ class InformerManager<R extends HasMetadata, C extends Informable<R>>

void setControllerConfiguration(ControllerConfiguration<R> controllerConfiguration) {
this.controllerConfiguration = controllerConfiguration;
this.controllerConfiguration.getConfigurationService().informerPool();
}

@Override
Expand Down Expand Up @@ -149,7 +152,6 @@ private InformerWrapper<R> createEventSource(
ResourceEventHandler<R> eventHandler,
String namespaceIdentifier) {
final var informerConfig = configuration.getInformerConfig();

if (informerConfig.getFieldSelector() != null
&& !informerConfig.getFieldSelector().getFields().isEmpty()) {
for (var f : informerConfig.getFieldSelector().getFields()) {
Expand All @@ -167,9 +169,7 @@ private InformerWrapper<R> createEventSource(
.orElse(filteredBySelectorClient)
.runnableInformer(0);
Optional.ofNullable(informerConfig.getItemStore()).ifPresent(informer::itemStore);
var source =
new InformerWrapper<>(
informer, controllerConfiguration.getConfigurationService(), namespaceIdentifier);
var source = new InformerWrapper<>(informer, namespaceIdentifier);
source.addEventHandler(eventHandler);
sources.put(namespaceIdentifier, source);
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
Expand All @@ -51,99 +44,16 @@ class InformerWrapper<T extends HasMetadata>
private final SharedIndexInformer<T> informer;
private final Cache<T> cache;
private final String namespaceIdentifier;
private final ConfigurationService configurationService;

public InformerWrapper(
SharedIndexInformer<T> informer,
ConfigurationService configurationService,
String namespaceIdentifier) {
public InformerWrapper(SharedIndexInformer<T> informer, String namespaceIdentifier) {
this.informer = informer;
this.namespaceIdentifier = namespaceIdentifier;
this.cache = (Cache<T>) informer.getStore();
this.configurationService = configurationService;
}

@Override
public void start() throws OperatorException {
try {

// register stopped handler if we have one defined
configurationService
.getInformerStoppedHandler()
.ifPresent(
ish -> {
final var stopped = informer.stopped();
if (stopped != null) {
stopped.handle(
(res, ex) -> {
ish.onStop(informer, ex);
return null;
});
} else {
final var apiTypeClass = informer.getApiTypeClass();
final var fullResourceName = HasMetadata.getFullResourceName(apiTypeClass);
final var version = HasMetadata.getVersion(apiTypeClass);
throw new IllegalStateException(
"Cannot retrieve 'stopped' callback to listen to informer stopping for"
+ " informer for "
+ fullResourceName
+ "/"
+ version);
}
});
if (!configurationService.stopOnInformerErrorDuringStartup()) {
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
}
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
thread.setName(informerInfo() + " " + thread.getId());
final var resourceName = informer.getApiTypeClass().getSimpleName();
log.debug(
"Starting informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
var start = informer.start();
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
// starts
log.trace(
"Waiting informer to start namespace: {} resource: {}",
namespaceIdentifier,
resourceName);
start
.toCompletableFuture()
.get(configurationService.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS);
log.debug(
"Started informer for namespace: {} resource: {}", namespaceIdentifier, resourceName);
} catch (TimeoutException | ExecutionException e) {
if (configurationService.stopOnInformerErrorDuringStartup()) {
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
throw new OperatorException(e);
} else {
log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e);
}
} catch (InterruptedException e) {
thread.interrupt();
throw new IllegalStateException(e);
} finally {
// restore original name
thread.setName(name);
}

} catch (Exception e) {
ReconcilerUtilsInternal.handleKubernetesClientException(
e, HasMetadata.getFullResourceName(informer.getApiTypeClass()));
throw new OperatorException(
"Couldn't start informer for " + versionedFullResourceName() + " resources", e);
}
}

private String versionedFullResourceName() {
final var apiTypeClass = informer.getApiTypeClass();
if (apiTypeClass.isAssignableFrom(GenericKubernetesResource.class)) {
return GenericKubernetesResource.class.getSimpleName();
}
return ReconcilerUtilsInternal.getResourceTypeNameWithVersion(apiTypeClass);
public void start() {
// no-op: informer initialization is handled by InformerPool
}

@Override
Expand Down Expand Up @@ -201,7 +111,7 @@ public String toString() {
}

private String informerInfo() {
return "InformerWrapper [" + versionedFullResourceName() + "]";
return "InformerWrapper [" + informer.getApiTypeClass().getSimpleName() + "]";
}

@Override
Expand Down
Loading