-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: resolve shell injection, session cache races, and dead stats lock #1609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0b4d2ba
da759ef
ba6fc02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,9 @@ | |
|
|
||
| import logging | ||
| import time | ||
| import threading | ||
| import uuid | ||
| from copy import deepcopy | ||
| from typing import Any, Dict, List, Optional, TYPE_CHECKING | ||
|
|
||
| from .conversation.base import ConversationStore, ConversationSession, ConversationMessage | ||
|
|
@@ -78,6 +80,7 @@ def __init__( | |
|
|
||
| self._current_session: Optional[ConversationSession] = None | ||
| self._session_cache: Dict[str, ConversationSession] = {} | ||
| self._cache_lock = threading.RLock() # RLock allows re-entrant access | ||
|
|
||
| @classmethod | ||
| def from_config(cls, config: PersistenceConfig) -> "PersistenceOrchestrator": | ||
|
|
@@ -90,6 +93,31 @@ def from_env(cls) -> "PersistenceOrchestrator": | |
| config = PersistenceConfig.from_env() | ||
| return cls(config=config) | ||
|
|
||
| # ========================================================================= | ||
| # Thread-Safe Cache Operations | ||
| # ========================================================================= | ||
|
|
||
| def _cache_put(self, session: ConversationSession) -> None: | ||
| """Store session in cache with thread safety.""" | ||
| with self._cache_lock: | ||
| self._session_cache[session.session_id] = session | ||
|
|
||
| def _cache_get(self, session_id: str) -> Optional[ConversationSession]: | ||
| """Get session from cache with thread safety and defensive copying.""" | ||
| with self._cache_lock: | ||
| cached = self._session_cache.get(session_id) | ||
| return deepcopy(cached) if cached is not None else None | ||
|
|
||
| def _cache_delete(self, session_id: str) -> Optional[ConversationSession]: | ||
| """Remove session from cache with thread safety.""" | ||
| with self._cache_lock: | ||
| return self._session_cache.pop(session_id, None) | ||
|
|
||
| def _cache_clear(self) -> None: | ||
| """Clear all sessions from cache with thread safety.""" | ||
| with self._cache_lock: | ||
| self._session_cache.clear() | ||
|
|
||
| # ========================================================================= | ||
| # Agent Lifecycle Hooks | ||
| # ========================================================================= | ||
|
|
@@ -128,7 +156,7 @@ def on_agent_start( | |
| if session: | ||
| logger.info(f"Resuming session: {session_id}") | ||
| self._current_session = session | ||
| self._session_cache[session_id] = session | ||
| self._cache_put(session) | ||
|
|
||
| # Load previous messages | ||
| messages = self.conversation.get_messages(session_id) | ||
|
|
@@ -146,7 +174,7 @@ def on_agent_start( | |
| self.conversation.create_session(session) | ||
| logger.info(f"Created new session: {session_id}") | ||
| self._current_session = session | ||
| self._session_cache[session_id] = session | ||
| self._cache_put(session) | ||
| return [] | ||
|
|
||
| def on_message( | ||
|
|
@@ -206,12 +234,14 @@ def on_agent_end( | |
| if not self.conversation: | ||
| return | ||
|
|
||
| session = self._session_cache.get(session_id) or self.conversation.get_session(session_id) | ||
| session = self._cache_get(session_id) or self.conversation.get_session(session_id) | ||
| if session: | ||
| session.updated_at = time.time() | ||
| if metadata: | ||
| session.metadata = {**(session.metadata or {}), **metadata} | ||
| self.conversation.update_session(session) | ||
| # Update cache with the modified session | ||
| self._cache_put(session) | ||
|
Comment on lines
+237
to
+244
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The compound read-modify-write in The lock must be held for the entire read-modify-write sequence — a single |
||
| logger.debug(f"Updated session metadata: {session_id}") | ||
|
|
||
| # ========================================================================= | ||
|
|
@@ -315,8 +345,8 @@ def delete_session(self, session_id: str) -> bool: | |
| if not self.conversation: | ||
| return False | ||
|
|
||
| if session_id in self._session_cache: | ||
| del self._session_cache[session_id] | ||
| # Remove from cache using thread-safe method | ||
| self._cache_delete(session_id) | ||
|
|
||
| return self.conversation.delete_session(session_id) | ||
|
|
||
|
|
@@ -395,7 +425,8 @@ def close(self) -> None: | |
| if self.state: | ||
| self.state.close() | ||
|
|
||
| self._session_cache.clear() | ||
| # Clear cache using thread-safe method | ||
| self._cache_clear() | ||
| logger.info("Persistence orchestrator closed") | ||
|
|
||
| def __enter__(self): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| """Shared utilities for safe shell command handling across sandbox backends.""" | ||
|
|
||
| import shlex | ||
| from typing import List, Union | ||
|
|
||
|
|
||
| def build_argv(command: Union[str, List[str]], shell: bool = False) -> List[str]: | ||
| """ | ||
| Safely build command argv with explicit shell control. | ||
|
|
||
| Args: | ||
| command: String command or list of arguments | ||
| shell: If True, explicitly use shell. If False, parse safely without shell. | ||
|
|
||
| Returns: | ||
| List of command arguments safe for subprocess execution | ||
|
|
||
| Security: | ||
| - shell=False (default): No shell injection possible | ||
| - shell=True: Caller explicitly opts into shell evaluation | ||
| """ | ||
| if isinstance(command, str): | ||
| if not shell: | ||
| # Safe parse: convert string to argv without invoking shell | ||
| return shlex.split(command) | ||
| else: | ||
| # Explicit shell: caller has opted in | ||
| return ["sh", "-c", command] | ||
| else: | ||
| # List input | ||
| cmd_list = list(command) | ||
| if shell: | ||
| # Quote each element when combining into shell command | ||
| quoted_cmd = " ".join(shlex.quote(arg) for arg in cmd_list) | ||
| return ["sh", "-c", quoted_cmd] | ||
| else: | ||
| # Direct argv execution - no shell | ||
| return cmd_list |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| import asyncio | ||
| import logging | ||
| import os | ||
| import shlex | ||
| import tempfile | ||
| import time | ||
| import uuid | ||
|
|
@@ -226,18 +227,34 @@ async def run_command( | |
| limits: Optional[ResourceLimits] = None, | ||
| env: Optional[Dict[str, str]] = None, | ||
| working_dir: Optional[str] = None, | ||
| shell: bool = False, | ||
| ) -> SandboxResult: | ||
| """Run a shell command in the sandbox.""" | ||
| """Run a command in the sandbox. | ||
|
|
||
| Args: | ||
| command: String command or list of arguments | ||
| limits: Resource limits to apply | ||
| env: Environment variables | ||
| working_dir: Working directory | ||
| shell: If True, explicitly use shell. If False (default), execute safely without shell. | ||
| """ | ||
| if not self._is_running: | ||
| await self.start() | ||
|
|
||
| limits = limits or self.config.resource_limits | ||
| execution_id = str(uuid.uuid4()) | ||
|
|
||
| if isinstance(command, list): | ||
| cmd_str = " ".join(command) | ||
| # Always quote list elements to prevent shell injection | ||
| cmd_str = " ".join(shlex.quote(arg) for arg in command) | ||
| else: | ||
| cmd_str = command | ||
| if shell: | ||
| # Caller explicitly requested shell evaluation | ||
| cmd_str = command | ||
| else: | ||
| # Parse string safely then re-quote each part | ||
| cmd_parts = shlex.split(command) | ||
| cmd_str = " ".join(shlex.quote(part) for part in cmd_parts) | ||
|
Comment on lines
+256
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the new |
||
|
|
||
| docker_cmd = [ | ||
| "docker", "run", "--rm", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -190,18 +190,26 @@ async def run_command( | |
| limits: Optional[ResourceLimits] = None, | ||
| env: Optional[Dict[str, str]] = None, | ||
| working_dir: Optional[str] = None, | ||
| shell: bool = False, | ||
| ) -> SandboxResult: | ||
| """Run a shell command in the sandbox.""" | ||
| """Run a command in the sandbox. | ||
|
|
||
| Args: | ||
| command: String command or list of arguments | ||
| limits: Resource limits to apply | ||
| env: Environment variables | ||
| working_dir: Working directory | ||
| shell: If True, explicitly use shell. If False (default), execute safely without shell. | ||
| """ | ||
| if not self._is_running: | ||
| await self.start() | ||
|
|
||
| limits = limits or self.config.resource_limits | ||
| execution_id = str(uuid.uuid4()) | ||
|
|
||
| if isinstance(command, str): | ||
| cmd = ["sh", "-c", command] | ||
| else: | ||
| cmd = command | ||
| # Import here to avoid circular import | ||
| from ._shell import build_argv | ||
| cmd = build_argv(command, shell=shell) | ||
|
Comment on lines
+193
to
+212
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check if SandlockSandbox or other callers pass shell parameter to run_command
rg -n --type=py 'run_command\(' src/praisonai/praisonai/sandbox/ -A2 | head -60Repository: MervinPraison/PraisonAI Length of output: 1697 🏁 Script executed: #!/bin/bash
# Get full signature of run_command in subprocess.py
echo "=== SubprocessSandbox.run_command signature ==="
sed -n '187,215p' src/praisonai/praisonai/sandbox/subprocess.py
echo ""
echo "=== SandlockSandbox.run_command signature and fallback call ==="
sed -n '345,365p' src/praisonai/praisonai/sandbox/sandlock.py
echo ""
echo "=== Check if other implementations have shell parameter ==="
rg -n 'async def run_command' src/praisonai/praisonai/sandbox/ -A5 | grep -E '(shell|def run_command)'Repository: MervinPraison/PraisonAI Length of output: 2511 🏁 Script executed: #!/bin/bash
# Get full signatures of other implementations to see if they have shell parameter
echo "=== SSH run_command signature ==="
sed -n '274,290p' src/praisonai/praisonai/sandbox/ssh.py
echo ""
echo "=== Docker run_command signature ==="
sed -n '223,240p' src/praisonai/praisonai/sandbox/docker.py
echo ""
echo "=== Modal run_command signature ==="
sed -n '320,337p' src/praisonai/praisonai/sandbox/modal.py
echo ""
echo "=== Daytona run_command signature ==="
sed -n '262,279p' src/praisonai/praisonai/sandbox/daytona.py
echo ""
echo "=== Check the base class signature ==="
find src/praisonai/praisonai/sandbox -name '*.py' -type f | xargs grep -l 'class.*Sandbox' | head -5
rg -n 'class.*Sandbox|async def run_command' src/praisonai/praisonai/sandbox/base.py -A3 | head -30Repository: MervinPraison/PraisonAI Length of output: 3044 🏁 Script executed: #!/bin/bash
# Check if there's an abstract base class
echo "=== Looking for base class definition ==="
find src/praisonai/praisonai/sandbox -name '*.py' -type f -exec grep -l 'ABC\|abstractmethod' {} \;
echo ""
echo "=== Check for Sandbox protocol or base class ==="
rg -n 'class.*Sandbox.*:' src/praisonai/praisonai/sandbox/ -A2 | head -40
echo ""
echo "=== Check imports in sandlock.py to understand relationship ==="
head -30 src/praisonai/praisonai/sandbox/sandlock.pyRepository: MervinPraison/PraisonAI Length of output: 2188 🏁 Script executed: #!/bin/bash
# Confirm the exact fallback line and check if this is a newly added parameter issue
echo "=== Full fallback code context ==="
sed -n '355,365p' src/praisonai/praisonai/sandbox/sandlock.py
echo ""
echo "=== Check git history to see if shell parameter is new ==="
git log --oneline -p src/praisonai/praisonai/sandbox/subprocess.py | grep -A5 -B5 'shell.*bool' | head -30Repository: MervinPraison/PraisonAI Length of output: 1003 🏁 Script executed: #!/bin/bash
# Check if SandlockSandbox.run_command signature should have the shell parameter
echo "=== Check if other implementations that fallback also updated signatures ==="
rg -n 'fallback.*run_command|SubprocessSandbox.*run_command' src/praisonai/praisonai/sandbox/ -B5 -A2
echo ""
echo "=== Check Modal and Daytona implementations to see if they should have shell ==="
sed -n '320,328p' src/praisonai/praisonai/sandbox/modal.py
echo ""
echo "=== Verify: does SandlockSandbox need to be updated with shell parameter? ==="
grep -n 'def run_command\|shell' src/praisonai/praisonai/sandbox/sandlock.py | head -20Repository: MervinPraison/PraisonAI Length of output: 1485 🏁 Script executed: #!/bin/bash
# Check SandlockSandbox docstring and full method to understand intent
echo "=== SandlockSandbox.run_command full implementation ==="
sed -n '345,380p' src/praisonai/praisonai/sandbox/sandlock.py
echo ""
echo "=== Check what build_argv does when shell parameter is not passed ==="
grep -n 'def build_argv' src/praisonai/praisonai/sandbox/_shell.py -A20 | head -30Repository: MervinPraison/PraisonAI Length of output: 2219 🏁 Script executed: #!/bin/bash
# Verify the exact behavior difference between SandlockSandbox native and fallback
echo "=== SandlockSandbox native behavior (always shells) ==="
sed -n '367,371p' src/praisonai/praisonai/sandbox/sandlock.py
echo ""
echo "=== SubprocessSandbox with shell=False (what fallback will use) ==="
sed -n '210,220p' src/praisonai/praisonai/sandbox/subprocess.py
echo ""
echo "=== Check if Modal and Daytona might have similar issues ==="
grep -n 'run_command.*(' src/praisonai/praisonai/sandbox/modal.py -A8 | head -15Repository: MervinPraison/PraisonAI Length of output: 1037 API contract violation in fallback path: Recommended fix: Update 🤖 Prompt for AI Agents |
||
|
|
||
| process_env = os.environ.copy() | ||
| if env: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-atomic cache read-modify-write in
on_agent_end— lost-update race_cache_get(line 237) acquires and releases the lock before the modification, and_cache_put(line 244) acquires it again. The gap between them is unguarded: two concurrenton_agent_endcalls for the samesession_id(possible viaPersistentAgent.end_session()orPersistentSession.end()from different threads) both obtain a deepcopy of the same baseline session, both modify it, and whichever thread calls_cache_putlast silently discards the other's update — the exact lost-update race the PR set out to fix.The I/O call (
update_session) must remain outside the lock, but the cache portion of the read-modify-write can be made atomic by folding it into onewith self._cache_lockblock:🔒 Proposed fix — atomic cache RMW with I/O outside the lock
This keeps
update_sessionoutside the lock while making the cache get→modify→put an atomic unit. The store-level race (two threads persisting concurrent updates) is inherent to the design and requires session-level serialization at a higher layer if needed.🤖 Prompt for AI Agents