From e16745bc66fc348ddc7607a319b741b62c6c245b Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 3 Jun 2026 15:03:52 -0700 Subject: [PATCH] xds: Use ObjectPool to manage scheduler in XdsServerWrapper Also, we should be passing the executor in all tests, as we don't want other threads to be running if we can avoid it. --- .../java/io/grpc/xds/XdsServerBuilder.java | 5 ++- .../java/io/grpc/xds/XdsServerWrapper.java | 34 +++++++++---------- .../XdsClientWrapperForServerSdsTestMisc.java | 4 ++- .../io/grpc/xds/XdsServerWrapperTest.java | 11 +++--- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java index 4a4fb71aa84..792f666d8de 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java @@ -31,6 +31,8 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCredentials; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SharedResourcePool; import io.grpc.netty.InternalNettyServerBuilder; import io.grpc.netty.InternalNettyServerCredentials; import io.grpc.netty.InternalProtocolNegotiator; @@ -128,7 +130,8 @@ public Server build() { } InternalNettyServerBuilder.eagAttributes(delegate, builder.build()); return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener, - filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry); + filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry, + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE)); } @VisibleForTesting diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index 5529f96c7a2..66942246957 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -45,9 +45,8 @@ import io.grpc.StatusOr; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; -import io.grpc.internal.GrpcUtil; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ObjectPool; -import io.grpc.internal.SharedResourceHolder; import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.Filter.FilterConfig; import io.grpc.xds.Filter.NamedFilterConfig; @@ -100,7 +99,7 @@ public void uncaughtException(Thread t, Throwable e) { static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1); private final String listenerAddress; private final ServerBuilder delegateBuilder; - private boolean sharedTimeService; + private final ObjectPool timeServicePool; private final ScheduledExecutorService timeService; private final FilterRegistry filterRegistry; private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance; @@ -128,14 +127,16 @@ public void uncaughtException(Thread t, Throwable e) { // NamedFilterConfig.filterStateKey -> filter_instance. private final HashMap activeFiltersDefaultChain = new HashMap<>(); + @VisibleForTesting XdsServerWrapper( - String listenerAddress, - ServerBuilder delegateBuilder, - XdsServingStatusListener listener, - FilterChainSelectorManager filterChainSelectorManager, - XdsClientPoolFactory xdsClientPoolFactory, - @Nullable Map bootstrapOverride, - FilterRegistry filterRegistry) { + String listenerAddress, + ServerBuilder delegateBuilder, + XdsServingStatusListener listener, + FilterChainSelectorManager filterChainSelectorManager, + XdsClientPoolFactory xdsClientPoolFactory, + @Nullable Map bootstrapOverride, + FilterRegistry filterRegistry, + ScheduledExecutorService timeService) { this( listenerAddress, delegateBuilder, @@ -144,11 +145,9 @@ public void uncaughtException(Thread t, Throwable e) { xdsClientPoolFactory, bootstrapOverride, filterRegistry, - SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE)); - sharedTimeService = true; + new FixedObjectPool<>(timeService)); } - @VisibleForTesting XdsServerWrapper( String listenerAddress, ServerBuilder delegateBuilder, @@ -157,7 +156,7 @@ public void uncaughtException(Thread t, Throwable e) { XdsClientPoolFactory xdsClientPoolFactory, @Nullable Map bootstrapOverride, FilterRegistry filterRegistry, - ScheduledExecutorService timeService) { + ObjectPool timeServicePool) { this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress"); this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder"); this.delegateBuilder.intercept(new ConfigApplyingInterceptor()); @@ -166,7 +165,8 @@ public void uncaughtException(Thread t, Throwable e) { = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager"); this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.bootstrapOverride = bootstrapOverride; - this.timeService = checkNotNull(timeService, "timeService"); + this.timeServicePool = checkNotNull(timeServicePool, "timeServicePool"); + this.timeService = checkNotNull(timeServicePool.getObject(), "timeService"); this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry"); this.delegate = delegateBuilder.build(); } @@ -275,9 +275,7 @@ private void internalShutdown() { if (restartTimer != null) { restartTimer.cancel(); } - if (sharedTimeService) { - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService); - } + timeServicePool.returnObject(timeService); isServing = false; internalTerminationLatch.countDown(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java index 81186d0639c..11e49db3e35 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java @@ -38,6 +38,7 @@ import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.inprocess.InProcessSocketAddress; +import io.grpc.internal.FakeClock; import io.grpc.internal.TestUtils.NoopChannelLogger; import io.grpc.netty.GrpcHttp2ConnectionHandler; import io.grpc.netty.InternalProtocolNegotiationEvent; @@ -122,7 +123,8 @@ public void setUp() { when(mockServer.isShutdown()).thenReturn(false); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:" + PORT, mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, FilterRegistry.newRegistry()); + XdsServerTestHelper.RAW_BOOTSTRAP, FilterRegistry.newRegistry(), + new FakeClock().getScheduledExecutorService()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java index 99e3911307a..4aaec91d1ff 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java @@ -160,7 +160,7 @@ public void testBootstrap() throws Exception { when(xdsClient.getBootstrapInfo()).thenReturn(b); xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, executor.getScheduledExecutorService()); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { @@ -194,7 +194,8 @@ private void verifyBootstrapFail(Bootstrapper.BootstrapInfo b) throws Exception when(xdsClient.getBootstrapInfo()).thenReturn(b); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, + executor.getScheduledExecutorService()); final SettableFuture start = SettableFuture.create(); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override @@ -234,7 +235,8 @@ public void testBootstrap_templateWithXdstp() throws Exception { when(xdsClient.getBootstrapInfo()).thenReturn(b); xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, + executor.getScheduledExecutorService()); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { @@ -1859,7 +1861,8 @@ private FilterRegistry filterStateTestFilterRegistry( private SettableFuture filterStateTestStartServer(FilterRegistry filterRegistry) { xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, + executor.getScheduledExecutorService()); SettableFuture serverStart = SettableFuture.create(); scheduleServerStart(xdsServerWrapper, serverStart); return serverStart;