Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ public Mono<Void> closeGracefully() {
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
Utils.closeHttpClient(this.httpClient);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,9 @@ public Mono<Void> closeGracefully() {
return Mono.defer(() -> {
logger.debug("Graceful close triggered");
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
if (currentSession != null) {
return Mono.from(currentSession.closeGracefully());
}
return Mono.empty();
Mono<Void> closeSessionMono = (currentSession != null) ? Mono.from(currentSession.closeGracefully())
: Mono.empty();
return closeSessionMono.doFinally(signalType -> Utils.closeHttpClient(this.httpClient));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ public void close() {

@Override
public Mono<Void> closeGracefully() {
return Mono.from(this.onClose.apply(this.sessionId.get()))
.then(Mono.fromRunnable(this.openConnections::dispose));
return Mono.defer(() -> Mono.from(this.onClose.apply(this.sessionId.get())))
.doFinally(signalType -> this.openConnections.dispose())
.then();
}

}
37 changes: 37 additions & 0 deletions mcp-core/src/main/java/io/modelcontextprotocol/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.modelcontextprotocol.util;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.http.HttpClient;
import java.util.Collection;
import java.util.Map;

Expand All @@ -18,6 +21,40 @@

public final class Utils {

/**
* Closes the given {@link HttpClient} if it implements {@link AutoCloseable} (as it
* does in JDK 21+), or tries to close it using reflection for older JDKs.
* @param httpClient the HTTP client to close
*/
public static void closeHttpClient(HttpClient httpClient) {
if (httpClient == null) {
return;
}

if (httpClient instanceof AutoCloseable autoCloseable) {
try {
autoCloseable.close();
}
catch (Exception e) {
// ignore
}
return;
}

// Fallback for JDK < 21 using reflection to access internal 'stop' method.
try {
Field implField = httpClient.getClass().getDeclaredField("impl");
implField.setAccessible(true);
Object impl = implField.get(httpClient);
Method stopMethod = impl.getClass().getDeclaredMethod("stop");
stopMethod.setAccessible(true);
stopMethod.invoke(impl);
}
catch (Exception e) {
// Fallback if reflection fails
}
}

/**
* Check whether the given {@code String} contains actual <em>text</em>.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.modelcontextprotocol.client.transport;

import java.io.IOException;
import java.time.Duration;

import io.modelcontextprotocol.json.McpJsonMapper;
import io.modelcontextprotocol.json.TypeRef;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.assertj.core.api.Assertions.assertThat;

public class StdioClientTransportReproductionTest {

private static final Logger logger = LoggerFactory.getLogger(StdioClientTransportReproductionTest.class);

@Test
void testCloseGracefullyDoesNotHang() {
ServerParameters params = ServerParameters.builder("cat").build();
StdioClientTransport transport = new StdioClientTransport(params, new McpJsonMapper() {
@Override
public <T> T readValue(String content, Class<T> type) throws IOException {
return null;
}

@Override
public <T> T readValue(byte[] content, Class<T> type) throws IOException {
return null;
}

@Override
public <T> T readValue(String content, TypeRef<T> type) throws IOException {
return null;
}

@Override
public <T> T readValue(byte[] content, TypeRef<T> type) throws IOException {
return null;
}

@Override
public <T> T convertValue(Object fromValue, Class<T> type) {
return null;
}

@Override
public <T> T convertValue(Object fromValue, TypeRef<T> type) {
return null;
}

@Override
public String writeValueAsString(Object value) throws IOException {
return "{}";
}

@Override
public byte[] writeValueAsBytes(Object value) throws IOException {
return "{}".getBytes();
}
});

transport.connect((message) -> message).block(Duration.ofMillis(5000));

logger.info("Connected to 'cat' server");

logger.info("Closing transport...");
long start = System.currentTimeMillis();
try {
transport.closeGracefully().block(Duration.ofMillis(5000));
long duration = System.currentTimeMillis() - start;
logger.info("Transport closed in {} ms", duration);
assertThat(duration).isLessThan(5000);
}
catch (Exception ex) {
logger.error("Transport failed to close or timed out", ex);
throw ex;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2024-2025 the original author or authors.
*/

package io.modelcontextprotocol.common;

import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.util.Utils;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* Tests for resource leaks, specifically HttpClient threads and transport session
* connection disposal.
*
* @author Christian Tzolov
*/
public class ResourceLeakTests {

@Test
public void testSelectorManagerThreadCleanup() {
long baselineThreads = countSelectorManagerThreads();

// Create and close several clients
for (int i = 0; i < 10; i++) {
HttpClient client = HttpClient.newBuilder().build();
Utils.closeHttpClient(client);
}

// Verify that SelectorManager threads are cleaned up.
// Note: Thread termination is asynchronous, so we wait for it.
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
long currentThreads = countSelectorManagerThreads();
assertThat(currentThreads).as("HttpClient SelectorManager threads should be cleaned up")
.isLessThanOrEqualTo(baselineThreads);
});
}

@Test
public void testConnectionDisposalOnCloseFailure() {
AtomicBoolean disposed = new AtomicBoolean(false);
Disposable connection = new Disposable() {
@Override
public void dispose() {
disposed.set(true);
}

@Override
public boolean isDisposed() {
return disposed.get();
}
};

// Session that fails on close
DefaultMcpTransportSession session = new DefaultMcpTransportSession(
sessionId -> Mono.error(new RuntimeException("Simulated closure failure")));
session.addConnection(connection);

// Trigger close - error is swallowed by the session implementation but we still
// wait for completion
session.closeGracefully().onErrorResume(t -> Mono.empty()).block(Duration.ofSeconds(2));

assertThat(disposed.get()).as("Connection should be disposed even if session closure fails").isTrue();
}

@Test
public void testTransportCloseGracefully() {
long baselineThreads = countSelectorManagerThreads();

// Create a transport that uses a real HttpClient
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder("http://localhost:8080")
.build();

// Close the transport
transport.closeGracefully().block(Duration.ofSeconds(5));

// Verify that SelectorManager threads are cleaned up.
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
long currentThreads = countSelectorManagerThreads();
assertThat(currentThreads)
.as("HttpClient SelectorManager threads should be cleaned up after transport close")
.isLessThanOrEqualTo(baselineThreads);
});
}

private long countSelectorManagerThreads() {
return Thread.getAllStackTraces()
.keySet()
.stream()
.filter(t -> t.getName().contains("HttpClient-") && t.getName().contains("-SelectorManager"))
.count();
}

}