From 7590c8132bc1bdec73cc40d40cb1552831cd631a Mon Sep 17 00:00:00 2001 From: noxymon Date: Sat, 14 Mar 2026 06:29:30 +0700 Subject: [PATCH] fix: resolve resource leaks in transport session and httpclient --- .../HttpClientSseClientTransport.java | 1 + .../HttpClientStreamableHttpTransport.java | 7 +- .../spec/DefaultMcpTransportSession.java | 5 +- .../io/modelcontextprotocol/util/Utils.java | 37 +++++++ .../StdioClientTransportReproductionTest.java | 81 ++++++++++++++ .../common/ResourceLeakTests.java | 103 ++++++++++++++++++ 6 files changed, 228 insertions(+), 6 deletions(-) create mode 100644 mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportReproductionTest.java create mode 100644 mcp-test/src/test/java/io/modelcontextprotocol/common/ResourceLeakTests.java diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index be4e4cf97..820874be0 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -483,6 +483,7 @@ public Mono closeGracefully() { if (subscription != null && !subscription.isDisposed()) { subscription.dispose(); } + Utils.closeHttpClient(this.httpClient); }); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 57a27a3fd..9642f7ce4 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -237,10 +237,9 @@ public Mono closeGracefully() { return Mono.defer(() -> { logger.debug("Graceful close triggered"); McpTransportSession currentSession = this.activeSession.getAndUpdate(this::createClosedSession); - if (currentSession != null) { - return Mono.from(currentSession.closeGracefully()); - } - return Mono.empty(); + Mono closeSessionMono = (currentSession != null) ? Mono.from(currentSession.closeGracefully()) + : Mono.empty(); + return closeSessionMono.doFinally(signalType -> Utils.closeHttpClient(this.httpClient)); }); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java index fdb7bfd89..6c2930abb 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java @@ -77,8 +77,9 @@ public void close() { @Override public Mono closeGracefully() { - return Mono.from(this.onClose.apply(this.sessionId.get())) - .then(Mono.fromRunnable(this.openConnections::dispose)); + return Mono.defer(() -> Mono.from(this.onClose.apply(this.sessionId.get()))) + .doFinally(signalType -> this.openConnections.dispose()) + .then(); } } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/util/Utils.java b/mcp-core/src/main/java/io/modelcontextprotocol/util/Utils.java index cd420100c..a1635df9f 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/util/Utils.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/util/Utils.java @@ -4,7 +4,10 @@ package io.modelcontextprotocol.util; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.URI; +import java.net.http.HttpClient; import java.util.Collection; import java.util.Map; @@ -18,6 +21,40 @@ public final class Utils { + /** + * Closes the given {@link HttpClient} if it implements {@link AutoCloseable} (as it + * does in JDK 21+), or tries to close it using reflection for older JDKs. + * @param httpClient the HTTP client to close + */ + public static void closeHttpClient(HttpClient httpClient) { + if (httpClient == null) { + return; + } + + if (httpClient instanceof AutoCloseable autoCloseable) { + try { + autoCloseable.close(); + } + catch (Exception e) { + // ignore + } + return; + } + + // Fallback for JDK < 21 using reflection to access internal 'stop' method. + try { + Field implField = httpClient.getClass().getDeclaredField("impl"); + implField.setAccessible(true); + Object impl = implField.get(httpClient); + Method stopMethod = impl.getClass().getDeclaredMethod("stop"); + stopMethod.setAccessible(true); + stopMethod.invoke(impl); + } + catch (Exception e) { + // Fallback if reflection fails + } + } + /** * Check whether the given {@code String} contains actual text. *

diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportReproductionTest.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportReproductionTest.java new file mode 100644 index 000000000..0e45cc5b8 --- /dev/null +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/StdioClientTransportReproductionTest.java @@ -0,0 +1,81 @@ +package io.modelcontextprotocol.client.transport; + +import java.io.IOException; +import java.time.Duration; + +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.TypeRef; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; + +public class StdioClientTransportReproductionTest { + + private static final Logger logger = LoggerFactory.getLogger(StdioClientTransportReproductionTest.class); + + @Test + void testCloseGracefullyDoesNotHang() { + ServerParameters params = ServerParameters.builder("cat").build(); + StdioClientTransport transport = new StdioClientTransport(params, new McpJsonMapper() { + @Override + public T readValue(String content, Class type) throws IOException { + return null; + } + + @Override + public T readValue(byte[] content, Class type) throws IOException { + return null; + } + + @Override + public T readValue(String content, TypeRef type) throws IOException { + return null; + } + + @Override + public T readValue(byte[] content, TypeRef type) throws IOException { + return null; + } + + @Override + public T convertValue(Object fromValue, Class type) { + return null; + } + + @Override + public T convertValue(Object fromValue, TypeRef type) { + return null; + } + + @Override + public String writeValueAsString(Object value) throws IOException { + return "{}"; + } + + @Override + public byte[] writeValueAsBytes(Object value) throws IOException { + return "{}".getBytes(); + } + }); + + transport.connect((message) -> message).block(Duration.ofMillis(5000)); + + logger.info("Connected to 'cat' server"); + + logger.info("Closing transport..."); + long start = System.currentTimeMillis(); + try { + transport.closeGracefully().block(Duration.ofMillis(5000)); + long duration = System.currentTimeMillis() - start; + logger.info("Transport closed in {} ms", duration); + assertThat(duration).isLessThan(5000); + } + catch (Exception ex) { + logger.error("Transport failed to close or timed out", ex); + throw ex; + } + } + +} diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/common/ResourceLeakTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/common/ResourceLeakTests.java new file mode 100644 index 000000000..4b95afbd2 --- /dev/null +++ b/mcp-test/src/test/java/io/modelcontextprotocol/common/ResourceLeakTests.java @@ -0,0 +1,103 @@ +/* + * Copyright 2024-2025 the original author or authors. + */ + +package io.modelcontextprotocol.common; + +import java.net.http.HttpClient; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; +import io.modelcontextprotocol.spec.DefaultMcpTransportSession; +import io.modelcontextprotocol.util.Utils; +import org.junit.jupiter.api.Test; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Tests for resource leaks, specifically HttpClient threads and transport session + * connection disposal. + * + * @author Christian Tzolov + */ +public class ResourceLeakTests { + + @Test + public void testSelectorManagerThreadCleanup() { + long baselineThreads = countSelectorManagerThreads(); + + // Create and close several clients + for (int i = 0; i < 10; i++) { + HttpClient client = HttpClient.newBuilder().build(); + Utils.closeHttpClient(client); + } + + // Verify that SelectorManager threads are cleaned up. + // Note: Thread termination is asynchronous, so we wait for it. + await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + long currentThreads = countSelectorManagerThreads(); + assertThat(currentThreads).as("HttpClient SelectorManager threads should be cleaned up") + .isLessThanOrEqualTo(baselineThreads); + }); + } + + @Test + public void testConnectionDisposalOnCloseFailure() { + AtomicBoolean disposed = new AtomicBoolean(false); + Disposable connection = new Disposable() { + @Override + public void dispose() { + disposed.set(true); + } + + @Override + public boolean isDisposed() { + return disposed.get(); + } + }; + + // Session that fails on close + DefaultMcpTransportSession session = new DefaultMcpTransportSession( + sessionId -> Mono.error(new RuntimeException("Simulated closure failure"))); + session.addConnection(connection); + + // Trigger close - error is swallowed by the session implementation but we still + // wait for completion + session.closeGracefully().onErrorResume(t -> Mono.empty()).block(Duration.ofSeconds(2)); + + assertThat(disposed.get()).as("Connection should be disposed even if session closure fails").isTrue(); + } + + @Test + public void testTransportCloseGracefully() { + long baselineThreads = countSelectorManagerThreads(); + + // Create a transport that uses a real HttpClient + HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder("http://localhost:8080") + .build(); + + // Close the transport + transport.closeGracefully().block(Duration.ofSeconds(5)); + + // Verify that SelectorManager threads are cleaned up. + await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + long currentThreads = countSelectorManagerThreads(); + assertThat(currentThreads) + .as("HttpClient SelectorManager threads should be cleaned up after transport close") + .isLessThanOrEqualTo(baselineThreads); + }); + } + + private long countSelectorManagerThreads() { + return Thread.getAllStackTraces() + .keySet() + .stream() + .filter(t -> t.getName().contains("HttpClient-") && t.getName().contains("-SelectorManager")) + .count(); + } + +}