From a1743df57dcddb5e1855b3310a6202dd9d8843ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 13 Mar 2026 16:02:56 +0100 Subject: [PATCH 1/5] chore: update version to 999-SNAPSHOT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- bootstrapper-maven-plugin/pom.xml | 2 +- caffeine-bounded-cache-support/pom.xml | 2 +- micrometer-support/pom.xml | 2 +- operator-framework-bom/pom.xml | 2 +- operator-framework-core/pom.xml | 2 +- operator-framework-junit/pom.xml | 2 +- operator-framework/pom.xml | 2 +- pom.xml | 2 +- sample-operators/controller-namespace-deletion/pom.xml | 2 +- sample-operators/leader-election/pom.xml | 2 +- sample-operators/metrics-processing/pom.xml | 7 +------ sample-operators/mysql-schema/pom.xml | 2 +- sample-operators/pom.xml | 2 +- sample-operators/tomcat-operator/pom.xml | 2 +- sample-operators/webpage/pom.xml | 2 +- test-index-processor/pom.xml | 2 +- 16 files changed, 16 insertions(+), 21 deletions(-) diff --git a/bootstrapper-maven-plugin/pom.xml b/bootstrapper-maven-plugin/pom.xml index cf383f0a47..9edf011170 100644 --- a/bootstrapper-maven-plugin/pom.xml +++ b/bootstrapper-maven-plugin/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT bootstrapper diff --git a/caffeine-bounded-cache-support/pom.xml b/caffeine-bounded-cache-support/pom.xml index 873173cd92..be70ab9a2e 100644 --- a/caffeine-bounded-cache-support/pom.xml +++ b/caffeine-bounded-cache-support/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT caffeine-bounded-cache-support diff --git a/micrometer-support/pom.xml b/micrometer-support/pom.xml index 8437197c5a..ae3c4d0be1 100644 --- a/micrometer-support/pom.xml +++ b/micrometer-support/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT micrometer-support diff --git a/operator-framework-bom/pom.xml b/operator-framework-bom/pom.xml index 4627e79210..7068fb4987 100644 --- a/operator-framework-bom/pom.xml +++ b/operator-framework-bom/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk operator-framework-bom - 5.3.4-SNAPSHOT + 999-SNAPSHOT pom Operator SDK - Bill of Materials Java SDK for implementing Kubernetes operators diff --git a/operator-framework-core/pom.xml b/operator-framework-core/pom.xml index 8fb28c8d73..2356433ca9 100644 --- a/operator-framework-core/pom.xml +++ b/operator-framework-core/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT ../pom.xml diff --git a/operator-framework-junit/pom.xml b/operator-framework-junit/pom.xml index e43c38e534..aa18d5c778 100644 --- a/operator-framework-junit/pom.xml +++ b/operator-framework-junit/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT operator-framework-junit diff --git a/operator-framework/pom.xml b/operator-framework/pom.xml index 90fc061b0f..f94dfa757d 100644 --- a/operator-framework/pom.xml +++ b/operator-framework/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT operator-framework diff --git a/pom.xml b/pom.xml index 7fa7964a9b..3059ee2a00 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT pom Operator SDK for Java Java SDK for implementing Kubernetes operators diff --git a/sample-operators/controller-namespace-deletion/pom.xml b/sample-operators/controller-namespace-deletion/pom.xml index bcda986cd3..33b0227db6 100644 --- a/sample-operators/controller-namespace-deletion/pom.xml +++ b/sample-operators/controller-namespace-deletion/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk sample-operators - 5.3.4-SNAPSHOT + 999-SNAPSHOT sample-controller-namespace-deletion diff --git a/sample-operators/leader-election/pom.xml b/sample-operators/leader-election/pom.xml index e1df94fa1e..4f896485d1 100644 --- a/sample-operators/leader-election/pom.xml +++ b/sample-operators/leader-election/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk sample-operators - 5.3.4-SNAPSHOT + 999-SNAPSHOT sample-leader-election diff --git a/sample-operators/metrics-processing/pom.xml b/sample-operators/metrics-processing/pom.xml index c67f623e33..0476ea927d 100644 --- a/sample-operators/metrics-processing/pom.xml +++ b/sample-operators/metrics-processing/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk sample-operators - 5.3.4-SNAPSHOT + 999-SNAPSHOT sample-metrics-processing @@ -101,11 +101,6 @@ metrics-processing-operator - - - -Dlog4j.configurationFile=/config/log4j2.xml - - diff --git a/sample-operators/mysql-schema/pom.xml b/sample-operators/mysql-schema/pom.xml index ace10a21ad..7f0d9219f4 100644 --- a/sample-operators/mysql-schema/pom.xml +++ b/sample-operators/mysql-schema/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk sample-operators - 5.3.4-SNAPSHOT + 999-SNAPSHOT sample-mysql-schema-operator diff --git a/sample-operators/pom.xml b/sample-operators/pom.xml index 25a745012c..c8a70b2701 100644 --- a/sample-operators/pom.xml +++ b/sample-operators/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT sample-operators diff --git a/sample-operators/tomcat-operator/pom.xml b/sample-operators/tomcat-operator/pom.xml index 09ca43036c..0417f337f4 100644 --- a/sample-operators/tomcat-operator/pom.xml +++ b/sample-operators/tomcat-operator/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk sample-operators - 5.3.4-SNAPSHOT + 999-SNAPSHOT sample-tomcat-operator diff --git a/sample-operators/webpage/pom.xml b/sample-operators/webpage/pom.xml index 67435ef617..3f23c6f8de 100644 --- a/sample-operators/webpage/pom.xml +++ b/sample-operators/webpage/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk sample-operators - 5.3.4-SNAPSHOT + 999-SNAPSHOT sample-webpage-operator diff --git a/test-index-processor/pom.xml b/test-index-processor/pom.xml index 51e15247ff..2ae7c5f454 100644 --- a/test-index-processor/pom.xml +++ b/test-index-processor/pom.xml @@ -22,7 +22,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT test-index-processor From 533cbdea251cbadf328947bd00f08997ef1d4dbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 30 Mar 2026 13:55:55 +0200 Subject: [PATCH 2/5] fix: set migration module version to correct one (#3263) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- migration/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migration/pom.xml b/migration/pom.xml index aa4f62a5e4..47c7fe36b0 100644 --- a/migration/pom.xml +++ b/migration/pom.xml @@ -21,7 +21,7 @@ io.javaoperatorsdk java-operator-sdk - 5.3.4-SNAPSHOT + 999-SNAPSHOT migration From b6f199acd2426b9c5d8a5b442fc08a94e0566510 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 29 Apr 2026 15:25:39 +0200 Subject: [PATCH 3/5] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/InformerManager.java | 1 - .../source/pool/AbstractEventSourcePool.java | 3 ++ .../event/source/pool/EventSourcePool.java | 8 ++++ .../event/source/pool/InformerClassifier.java | 10 +++++ .../event/source/pool/InformerPool.java | 38 +++++++++++++++++++ 5 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 6632ce631e..635212564a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -149,7 +149,6 @@ private InformerWrapper createEventSource( ResourceEventHandler eventHandler, String namespaceIdentifier) { final var informerConfig = configuration.getInformerConfig(); - if (informerConfig.getFieldSelector() != null && !informerConfig.getFieldSelector().getFields().isEmpty()) { for (var f : informerConfig.getFieldSelector().getFields()) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java new file mode 100644 index 0000000000..3c371fbc12 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java @@ -0,0 +1,3 @@ +package io.javaoperatorsdk.operator.processing.event.source.pool; + +public abstract class AbstractEventSourcePool implements EventSourcePool {} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java new file mode 100644 index 0000000000..7712cd2d22 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java @@ -0,0 +1,8 @@ +package io.javaoperatorsdk.operator.processing.event.source.pool; + +public interface EventSourcePool { + + T getEventSource(C classifier); + + void removeEventSource(T informerEventSource); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java new file mode 100644 index 0000000000..f0135b0f5f --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java @@ -0,0 +1,10 @@ +package io.javaoperatorsdk.operator.processing.event.source.pool; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.informer.FieldSelector; + +public record InformerClassifier( + String labelSelector, + String namespaceIdentifier, + Class resourceClass, + FieldSelector fieldSelector) {} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java new file mode 100644 index 0000000000..9185bdb15a --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java @@ -0,0 +1,38 @@ +package io.javaoperatorsdk.operator.processing.event.source.pool; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +public class InformerPool + extends AbstractEventSourcePool> { + + private final KubernetesClient client; + + private Map> informers = new ConcurrentHashMap<>(); + private Map, AtomicInteger> counters = new ConcurrentHashMap<>(); + + public InformerPool(KubernetesClient client) { + this.client = client; + } + + @Override + public SharedIndexInformer getEventSource(InformerClassifier classifier) { + var actual = informers.get(classifier); + if (actual == null) { + actual = null; // create Informer + } + incrementCounter(actual); + return null; + } + + private synchronized void incrementCounter(SharedIndexInformer actual) { + counters.compute(actual, (k, v) -> new AtomicInteger(v == null ? 0 : v.incrementAndGet())); + } + + @Override + public void removeEventSource(SharedIndexInformer informerEventSource) {} +} From 0feecd1d22d88952832b371b8f213537a3f6d129 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 4 May 2026 09:48:47 +0200 Subject: [PATCH 4/5] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/pool/AbstractEventSourcePool.java | 15 +++++++++++++++ .../event/source/pool/EventSourcePool.java | 15 +++++++++++++++ .../event/source/pool/InformerClassifier.java | 15 +++++++++++++++ .../event/source/pool/InformerPool.java | 15 +++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java index 3c371fbc12..8ced98edcf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java @@ -1,3 +1,18 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.javaoperatorsdk.operator.processing.event.source.pool; public abstract class AbstractEventSourcePool implements EventSourcePool {} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java index 7712cd2d22..c09061a796 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java @@ -1,3 +1,18 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.javaoperatorsdk.operator.processing.event.source.pool; public interface EventSourcePool { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java index f0135b0f5f..67232e132e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java @@ -1,3 +1,18 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.javaoperatorsdk.operator.processing.event.source.pool; import io.fabric8.kubernetes.api.model.HasMetadata; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java index 9185bdb15a..3be4bb876a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java @@ -1,3 +1,18 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.javaoperatorsdk.operator.processing.event.source.pool; import java.util.Map; From 73c9d76153332d86f2a5ac1071fbc9c7d6be59fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 7 May 2026 15:50:37 +0200 Subject: [PATCH 5/5] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/config/ConfigurationService.java | 6 + .../source/informer/InformerManager.java | 7 +- .../source/informer/InformerWrapper.java | 98 +-------- .../informer/pool/DefaultInformerPool.java | 186 ++++++++++++++++++ .../pool/InformerClassifier.java | 2 +- .../pool/InformerPool.java} | 10 +- .../pool/NoOpInformerPool.java} | 15 +- .../event/source/pool/InformerPool.java | 53 ----- 8 files changed, 220 insertions(+), 157 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/DefaultInformerPool.java rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/{ => informer}/pool/InformerClassifier.java (92%) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/{pool/EventSourcePool.java => informer/pool/InformerPool.java} (66%) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/{pool/AbstractEventSourcePool.java => informer/pool/NoOpInformerPool.java} (60%) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 6ed9b7ff64..4b1fb4320e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -22,6 +22,8 @@ import java.util.concurrent.Executors; import java.util.function.Consumer; +import io.javaoperatorsdk.operator.processing.event.source.informer.pool.DefaultInformerPool; +import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -476,4 +478,8 @@ default boolean useSSAToPatchPrimaryResource() { default boolean cloneSecondaryResourcesWhenGettingFromCache() { return false; } + + default InformerPool informerPool() { + return new DefaultInformerPool(getKubernetesClient(),this); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 635212564a..7aa5081293 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -22,6 +22,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.javaoperatorsdk.operator.processing.event.source.informer.pool.InformerPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,7 @@ class InformerManager> private final ResourceEventHandler eventHandler; private final Map>> indexers = new HashMap<>(); private ControllerConfiguration controllerConfiguration; + private InformerPool informerPool; InformerManager( MixedOperation, Resource> client, @@ -67,6 +69,7 @@ class InformerManager> void setControllerConfiguration(ControllerConfiguration controllerConfiguration) { this.controllerConfiguration = controllerConfiguration; + this.controllerConfiguration.getConfigurationService().informerPool(); } @Override @@ -166,9 +169,7 @@ private InformerWrapper createEventSource( .orElse(filteredBySelectorClient) .runnableInformer(0); Optional.ofNullable(informerConfig.getItemStore()).ifPresent(informer::itemStore); - var source = - new InformerWrapper<>( - informer, controllerConfiguration.getConfigurationService(), namespaceIdentifier); + var source = new InformerWrapper<>(informer, namespaceIdentifier); source.addEventHandler(eventHandler); sources.put(namespaceIdentifier, source); return source; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 2f57f879b8..29290f6c13 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -18,9 +18,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; @@ -28,15 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.fabric8.kubernetes.api.model.GenericKubernetesResource; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.informers.ExceptionHandler; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.LifecycleAware; @@ -51,99 +44,16 @@ class InformerWrapper private final SharedIndexInformer informer; private final Cache cache; private final String namespaceIdentifier; - private final ConfigurationService configurationService; - public InformerWrapper( - SharedIndexInformer informer, - ConfigurationService configurationService, - String namespaceIdentifier) { + public InformerWrapper(SharedIndexInformer informer, String namespaceIdentifier) { this.informer = informer; this.namespaceIdentifier = namespaceIdentifier; this.cache = (Cache) informer.getStore(); - this.configurationService = configurationService; } @Override - public void start() throws OperatorException { - try { - - // register stopped handler if we have one defined - configurationService - .getInformerStoppedHandler() - .ifPresent( - ish -> { - final var stopped = informer.stopped(); - if (stopped != null) { - stopped.handle( - (res, ex) -> { - ish.onStop(informer, ex); - return null; - }); - } else { - final var apiTypeClass = informer.getApiTypeClass(); - final var fullResourceName = HasMetadata.getFullResourceName(apiTypeClass); - final var version = HasMetadata.getVersion(apiTypeClass); - throw new IllegalStateException( - "Cannot retrieve 'stopped' callback to listen to informer stopping for" - + " informer for " - + fullResourceName - + "/" - + version); - } - }); - if (!configurationService.stopOnInformerErrorDuringStartup()) { - informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t)); - } - // change thread name for easier debugging - final var thread = Thread.currentThread(); - final var name = thread.getName(); - try { - thread.setName(informerInfo() + " " + thread.getId()); - final var resourceName = informer.getApiTypeClass().getSimpleName(); - log.debug( - "Starting informer for namespace: {} resource: {}", namespaceIdentifier, resourceName); - var start = informer.start(); - // note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is - // false, and there is a rbac issue the get never returns; therefore operator never really - // starts - log.trace( - "Waiting informer to start namespace: {} resource: {}", - namespaceIdentifier, - resourceName); - start - .toCompletableFuture() - .get(configurationService.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS); - log.debug( - "Started informer for namespace: {} resource: {}", namespaceIdentifier, resourceName); - } catch (TimeoutException | ExecutionException e) { - if (configurationService.stopOnInformerErrorDuringStartup()) { - log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e); - throw new OperatorException(e); - } else { - log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e); - } - } catch (InterruptedException e) { - thread.interrupt(); - throw new IllegalStateException(e); - } finally { - // restore original name - thread.setName(name); - } - - } catch (Exception e) { - ReconcilerUtilsInternal.handleKubernetesClientException( - e, HasMetadata.getFullResourceName(informer.getApiTypeClass())); - throw new OperatorException( - "Couldn't start informer for " + versionedFullResourceName() + " resources", e); - } - } - - private String versionedFullResourceName() { - final var apiTypeClass = informer.getApiTypeClass(); - if (apiTypeClass.isAssignableFrom(GenericKubernetesResource.class)) { - return GenericKubernetesResource.class.getSimpleName(); - } - return ReconcilerUtilsInternal.getResourceTypeNameWithVersion(apiTypeClass); + public void start() { + // no-op: informer initialization is handled by InformerPool } @Override @@ -201,7 +111,7 @@ public String toString() { } private String informerInfo() { - return "InformerWrapper [" + versionedFullResourceName() + "]"; + return "InformerWrapper [" + informer.getApiTypeClass().getSimpleName() + "]"; } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/DefaultInformerPool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/DefaultInformerPool.java new file mode 100644 index 0000000000..799d89ffb3 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/DefaultInformerPool.java @@ -0,0 +1,186 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.processing.event.source.informer.pool; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.informers.ExceptionHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; + +import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACES; + +public class DefaultInformerPool implements InformerPool { + + private static final Logger log = LoggerFactory.getLogger(DefaultInformerPool.class); + + private final KubernetesClient client; + private final ConfigurationService configurationService; + + private final Map> informers = new HashMap<>(); + private final Map, AtomicInteger> counters = new HashMap<>(); + + public DefaultInformerPool(KubernetesClient client, ConfigurationService configurationService) { + this.client = client; + this.configurationService = configurationService; + } + + public synchronized SharedIndexInformer getResource(InformerClassifier classifier) { + var informer = informers.get(classifier); + if (informer == null) { + informer = createInformer(client, classifier); + initInformer(informer, classifier.namespaceIdentifier()); + informers.put(classifier, informer); + counters.put(informer, new AtomicInteger(1)); + } else { + counters.get(informer).incrementAndGet(); + } + return informer; + } + + public synchronized void releaseResource(SharedIndexInformer informer) { + var counter = counters.get(informer); + if (counter != null && counter.decrementAndGet() <= 0) { + informer.stop(); + counters.remove(informer); + informers.values().remove(informer); + } else { + log.warn("No informer found in the pool."); + } + } + + private void initInformer(SharedIndexInformer informer, String namespaceIdentifier) { + try { + configurationService + .getInformerStoppedHandler() + .ifPresent( + ish -> { + final var stopped = informer.stopped(); + if (stopped != null) { + stopped.handle( + (res, ex) -> { + ish.onStop(informer, ex); + return null; + }); + } else { + final var apiTypeClass = informer.getApiTypeClass(); + final var fullResourceName = HasMetadata.getFullResourceName(apiTypeClass); + final var version = HasMetadata.getVersion(apiTypeClass); + throw new IllegalStateException( + "Cannot retrieve 'stopped' callback to listen to informer stopping for" + + " informer for " + + fullResourceName + + "/" + + version); + } + }); + if (!configurationService.stopOnInformerErrorDuringStartup()) { + informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t)); + } + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); + try { + thread.setName( + "InformerPool [" + versionedFullResourceName(informer) + "] " + thread.getId()); + final var resourceName = informer.getApiTypeClass().getSimpleName(); + log.debug( + "Starting informer for namespace: {} resource: {}", namespaceIdentifier, resourceName); + var start = informer.start(); + // note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is + // false, and there is a rbac issue the get never returns; therefore operator never really + // starts + log.trace( + "Waiting informer to start namespace: {} resource: {}", + namespaceIdentifier, + resourceName); + start + .toCompletableFuture() + .get(configurationService.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS); + log.debug( + "Started informer for namespace: {} resource: {}", namespaceIdentifier, resourceName); + } catch (TimeoutException | ExecutionException e) { + if (configurationService.stopOnInformerErrorDuringStartup()) { + log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e); + throw new OperatorException(e); + } else { + log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e); + } + } catch (InterruptedException e) { + thread.interrupt(); + throw new IllegalStateException(e); + } finally { + // restore original name + thread.setName(name); + } + } catch (Exception e) { + ReconcilerUtilsInternal.handleKubernetesClientException( + e, HasMetadata.getFullResourceName(informer.getApiTypeClass())); + throw new OperatorException( + "Couldn't start informer for " + versionedFullResourceName(informer) + " resources", e); + } + } + + @SuppressWarnings("unchecked") + private static String versionedFullResourceName(SharedIndexInformer informer) { + return ReconcilerUtilsInternal.getResourceTypeNameWithVersion( + (Class) informer.getApiTypeClass()); + } + + @SuppressWarnings("rawtypes") + static SharedIndexInformer createInformer( + KubernetesClient client, InformerClassifier classifier) { + FilterWatchListDeletable filteredClient; + if (WATCH_ALL_NAMESPACES.equals(classifier.namespaceIdentifier())) { + filteredClient = + client + .resources(classifier.resourceClass()) + .inAnyNamespace() + .withLabelSelector(classifier.labelSelector()); + } else { + filteredClient = + client + .resources(classifier.resourceClass()) + .inNamespace(classifier.namespaceIdentifier()) + .withLabelSelector(classifier.labelSelector()); + } + + if (classifier.fieldSelector() != null && !classifier.fieldSelector().getFields().isEmpty()) { + for (var f : classifier.fieldSelector().getFields()) { + if (f.negated()) { + filteredClient = + (FilterWatchListDeletable) filteredClient.withoutField(f.path(), f.value()); + } else { + filteredClient = (FilterWatchListDeletable) filteredClient.withField(f.path(), f.value()); + } + } + } + return filteredClient.runnableInformer(0); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/InformerClassifier.java similarity index 92% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/InformerClassifier.java index 67232e132e..d823ebd513 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerClassifier.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/InformerClassifier.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.javaoperatorsdk.operator.processing.event.source.pool; +package io.javaoperatorsdk.operator.processing.event.source.informer.pool; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.informer.FieldSelector; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/InformerPool.java similarity index 66% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/InformerPool.java index c09061a796..1ca63e324c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/EventSourcePool.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/InformerPool.java @@ -13,11 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.javaoperatorsdk.operator.processing.event.source.pool; +package io.javaoperatorsdk.operator.processing.event.source.informer.pool; -public interface EventSourcePool { +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; - T getEventSource(C classifier); +public interface InformerPool { - void removeEventSource(T informerEventSource); + SharedIndexInformer getResource(InformerClassifier classifier); + + void releaseResource(SharedIndexInformer informerEventSource); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/NoOpInformerPool.java similarity index 60% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/NoOpInformerPool.java index 8ced98edcf..7ed050dc2e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/AbstractEventSourcePool.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/pool/NoOpInformerPool.java @@ -13,6 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.javaoperatorsdk.operator.processing.event.source.pool; +package io.javaoperatorsdk.operator.processing.event.source.informer.pool; -public abstract class AbstractEventSourcePool implements EventSourcePool {} +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +public class NoOpInformerPool implements InformerPool { + + @Override + public SharedIndexInformer getResource(InformerClassifier classifier) { + return null; + } + + @Override + public void releaseResource(SharedIndexInformer informerEventSource) {} +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java deleted file mode 100644 index 3be4bb876a..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/pool/InformerPool.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright Java Operator SDK Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.javaoperatorsdk.operator.processing.event.source.pool; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; - -public class InformerPool - extends AbstractEventSourcePool> { - - private final KubernetesClient client; - - private Map> informers = new ConcurrentHashMap<>(); - private Map, AtomicInteger> counters = new ConcurrentHashMap<>(); - - public InformerPool(KubernetesClient client) { - this.client = client; - } - - @Override - public SharedIndexInformer getEventSource(InformerClassifier classifier) { - var actual = informers.get(classifier); - if (actual == null) { - actual = null; // create Informer - } - incrementCounter(actual); - return null; - } - - private synchronized void incrementCounter(SharedIndexInformer actual) { - counters.compute(actual, (k, v) -> new AtomicInteger(v == null ? 0 : v.incrementAndGet())); - } - - @Override - public void removeEventSource(SharedIndexInformer informerEventSource) {} -}