From 272a82dc87b20e7e38c07f7b91ffdca5fae9d2e1 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 5 May 2026 15:19:50 +0800 Subject: [PATCH] RATIS-2512. Fix unreleased zero-copy messages during gRPC service shutdown. --- .../grpc/server/GrpcClientProtocolService.java | 4 ++++ .../grpc/server/GrpcServerProtocolService.java | 4 ++++ .../apache/ratis/grpc/server/GrpcServicesImpl.java | 13 +++++++++---- .../ratis/grpc/util/ZeroCopyMessageMarshaller.java | 5 +++-- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java index b7548780cd..acabd1c0e6 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java @@ -165,6 +165,10 @@ void closeAllExisting(RaftGroupId groupId) { zeroCopyMetrics.addUnreleased("client_protocol", zeroCopyRequestMarshaller::getUnclosedCount); } + void close() { + zeroCopyRequestMarshaller.close(); + } + RaftPeerId getId() { return idSupplier.get(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 7e17cb3cf4..405d9f53f2 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -278,6 +278,10 @@ ServerServiceDefinition bindServiceWithZeroCopy() { return builder.build(); } + void close() { + zeroCopyRequestMarshaller.close(); + } + @Override public void requestVote(RequestVoteRequestProto request, StreamObserver responseObserver) { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java index d6f6a0c866..ce247df90e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java @@ -235,11 +235,12 @@ private boolean separateClientServer() { return clientPort > 0 && clientPort != serverPort; } - Server newServer(GrpcClientProtocolService client, ZeroCopyMetrics zeroCopyMetrics, ServerInterceptor interceptor) { + Server newServer(GrpcClientProtocolService client, GrpcServerProtocolService service, + ServerInterceptor interceptor) { final EnumSet types = EnumSet.of(GrpcServices.Type.SERVER); final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer(); - final ServerServiceDefinition service = newGrpcServerProtocolService(zeroCopyMetrics).bindServiceWithZeroCopy(); - serverBuilder.addService(ServerInterceptors.intercept(service, interceptor)); + final ServerServiceDefinition serviceDefinition = service.bindServiceWithZeroCopy(); + serverBuilder.addService(ServerInterceptors.intercept(serviceDefinition, interceptor)); if (!separateAdminServer()) { types.add(GrpcServices.Type.ADMIN); @@ -285,6 +286,7 @@ public static Builder newBuilder() { private final ExecutorService executor; private final GrpcClientProtocolService clientProtocolService; + private final GrpcServerProtocolService serverProtocolService; private final MetricServerInterceptor serverInterceptor; private final ZeroCopyMetrics zeroCopyMetrics = new ZeroCopyMetrics(); @@ -294,8 +296,9 @@ private GrpcServicesImpl(Builder b) { this.executor = b.newExecutor(); this.clientProtocolService = b.newGrpcClientProtocolService(executor, zeroCopyMetrics); + this.serverProtocolService = b.newGrpcServerProtocolService(zeroCopyMetrics); this.serverInterceptor = b.newMetricServerInterceptor(); - final Server server = b.newServer(clientProtocolService, zeroCopyMetrics, serverInterceptor); + final Server server = b.newServer(clientProtocolService, serverProtocolService, serverInterceptor); servers.put(GrpcServerProtocolService.class.getSimpleName(), server); addressSupplier = newAddressSupplier(b.serverPort, server); @@ -373,6 +376,8 @@ public void closeImpl() throws IOException { serverInterceptor.close(); ConcurrentUtils.shutdownAndWait(executor); + clientProtocolService.close(); + serverProtocolService.close(); zeroCopyMetrics.unregister(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java index eddf2495e4..50bb5cf5c9 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java @@ -243,9 +243,10 @@ public void close() { if (unclosedStreams.isEmpty()) { return; } - for (InputStream stream : unclosedStreams.values()) { + for (Map.Entry entry : unclosedStreams.entrySet()) { try { - stream.close(); + entry.getValue().close(); + releasedCount.accept(entry.getKey()); } catch (IOException e) { LOG.warn("{}: Failed to close leaked stream.", name, e); }