diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java index 1b4eaca97..1dd5052a7 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java @@ -11,7 +11,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Function; @@ -88,9 +87,9 @@ public StdioClientTransport(ServerParameters params, McpJsonMapper jsonMapper) { this.errorSink = Sinks.many().unicast().onBackpressureBuffer(); // Start threads - this.inboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "inbound"); - this.outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "outbound"); - this.errorScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "error"); + this.inboundScheduler = Schedulers.newSingle("inbound", true); + this.outboundScheduler = Schedulers.newSingle("outbound", true); + this.errorScheduler = Schedulers.newSingle("error", true); } /** @@ -348,6 +347,14 @@ public Mono closeGracefully() { })).then(Mono.defer(() -> { logger.debug("Sending TERM to process"); if (this.process != null) { + try { + this.process.getInputStream().close(); + this.process.getErrorStream().close(); + this.process.getOutputStream().close(); + } + catch (IOException e) { + logger.warn("Failed to close process streams: {}", e.getMessage()); + } this.process.destroy(); return Mono.fromFuture(process.onExit()); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java index 66cc304d6..6dd70a329 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java @@ -11,7 +11,6 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -151,10 +150,8 @@ public StdioMcpSessionTransport() { this.outboundSink = Sinks.many().unicast().onBackpressureBuffer(); // Use bounded schedulers for better resource management - this.inboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), - "stdio-inbound"); - this.outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), - "stdio-outbound"); + this.inboundScheduler = Schedulers.newSingle("stdio-inbound", true); + this.outboundScheduler = Schedulers.newSingle("stdio-outbound", true); } @Override