Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion flocks/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,16 @@ class ConfigInfo(BaseModel):
"workspace_access (none/ro/rw), workspace_root, docker, tools, prune."
),
)

allow_read_paths: Optional[List[str]] = Field(
None,
alias="allowReadPaths",
description=(
"Extra absolute paths (directories or prefixes) allowed for HTTP "
"/api/file/content and /api/file/list. Does not replace project root, data, or "
"workspace; the Flocks config directory and ~/.ssh are never allowed."
),
)

# Channel configuration (IM platform integrations)
channels: Optional[Dict[str, ChannelConfig]] = Field(
None,
Expand Down
16 changes: 14 additions & 2 deletions flocks/server/routes/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel

from flocks.config.config import Config
from flocks.utils.file import File, FileNode, FileContent, FileInfo
from flocks.utils.http_file_read_guard import resolve_path_for_http_file_access
from flocks.utils.log import Log

router = APIRouter()
Expand All @@ -23,8 +25,13 @@ async def list_files(path: str = Query(..., description="Directory path")):
List files and directories in a specified path.
"""
try:
nodes = await File.list(path)
cfg = await Config.get()
safe_path = await resolve_path_for_http_file_access(path, cfg)
nodes = await File.list(safe_path)
return nodes
except PermissionError:
log.warning("http_file.list.denied")
raise HTTPException(status_code=403, detail="Access denied")
except Exception as e:
log.error("file.list.error", {"error": str(e), "path": path})
raise HTTPException(status_code=500, detail=str(e))
Expand All @@ -38,10 +45,15 @@ async def read_file(path: str = Query(..., description="File path")):
Read the content of a specified file.
"""
try:
content = await File.read(path)
cfg = await Config.get()
safe_path = await resolve_path_for_http_file_access(path, cfg)
content = await File.read(safe_path)
return content
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except PermissionError:
log.warning("http_file.read.denied")
raise HTTPException(status_code=403, detail="Access denied")
except Exception as e:
log.error("file.read.error", {"error": str(e), "path": path})
raise HTTPException(status_code=500, detail=str(e))
Expand Down
11 changes: 9 additions & 2 deletions flocks/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
from flocks.utils.log import Log
from flocks.utils.id import Identifier
from flocks.utils.json_repair import parse_json_robust, repair_truncated_json
from flocks.utils.paths import find_project_root
from flocks.utils.paths import find_flocks_project_root, find_project_root

__all__ = ["Log", "Identifier", "parse_json_robust", "repair_truncated_json", "find_project_root"]
__all__ = [
"Log",
"Identifier",
"parse_json_robust",
"repair_truncated_json",
"find_project_root",
"find_flocks_project_root",
]
201 changes: 201 additions & 0 deletions flocks/utils/http_file_read_guard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Path checks for HTTP ``/api/file/*`` endpoints: blocks arbitrary file read without
changing :meth:`flocks.utils.file.File.read` used by internal callers.

- Used only from ``flocks.server.routes.file``.
- Reads ``allowReadPaths`` via :meth:`flocks.config.config.Config.get` (Pydantic alias).
- Uses :func:`flocks.sandbox.paths.assert_sandbox_path` for containment and symlink checks.
- When no ``.flocks`` project root exists, does not fall back to cwd as a sandbox root.
"""

from __future__ import annotations

import os
from pathlib import Path
from typing import List, Optional, Set

from flocks.config.config import Config, ConfigInfo
from flocks.sandbox.paths import assert_sandbox_path
from flocks.utils.paths import find_flocks_project_root


def _safe_system_files_resolved() -> Set[str]:
"""Small built-in allowlist of safe system files (resolved absolute paths)."""
out: Set[str] = set()
for p in ("/etc/hosts", "/etc/hostname", "/etc/resolv.conf"):
try:
rp = Path(p).resolve()
if rp.is_file():
out.add(str(rp))
except OSError:
continue
return out


_SAFE_SYSTEM_FILES: Set[str] = _safe_system_files_resolved()


def _normalize_allow_read_entries(entries: Optional[List[str]]) -> List[str]:
"""Validate extra readable paths from config: absolute, not FS root, not under config or ``~/.ssh``."""
if not entries:
return []

cfg_dir = Config.get_config_path().resolve()
try:
ssh_dir = (Path.home() / ".ssh").resolve()
except OSError:
ssh_dir = None

out: List[str] = []
seen: Set[str] = set()

for raw in entries:
if not raw or not isinstance(raw, str):
continue
expanded = os.path.normpath(os.path.expanduser(raw.strip()))
if not expanded:
continue
if not os.path.isabs(expanded):
continue
try:
p = Path(expanded).resolve()
except OSError:
continue

if p.parent == p:
continue

ps = str(p)

if ps == str(cfg_dir) or p.is_relative_to(cfg_dir):
continue
if ssh_dir is not None and (ps == str(ssh_dir) or p.is_relative_to(ssh_dir)):
continue

if ps not in seen:
seen.add(ps)
out.append(ps)

return out


def _blocked_for_http_read(resolved_str: str) -> bool:
"""Always deny these sensitive locations, even if another allow-root would match."""
try:
r = Path(resolved_str).resolve()
except OSError:
return True

cfg_dir = Config.get_config_path().resolve()
if r == cfg_dir or r.is_relative_to(cfg_dir):
return True

try:
ssh = (Path.home() / ".ssh").resolve()
if r == ssh or r.is_relative_to(ssh):
return True
except OSError:
pass

rs = str(r)
if os.name == "posix":
for prefix in ("/proc", "/sys", "/dev"):
if rs == prefix or rs.startswith(prefix + os.sep):
return True

return False


def _is_filesystem_root(p: Path) -> bool:
try:
r = p.resolve()
except OSError:
return True
return r.parent == r


def _initial_abs_path(user_path: str, project_root: Optional[Path]) -> str:
"""Turn the query path into an absolute path; relative paths are only allowed under the found project root."""
raw = user_path.strip()
if not raw:
raise PermissionError("Empty path")

expanded = str(Path(raw).expanduser())
if os.path.isabs(expanded):
return os.path.normpath(expanded)

if project_root is None:
raise PermissionError("Relative paths require a Flocks project root (.flocks/)")

return os.path.normpath(os.path.join(str(project_root.resolve()), expanded))


async def resolve_path_for_http_file_access(
user_path: str,
config: ConfigInfo,
) -> str:
"""
Resolve and authorize ``user_path``; on success return an absolute path safe to open.

Raises:
PermissionError: Path is not allowed for remote HTTP file access.
"""
project_root = find_flocks_project_root()
abs_guess = _initial_abs_path(user_path, project_root)

extra_roots = _normalize_allow_read_entries(config.allow_read_paths)

roots: List[str] = []
if project_root is not None and not _is_filesystem_root(project_root):
roots.append(str(project_root.resolve()))
data_dir = Config.get_data_path().resolve()
if not _is_filesystem_root(data_dir):
roots.append(str(data_dir))

from flocks.workspace.manager import WorkspaceManager

ws_dir = WorkspaceManager.get_instance().get_workspace_dir().resolve()
if not _is_filesystem_root(ws_dir):
roots.append(str(ws_dir))

for r in extra_roots:
if r not in roots:
roots.append(r)

# Deduplicate while preserving order
seen: Set[str] = set()
uniq: List[str] = []
for r in roots:
if r not in seen:
seen.add(r)
uniq.append(r)

for root in uniq:
try:
result = await assert_sandbox_path(
file_path=abs_guess,
cwd=root,
root=root,
)
cand = str(Path(result.resolved).resolve())
if not _blocked_for_http_read(cand):
return cand
except ValueError:
continue

try:
safe_resolved = str(Path(abs_guess).resolve())
except OSError as e:
raise PermissionError("Invalid path") from e

if (
_SAFE_SYSTEM_FILES
and safe_resolved in _SAFE_SYSTEM_FILES
and not _blocked_for_http_read(safe_resolved)
):
return safe_resolved

raise PermissionError("Path is not allowed for remote file access")


__all__ = ["resolve_path_for_http_file_access"]
26 changes: 19 additions & 7 deletions flocks/utils/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@
from pathlib import Path


def find_flocks_project_root() -> Path | None:
"""Walk upward from cwd and return the first directory that contains ``.flocks/``.

Unlike :func:`find_project_root`, this returns ``None`` when none is found—no
fallback to cwd. Use this for security-sensitive checks (e.g. avoid treating
the whole filesystem as a project when cwd is ``/``).
"""
current = Path.cwd().resolve()
for directory in (current, *current.parents):
if (directory / ".flocks").is_dir():
return directory
return None


def find_project_root() -> Path:
"""Walk up from cwd to locate the Flocks project root.

Expand All @@ -13,17 +27,15 @@ def find_project_root() -> Path:
until the filesystem root is reached.

Falls back to ``Path.cwd()`` when nothing is found (e.g. first-run before
``.flocks/`` has been created).
``.flocks/`` has been created). Prefer :func:`find_flocks_project_root` for
HTTP file access and other security-sensitive path checks.

Returns:
The nearest ancestor directory (inclusive of cwd) that contains a
``.flocks/`` sub-directory, or ``Path.cwd()`` as a fallback.
"""
current = Path.cwd().resolve()
for directory in [current, *current.parents]:
if (directory / ".flocks").is_dir():
return directory
return current
found = find_flocks_project_root()
return found if found is not None else Path.cwd().resolve()


__all__ = ["find_project_root"]
__all__ = ["find_project_root", "find_flocks_project_root"]