diff --git a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java index 4a4fb71aa84..792f666d8de 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerBuilder.java @@ -31,6 +31,8 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCredentials; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SharedResourcePool; import io.grpc.netty.InternalNettyServerBuilder; import io.grpc.netty.InternalNettyServerCredentials; import io.grpc.netty.InternalProtocolNegotiator; @@ -128,7 +130,8 @@ public Server build() { } InternalNettyServerBuilder.eagAttributes(delegate, builder.build()); return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener, - filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry); + filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry, + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE)); } @VisibleForTesting diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index 5529f96c7a2..66942246957 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -45,9 +45,8 @@ import io.grpc.StatusOr; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; -import io.grpc.internal.GrpcUtil; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ObjectPool; -import io.grpc.internal.SharedResourceHolder; import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.Filter.FilterConfig; import io.grpc.xds.Filter.NamedFilterConfig; @@ -100,7 +99,7 @@ public void uncaughtException(Thread t, Throwable e) { static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1); private final String listenerAddress; private final ServerBuilder delegateBuilder; - private boolean sharedTimeService; + private final ObjectPool timeServicePool; private final ScheduledExecutorService timeService; private final FilterRegistry filterRegistry; private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance; @@ -128,14 +127,16 @@ public void uncaughtException(Thread t, Throwable e) { // NamedFilterConfig.filterStateKey -> filter_instance. private final HashMap activeFiltersDefaultChain = new HashMap<>(); + @VisibleForTesting XdsServerWrapper( - String listenerAddress, - ServerBuilder delegateBuilder, - XdsServingStatusListener listener, - FilterChainSelectorManager filterChainSelectorManager, - XdsClientPoolFactory xdsClientPoolFactory, - @Nullable Map bootstrapOverride, - FilterRegistry filterRegistry) { + String listenerAddress, + ServerBuilder delegateBuilder, + XdsServingStatusListener listener, + FilterChainSelectorManager filterChainSelectorManager, + XdsClientPoolFactory xdsClientPoolFactory, + @Nullable Map bootstrapOverride, + FilterRegistry filterRegistry, + ScheduledExecutorService timeService) { this( listenerAddress, delegateBuilder, @@ -144,11 +145,9 @@ public void uncaughtException(Thread t, Throwable e) { xdsClientPoolFactory, bootstrapOverride, filterRegistry, - SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE)); - sharedTimeService = true; + new FixedObjectPool<>(timeService)); } - @VisibleForTesting XdsServerWrapper( String listenerAddress, ServerBuilder delegateBuilder, @@ -157,7 +156,7 @@ public void uncaughtException(Thread t, Throwable e) { XdsClientPoolFactory xdsClientPoolFactory, @Nullable Map bootstrapOverride, FilterRegistry filterRegistry, - ScheduledExecutorService timeService) { + ObjectPool timeServicePool) { this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress"); this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder"); this.delegateBuilder.intercept(new ConfigApplyingInterceptor()); @@ -166,7 +165,8 @@ public void uncaughtException(Thread t, Throwable e) { = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager"); this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.bootstrapOverride = bootstrapOverride; - this.timeService = checkNotNull(timeService, "timeService"); + this.timeServicePool = checkNotNull(timeServicePool, "timeServicePool"); + this.timeService = checkNotNull(timeServicePool.getObject(), "timeService"); this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry"); this.delegate = delegateBuilder.build(); } @@ -275,9 +275,7 @@ private void internalShutdown() { if (restartTimer != null) { restartTimer.cancel(); } - if (sharedTimeService) { - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService); - } + timeServicePool.returnObject(timeService); isServing = false; internalTerminationLatch.countDown(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java index 81186d0639c..11e49db3e35 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java @@ -38,6 +38,7 @@ import io.grpc.Status; import io.grpc.StatusOr; import io.grpc.inprocess.InProcessSocketAddress; +import io.grpc.internal.FakeClock; import io.grpc.internal.TestUtils.NoopChannelLogger; import io.grpc.netty.GrpcHttp2ConnectionHandler; import io.grpc.netty.InternalProtocolNegotiationEvent; @@ -122,7 +123,8 @@ public void setUp() { when(mockServer.isShutdown()).thenReturn(false); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:" + PORT, mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, FilterRegistry.newRegistry()); + XdsServerTestHelper.RAW_BOOTSTRAP, FilterRegistry.newRegistry(), + new FakeClock().getScheduledExecutorService()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java index 99e3911307a..4aaec91d1ff 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java @@ -160,7 +160,7 @@ public void testBootstrap() throws Exception { when(xdsClient.getBootstrapInfo()).thenReturn(b); xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, executor.getScheduledExecutorService()); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { @@ -194,7 +194,8 @@ private void verifyBootstrapFail(Bootstrapper.BootstrapInfo b) throws Exception when(xdsClient.getBootstrapInfo()).thenReturn(b); xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, + executor.getScheduledExecutorService()); final SettableFuture start = SettableFuture.create(); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override @@ -234,7 +235,8 @@ public void testBootstrap_templateWithXdstp() throws Exception { when(xdsClient.getBootstrapInfo()).thenReturn(b); xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, + executor.getScheduledExecutorService()); Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { @@ -1859,7 +1861,8 @@ private FilterRegistry filterStateTestFilterRegistry( private SettableFuture filterStateTestStartServer(FilterRegistry filterRegistry) { xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener, selectorManager, new FakeXdsClientPoolFactory(xdsClient), - XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry); + XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, + executor.getScheduledExecutorService()); SettableFuture serverStart = SettableFuture.create(); scheduleServerStart(xdsServerWrapper, serverStart); return serverStart;