diff --git a/src/praisonai/praisonai/async_agent_scheduler.py b/src/praisonai/praisonai/async_agent_scheduler.py index 355bd768b..ee808ced9 100644 --- a/src/praisonai/praisonai/async_agent_scheduler.py +++ b/src/praisonai/praisonai/async_agent_scheduler.py @@ -255,7 +255,11 @@ async def stop(self) -> bool: def get_stats(self) -> Dict[str, Any]: """ - Get execution statistics. + Get current execution statistics (synchronous, best-effort). + + Warning: This method provides a best-effort view of stats without + guaranteeing atomicity. For consistent snapshots in async context, + use get_stats_async() instead. Returns: Dictionary with execution stats @@ -268,6 +272,40 @@ def get_stats(self) -> Dict[str, Any]: "success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0 } + async def get_stats_async(self) -> Dict[str, Any]: + """ + Get current execution statistics with atomic snapshot (async). + + Returns: + Dictionary with execution stats + """ + if self._stats_lock is None: + # Not yet started: stats are all zero, no lock needed + execs, success, failed = 0, 0, 0 + else: + # Take atomic snapshot of all counters + async with self._stats_lock: + execs = self._execution_count + success = self._success_count + failed = self._failure_count + + return { + "is_running": self.is_running, + "total_executions": execs, + "successful_executions": success, + "failed_executions": failed, + "success_rate": (success / execs * 100) if execs > 0 else 0 + } + + def get_stats_sync(self) -> Dict[str, Any]: + """ + Alias for get_stats() for clarity. + + Returns: + Dictionary with execution stats + """ + return self.get_stats() + async def _run_schedule(self, interval: int, max_retries: int): """Internal method to run scheduled agent executions.""" try: @@ -289,7 +327,13 @@ async def _run_schedule(self, interval: int, max_retries: int): async def _execute_with_retry(self, max_retries: int): """Execute agent with retry logic.""" - self._execution_count += 1 + # Ensure async primitives are available + if self._stats_lock is None: + self._ensure_async_primitives() + + # Atomically increment execution count + async with self._stats_lock: + self._execution_count += 1 last_exc: Optional[Exception] = None for attempt in range(max_retries): @@ -300,7 +344,9 @@ async def _execute_with_retry(self, max_retries: int): logger.info(f"Async agent execution successful on attempt {attempt + 1}") logger.info(f"Result: {result}") - self._success_count += 1 + # Atomically increment success count + async with self._stats_lock: + self._success_count += 1 safe_call(self.on_success, result) return @@ -313,7 +359,9 @@ async def _execute_with_retry(self, max_retries: int): logger.info(f"Waiting {wait_time}s before async retry...") await asyncio.sleep(wait_time) - self._failure_count += 1 + # Atomically increment failure count + async with self._stats_lock: + self._failure_count += 1 logger.error(f"Async agent execution failed after {max_retries} attempts") safe_call( self.on_failure, diff --git a/src/praisonai/praisonai/persistence/orchestrator.py b/src/praisonai/praisonai/persistence/orchestrator.py index 0c9077efb..38ebc5090 100644 --- a/src/praisonai/praisonai/persistence/orchestrator.py +++ b/src/praisonai/praisonai/persistence/orchestrator.py @@ -7,7 +7,9 @@ import logging import time +import threading import uuid +from copy import deepcopy from typing import Any, Dict, List, Optional, TYPE_CHECKING from .conversation.base import ConversationStore, ConversationSession, ConversationMessage @@ -78,6 +80,7 @@ def __init__( self._current_session: Optional[ConversationSession] = None self._session_cache: Dict[str, ConversationSession] = {} + self._cache_lock = threading.RLock() # RLock allows re-entrant access @classmethod def from_config(cls, config: PersistenceConfig) -> "PersistenceOrchestrator": @@ -90,6 +93,31 @@ def from_env(cls) -> "PersistenceOrchestrator": config = PersistenceConfig.from_env() return cls(config=config) + # ========================================================================= + # Thread-Safe Cache Operations + # ========================================================================= + + def _cache_put(self, session: ConversationSession) -> None: + """Store session in cache with thread safety.""" + with self._cache_lock: + self._session_cache[session.session_id] = session + + def _cache_get(self, session_id: str) -> Optional[ConversationSession]: + """Get session from cache with thread safety and defensive copying.""" + with self._cache_lock: + cached = self._session_cache.get(session_id) + return deepcopy(cached) if cached is not None else None + + def _cache_delete(self, session_id: str) -> Optional[ConversationSession]: + """Remove session from cache with thread safety.""" + with self._cache_lock: + return self._session_cache.pop(session_id, None) + + def _cache_clear(self) -> None: + """Clear all sessions from cache with thread safety.""" + with self._cache_lock: + self._session_cache.clear() + # ========================================================================= # Agent Lifecycle Hooks # ========================================================================= @@ -128,7 +156,7 @@ def on_agent_start( if session: logger.info(f"Resuming session: {session_id}") self._current_session = session - self._session_cache[session_id] = session + self._cache_put(session) # Load previous messages messages = self.conversation.get_messages(session_id) @@ -146,7 +174,7 @@ def on_agent_start( self.conversation.create_session(session) logger.info(f"Created new session: {session_id}") self._current_session = session - self._session_cache[session_id] = session + self._cache_put(session) return [] def on_message( @@ -206,12 +234,14 @@ def on_agent_end( if not self.conversation: return - session = self._session_cache.get(session_id) or self.conversation.get_session(session_id) + session = self._cache_get(session_id) or self.conversation.get_session(session_id) if session: session.updated_at = time.time() if metadata: session.metadata = {**(session.metadata or {}), **metadata} self.conversation.update_session(session) + # Update cache with the modified session + self._cache_put(session) logger.debug(f"Updated session metadata: {session_id}") # ========================================================================= @@ -315,8 +345,8 @@ def delete_session(self, session_id: str) -> bool: if not self.conversation: return False - if session_id in self._session_cache: - del self._session_cache[session_id] + # Remove from cache using thread-safe method + self._cache_delete(session_id) return self.conversation.delete_session(session_id) @@ -395,7 +425,8 @@ def close(self) -> None: if self.state: self.state.close() - self._session_cache.clear() + # Clear cache using thread-safe method + self._cache_clear() logger.info("Persistence orchestrator closed") def __enter__(self): diff --git a/src/praisonai/praisonai/sandbox/_shell.py b/src/praisonai/praisonai/sandbox/_shell.py new file mode 100644 index 000000000..0605e8193 --- /dev/null +++ b/src/praisonai/praisonai/sandbox/_shell.py @@ -0,0 +1,38 @@ +"""Shared utilities for safe shell command handling across sandbox backends.""" + +import shlex +from typing import List, Union + + +def build_argv(command: Union[str, List[str]], shell: bool = False) -> List[str]: + """ + Safely build command argv with explicit shell control. + + Args: + command: String command or list of arguments + shell: If True, explicitly use shell. If False, parse safely without shell. + + Returns: + List of command arguments safe for subprocess execution + + Security: + - shell=False (default): No shell injection possible + - shell=True: Caller explicitly opts into shell evaluation + """ + if isinstance(command, str): + if not shell: + # Safe parse: convert string to argv without invoking shell + return shlex.split(command) + else: + # Explicit shell: caller has opted in + return ["sh", "-c", command] + else: + # List input + cmd_list = list(command) + if shell: + # Quote each element when combining into shell command + quoted_cmd = " ".join(shlex.quote(arg) for arg in cmd_list) + return ["sh", "-c", quoted_cmd] + else: + # Direct argv execution - no shell + return cmd_list diff --git a/src/praisonai/praisonai/sandbox/docker.py b/src/praisonai/praisonai/sandbox/docker.py index 1850d118b..f5e269e5c 100644 --- a/src/praisonai/praisonai/sandbox/docker.py +++ b/src/praisonai/praisonai/sandbox/docker.py @@ -9,6 +9,7 @@ import asyncio import logging import os +import shlex import tempfile import time import uuid @@ -226,8 +227,17 @@ async def run_command( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, + shell: bool = False, ) -> SandboxResult: - """Run a shell command in the sandbox.""" + """Run a command in the sandbox. + + Args: + command: String command or list of arguments + limits: Resource limits to apply + env: Environment variables + working_dir: Working directory + shell: If True, explicitly use shell. If False (default), execute safely without shell. + """ if not self._is_running: await self.start() @@ -235,9 +245,16 @@ async def run_command( execution_id = str(uuid.uuid4()) if isinstance(command, list): - cmd_str = " ".join(command) + # Always quote list elements to prevent shell injection + cmd_str = " ".join(shlex.quote(arg) for arg in command) else: - cmd_str = command + if shell: + # Caller explicitly requested shell evaluation + cmd_str = command + else: + # Parse string safely then re-quote each part + cmd_parts = shlex.split(command) + cmd_str = " ".join(shlex.quote(part) for part in cmd_parts) docker_cmd = [ "docker", "run", "--rm", diff --git a/src/praisonai/praisonai/sandbox/ssh.py b/src/praisonai/praisonai/sandbox/ssh.py index 5b6e76b42..f6d948c5a 100644 --- a/src/praisonai/praisonai/sandbox/ssh.py +++ b/src/praisonai/praisonai/sandbox/ssh.py @@ -277,8 +277,17 @@ async def run_command( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, + shell: bool = False, ) -> SandboxResult: - """Run a shell command on the remote server.""" + """Run a command on the remote server. + + Args: + command: String command or list of arguments + limits: Resource limits to apply + env: Environment variables + working_dir: Working directory + shell: If True, explicitly use shell. If False (default), execute safely without shell. + """ if not self._is_running: await self.start() @@ -286,15 +295,34 @@ async def run_command( started_at = time.time() try: - # Convert command to string if needed + # Convert command to string safely based on shell parameter if isinstance(command, list): - command = shlex.join(command) + if shell: + # Shell mode: join with proper quoting + command_str = " ".join(shlex.quote(arg) for arg in command) + else: + # Non-shell mode: use shlex.join for proper escaping + command_str = shlex.join(command) + else: + # String command + if not shell: + # Parse then re-join to ensure safe execution + try: + parts = shlex.split(command) + command_str = shlex.join(parts) + except ValueError: + # If parsing fails, quote the whole thing + command_str = shlex.quote(command) + else: + # Shell mode: use as-is (caller explicitly opted in) + command_str = command # Execute command result = await self._run_command_with_limits( - command, + command_str, limits, - working_dir or self.working_dir + working_dir or self.working_dir, + shell ) completed_at = time.time() @@ -491,10 +519,12 @@ async def _run_command_with_limits( self, command: str, limits: Optional[ResourceLimits], - working_dir: str + working_dir: str, + shell: bool = False ): """Run command with resource limits.""" - # Change to working directory + # Change to working directory and execute command + # Note: The command has already been processed for shell safety by the caller full_command = f"cd {shlex.quote(working_dir)} && {command}" # Set timeout diff --git a/src/praisonai/praisonai/sandbox/subprocess.py b/src/praisonai/praisonai/sandbox/subprocess.py index ae99d2908..49528d82d 100644 --- a/src/praisonai/praisonai/sandbox/subprocess.py +++ b/src/praisonai/praisonai/sandbox/subprocess.py @@ -190,18 +190,26 @@ async def run_command( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, working_dir: Optional[str] = None, + shell: bool = False, ) -> SandboxResult: - """Run a shell command in the sandbox.""" + """Run a command in the sandbox. + + Args: + command: String command or list of arguments + limits: Resource limits to apply + env: Environment variables + working_dir: Working directory + shell: If True, explicitly use shell. If False (default), execute safely without shell. + """ if not self._is_running: await self.start() limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) - if isinstance(command, str): - cmd = ["sh", "-c", command] - else: - cmd = command + # Import here to avoid circular import + from ._shell import build_argv + cmd = build_argv(command, shell=shell) process_env = os.environ.copy() if env: