Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions src/praisonai/praisonai/async_agent_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ async def stop(self) -> bool:

def get_stats(self) -> Dict[str, Any]:
"""
Get execution statistics.
Get current execution statistics (synchronous, best-effort).

Warning: This method provides a best-effort view of stats without
guaranteeing atomicity. For consistent snapshots in async context,
use get_stats_async() instead.

Returns:
Dictionary with execution stats
Expand All @@ -268,6 +272,40 @@ def get_stats(self) -> Dict[str, Any]:
"success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0
}

async def get_stats_async(self) -> Dict[str, Any]:
"""
Get current execution statistics with atomic snapshot (async).

Returns:
Dictionary with execution stats
"""
if self._stats_lock is None:
# Not yet started: stats are all zero, no lock needed
execs, success, failed = 0, 0, 0
else:
# Take atomic snapshot of all counters
async with self._stats_lock:
execs = self._execution_count
success = self._success_count
failed = self._failure_count

return {
"is_running": self.is_running,
"total_executions": execs,
"successful_executions": success,
"failed_executions": failed,
"success_rate": (success / execs * 100) if execs > 0 else 0
}

def get_stats_sync(self) -> Dict[str, Any]:
"""
Alias for get_stats() for clarity.

Returns:
Dictionary with execution stats
"""
return self.get_stats()

async def _run_schedule(self, interval: int, max_retries: int):
"""Internal method to run scheduled agent executions."""
try:
Expand All @@ -289,7 +327,13 @@ async def _run_schedule(self, interval: int, max_retries: int):

async def _execute_with_retry(self, max_retries: int):
"""Execute agent with retry logic."""
self._execution_count += 1
# Ensure async primitives are available
if self._stats_lock is None:
self._ensure_async_primitives()

# Atomically increment execution count
async with self._stats_lock:
self._execution_count += 1

last_exc: Optional[Exception] = None
for attempt in range(max_retries):
Expand All @@ -300,7 +344,9 @@ async def _execute_with_retry(self, max_retries: int):
logger.info(f"Async agent execution successful on attempt {attempt + 1}")
logger.info(f"Result: {result}")

self._success_count += 1
# Atomically increment success count
async with self._stats_lock:
self._success_count += 1
safe_call(self.on_success, result)
return

Expand All @@ -313,7 +359,9 @@ async def _execute_with_retry(self, max_retries: int):
logger.info(f"Waiting {wait_time}s before async retry...")
await asyncio.sleep(wait_time)

self._failure_count += 1
# Atomically increment failure count
async with self._stats_lock:
self._failure_count += 1
logger.error(f"Async agent execution failed after {max_retries} attempts")
safe_call(
self.on_failure,
Expand Down
43 changes: 37 additions & 6 deletions src/praisonai/praisonai/persistence/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

import logging
import time
import threading
import uuid
from copy import deepcopy
from typing import Any, Dict, List, Optional, TYPE_CHECKING

from .conversation.base import ConversationStore, ConversationSession, ConversationMessage
Expand Down Expand Up @@ -78,6 +80,7 @@ def __init__(

self._current_session: Optional[ConversationSession] = None
self._session_cache: Dict[str, ConversationSession] = {}
self._cache_lock = threading.RLock() # RLock allows re-entrant access

@classmethod
def from_config(cls, config: PersistenceConfig) -> "PersistenceOrchestrator":
Expand All @@ -90,6 +93,31 @@ def from_env(cls) -> "PersistenceOrchestrator":
config = PersistenceConfig.from_env()
return cls(config=config)

# =========================================================================
# Thread-Safe Cache Operations
# =========================================================================

def _cache_put(self, session: ConversationSession) -> None:
"""Store session in cache with thread safety."""
with self._cache_lock:
self._session_cache[session.session_id] = session

def _cache_get(self, session_id: str) -> Optional[ConversationSession]:
"""Get session from cache with thread safety and defensive copying."""
with self._cache_lock:
cached = self._session_cache.get(session_id)
return deepcopy(cached) if cached is not None else None

def _cache_delete(self, session_id: str) -> Optional[ConversationSession]:
"""Remove session from cache with thread safety."""
with self._cache_lock:
return self._session_cache.pop(session_id, None)

def _cache_clear(self) -> None:
"""Clear all sessions from cache with thread safety."""
with self._cache_lock:
self._session_cache.clear()

# =========================================================================
# Agent Lifecycle Hooks
# =========================================================================
Expand Down Expand Up @@ -128,7 +156,7 @@ def on_agent_start(
if session:
logger.info(f"Resuming session: {session_id}")
self._current_session = session
self._session_cache[session_id] = session
self._cache_put(session)

# Load previous messages
messages = self.conversation.get_messages(session_id)
Expand All @@ -146,7 +174,7 @@ def on_agent_start(
self.conversation.create_session(session)
logger.info(f"Created new session: {session_id}")
self._current_session = session
self._session_cache[session_id] = session
self._cache_put(session)
return []

def on_message(
Expand Down Expand Up @@ -206,12 +234,14 @@ def on_agent_end(
if not self.conversation:
return

session = self._session_cache.get(session_id) or self.conversation.get_session(session_id)
session = self._cache_get(session_id) or self.conversation.get_session(session_id)
if session:
session.updated_at = time.time()
if metadata:
session.metadata = {**(session.metadata or {}), **metadata}
self.conversation.update_session(session)
# Update cache with the modified session
self._cache_put(session)
Comment on lines +237 to +244
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Non-atomic cache read-modify-write in on_agent_end — lost-update race

_cache_get (line 237) acquires and releases the lock before the modification, and _cache_put (line 244) acquires it again. The gap between them is unguarded: two concurrent on_agent_end calls for the same session_id (possible via PersistentAgent.end_session() or PersistentSession.end() from different threads) both obtain a deepcopy of the same baseline session, both modify it, and whichever thread calls _cache_put last silently discards the other's update — the exact lost-update race the PR set out to fix.

The I/O call (update_session) must remain outside the lock, but the cache portion of the read-modify-write can be made atomic by folding it into one with self._cache_lock block:

🔒 Proposed fix — atomic cache RMW with I/O outside the lock
-        session = self._cache_get(session_id) or self.conversation.get_session(session_id)
+        # Fetch raw reference under lock; fall back to store only if not cached
+        with self._cache_lock:
+            _cached = self._session_cache.get(session_id)
+        session = deepcopy(_cached) if _cached is not None else self.conversation.get_session(session_id)
         if session:
             session.updated_at = time.time()
             if metadata:
                 session.metadata = {**(session.metadata or {}), **metadata}
             self.conversation.update_session(session)
-            # Update cache with the modified session
-            self._cache_put(session)
+            # Single atomic put — no separate get/put gap
+            with self._cache_lock:
+                self._session_cache[session_id] = session
             logger.debug(f"Updated session metadata: {session_id}")

This keeps update_session outside the lock while making the cache get→modify→put an atomic unit. The store-level race (two threads persisting concurrent updates) is inherent to the design and requires session-level serialization at a higher layer if needed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/persistence/orchestrator.py` around lines 237 - 244,
on_agent_end currently does a non-atomic cache read-modify-write (calls
self._cache_get, modifies session, then self._cache_put) which allows
lost-update races; to fix, make the cache RMW atomic by acquiring
self._cache_lock around the get→modify→put sequence (use a single with
self._cache_lock: block that reads via _cache_get, updates session.updated_at
and merges metadata into session.metadata, then calls _cache_put) while keeping
the I/O call self.conversation.update_session(session) outside the lock;
reference symbols: on_agent_end, _cache_get, _cache_put, _cache_lock,
conversation.update_session, session.updated_at, session.metadata.

Comment on lines +237 to +244
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 TOCTOU race in on_agent_end update path

The compound read-modify-write in on_agent_end is not atomic under the new RLock design. _cache_get releases the lock after returning a deepcopy, so a second thread can enter between the _cache_get call and the final _cache_put. Both threads will overwrite each other's changes: whichever thread reaches _cache_put last silently discards the other's metadata updates.

The lock must be held for the entire read-modify-write sequence — a single with self._cache_lock: block covering the get, mutation, and put — not just the individual cache operations.

logger.debug(f"Updated session metadata: {session_id}")

# =========================================================================
Expand Down Expand Up @@ -315,8 +345,8 @@ def delete_session(self, session_id: str) -> bool:
if not self.conversation:
return False

if session_id in self._session_cache:
del self._session_cache[session_id]
# Remove from cache using thread-safe method
self._cache_delete(session_id)

return self.conversation.delete_session(session_id)

Expand Down Expand Up @@ -395,7 +425,8 @@ def close(self) -> None:
if self.state:
self.state.close()

self._session_cache.clear()
# Clear cache using thread-safe method
self._cache_clear()
logger.info("Persistence orchestrator closed")

def __enter__(self):
Expand Down
38 changes: 38 additions & 0 deletions src/praisonai/praisonai/sandbox/_shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Shared utilities for safe shell command handling across sandbox backends."""

import shlex
from typing import List, Union


def build_argv(command: Union[str, List[str]], shell: bool = False) -> List[str]:
"""
Safely build command argv with explicit shell control.

Args:
command: String command or list of arguments
shell: If True, explicitly use shell. If False, parse safely without shell.

Returns:
List of command arguments safe for subprocess execution

Security:
- shell=False (default): No shell injection possible
- shell=True: Caller explicitly opts into shell evaluation
"""
if isinstance(command, str):
if not shell:
# Safe parse: convert string to argv without invoking shell
return shlex.split(command)
else:
# Explicit shell: caller has opted in
return ["sh", "-c", command]
else:
# List input
cmd_list = list(command)
if shell:
# Quote each element when combining into shell command
quoted_cmd = " ".join(shlex.quote(arg) for arg in cmd_list)
return ["sh", "-c", quoted_cmd]
else:
# Direct argv execution - no shell
return cmd_list
23 changes: 20 additions & 3 deletions src/praisonai/praisonai/sandbox/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import asyncio
import logging
import os
import shlex
import tempfile
import time
import uuid
Expand Down Expand Up @@ -226,18 +227,34 @@ async def run_command(
limits: Optional[ResourceLimits] = None,
env: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
shell: bool = False,
) -> SandboxResult:
"""Run a shell command in the sandbox."""
"""Run a command in the sandbox.

Args:
command: String command or list of arguments
limits: Resource limits to apply
env: Environment variables
working_dir: Working directory
shell: If True, explicitly use shell. If False (default), execute safely without shell.
"""
if not self._is_running:
await self.start()

limits = limits or self.config.resource_limits
execution_id = str(uuid.uuid4())

if isinstance(command, list):
cmd_str = " ".join(command)
# Always quote list elements to prevent shell injection
cmd_str = " ".join(shlex.quote(arg) for arg in command)
else:
cmd_str = command
if shell:
# Caller explicitly requested shell evaluation
cmd_str = command
else:
# Parse string safely then re-quote each part
cmd_parts = shlex.split(command)
cmd_str = " ".join(shlex.quote(part) for part in cmd_parts)
Comment on lines +256 to +257
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 shlex.split can raise ValueError, crashing the sandbox

In the new shell=False code path, shlex.split(command) is called without a try/except. A command string with an unclosed quote (e.g. echo 'hello) will raise ValueError: No closing quotation and propagate unhandled to the caller. Before this PR, the string was passed directly to sh -c and the shell would return a non-zero exit code inside a SandboxResult. Now callers can receive a raw Python exception instead of a SandboxResult, which is a behavioral regression. Compare with ssh.py which wraps the same call in a try/except ValueError. The same issue exists in sandbox/_shell.py's build_argv(), affecting subprocess.py via the same path.


docker_cmd = [
"docker", "run", "--rm",
Expand Down
44 changes: 37 additions & 7 deletions src/praisonai/praisonai/sandbox/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,24 +277,52 @@ async def run_command(
limits: Optional[ResourceLimits] = None,
env: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
shell: bool = False,
) -> SandboxResult:
"""Run a shell command on the remote server."""
"""Run a command on the remote server.

Args:
command: String command or list of arguments
limits: Resource limits to apply
env: Environment variables
working_dir: Working directory
shell: If True, explicitly use shell. If False (default), execute safely without shell.
"""
if not self._is_running:
await self.start()

execution_id = str(uuid.uuid4())
started_at = time.time()

try:
# Convert command to string if needed
# Convert command to string safely based on shell parameter
if isinstance(command, list):
command = shlex.join(command)
if shell:
# Shell mode: join with proper quoting
command_str = " ".join(shlex.quote(arg) for arg in command)
else:
# Non-shell mode: use shlex.join for proper escaping
command_str = shlex.join(command)
else:
# String command
if not shell:
# Parse then re-join to ensure safe execution
try:
parts = shlex.split(command)
command_str = shlex.join(parts)
except ValueError:
# If parsing fails, quote the whole thing
command_str = shlex.quote(command)
else:
# Shell mode: use as-is (caller explicitly opted in)
command_str = command

# Execute command
result = await self._run_command_with_limits(
command,
command_str,
limits,
working_dir or self.working_dir
working_dir or self.working_dir,
shell
)

completed_at = time.time()
Expand Down Expand Up @@ -491,10 +519,12 @@ async def _run_command_with_limits(
self,
command: str,
limits: Optional[ResourceLimits],
working_dir: str
working_dir: str,
shell: bool = False
):
"""Run command with resource limits."""
# Change to working directory
# Change to working directory and execute command
# Note: The command has already been processed for shell safety by the caller
full_command = f"cd {shlex.quote(working_dir)} && {command}"

# Set timeout
Expand Down
18 changes: 13 additions & 5 deletions src/praisonai/praisonai/sandbox/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,26 @@ async def run_command(
limits: Optional[ResourceLimits] = None,
env: Optional[Dict[str, str]] = None,
working_dir: Optional[str] = None,
shell: bool = False,
) -> SandboxResult:
"""Run a shell command in the sandbox."""
"""Run a command in the sandbox.

Args:
command: String command or list of arguments
limits: Resource limits to apply
env: Environment variables
working_dir: Working directory
shell: If True, explicitly use shell. If False (default), execute safely without shell.
"""
if not self._is_running:
await self.start()

limits = limits or self.config.resource_limits
execution_id = str(uuid.uuid4())

if isinstance(command, str):
cmd = ["sh", "-c", command]
else:
cmd = command
# Import here to avoid circular import
from ._shell import build_argv
cmd = build_argv(command, shell=shell)
Comment on lines +193 to +212
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if SandlockSandbox or other callers pass shell parameter to run_command
rg -n --type=py 'run_command\(' src/praisonai/praisonai/sandbox/ -A2 | head -60

Repository: MervinPraison/PraisonAI

Length of output: 1697


🏁 Script executed:

#!/bin/bash
# Get full signature of run_command in subprocess.py
echo "=== SubprocessSandbox.run_command signature ==="
sed -n '187,215p' src/praisonai/praisonai/sandbox/subprocess.py

echo ""
echo "=== SandlockSandbox.run_command signature and fallback call ==="
sed -n '345,365p' src/praisonai/praisonai/sandbox/sandlock.py

echo ""
echo "=== Check if other implementations have shell parameter ==="
rg -n 'async def run_command' src/praisonai/praisonai/sandbox/ -A5 | grep -E '(shell|def run_command)'

Repository: MervinPraison/PraisonAI

Length of output: 2511


🏁 Script executed:

#!/bin/bash
# Get full signatures of other implementations to see if they have shell parameter
echo "=== SSH run_command signature ==="
sed -n '274,290p' src/praisonai/praisonai/sandbox/ssh.py

echo ""
echo "=== Docker run_command signature ==="
sed -n '223,240p' src/praisonai/praisonai/sandbox/docker.py

echo ""
echo "=== Modal run_command signature ==="
sed -n '320,337p' src/praisonai/praisonai/sandbox/modal.py

echo ""
echo "=== Daytona run_command signature ==="
sed -n '262,279p' src/praisonai/praisonai/sandbox/daytona.py

echo ""
echo "=== Check the base class signature ==="
find src/praisonai/praisonai/sandbox -name '*.py' -type f | xargs grep -l 'class.*Sandbox' | head -5
rg -n 'class.*Sandbox|async def run_command' src/praisonai/praisonai/sandbox/base.py -A3 | head -30

Repository: MervinPraison/PraisonAI

Length of output: 3044


🏁 Script executed:

#!/bin/bash
# Check if there's an abstract base class
echo "=== Looking for base class definition ==="
find src/praisonai/praisonai/sandbox -name '*.py' -type f -exec grep -l 'ABC\|abstractmethod' {} \;

echo ""
echo "=== Check for Sandbox protocol or base class ==="
rg -n 'class.*Sandbox.*:' src/praisonai/praisonai/sandbox/ -A2 | head -40

echo ""
echo "=== Check imports in sandlock.py to understand relationship ==="
head -30 src/praisonai/praisonai/sandbox/sandlock.py

Repository: MervinPraison/PraisonAI

Length of output: 2188


🏁 Script executed:

#!/bin/bash
# Confirm the exact fallback line and check if this is a newly added parameter issue
echo "=== Full fallback code context ==="
sed -n '355,365p' src/praisonai/praisonai/sandbox/sandlock.py

echo ""
echo "=== Check git history to see if shell parameter is new ==="
git log --oneline -p src/praisonai/praisonai/sandbox/subprocess.py | grep -A5 -B5 'shell.*bool' | head -30

Repository: MervinPraison/PraisonAI

Length of output: 1003


🏁 Script executed:

#!/bin/bash
# Check if SandlockSandbox.run_command signature should have the shell parameter
echo "=== Check if other implementations that fallback also updated signatures ==="
rg -n 'fallback.*run_command|SubprocessSandbox.*run_command' src/praisonai/praisonai/sandbox/ -B5 -A2

echo ""
echo "=== Check Modal and Daytona implementations to see if they should have shell ==="
sed -n '320,328p' src/praisonai/praisonai/sandbox/modal.py

echo ""
echo "=== Verify: does SandlockSandbox need to be updated with shell parameter? ==="
grep -n 'def run_command\|shell' src/praisonai/praisonai/sandbox/sandlock.py | head -20

Repository: MervinPraison/PraisonAI

Length of output: 1485


🏁 Script executed:

#!/bin/bash
# Check SandlockSandbox docstring and full method to understand intent
echo "=== SandlockSandbox.run_command full implementation ==="
sed -n '345,380p' src/praisonai/praisonai/sandbox/sandlock.py

echo ""
echo "=== Check what build_argv does when shell parameter is not passed ==="
grep -n 'def build_argv' src/praisonai/praisonai/sandbox/_shell.py -A20 | head -30

Repository: MervinPraison/PraisonAI

Length of output: 2219


🏁 Script executed:

#!/bin/bash
# Verify the exact behavior difference between SandlockSandbox native and fallback
echo "=== SandlockSandbox native behavior (always shells) ==="
sed -n '367,371p' src/praisonai/praisonai/sandbox/sandlock.py

echo ""
echo "=== SubprocessSandbox with shell=False (what fallback will use) ==="
sed -n '210,220p' src/praisonai/praisonai/sandbox/subprocess.py

echo ""
echo "=== Check if Modal and Daytona might have similar issues ==="
grep -n 'run_command.*(' src/praisonai/praisonai/sandbox/modal.py -A8 | head -15

Repository: MervinPraison/PraisonAI

Length of output: 1037


API contract violation in fallback path: SandlockSandbox.run_command explicitly documents "Run a shell command in the sandbox" and natively wraps string commands as ["sh", "-c", command] (lines 367-370). However, the fallback to SubprocessSandbox (line 360) does not pass the new shell parameter, causing it to default to shell=False. This changes the behavior: string commands with shell syntax (pipes, redirects, variable expansion) will fail to parse correctly in the fallback path, breaking the API contract for callers relying on shell execution when Sandlock is unavailable.

Recommended fix: Update SandlockSandbox.run_command to accept a shell: bool = False parameter and pass it through to the fallback call, or explicitly pass shell=True to preserve the documented shell execution behavior.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/sandbox/subprocess.py` around lines 193 - 212, The
SandlockSandbox.run_command API currently omits the shell: bool parameter when
delegating to the SubprocessSandbox fallback, violating the documented behavior
for string commands; modify SandlockSandbox.run_command to accept shell: bool =
False (or default True if docs require) and pass that shell value through when
calling SubprocessSandbox.run_command (or explicitly forward shell=True) so that
the fallback uses build_argv and shell execution consistently; update the
signature and the fallback invocation in SandlockSandbox.run_command to
reference the shell parameter.


process_env = os.environ.copy()
if env:
Expand Down