Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourcePool;
import io.grpc.netty.InternalNettyServerBuilder;
import io.grpc.netty.InternalNettyServerCredentials;
import io.grpc.netty.InternalProtocolNegotiator;
Expand Down Expand Up @@ -128,7 +130,8 @@ public Server build() {
}
InternalNettyServerBuilder.eagAttributes(delegate, builder.build());
return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener,
filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry);
filterChainSelectorManager, xdsClientPoolFactory, bootstrapOverride, filterRegistry,
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE));
}

@VisibleForTesting
Expand Down
34 changes: 16 additions & 18 deletions xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
Expand Down Expand Up @@ -100,7 +99,7 @@ public void uncaughtException(Thread t, Throwable e) {
static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
private final String listenerAddress;
private final ServerBuilder<?> delegateBuilder;
private boolean sharedTimeService;
private final ObjectPool<ScheduledExecutorService> timeServicePool;
private final ScheduledExecutorService timeService;
private final FilterRegistry filterRegistry;
private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
Expand Down Expand Up @@ -128,14 +127,16 @@ public void uncaughtException(Thread t, Throwable e) {
// NamedFilterConfig.filterStateKey -> filter_instance.
private final HashMap<String, Filter> activeFiltersDefaultChain = new HashMap<>();

@VisibleForTesting
XdsServerWrapper(
String listenerAddress,
ServerBuilder<?> delegateBuilder,
XdsServingStatusListener listener,
FilterChainSelectorManager filterChainSelectorManager,
XdsClientPoolFactory xdsClientPoolFactory,
@Nullable Map<String, ?> bootstrapOverride,
FilterRegistry filterRegistry) {
String listenerAddress,
ServerBuilder<?> delegateBuilder,
XdsServingStatusListener listener,
FilterChainSelectorManager filterChainSelectorManager,
XdsClientPoolFactory xdsClientPoolFactory,
@Nullable Map<String, ?> bootstrapOverride,
FilterRegistry filterRegistry,
ScheduledExecutorService timeService) {
this(
listenerAddress,
delegateBuilder,
Expand All @@ -144,11 +145,9 @@ public void uncaughtException(Thread t, Throwable e) {
xdsClientPoolFactory,
bootstrapOverride,
filterRegistry,
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
sharedTimeService = true;
new FixedObjectPool<>(timeService));
}

@VisibleForTesting
XdsServerWrapper(
String listenerAddress,
ServerBuilder<?> delegateBuilder,
Expand All @@ -157,7 +156,7 @@ public void uncaughtException(Thread t, Throwable e) {
XdsClientPoolFactory xdsClientPoolFactory,
@Nullable Map<String, ?> bootstrapOverride,
FilterRegistry filterRegistry,
ScheduledExecutorService timeService) {
ObjectPool<ScheduledExecutorService> timeServicePool) {
this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
Expand All @@ -166,7 +165,8 @@ public void uncaughtException(Thread t, Throwable e) {
= checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.bootstrapOverride = bootstrapOverride;
this.timeService = checkNotNull(timeService, "timeService");
this.timeServicePool = checkNotNull(timeServicePool, "timeServicePool");
this.timeService = checkNotNull(timeServicePool.getObject(), "timeService");
this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
this.delegate = delegateBuilder.build();
}
Expand Down Expand Up @@ -275,9 +275,7 @@ private void internalShutdown() {
if (restartTimer != null) {
restartTimer.cancel();
}
if (sharedTimeService) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
}
timeServicePool.returnObject(timeService);
isServing = false;
internalTerminationLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.inprocess.InProcessSocketAddress;
import io.grpc.internal.FakeClock;
import io.grpc.internal.TestUtils.NoopChannelLogger;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalProtocolNegotiationEvent;
Expand Down Expand Up @@ -122,7 +123,8 @@ public void setUp() {
when(mockServer.isShutdown()).thenReturn(false);
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:" + PORT, mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
XdsServerTestHelper.RAW_BOOTSTRAP, FilterRegistry.newRegistry());
XdsServerTestHelper.RAW_BOOTSTRAP, FilterRegistry.newRegistry(),
new FakeClock().getScheduledExecutorService());
}

@Test
Expand Down
11 changes: 7 additions & 4 deletions xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testBootstrap() throws Exception {
when(xdsClient.getBootstrapInfo()).thenReturn(b);
xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry);
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry, executor.getScheduledExecutorService());
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -194,7 +194,8 @@ private void verifyBootstrapFail(Bootstrapper.BootstrapInfo b) throws Exception
when(xdsClient.getBootstrapInfo()).thenReturn(b);
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry);
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry,
executor.getScheduledExecutorService());
final SettableFuture<Server> start = SettableFuture.create();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
Expand Down Expand Up @@ -234,7 +235,8 @@ public void testBootstrap_templateWithXdstp() throws Exception {
when(xdsClient.getBootstrapInfo()).thenReturn(b);
xdsServerWrapper = new XdsServerWrapper("[::FFFF:129.144.52.38]:80", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry);
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry,
executor.getScheduledExecutorService());
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -1859,7 +1861,8 @@ private FilterRegistry filterStateTestFilterRegistry(
private SettableFuture<Server> filterStateTestStartServer(FilterRegistry filterRegistry) {
xdsServerWrapper = new XdsServerWrapper("0.0.0.0:1", mockBuilder, listener,
selectorManager, new FakeXdsClientPoolFactory(xdsClient),
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry);
XdsServerTestHelper.RAW_BOOTSTRAP, filterRegistry,
executor.getScheduledExecutorService());
SettableFuture<Server> serverStart = SettableFuture.create();
scheduleServerStart(xdsServerWrapper, serverStart);
return serverStart;
Expand Down
Loading