diff --git a/pyproject.toml b/pyproject.toml index efc3d55..7d1eb4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-runtime" -version = "0.10.0" +version = "0.10.1" description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/runtime/logging/_interceptor.py b/src/uipath/runtime/logging/_interceptor.py index 85d4a1e..ed69f3d 100644 --- a/src/uipath/runtime/logging/_interceptor.py +++ b/src/uipath/runtime/logging/_interceptor.py @@ -148,9 +148,16 @@ def setup(self) -> None: # logger.propagate remains True (default) self.patched_loggers.add(logger_name) - # Child executions should redirect stdout/stderr to their own handler - # This ensures print statements are captured per execution - self._redirect_stdout_stderr() + # Register our handler on stdout/stderr loggers so that + # print() output routed through the master's LoggerWriter + # is captured per-execution via filters. + # We do NOT replace sys.stdout/sys.stderr — the master owns those. + if not isinstance(sys.stdout, LoggerWriter): + self.logger.warning( + "Child interceptor set up without a master LoggerWriter on sys.stdout. " + "print() output will not be captured for this execution context." + ) + self._register_stdout_stderr_handlers() else: # Master execution mode: remove all handlers and add only ours self._clean_all_handlers(self.root_logger) @@ -165,28 +172,33 @@ def setup(self) -> None: # Master redirects stdout/stderr self._redirect_stdout_stderr() + def _register_stdout_stderr_handlers(self) -> None: + """Register our handler on stdout/stderr loggers without replacing the streams.""" + stdout_logger = logging.getLogger("stdout") + stderr_logger = logging.getLogger("stderr") + + stdout_logger.propagate = False + stderr_logger.propagate = False + + if self.log_handler not in stdout_logger.handlers: + stdout_logger.addHandler(self.log_handler) + if self.log_handler not in stderr_logger.handlers: + stderr_logger.addHandler(self.log_handler) + def _redirect_stdout_stderr(self) -> None: - """Redirect stdout and stderr to the logging system.""" - # Set up stdout and stderr loggers + """Redirect stdout and stderr to the logging system. + + Only called by master execution mode. Replaces sys.stdout/sys.stderr + with LoggerWriter instances that route output through the logging system. + """ stdout_logger = logging.getLogger("stdout") stderr_logger = logging.getLogger("stderr") - if self.execution_id: - # Child execution: add our handler to stdout/stderr loggers - stdout_logger.propagate = False - stderr_logger.propagate = False - - if self.log_handler not in stdout_logger.handlers: - stdout_logger.addHandler(self.log_handler) - if self.log_handler not in stderr_logger.handlers: - stderr_logger.addHandler(self.log_handler) - else: - # Master execution: clean and set up handlers - stdout_logger.propagate = False - stderr_logger.propagate = False + stdout_logger.propagate = False + stderr_logger.propagate = False - self._clean_all_handlers(stdout_logger) - self._clean_all_handlers(stderr_logger) + self._clean_all_handlers(stdout_logger) + self._clean_all_handlers(stderr_logger) # Use the min_level in the LoggerWriter to filter messages sys.stdout = LoggerWriter( @@ -197,16 +209,37 @@ def _redirect_stdout_stderr(self) -> None: ) def teardown(self) -> None: - """Restore original logging configuration.""" - # Clear the context variable + """Restore original logging configuration. + + IMPORTANT: The ordering below is critical. Flushing must happen before + clearing the context variable and before removing handlers. Otherwise: + - If context is cleared first, the execution filter won't match the + flushed records and they'll be dropped. + - If handlers are removed first, the flushed records have no destination. + """ + # Step 1: Flush LoggerWriter buffers while context and handlers are still active. + # Child mode: flush only this context's buffer from the shared LoggerWriter. + # Master mode: flush ALL remaining buffers before restoring streams. + if self.execution_id: + if isinstance(sys.stdout, LoggerWriter): + sys.stdout.flush() + if isinstance(sys.stderr, LoggerWriter): + sys.stderr.flush() + else: + if isinstance(sys.stdout, LoggerWriter): + sys.stdout.flush_all() + if isinstance(sys.stderr, LoggerWriter): + sys.stderr.flush_all() + + # Step 2: Clear the context variable (after flush used it) if self.execution_id: current_execution_id.set(None) - # Restore the original disable level + # Step 3: Restore the original disable level if not self.execution_id: logging.disable(self.original_disable_level) - # Remove our handler and filter + # Step 4: Remove our handler and filter if self.execution_filter: self.log_handler.removeFilter(self.execution_filter) @@ -240,8 +273,10 @@ def teardown(self) -> None: if self._owns_handler: self.log_handler.close() - # Only restore streams if we redirected them - if self.original_stdout and self.original_stderr: + # Step 5: Only master restores streams. Children never replaced + # sys.stdout/sys.stderr (they only registered handlers on the loggers), + # so there is nothing for them to restore here. + if not self.execution_id and self.original_stdout and self.original_stderr: sys.stdout = self.original_stdout sys.stderr = self.original_stderr diff --git a/src/uipath/runtime/logging/_writers.py b/src/uipath/runtime/logging/_writers.py index f33732b..898a4b9 100644 --- a/src/uipath/runtime/logging/_writers.py +++ b/src/uipath/runtime/logging/_writers.py @@ -3,9 +3,15 @@ import logging from typing import TextIO +from uipath.runtime.logging._context import current_execution_id + class LoggerWriter: - """Redirect stdout/stderr to logging system.""" + """Redirect stdout/stderr to logging system. + + Maintains per-execution-context buffers so that concurrent async tasks + (e.g. parallel eval runs) do not interleave partial lines. + """ def __init__( self, @@ -18,7 +24,10 @@ def __init__( self.logger = logger self.level = level self.min_level = min_level - self.buffer = "" + # Keyed by current_execution_id (None for master context). + # A single shared buffer would interleave partial lines from + # concurrent async tasks writing to the same sys.stdout. + self._buffers: dict[str | None, str] = {} self.sys_file = sys_file self._in_logging = False # Recursion guard @@ -35,17 +44,22 @@ def write(self, message: str) -> None: try: self._in_logging = True - self.buffer += message - while "\n" in self.buffer: - line, self.buffer = self.buffer.split("\n", 1) + ctx = current_execution_id.get() + buf = self._buffers.get(ctx, "") + message + while "\n" in buf: + line, buf = buf.split("\n", 1) # Only log if the message is not empty and the level is sufficient if line and self.level >= self.min_level: self.logger._log(self.level, line, ()) + if buf: + self._buffers[ctx] = buf + else: + self._buffers.pop(ctx, None) finally: self._in_logging = False def flush(self) -> None: - """Flush any remaining buffered messages to the logger.""" + """Flush the current execution context's buffered messages to the logger.""" if self._in_logging: if self.sys_file: try: @@ -56,10 +70,28 @@ def flush(self) -> None: try: self._in_logging = True - # Log any remaining content in the buffer on flush - if self.buffer and self.level >= self.min_level: - self.logger._log(self.level, self.buffer, ()) - self.buffer = "" + ctx = current_execution_id.get() + buf = self._buffers.pop(ctx, "") + if buf and self.level >= self.min_level: + self.logger._log(self.level, buf, ()) + finally: + self._in_logging = False + + def flush_all(self) -> None: + """Flush all execution contexts' buffered messages. Called by master teardown. + + Intentionally ignores current_execution_id — iterates all keys + directly so that no context's partial lines are lost. + """ + if self._in_logging: + return + + try: + self._in_logging = True + for buf in self._buffers.values(): + if buf and self.level >= self.min_level: + self.logger._log(self.level, buf, ()) + self._buffers.clear() finally: self._in_logging = False diff --git a/tests/test_executor.py b/tests/test_executor.py index ebb5d60..9c47372 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -1,5 +1,7 @@ """Simple test for runtime factory and executor span capture.""" +import logging +import sys from typing import Any, AsyncGenerator, TypeVar import pytest @@ -13,6 +15,7 @@ UiPathRuntimeProtocol, ) from uipath.runtime.base import UiPathStreamOptions +from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus from uipath.runtime.schema import UiPathRuntimeSchema @@ -119,9 +122,32 @@ async def new_runtime( return self.runtime_class() +@pytest.fixture(autouse=True) +def _isolate_logging(): + """Save and restore logging state so tests don't leak into each other.""" + root = logging.getLogger() + original_level = root.level + original_handlers = list(root.handlers) + original_stdout = sys.stdout + original_stderr = sys.stderr + yield + root.setLevel(original_level) + root.handlers = original_handlers + sys.stdout = original_stdout + sys.stderr = original_stderr + logging.disable(logging.NOTSET) + + @pytest.mark.asyncio -async def test_multiple_factories_same_executor(): +async def test_multiple_factories_same_executor(tmp_path): """Test factories using same trace manager, verify spans are captured correctly.""" + # Set up a master interceptor so that sys.stdout is a LoggerWriter, + # matching real production usage where UiPathRuntimeContext provides one. + master = UiPathRuntimeLogsInterceptor( + job_id="test-job", dir=str(tmp_path), file="test.log" + ) + master.setup() + trace_manager = UiPathTraceManager() # Create factories for different runtimes @@ -228,3 +254,5 @@ async def test_multiple_factories_same_executor(): assert execution_runtime_c.log_handler assert len(execution_runtime_c.log_handler.buffer) > 0 assert execution_runtime_c.log_handler.buffer[0].msg == "executing {'input': 'c'}" + + master.teardown() diff --git a/tests/test_interceptor.py b/tests/test_interceptor.py index b69e093..8200459 100644 --- a/tests/test_interceptor.py +++ b/tests/test_interceptor.py @@ -1,5 +1,6 @@ -"""Tests for UiPathRuntimeLogsInterceptor teardown with non-UTF-8 stdout.""" +"""Tests for UiPathRuntimeLogsInterceptor.""" +import asyncio import io import logging import sys @@ -7,7 +8,10 @@ import pytest +from uipath.runtime.logging._context import current_execution_id from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor +from uipath.runtime.logging._writers import LoggerWriter +from uipath.runtime.logging.handlers import UiPathRuntimeExecutionLogHandler @pytest.fixture(autouse=True) @@ -169,3 +173,238 @@ def test_no_utf8_wrapper_with_job_id(self, tmp_path): ) assert not hasattr(interceptor, "utf8_stdout") + + +class TestLoggerWriterPerContextBuffers: + """Verify LoggerWriter isolates buffers by execution context.""" + + def _make_writer(self) -> tuple[LoggerWriter, logging.Logger, list[logging.LogRecord]]: + logger = logging.getLogger("test_writer") + logger.handlers.clear() + logger.setLevel(logging.DEBUG) + logger.propagate = False + records: list[logging.LogRecord] = [] + handler = logging.Handler() + handler.emit = lambda r: records.append(r) + logger.addHandler(handler) + writer = LoggerWriter(logger, logging.INFO, logging.INFO, sys.__stdout__) + return writer, logger, records + + def test_no_context_uses_none_key(self): + """Writes without execution context use None as buffer key.""" + writer, _, records = self._make_writer() + + writer.write("hello\n") + assert None not in writer._buffers # fully flushed line, no leftover + assert len(records) == 1 + + writer.write("partial") + assert writer._buffers.get(None) == "partial" + + def test_separate_buffers_per_context(self): + """Different execution contexts get independent buffers.""" + writer, _, _ = self._make_writer() + + current_execution_id.set("exec-1") + writer.write("from exec 1") + + current_execution_id.set("exec-2") + writer.write("from exec 2") + + assert writer._buffers.get("exec-1") == "from exec 1" + assert writer._buffers.get("exec-2") == "from exec 2" + + current_execution_id.set(None) + + def test_flush_only_current_context(self): + """flush() only flushes the current context's buffer.""" + writer, _, records = self._make_writer() + + current_execution_id.set("exec-1") + writer.write("line 1 partial") + + current_execution_id.set("exec-2") + writer.write("line 2 partial") + + # Flush only exec-2 + writer.flush() + + assert "exec-2" not in writer._buffers + assert writer._buffers.get("exec-1") == "line 1 partial" + assert len(records) == 1 + assert records[0].getMessage() == "line 2 partial" + + current_execution_id.set(None) + + def test_flush_all_flushes_every_context(self): + """flush_all() flushes all contexts.""" + writer, _, records = self._make_writer() + + current_execution_id.set("exec-1") + writer.write("partial 1") + + current_execution_id.set("exec-2") + writer.write("partial 2") + + writer.flush_all() + + assert len(writer._buffers) == 0 + messages = {r.getMessage() for r in records} + assert messages == {"partial 1", "partial 2"} + + current_execution_id.set(None) + + def test_complete_lines_emitted_immediately(self): + """Lines ending with newline are emitted, not buffered.""" + writer, _, records = self._make_writer() + + writer.write("complete line\n") + assert len(records) == 1 + assert records[0].getMessage() == "complete line" + assert len(writer._buffers) == 0 + + +class TestChildInterceptorNoStreamReplace: + """Verify child interceptors don't replace sys.stdout/sys.stderr.""" + + def test_child_does_not_replace_stdout(self, tmp_path): + """Child interceptor must not overwrite sys.stdout.""" + master = UiPathRuntimeLogsInterceptor( + job_id="job-1", dir=str(tmp_path), file="test.log" + ) + master.setup() + + master_stdout = sys.stdout + assert isinstance(master_stdout, LoggerWriter) + + child_handler = UiPathRuntimeExecutionLogHandler("exec-1") + child = UiPathRuntimeLogsInterceptor( + execution_id="exec-1", log_handler=child_handler + ) + child.setup() + + # sys.stdout must still be the master's LoggerWriter + assert sys.stdout is master_stdout + + child.teardown() + master.teardown() + + def test_child_registers_handler_on_stdout_logger(self, tmp_path): + """Child interceptor must register its handler on the stdout logger.""" + master = UiPathRuntimeLogsInterceptor( + job_id="job-1", dir=str(tmp_path), file="test.log" + ) + master.setup() + + child_handler = UiPathRuntimeExecutionLogHandler("exec-1") + child = UiPathRuntimeLogsInterceptor( + execution_id="exec-1", log_handler=child_handler + ) + child.setup() + + stdout_logger = logging.getLogger("stdout") + assert child_handler in stdout_logger.handlers + + child.teardown() + + # After child teardown, handler should be removed + assert child_handler not in stdout_logger.handlers + + master.teardown() + + +class TestTeardownFlushOrder: + """Verify partial lines are flushed before teardown completes.""" + + def test_child_flushes_partial_line_on_teardown(self, tmp_path): + """Partial line in LoggerWriter buffer must be captured on child teardown.""" + master = UiPathRuntimeLogsInterceptor( + job_id="job-1", dir=str(tmp_path), file="test.log" + ) + master.setup() + + child_handler = UiPathRuntimeExecutionLogHandler("exec-1") + child = UiPathRuntimeLogsInterceptor( + execution_id="exec-1", log_handler=child_handler + ) + child.setup() + + # Write a partial line (no newline) via print + print("partial line no newline", end="") + + child.teardown() + + # The partial line should have been flushed to the child's handler + messages = [child_handler.formatter.format(r) for r in child_handler.buffer] + assert any("partial line no newline" in m for m in messages) + + master.teardown() + + def test_master_flushes_all_on_teardown(self, tmp_path): + """Master teardown flushes all remaining buffers.""" + master = UiPathRuntimeLogsInterceptor( + job_id="job-1", dir=str(tmp_path), file="test.log" + ) + master.setup() + + # Write a partial line in master context (no execution_id) + print("master partial", end="") + + master.teardown() + + # Read the log file to verify the partial line was written + log_file = tmp_path / "test.log" + log_content = log_file.read_text() + assert "master partial" in log_content + + +class TestParallelExecutionLogIsolation: + """End-to-end test: parallel async tasks produce correctly separated logs.""" + + @pytest.mark.asyncio + async def test_parallel_tasks_isolated(self, tmp_path): + """Concurrent eval executions must not interleave log output.""" + master = UiPathRuntimeLogsInterceptor( + job_id="job-1", dir=str(tmp_path), file="test.log" + ) + master.setup() + + child_handlers: dict[str, UiPathRuntimeExecutionLogHandler] = {} + + async def run_execution(exec_id: str) -> None: + handler = UiPathRuntimeExecutionLogHandler(exec_id) + child_handlers[exec_id] = handler + + child = UiPathRuntimeLogsInterceptor( + execution_id=exec_id, log_handler=handler + ) + child.setup() + + try: + # Mix logging and print calls with yields between them + for i in range(5): + logging.info(f"log from {exec_id} iter {i}") + print(f"print from {exec_id} iter {i}") + await asyncio.sleep(0) # yield to other tasks + finally: + child.teardown() + + # Run 4 concurrent executions + exec_ids = [f"exec-{i}" for i in range(4)] + await asyncio.gather(*(run_execution(eid) for eid in exec_ids)) + + master.teardown() + + # Verify each handler only contains its own messages + for exec_id in exec_ids: + handler = child_handlers[exec_id] + messages = [handler.formatter.format(r) for r in handler.buffer] + for msg in messages: + assert exec_id in msg, ( + f"Handler for {exec_id} contains foreign message: {msg}" + ) + # Each execution should have captured both logging and print output + # 5 logging.info + 5 print = 10 messages + assert len(messages) == 10, ( + f"Handler for {exec_id} has {len(messages)} messages, expected 10" + ) diff --git a/uv.lock b/uv.lock index f514162..d45affe 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11" [[package]] @@ -1005,7 +1005,7 @@ wheels = [ [[package]] name = "uipath-runtime" -version = "0.10.0" +version = "0.10.1" source = { editable = "." } dependencies = [ { name = "uipath-core" },