From 339493a25a37bbc4f770528ce9fa0bf2dba7ba24 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Mon, 9 Mar 2026 13:15:13 -0400 Subject: [PATCH 01/10] fix: detect stdin EOF on parent death for stdio transport Add a background monitor that uses select.poll() to detect POLLHUP on stdin's file descriptor. When the parent process dies and the pipe's write end closes, the monitor cancels the task group, triggering a clean shutdown. The anyio.wrap_file async iterator may not propagate EOF promptly because it runs readline() in a worker thread. The poll-based monitor detects the hang-up at the OS level independent of the worker thread. Only enabled on non-Windows platforms where select.poll() is available. --- src/mcp/server/stdio.py | 49 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index e526bab56..d26f8229c 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -28,6 +28,50 @@ async def run_server(): from mcp import types from mcp.shared.message import SessionMessage +# How often to check for stdin EOF (seconds) +STDIN_EOF_CHECK_INTERVAL = 0.1 + + +def _create_stdin_eof_monitor( + tg: anyio.abc.TaskGroup, +): + """Create a platform-appropriate stdin EOF monitor. + + Returns an async callable that monitors stdin for EOF and cancels the task + group when detected, or None if monitoring is not supported on this platform. + + When the parent process dies, stdin reaches EOF. The anyio.wrap_file async + iterator may not detect this promptly because it runs readline() in a worker + thread. This monitor polls the underlying file descriptor directly using + OS-level I/O, and cancels the task group when EOF is detected, ensuring the + server shuts down cleanly. + """ + if sys.platform == "win32": + return None + + import select + + try: + fd = sys.stdin.buffer.fileno() + except Exception: + return None + + async def monitor(): + poll_obj = select.poll() + poll_obj.register(fd, select.POLLIN | select.POLLHUP) + try: + while True: + await anyio.sleep(STDIN_EOF_CHECK_INTERVAL) + events = poll_obj.poll(0) + for _, event_mask in events: + if event_mask & (select.POLLHUP | select.POLLERR | select.POLLNVAL): + tg.cancel_scope.cancel() + return + finally: + poll_obj.unregister(fd) + + return monitor + @asynccontextmanager async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None): @@ -80,4 +124,9 @@ async def stdout_writer(): async with anyio.create_task_group() as tg: tg.start_soon(stdin_reader) tg.start_soon(stdout_writer) + + eof_monitor = _create_stdin_eof_monitor(tg) + if eof_monitor is not None: + tg.start_soon(eof_monitor) + yield read_stream, write_stream From d2f179a473931989e7c379d22fa078df9c8fa518 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Mon, 9 Mar 2026 14:26:58 -0400 Subject: [PATCH 02/10] fix: resolve pyright errors and add test coverage for stdin EOF monitor Move select import and TaskGroup import to module level, add explicit return type annotation, and add tests covering the win32 early-return, fileno failure path, POLLHUP detection, and POLLIN-only event handling. --- src/mcp/server/stdio.py | 12 +++-- tests/server/test_stdio.py | 91 +++++++++++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index d26f8229c..f8f34f684 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -17,12 +17,15 @@ async def run_server(): ``` """ +import select import sys +from collections.abc import Callable, Coroutine from contextlib import asynccontextmanager from io import TextIOWrapper import anyio import anyio.lowlevel +from anyio.abc import TaskGroup from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp import types @@ -33,8 +36,8 @@ async def run_server(): def _create_stdin_eof_monitor( - tg: anyio.abc.TaskGroup, -): + tg: TaskGroup, +) -> Callable[[], Coroutine[object, object, None]] | None: """Create a platform-appropriate stdin EOF monitor. Returns an async callable that monitors stdin for EOF and cancels the task @@ -49,14 +52,15 @@ def _create_stdin_eof_monitor( if sys.platform == "win32": return None - import select + if not hasattr(select, "poll"): + return None # pragma: no cover try: fd = sys.stdin.buffer.fileno() except Exception: return None - async def monitor(): + async def monitor() -> None: poll_obj = select.poll() poll_obj.register(fd, select.POLLIN | select.POLLHUP) try: diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 9a7ddaab4..8489b4b9c 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -1,9 +1,12 @@ import io +import os +import sys +from unittest.mock import MagicMock, patch import anyio import pytest -from mcp.server.stdio import stdio_server +from mcp.server.stdio import _create_stdin_eof_monitor, stdio_server from mcp.shared.message import SessionMessage from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter @@ -59,3 +62,89 @@ async def test_stdio_server(): assert len(received_responses) == 2 assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping") assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={}) + + +def test_create_stdin_eof_monitor_returns_none_on_win32(): + """On Windows, the EOF monitor is not supported.""" + tg = MagicMock() + with patch.object(sys, "platform", "win32"): + result = _create_stdin_eof_monitor(tg) + assert result is None + + +def test_create_stdin_eof_monitor_returns_none_when_fileno_fails(): + """When stdin.buffer.fileno() raises, the monitor returns None.""" + tg = MagicMock() + mock_buffer = MagicMock() + mock_buffer.fileno.side_effect = io.UnsupportedOperation("redirected stdin") + with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): + result = _create_stdin_eof_monitor(tg) + assert result is None + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") +async def test_stdin_eof_monitor_detects_hangup(): + """The EOF monitor cancels the task group when stdin pipe closes.""" + read_fd, write_fd = os.pipe() + try: + # Patch sys.stdin.buffer.fileno to return our read end + mock_buffer = MagicMock() + mock_buffer.fileno.return_value = read_fd + + with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): + async with anyio.create_task_group() as tg: + monitor = _create_stdin_eof_monitor(tg) + assert monitor is not None + tg.start_soon(monitor) + + # Close the write end to trigger POLLHUP on read end + os.close(write_fd) + write_fd = -1 + + # Monitor should detect POLLHUP and cancel the scope + # Wait for the cancellation (with timeout to avoid hanging) + with anyio.fail_after(5): + while not tg.cancel_scope.cancel_called: + await anyio.sleep(0.05) + finally: + os.close(read_fd) + if write_fd != -1: + os.close(write_fd) + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") +async def test_stdin_eof_monitor_ignores_pollin_events(): + """The monitor ignores POLLIN events (data available) and only reacts to hangup/error.""" + read_fd, write_fd = os.pipe() + try: + mock_buffer = MagicMock() + mock_buffer.fileno.return_value = read_fd + + with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): + async with anyio.create_task_group() as tg: + monitor = _create_stdin_eof_monitor(tg) + assert monitor is not None + tg.start_soon(monitor) + + # Write data to trigger POLLIN (not POLLHUP) + os.write(write_fd, b"hello\n") + + # Give the monitor time to process the POLLIN event + await anyio.sleep(0.3) + + # Monitor should NOT have cancelled since POLLIN alone isn't a hangup + assert not tg.cancel_scope.cancel_called + + # Now close write end to trigger POLLHUP + os.close(write_fd) + write_fd = -1 + + with anyio.fail_after(5): + while not tg.cancel_scope.cancel_called: + await anyio.sleep(0.05) + finally: + os.close(read_fd) + if write_fd != -1: + os.close(write_fd) From 409b4e265ac67f8c884f0d65d357b23718e07217 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Mon, 9 Mar 2026 14:30:22 -0400 Subject: [PATCH 03/10] test: mark defensive fd cleanup as no-cover The write_fd cleanup in finally blocks is defensive code for error cases that don't occur in the happy path. Mark with pragma: no cover to satisfy 100% coverage requirement. --- tests/server/test_stdio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 8489b4b9c..fa7a3d638 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -109,7 +109,7 @@ async def test_stdin_eof_monitor_detects_hangup(): await anyio.sleep(0.05) finally: os.close(read_fd) - if write_fd != -1: + if write_fd != -1: # pragma: no cover os.close(write_fd) @@ -146,5 +146,5 @@ async def test_stdin_eof_monitor_ignores_pollin_events(): await anyio.sleep(0.05) finally: os.close(read_fd) - if write_fd != -1: + if write_fd != -1: # pragma: no cover os.close(write_fd) From 1784d71a3da822d19e2fd5a0d9a804123de0f584 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Mon, 9 Mar 2026 14:34:35 -0400 Subject: [PATCH 04/10] test: replace polling loops with sleep to avoid partial branch coverage Use await anyio.sleep() instead of while loops to wait for monitor cancellation. The while loop's False-condition branch was never taken because the scope always exits via cancellation, not loop termination. --- tests/server/test_stdio.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index fa7a3d638..e7e155500 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -88,25 +88,27 @@ async def test_stdin_eof_monitor_detects_hangup(): """The EOF monitor cancels the task group when stdin pipe closes.""" read_fd, write_fd = os.pipe() try: - # Patch sys.stdin.buffer.fileno to return our read end mock_buffer = MagicMock() mock_buffer.fileno.return_value = read_fd + cancelled = False with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): - async with anyio.create_task_group() as tg: - monitor = _create_stdin_eof_monitor(tg) - assert monitor is not None - tg.start_soon(monitor) - - # Close the write end to trigger POLLHUP on read end - os.close(write_fd) - write_fd = -1 - - # Monitor should detect POLLHUP and cancel the scope - # Wait for the cancellation (with timeout to avoid hanging) - with anyio.fail_after(5): - while not tg.cancel_scope.cancel_called: - await anyio.sleep(0.05) + with anyio.CancelScope() as scope: + async with anyio.create_task_group() as tg: + monitor = _create_stdin_eof_monitor(tg) + assert monitor is not None + tg.start_soon(monitor) + + # Close the write end to trigger POLLHUP on read end + os.close(write_fd) + write_fd = -1 + + # The monitor will cancel the task group scope when it + # detects POLLHUP. Wait with a timeout to avoid hanging. + with anyio.fail_after(5): + await anyio.sleep(10) # will be cancelled by monitor + cancelled = scope.cancel_called or tg.cancel_scope.cancel_called + assert cancelled finally: os.close(read_fd) if write_fd != -1: # pragma: no cover @@ -141,9 +143,9 @@ async def test_stdin_eof_monitor_ignores_pollin_events(): os.close(write_fd) write_fd = -1 + # Wait for the monitor to detect POLLHUP and cancel with anyio.fail_after(5): - while not tg.cancel_scope.cancel_called: - await anyio.sleep(0.05) + await anyio.sleep(10) # will be cancelled by monitor finally: os.close(read_fd) if write_fd != -1: # pragma: no cover From b0f866f727a332ca3703dbdad3f9914640d6735d Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Thu, 12 Mar 2026 13:28:04 -0400 Subject: [PATCH 05/10] test: avoid possibly-unbound task group variable --- tests/server/test_stdio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index e7e155500..8dedc3820 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -107,7 +107,7 @@ async def test_stdin_eof_monitor_detects_hangup(): # detects POLLHUP. Wait with a timeout to avoid hanging. with anyio.fail_after(5): await anyio.sleep(10) # will be cancelled by monitor - cancelled = scope.cancel_called or tg.cancel_scope.cancel_called + cancelled = scope.cancel_called assert cancelled finally: os.close(read_fd) From 8232b87ff6d7db8c27a2553411617c9ed7e7f0b1 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Fri, 13 Mar 2026 00:44:27 -0400 Subject: [PATCH 06/10] test: assert task-group cancellation directly in stdio EOF monitor tests --- tests/server/test_stdio.py | 39 +++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 8dedc3820..9bce31643 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -91,24 +91,22 @@ async def test_stdin_eof_monitor_detects_hangup(): mock_buffer = MagicMock() mock_buffer.fileno.return_value = read_fd - cancelled = False with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): - with anyio.CancelScope() as scope: - async with anyio.create_task_group() as tg: - monitor = _create_stdin_eof_monitor(tg) - assert monitor is not None - tg.start_soon(monitor) - - # Close the write end to trigger POLLHUP on read end - os.close(write_fd) - write_fd = -1 - - # The monitor will cancel the task group scope when it - # detects POLLHUP. Wait with a timeout to avoid hanging. - with anyio.fail_after(5): - await anyio.sleep(10) # will be cancelled by monitor - cancelled = scope.cancel_called - assert cancelled + async with anyio.create_task_group() as tg: + monitor = _create_stdin_eof_monitor(tg) + assert monitor is not None + tg.start_soon(monitor) + + # Close the write end to trigger POLLHUP on read end + os.close(write_fd) + write_fd = -1 + + # Wait for the monitor to cancel the task-group scope. + with anyio.fail_after(5): + while not tg.cancel_scope.cancel_called: + await anyio.sleep(0.05) + + assert tg.cancel_scope.cancel_called finally: os.close(read_fd) if write_fd != -1: # pragma: no cover @@ -143,9 +141,12 @@ async def test_stdin_eof_monitor_ignores_pollin_events(): os.close(write_fd) write_fd = -1 - # Wait for the monitor to detect POLLHUP and cancel + # Wait for the monitor to detect POLLHUP and cancel. with anyio.fail_after(5): - await anyio.sleep(10) # will be cancelled by monitor + while not tg.cancel_scope.cancel_called: + await anyio.sleep(0.05) + + assert tg.cancel_scope.cancel_called finally: os.close(read_fd) if write_fd != -1: # pragma: no cover From c8b99837a0b01258af4ad60a24eaf8521f322186 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Fri, 13 Mar 2026 00:51:25 -0400 Subject: [PATCH 07/10] test: avoid redundant cancel assertions in stdio EOF monitor tests --- tests/server/test_stdio.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 9bce31643..10f7ad023 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -105,8 +105,6 @@ async def test_stdin_eof_monitor_detects_hangup(): with anyio.fail_after(5): while not tg.cancel_scope.cancel_called: await anyio.sleep(0.05) - - assert tg.cancel_scope.cancel_called finally: os.close(read_fd) if write_fd != -1: # pragma: no cover @@ -145,8 +143,6 @@ async def test_stdin_eof_monitor_ignores_pollin_events(): with anyio.fail_after(5): while not tg.cancel_scope.cancel_called: await anyio.sleep(0.05) - - assert tg.cancel_scope.cancel_called finally: os.close(read_fd) if write_fd != -1: # pragma: no cover From 2787185c6f18027a07c0799a128f355d2b579141 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Sun, 15 Mar 2026 00:25:19 -0400 Subject: [PATCH 08/10] tests: mark unreachable zero-iteration branch in POLLHUP monitor loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The while loop at line 144 always executes at least once — POLLHUP fires after the write end is closed, not before. The zero-iteration branch (condition False on first check) is structurally unreachable in this test but triggers a coverage miss under branch coverage. Mark with pragma: no branch, consistent with the pattern used elsewhere in the test suite (e.g. test_notification_response.py). --- tests/server/test_stdio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 10f7ad023..9f0881a63 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -141,7 +141,7 @@ async def test_stdin_eof_monitor_ignores_pollin_events(): # Wait for the monitor to detect POLLHUP and cancel. with anyio.fail_after(5): - while not tg.cancel_scope.cancel_called: + while not tg.cancel_scope.cancel_called: # pragma: no branch await anyio.sleep(0.05) finally: os.close(read_fd) From 33a96b81579d769ad1ea0be8b2f94fd4247ae2a3 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Sun, 15 Mar 2026 10:16:48 -0400 Subject: [PATCH 09/10] fix: pragma no cover on finally cleanup lines to fix 100% coverage gate --- tests/server/test_stdio.py | 298 ++++++++++++++++++------------------- 1 file changed, 149 insertions(+), 149 deletions(-) diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 9f0881a63..98a507b10 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -1,149 +1,149 @@ -import io -import os -import sys -from unittest.mock import MagicMock, patch - -import anyio -import pytest - -from mcp.server.stdio import _create_stdin_eof_monitor, stdio_server -from mcp.shared.message import SessionMessage -from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter - - -@pytest.mark.anyio -async def test_stdio_server(): - stdin = io.StringIO() - stdout = io.StringIO() - - messages = [ - JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"), - JSONRPCResponse(jsonrpc="2.0", id=2, result={}), - ] - - for message in messages: - stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n") - stdin.seek(0) - - async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as ( - read_stream, - write_stream, - ): - received_messages: list[JSONRPCMessage] = [] - async with read_stream: - async for message in read_stream: - if isinstance(message, Exception): # pragma: no cover - raise message - received_messages.append(message.message) - if len(received_messages) == 2: - break - - # Verify received messages - assert len(received_messages) == 2 - assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") - assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) - - # Test sending responses from the server - responses = [ - JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"), - JSONRPCResponse(jsonrpc="2.0", id=4, result={}), - ] - - async with write_stream: - for response in responses: - session_message = SessionMessage(response) - await write_stream.send(session_message) - - stdout.seek(0) - output_lines = stdout.readlines() - assert len(output_lines) == 2 - - received_responses = [jsonrpc_message_adapter.validate_json(line.strip()) for line in output_lines] - assert len(received_responses) == 2 - assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping") - assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={}) - - -def test_create_stdin_eof_monitor_returns_none_on_win32(): - """On Windows, the EOF monitor is not supported.""" - tg = MagicMock() - with patch.object(sys, "platform", "win32"): - result = _create_stdin_eof_monitor(tg) - assert result is None - - -def test_create_stdin_eof_monitor_returns_none_when_fileno_fails(): - """When stdin.buffer.fileno() raises, the monitor returns None.""" - tg = MagicMock() - mock_buffer = MagicMock() - mock_buffer.fileno.side_effect = io.UnsupportedOperation("redirected stdin") - with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): - result = _create_stdin_eof_monitor(tg) - assert result is None - - -@pytest.mark.anyio -@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") -async def test_stdin_eof_monitor_detects_hangup(): - """The EOF monitor cancels the task group when stdin pipe closes.""" - read_fd, write_fd = os.pipe() - try: - mock_buffer = MagicMock() - mock_buffer.fileno.return_value = read_fd - - with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): - async with anyio.create_task_group() as tg: - monitor = _create_stdin_eof_monitor(tg) - assert monitor is not None - tg.start_soon(monitor) - - # Close the write end to trigger POLLHUP on read end - os.close(write_fd) - write_fd = -1 - - # Wait for the monitor to cancel the task-group scope. - with anyio.fail_after(5): - while not tg.cancel_scope.cancel_called: - await anyio.sleep(0.05) - finally: - os.close(read_fd) - if write_fd != -1: # pragma: no cover - os.close(write_fd) - - -@pytest.mark.anyio -@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") -async def test_stdin_eof_monitor_ignores_pollin_events(): - """The monitor ignores POLLIN events (data available) and only reacts to hangup/error.""" - read_fd, write_fd = os.pipe() - try: - mock_buffer = MagicMock() - mock_buffer.fileno.return_value = read_fd - - with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): - async with anyio.create_task_group() as tg: - monitor = _create_stdin_eof_monitor(tg) - assert monitor is not None - tg.start_soon(monitor) - - # Write data to trigger POLLIN (not POLLHUP) - os.write(write_fd, b"hello\n") - - # Give the monitor time to process the POLLIN event - await anyio.sleep(0.3) - - # Monitor should NOT have cancelled since POLLIN alone isn't a hangup - assert not tg.cancel_scope.cancel_called - - # Now close write end to trigger POLLHUP - os.close(write_fd) - write_fd = -1 - - # Wait for the monitor to detect POLLHUP and cancel. - with anyio.fail_after(5): - while not tg.cancel_scope.cancel_called: # pragma: no branch - await anyio.sleep(0.05) - finally: - os.close(read_fd) - if write_fd != -1: # pragma: no cover - os.close(write_fd) +import io +import os +import sys +from unittest.mock import MagicMock, patch + +import anyio +import pytest + +from mcp.server.stdio import _create_stdin_eof_monitor, stdio_server +from mcp.shared.message import SessionMessage +from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter + + +@pytest.mark.anyio +async def test_stdio_server(): + stdin = io.StringIO() + stdout = io.StringIO() + + messages = [ + JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"), + JSONRPCResponse(jsonrpc="2.0", id=2, result={}), + ] + + for message in messages: + stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n") + stdin.seek(0) + + async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as ( + read_stream, + write_stream, + ): + received_messages: list[JSONRPCMessage] = [] + async with read_stream: + async for message in read_stream: + if isinstance(message, Exception): # pragma: no cover + raise message + received_messages.append(message.message) + if len(received_messages) == 2: + break + + # Verify received messages + assert len(received_messages) == 2 + assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) + + # Test sending responses from the server + responses = [ + JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"), + JSONRPCResponse(jsonrpc="2.0", id=4, result={}), + ] + + async with write_stream: + for response in responses: + session_message = SessionMessage(response) + await write_stream.send(session_message) + + stdout.seek(0) + output_lines = stdout.readlines() + assert len(output_lines) == 2 + + received_responses = [jsonrpc_message_adapter.validate_json(line.strip()) for line in output_lines] + assert len(received_responses) == 2 + assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping") + assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={}) + + +def test_create_stdin_eof_monitor_returns_none_on_win32(): + """On Windows, the EOF monitor is not supported.""" + tg = MagicMock() + with patch.object(sys, "platform", "win32"): + result = _create_stdin_eof_monitor(tg) + assert result is None + + +def test_create_stdin_eof_monitor_returns_none_when_fileno_fails(): + """When stdin.buffer.fileno() raises, the monitor returns None.""" + tg = MagicMock() + mock_buffer = MagicMock() + mock_buffer.fileno.side_effect = io.UnsupportedOperation("redirected stdin") + with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): + result = _create_stdin_eof_monitor(tg) + assert result is None + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") +async def test_stdin_eof_monitor_detects_hangup(): + """The EOF monitor cancels the task group when stdin pipe closes.""" + read_fd, write_fd = os.pipe() + try: + mock_buffer = MagicMock() + mock_buffer.fileno.return_value = read_fd + + with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): + async with anyio.create_task_group() as tg: + monitor = _create_stdin_eof_monitor(tg) + assert monitor is not None + tg.start_soon(monitor) + + # Close the write end to trigger POLLHUP on read end + os.close(write_fd) + write_fd = -1 + + # Wait for the monitor to cancel the task-group scope. + with anyio.fail_after(5): + while not tg.cancel_scope.cancel_called: + await anyio.sleep(0.05) + finally: + os.close(read_fd) # pragma: no cover + if write_fd != -1: # pragma: no cover + os.close(write_fd) + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") +async def test_stdin_eof_monitor_ignores_pollin_events(): + """The monitor ignores POLLIN events (data available) and only reacts to hangup/error.""" + read_fd, write_fd = os.pipe() + try: + mock_buffer = MagicMock() + mock_buffer.fileno.return_value = read_fd + + with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)): + async with anyio.create_task_group() as tg: + monitor = _create_stdin_eof_monitor(tg) + assert monitor is not None + tg.start_soon(monitor) + + # Write data to trigger POLLIN (not POLLHUP) + os.write(write_fd, b"hello\n") + + # Give the monitor time to process the POLLIN event + await anyio.sleep(0.3) + + # Monitor should NOT have cancelled since POLLIN alone isn't a hangup + assert not tg.cancel_scope.cancel_called + + # Now close write end to trigger POLLHUP + os.close(write_fd) + write_fd = -1 + + # Wait for the monitor to detect POLLHUP and cancel. + with anyio.fail_after(5): + while not tg.cancel_scope.cancel_called: # pragma: no branch + await anyio.sleep(0.05) + finally: + os.close(read_fd) # pragma: no cover + if write_fd != -1: # pragma: no cover + os.close(write_fd) From 6cb12cbc1e8ea29523cbf875f3ecc2bb3b8f4722 Mon Sep 17 00:00:00 2001 From: Andrew Barnes Date: Sun, 15 Mar 2026 13:52:55 -0400 Subject: [PATCH 10/10] fix: resolve CI coverage failures for platform-specific stdin EOF monitor Use pragma: lax no cover for select.poll()-based code paths that are only exercised on non-Windows platforms. Remove incorrect pragma: no cover from test finally blocks that are always executed. Github-Issue:#2231 --- src/mcp/server/stdio.py | 12 +++++++----- tests/server/test_stdio.py | 12 ++++++------ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index f8f34f684..2a5f566fc 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -55,12 +55,14 @@ def _create_stdin_eof_monitor( if not hasattr(select, "poll"): return None # pragma: no cover - try: + # The remaining code uses select.poll() which is not available on Windows. + # Coverage is exercised on non-Windows platforms only. + try: # pragma: lax no cover fd = sys.stdin.buffer.fileno() - except Exception: + except Exception: # pragma: lax no cover return None - async def monitor() -> None: + async def monitor() -> None: # pragma: lax no cover poll_obj = select.poll() poll_obj.register(fd, select.POLLIN | select.POLLHUP) try: @@ -74,7 +76,7 @@ async def monitor() -> None: finally: poll_obj.unregister(fd) - return monitor + return monitor # pragma: lax no cover @asynccontextmanager @@ -131,6 +133,6 @@ async def stdout_writer(): eof_monitor = _create_stdin_eof_monitor(tg) if eof_monitor is not None: - tg.start_soon(eof_monitor) + tg.start_soon(eof_monitor) # pragma: lax no cover yield read_stream, write_stream diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 98a507b10..8844ed223 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -84,7 +84,7 @@ def test_create_stdin_eof_monitor_returns_none_when_fileno_fails(): @pytest.mark.anyio @pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") -async def test_stdin_eof_monitor_detects_hangup(): +async def test_stdin_eof_monitor_detects_hangup(): # pragma: lax no cover """The EOF monitor cancels the task group when stdin pipe closes.""" read_fd, write_fd = os.pipe() try: @@ -106,14 +106,14 @@ async def test_stdin_eof_monitor_detects_hangup(): while not tg.cancel_scope.cancel_called: await anyio.sleep(0.05) finally: - os.close(read_fd) # pragma: no cover - if write_fd != -1: # pragma: no cover + os.close(read_fd) + if write_fd != -1: os.close(write_fd) @pytest.mark.anyio @pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows") -async def test_stdin_eof_monitor_ignores_pollin_events(): +async def test_stdin_eof_monitor_ignores_pollin_events(): # pragma: lax no cover """The monitor ignores POLLIN events (data available) and only reacts to hangup/error.""" read_fd, write_fd = os.pipe() try: @@ -144,6 +144,6 @@ async def test_stdin_eof_monitor_ignores_pollin_events(): while not tg.cancel_scope.cancel_called: # pragma: no branch await anyio.sleep(0.05) finally: - os.close(read_fd) # pragma: no cover - if write_fd != -1: # pragma: no cover + os.close(read_fd) + if write_fd != -1: os.close(write_fd)