Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ void closeAllExisting(RaftGroupId groupId) {
zeroCopyMetrics.addUnreleased("client_protocol", zeroCopyRequestMarshaller::getUnclosedCount);
}

void close() {
zeroCopyRequestMarshaller.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good catch! We never call zeroCopyRequestMarshaller.close() except for tests.

}

RaftPeerId getId() {
return idSupplier.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ ServerServiceDefinition bindServiceWithZeroCopy() {
return builder.build();
}

void close() {
zeroCopyRequestMarshaller.close();
}

@Override
public void requestVote(RequestVoteRequestProto request,
StreamObserver<RequestVoteReplyProto> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,12 @@ private boolean separateClientServer() {
return clientPort > 0 && clientPort != serverPort;
}

Server newServer(GrpcClientProtocolService client, ZeroCopyMetrics zeroCopyMetrics, ServerInterceptor interceptor) {
Server newServer(GrpcClientProtocolService client, GrpcServerProtocolService service,
ServerInterceptor interceptor) {
final EnumSet<GrpcServices.Type> types = EnumSet.of(GrpcServices.Type.SERVER);
final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer();
final ServerServiceDefinition service = newGrpcServerProtocolService(zeroCopyMetrics).bindServiceWithZeroCopy();
serverBuilder.addService(ServerInterceptors.intercept(service, interceptor));
final ServerServiceDefinition serviceDefinition = service.bindServiceWithZeroCopy();
serverBuilder.addService(ServerInterceptors.intercept(serviceDefinition, interceptor));

if (!separateAdminServer()) {
types.add(GrpcServices.Type.ADMIN);
Expand Down Expand Up @@ -285,6 +286,7 @@ public static Builder newBuilder() {

private final ExecutorService executor;
private final GrpcClientProtocolService clientProtocolService;
private final GrpcServerProtocolService serverProtocolService;

private final MetricServerInterceptor serverInterceptor;
private final ZeroCopyMetrics zeroCopyMetrics = new ZeroCopyMetrics();
Expand All @@ -294,8 +296,9 @@ private GrpcServicesImpl(Builder b) {

this.executor = b.newExecutor();
this.clientProtocolService = b.newGrpcClientProtocolService(executor, zeroCopyMetrics);
this.serverProtocolService = b.newGrpcServerProtocolService(zeroCopyMetrics);
this.serverInterceptor = b.newMetricServerInterceptor();
final Server server = b.newServer(clientProtocolService, zeroCopyMetrics, serverInterceptor);
final Server server = b.newServer(clientProtocolService, serverProtocolService, serverInterceptor);

servers.put(GrpcServerProtocolService.class.getSimpleName(), server);
addressSupplier = newAddressSupplier(b.serverPort, server);
Expand Down Expand Up @@ -373,6 +376,8 @@ public void closeImpl() throws IOException {

serverInterceptor.close();
ConcurrentUtils.shutdownAndWait(executor);
clientProtocolService.close();
serverProtocolService.close();
zeroCopyMetrics.unregister();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ public void close() {
if (unclosedStreams.isEmpty()) {
return;
}
for (InputStream stream : unclosedStreams.values()) {
for (Map.Entry<T, InputStream> entry : unclosedStreams.entrySet()) {
try {
stream.close();
entry.getValue().close();
releasedCount.accept(entry.getKey());
} catch (IOException e) {
LOG.warn("{}: Failed to close leaked stream.", name, e);
}
Expand Down
Loading