From 0b4d2ba88c38c35ac134352f85fbeb14640f8405 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Mon, 4 May 2026 08:41:03 +0000 Subject: [PATCH 1/3] fix: resolve shell injection, session cache races, and dead stats lock (fixes #1608) - Add shell=False parameter to sandbox backends with safe command parsing - Fix shell injection in subprocess.py, docker.py, and ssh.py - Add thread-safe session cache operations to PersistenceOrchestrator - Fix AsyncAgentScheduler stats lock usage with atomic counter operations - Create shared _shell.py utility for consistent command handling Security fixes: 1. Shell injection: Default to safe argv execution, explicit shell opt-in 2. Session races: Thread-safe cache with RLock and defensive copying 3. Dead lock: Use existing _stats_lock for atomic counter updates Co-authored-by: MervinPraison --- .../praisonai/async_agent_scheduler.py | 48 ++++++++++++++++-- .../praisonai/persistence/orchestrator.py | 43 +++++++++++++--- src/praisonai/praisonai/sandbox/_shell.py | 38 ++++++++++++++ src/praisonai/praisonai/sandbox/docker.py | 25 ++++++++-- src/praisonai/praisonai/sandbox/ssh.py | 49 ++++++++++++++++--- src/praisonai/praisonai/sandbox/subprocess.py | 18 +++++-- 6 files changed, 195 insertions(+), 26 deletions(-) create mode 100644 src/praisonai/praisonai/sandbox/_shell.py diff --git a/src/praisonai/praisonai/async_agent_scheduler.py b/src/praisonai/praisonai/async_agent_scheduler.py index 355bd768b..ab4b01c29 100644 --- a/src/praisonai/praisonai/async_agent_scheduler.py +++ b/src/praisonai/praisonai/async_agent_scheduler.py @@ -253,9 +253,37 @@ async def stop(self) -> bool: logger.info(f"Execution stats - Total: {self._execution_count}, Success: {self._success_count}, Failed: {self._failure_count}") return True - def get_stats(self) -> Dict[str, Any]: + async def get_stats(self) -> Dict[str, Any]: """ - Get execution statistics. + Get current execution statistics with atomic snapshot. + + Returns: + Dictionary with execution stats + """ + if self._stats_lock is None: + # Not yet started: stats are all zero, no lock needed + execs, success, failed = 0, 0, 0 + else: + # Take atomic snapshot of all counters + async with self._stats_lock: + execs = self._execution_count + success = self._success_count + failed = self._failure_count + + return { + "is_running": self.is_running, + "total_executions": execs, + "successful_executions": success, + "failed_executions": failed, + "success_rate": (success / execs * 100) if execs > 0 else 0 + } + + def get_stats_sync(self) -> Dict[str, Any]: + """ + Get current execution statistics (synchronous, best-effort). + + Warning: This method provides a best-effort view of stats without + guaranteeing atomicity. For consistent snapshots, use get_stats() async. Returns: Dictionary with execution stats @@ -289,7 +317,13 @@ async def _run_schedule(self, interval: int, max_retries: int): async def _execute_with_retry(self, max_retries: int): """Execute agent with retry logic.""" - self._execution_count += 1 + # Ensure async primitives are available + if self._stats_lock is None: + self._ensure_async_primitives() + + # Atomically increment execution count + async with self._stats_lock: + self._execution_count += 1 last_exc: Optional[Exception] = None for attempt in range(max_retries): @@ -300,7 +334,9 @@ async def _execute_with_retry(self, max_retries: int): logger.info(f"Async agent execution successful on attempt {attempt + 1}") logger.info(f"Result: {result}") - self._success_count += 1 + # Atomically increment success count + async with self._stats_lock: + self._success_count += 1 safe_call(self.on_success, result) return @@ -313,7 +349,9 @@ async def _execute_with_retry(self, max_retries: int): logger.info(f"Waiting {wait_time}s before async retry...") await asyncio.sleep(wait_time) - self._failure_count += 1 + # Atomically increment failure count + async with self._stats_lock: + self._failure_count += 1 logger.error(f"Async agent execution failed after {max_retries} attempts") safe_call( self.on_failure, diff --git a/src/praisonai/praisonai/persistence/orchestrator.py b/src/praisonai/praisonai/persistence/orchestrator.py index 0c9077efb..38ebc5090 100644 --- a/src/praisonai/praisonai/persistence/orchestrator.py +++ b/src/praisonai/praisonai/persistence/orchestrator.py @@ -7,7 +7,9 @@ import logging import time +import threading import uuid +from copy import deepcopy from typing import Any, Dict, List, Optional, TYPE_CHECKING from .conversation.base import ConversationStore, ConversationSession, ConversationMessage @@ -78,6 +80,7 @@ def __init__( self._current_session: Optional[ConversationSession] = None self._session_cache: Dict[str, ConversationSession] = {} + self._cache_lock = threading.RLock() # RLock allows re-entrant access @classmethod def from_config(cls, config: PersistenceConfig) -> "PersistenceOrchestrator": @@ -90,6 +93,31 @@ def from_env(cls) -> "PersistenceOrchestrator": config = PersistenceConfig.from_env() return cls(config=config) + # ========================================================================= + # Thread-Safe Cache Operations + # ========================================================================= + + def _cache_put(self, session: ConversationSession) -> None: + """Store session in cache with thread safety.""" + with self._cache_lock: + self._session_cache[session.session_id] = session + + def _cache_get(self, session_id: str) -> Optional[ConversationSession]: + """Get session from cache with thread safety and defensive copying.""" + with self._cache_lock: + cached = self._session_cache.get(session_id) + return deepcopy(cached) if cached is not None else None + + def _cache_delete(self, session_id: str) -> Optional[ConversationSession]: + """Remove session from cache with thread safety.""" + with self._cache_lock: + return self._session_cache.pop(session_id, None) + + def _cache_clear(self) -> None: + """Clear all sessions from cache with thread safety.""" + with self._cache_lock: + self._session_cache.clear() + # ========================================================================= # Agent Lifecycle Hooks # ========================================================================= @@ -128,7 +156,7 @@ def on_agent_start( if session: logger.info(f"Resuming session: {session_id}") self._current_session = session - self._session_cache[session_id] = session + self._cache_put(session) # Load previous messages messages = self.conversation.get_messages(session_id) @@ -146,7 +174,7 @@ def on_agent_start( self.conversation.create_session(session) logger.info(f"Created new session: {session_id}") self._current_session = session - self._session_cache[session_id] = session + self._cache_put(session) return [] def on_message( @@ -206,12 +234,14 @@ def on_agent_end( if not self.conversation: return - session = self._session_cache.get(session_id) or self.conversation.get_session(session_id) + session = self._cache_get(session_id) or self.conversation.get_session(session_id) if session: session.updated_at = time.time() if metadata: session.metadata = {**(session.metadata or {}), **metadata} self.conversation.update_session(session) + # Update cache with the modified session + self._cache_put(session) logger.debug(f"Updated session metadata: {session_id}") # ========================================================================= @@ -315,8 +345,8 @@ def delete_session(self, session_id: str) -> bool: if not self.conversation: return False - if session_id in self._session_cache: - del self._session_cache[session_id] + # Remove from cache using thread-safe method + self._cache_delete(session_id) return self.conversation.delete_session(session_id) @@ -395,7 +425,8 @@ def close(self) -> None: if self.state: self.state.close() - self._session_cache.clear() + # Clear cache using thread-safe method + self._cache_clear() logger.info("Persistence orchestrator closed") def __enter__(self): diff --git a/src/praisonai/praisonai/sandbox/_shell.py b/src/praisonai/praisonai/sandbox/_shell.py new file mode 100644 index 000000000..4d1cb7e89 --- /dev/null +++ b/src/praisonai/praisonai/sandbox/_shell.py @@ -0,0 +1,38 @@ +"""Shared utilities for safe shell command handling across sandbox backends.""" + +import shlex +from typing import List, Union + + +def build_argv(command: Union[str, List[str]], shell: bool = False) -> List[str]: + """ + Safely build command argv with explicit shell control. + + Args: + command: String command or list of arguments + shell: If True, explicitly use shell. If False, parse safely without shell. + + Returns: + List of command arguments safe for subprocess execution + + Security: + - shell=False (default): No shell injection possible + - shell=True: Caller explicitly opts into shell evaluation + """ + if isinstance(command, str): + if not shell: + # Safe parse: convert string to argv without invoking shell + return shlex.split(command) + else: + # Explicit shell: caller has opted in + return ["sh", "-c", command] + else: + # List input + cmd_list = list(command) + if shell: + # Quote each element when combining into shell command + quoted_cmd = " ".join(shlex.quote(arg) for arg in cmd_list) + return ["sh", "-c", quoted_cmd] + else: + # Direct argv execution - no shell + return cmd_list \ No newline at end of file diff --git a/src/praisonai/praisonai/sandbox/docker.py b/src/praisonai/praisonai/sandbox/docker.py index 1850d118b..51596fd17 100644 --- a/src/praisonai/praisonai/sandbox/docker.py +++ b/src/praisonai/praisonai/sandbox/docker.py @@ -226,18 +226,37 @@ async def run_command( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, + shell: bool = False, ) -> SandboxResult: - """Run a shell command in the sandbox.""" + """Run a command in the sandbox. + + Args: + command: String command or list of arguments + limits: Resource limits to apply + env: Environment variables + working_dir: Working directory + shell: If True, explicitly use shell. If False (default), execute safely without shell. + """ if not self._is_running: await self.start() limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) + # Import here to avoid circular import + import shlex + if isinstance(command, list): - cmd_str = " ".join(command) + # Always quote list elements to prevent shell injection + cmd_str = " ".join(shlex.quote(arg) for arg in command) else: - cmd_str = command + if shell: + # Caller explicitly requested shell evaluation + cmd_str = command + else: + # Parse string safely then quote each part + cmd_parts = shlex.split(command) + cmd_str = " ".join(shlex.quote(part) for part in cmd_parts) docker_cmd = [ "docker", "run", "--rm", diff --git a/src/praisonai/praisonai/sandbox/ssh.py b/src/praisonai/praisonai/sandbox/ssh.py index 5b6e76b42..a7f3df0b8 100644 --- a/src/praisonai/praisonai/sandbox/ssh.py +++ b/src/praisonai/praisonai/sandbox/ssh.py @@ -277,8 +277,17 @@ async def run_command( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, + shell: bool = False, ) -> SandboxResult: - """Run a shell command on the remote server.""" + """Run a command on the remote server. + + Args: + command: String command or list of arguments + limits: Resource limits to apply + env: Environment variables + working_dir: Working directory + shell: If True, explicitly use shell. If False (default), execute safely without shell. + """ if not self._is_running: await self.start() @@ -286,15 +295,35 @@ async def run_command( started_at = time.time() try: - # Convert command to string if needed + # Convert command to string safely based on shell parameter + from ._shell import build_argv if isinstance(command, list): - command = shlex.join(command) + if shell: + # Shell mode: join with proper quoting + command_str = " ".join(shlex.quote(arg) for arg in command) + else: + # Non-shell mode: use shlex.join for proper escaping + command_str = shlex.join(command) + else: + # String command + if not shell: + # Parse then re-join to ensure safe execution + try: + parts = shlex.split(command) + command_str = shlex.join(parts) + except ValueError: + # If parsing fails, quote the whole thing + command_str = shlex.quote(command) + else: + # Shell mode: use as-is (caller explicitly opted in) + command_str = command # Execute command result = await self._run_command_with_limits( - command, + command_str, limits, - working_dir or self.working_dir + working_dir or self.working_dir, + shell ) completed_at = time.time() @@ -491,11 +520,17 @@ async def _run_command_with_limits( self, command: str, limits: Optional[ResourceLimits], - working_dir: str + working_dir: str, + shell: bool = False ): """Run command with resource limits.""" # Change to working directory - full_command = f"cd {shlex.quote(working_dir)} && {command}" + if shell: + # Shell mode: only quote the working directory, command is already properly handled + full_command = f"cd {shlex.quote(working_dir)} && {command}" + else: + # Non-shell mode: quote the command for safety (command is already escaped from caller) + full_command = f"cd {shlex.quote(working_dir)} && {command}" # Set timeout timeout = None diff --git a/src/praisonai/praisonai/sandbox/subprocess.py b/src/praisonai/praisonai/sandbox/subprocess.py index ae99d2908..49528d82d 100644 --- a/src/praisonai/praisonai/sandbox/subprocess.py +++ b/src/praisonai/praisonai/sandbox/subprocess.py @@ -190,18 +190,26 @@ async def run_command( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, + shell: bool = False, ) -> SandboxResult: - """Run a shell command in the sandbox.""" + """Run a command in the sandbox. + + Args: + command: String command or list of arguments + limits: Resource limits to apply + env: Environment variables + working_dir: Working directory + shell: If True, explicitly use shell. If False (default), execute safely without shell. + """ if not self._is_running: await self.start() limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) - if isinstance(command, str): - cmd = ["sh", "-c", command] - else: - cmd = command + # Import here to avoid circular import + from ._shell import build_argv + cmd = build_argv(command, shell=shell) process_env = os.environ.copy() if env: From da759ef93d3623e0257c8fa0678c2e12940e7cd8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 4 May 2026 08:46:47 +0000 Subject: [PATCH 2/3] fix: correct test await, dead ssh branch, unused import, and docker shlex placement - Fix test_get_stats_initial_state: add await before get_stats() (now async) - Remove dead if/else branches in ssh._run_command_with_limits (both identical) - Remove unused 'from ._shell import build_argv' in ssh.run_command - Move 'import shlex' to top-level in docker.py (was inside method body) - Add missing EOF newline to _shell.py" Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/3ece618b-0046-4e3b-b0e9-a3dfc870f1a7 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com> --- src/praisonai/praisonai/sandbox/_shell.py | 2 +- src/praisonai/praisonai/sandbox/docker.py | 6 ++---- src/praisonai/praisonai/sandbox/ssh.py | 10 ++-------- .../tests/unit/scheduler/test_async_agent_scheduler.py | 2 +- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/src/praisonai/praisonai/sandbox/_shell.py b/src/praisonai/praisonai/sandbox/_shell.py index 4d1cb7e89..0605e8193 100644 --- a/src/praisonai/praisonai/sandbox/_shell.py +++ b/src/praisonai/praisonai/sandbox/_shell.py @@ -35,4 +35,4 @@ def build_argv(command: Union[str, List[str]], shell: bool = False) -> List[str] return ["sh", "-c", quoted_cmd] else: # Direct argv execution - no shell - return cmd_list \ No newline at end of file + return cmd_list diff --git a/src/praisonai/praisonai/sandbox/docker.py b/src/praisonai/praisonai/sandbox/docker.py index 51596fd17..f5e269e5c 100644 --- a/src/praisonai/praisonai/sandbox/docker.py +++ b/src/praisonai/praisonai/sandbox/docker.py @@ -9,6 +9,7 @@ import asyncio import logging import os +import shlex import tempfile import time import uuid @@ -243,9 +244,6 @@ async def run_command( limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) - # Import here to avoid circular import - import shlex - if isinstance(command, list): # Always quote list elements to prevent shell injection cmd_str = " ".join(shlex.quote(arg) for arg in command) @@ -254,7 +252,7 @@ async def run_command( # Caller explicitly requested shell evaluation cmd_str = command else: - # Parse string safely then quote each part + # Parse string safely then re-quote each part cmd_parts = shlex.split(command) cmd_str = " ".join(shlex.quote(part) for part in cmd_parts) diff --git a/src/praisonai/praisonai/sandbox/ssh.py b/src/praisonai/praisonai/sandbox/ssh.py index a7f3df0b8..6fcfea39c 100644 --- a/src/praisonai/praisonai/sandbox/ssh.py +++ b/src/praisonai/praisonai/sandbox/ssh.py @@ -296,7 +296,6 @@ async def run_command( try: # Convert command to string safely based on shell parameter - from ._shell import build_argv if isinstance(command, list): if shell: # Shell mode: join with proper quoting @@ -524,13 +523,8 @@ async def _run_command_with_limits( shell: bool = False ): """Run command with resource limits.""" - # Change to working directory - if shell: - # Shell mode: only quote the working directory, command is already properly handled - full_command = f"cd {shlex.quote(working_dir)} && {command}" - else: - # Non-shell mode: quote the command for safety (command is already escaped from caller) - full_command = f"cd {shlex.quote(working_dir)} && {command}" + # Change to working directory; command is already safely escaped by the caller + full_command = f"cd {shlex.quote(working_dir)} && {command}" # Set timeout timeout = None diff --git a/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py b/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py index 69785065f..9a387b1b5 100644 --- a/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py +++ b/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py @@ -273,7 +273,7 @@ async def test_start_invalid_schedule_returns_false(self): @pytest.mark.asyncio async def test_get_stats_initial_state(self): scheduler = _make_scheduler() - stats = scheduler.get_stats() + stats = await scheduler.get_stats() assert stats["is_running"] is False assert stats["total_executions"] == 0 assert stats["successful_executions"] == 0 From ba6fc022a77240546099144812f71cb3634f1844 Mon Sep 17 00:00:00 2001 From: "praisonai-triage-agent[bot]" <272766704+praisonai-triage-agent[bot]@users.noreply.github.com> Date: Mon, 4 May 2026 08:52:01 +0000 Subject: [PATCH 3/3] fix: restore backward compatibility for get_stats() method - Revert get_stats() from async to sync to fix breaking change - Add get_stats_async() for atomic stats in async context - Update test to use sync get_stats() again - Minor comment cleanup in SSH sandbox Fixes breaking API change affecting 8+ call sites across CLI, examples, and tests. Co-authored-by: Mervin Praison --- .../praisonai/async_agent_scheduler.py | 36 ++++++++++++------- src/praisonai/praisonai/sandbox/ssh.py | 3 +- .../scheduler/test_async_agent_scheduler.py | 2 +- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/praisonai/praisonai/async_agent_scheduler.py b/src/praisonai/praisonai/async_agent_scheduler.py index ab4b01c29..ee808ced9 100644 --- a/src/praisonai/praisonai/async_agent_scheduler.py +++ b/src/praisonai/praisonai/async_agent_scheduler.py @@ -253,9 +253,28 @@ async def stop(self) -> bool: logger.info(f"Execution stats - Total: {self._execution_count}, Success: {self._success_count}, Failed: {self._failure_count}") return True - async def get_stats(self) -> Dict[str, Any]: + def get_stats(self) -> Dict[str, Any]: """ - Get current execution statistics with atomic snapshot. + Get current execution statistics (synchronous, best-effort). + + Warning: This method provides a best-effort view of stats without + guaranteeing atomicity. For consistent snapshots in async context, + use get_stats_async() instead. + + Returns: + Dictionary with execution stats + """ + return { + "is_running": self.is_running, + "total_executions": self._execution_count, + "successful_executions": self._success_count, + "failed_executions": self._failure_count, + "success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0 + } + + async def get_stats_async(self) -> Dict[str, Any]: + """ + Get current execution statistics with atomic snapshot (async). Returns: Dictionary with execution stats @@ -280,21 +299,12 @@ async def get_stats(self) -> Dict[str, Any]: def get_stats_sync(self) -> Dict[str, Any]: """ - Get current execution statistics (synchronous, best-effort). - - Warning: This method provides a best-effort view of stats without - guaranteeing atomicity. For consistent snapshots, use get_stats() async. + Alias for get_stats() for clarity. Returns: Dictionary with execution stats """ - return { - "is_running": self.is_running, - "total_executions": self._execution_count, - "successful_executions": self._success_count, - "failed_executions": self._failure_count, - "success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0 - } + return self.get_stats() async def _run_schedule(self, interval: int, max_retries: int): """Internal method to run scheduled agent executions.""" diff --git a/src/praisonai/praisonai/sandbox/ssh.py b/src/praisonai/praisonai/sandbox/ssh.py index 6fcfea39c..f6d948c5a 100644 --- a/src/praisonai/praisonai/sandbox/ssh.py +++ b/src/praisonai/praisonai/sandbox/ssh.py @@ -523,7 +523,8 @@ async def _run_command_with_limits( shell: bool = False ): """Run command with resource limits.""" - # Change to working directory; command is already safely escaped by the caller + # Change to working directory and execute command + # Note: The command has already been processed for shell safety by the caller full_command = f"cd {shlex.quote(working_dir)} && {command}" # Set timeout diff --git a/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py b/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py index 9a387b1b5..69785065f 100644 --- a/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py +++ b/src/praisonai/tests/unit/scheduler/test_async_agent_scheduler.py @@ -273,7 +273,7 @@ async def test_start_invalid_schedule_returns_false(self): @pytest.mark.asyncio async def test_get_stats_initial_state(self): scheduler = _make_scheduler() - stats = await scheduler.get_stats() + stats = scheduler.get_stats() assert stats["is_running"] is False assert stats["total_executions"] == 0 assert stats["successful_executions"] == 0