Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions burr/tracking/server/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import importlib
import json
import os.path
import re
import sys
from datetime import datetime
from typing import Any, Optional, Sequence, Tuple, Type, TypeVar
Expand Down Expand Up @@ -292,6 +293,44 @@ def get_uri(project_id: str) -> str:

DEFAULT_PATH = os.path.expanduser("~/.burr")

# Regex for valid project/app identifiers — no path separators or traversal.
_VALID_IDENTIFIER_RE = re.compile(r"^[a-zA-Z0-9_\-:]+$")


def _validate_identifier(value: str, name: str = "identifier") -> str:
"""Validate that a project/app identifier does not contain path traversal characters.

:param value: the identifier to validate
:param name: human-readable name for error messages
:return: the identifier if valid
:raises fastapi.HTTPException: 400 if the identifier contains invalid characters
"""
if not _VALID_IDENTIFIER_RE.match(value):
raise fastapi.HTTPException(
status_code=400,
detail=f"Invalid {name}: '{value}'. Only alphanumeric, underscore, hyphen and colon are allowed.",
)
return value


def _safe_join(base: str, *parts: str) -> str:
"""Safely join path components and ensure the result stays within ``base``.

:param base: the allowed base directory
:param parts: path components to join
:return: the resolved path
:raises fastapi.HTTPException: 400 if the resolved path escapes the base directory
"""
target = os.path.realpath(os.path.join(base, *parts))
base_real = os.path.realpath(base)
# Ensure target is either the base directory or a subdirectory of it
if target != base_real and not target.startswith(base_real + os.sep):
raise fastapi.HTTPException(
status_code=400,
detail="Path traversal detected: attempted to escape the base directory.",
)
return target


class LocalBackend(BackendBase, AnnotationsBackendMixin):
"""Quick implementation of a local backend for testing purposes. This is not a production backend.
Expand All @@ -303,7 +342,8 @@ def __init__(self, path: str = DEFAULT_PATH):
self.path = path

def _get_annotation_path(self, project_id: str) -> str:
return os.path.join(self.path, project_id, "annotations.jsonl")
_validate_identifier(project_id, "project_id")
return _safe_join(self.path, project_id, "annotations.jsonl")

async def _load_project_annotations(self, project_id: str):
annotations_path = self._get_annotation_path(project_id)
Expand Down Expand Up @@ -464,7 +504,8 @@ async def list_apps(
limit: int = 100,
offset: int = 0,
) -> Tuple[Sequence[ApplicationSummary], int]:
project_filepath = os.path.join(self.path, project_id)
_validate_identifier(project_id, "project_id")
project_filepath = _safe_join(self.path, project_id)
if not os.path.exists(project_filepath):
return [], 0
# raise fastapi.HTTPException(status_code=404, detail=f"Project: {project_id} not found")
Expand Down Expand Up @@ -506,7 +547,9 @@ async def get_application_logs(
) -> ApplicationLogs:
# TODO -- handle partition key here
# This currently assumes uniqueness
app_filepath = os.path.join(self.path, project_id, app_id)
_validate_identifier(project_id, "project_id")
_validate_identifier(app_id, "app_id")
app_filepath = _safe_join(self.path, project_id, app_id)
if not os.path.exists(app_filepath):
raise fastapi.HTTPException(
status_code=404, detail=f"App: {app_id} from project: {project_id} not found"
Expand Down
2 changes: 1 addition & 1 deletion burr/tracking/server/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,4 @@ def mount_burr_ui(

if __name__ == "__main__":
port = int(os.getenv("PORT", 8000)) # Default to 8000 if no PORT environment variable is set
uvicorn.run(app, host="0.0.0.0", port=port)
uvicorn.run(app, host="127.0.0.1", port=port)
112 changes: 112 additions & 0 deletions tests/tracking/test_local_backend_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest
from fastapi import HTTPException

from burr.tracking.server.backend import LocalBackend, _safe_join, _validate_identifier


class TestValidateIdentifier:
def test_valid_identifiers(self):
assert _validate_identifier("hello_world") == "hello_world"
assert _validate_identifier("hello-world") == "hello-world"
assert _validate_identifier("Hello:World_123") == "Hello:World_123"

def test_invalid_identifiers(self):
with pytest.raises(HTTPException) as exc:
_validate_identifier("../etc/passwd")
assert exc.value.status_code == 400

with pytest.raises(HTTPException) as exc:
_validate_identifier("hello/world")
assert exc.value.status_code == 400

with pytest.raises(HTTPException) as exc:
_validate_identifier("hello\\world")
assert exc.value.status_code == 400

with pytest.raises(HTTPException) as exc:
_validate_identifier("hello..world")
assert exc.value.status_code == 400


class TestSafeJoin:
def test_safe_join_within_base(self, tmp_path):
base = str(tmp_path)
assert _safe_join(base, "project1") == str(tmp_path / "project1")
assert _safe_join(base, "project1", "app1") == str(tmp_path / "project1" / "app1")

def test_safe_join_blocks_traversal(self, tmp_path):
base = str(tmp_path)
with pytest.raises(HTTPException) as exc:
_safe_join(base, "..", "etc")
assert exc.value.status_code == 400

with pytest.raises(HTTPException) as exc:
_safe_join(base, "project", "..", "..", "etc")
assert exc.value.status_code == 400

def test_safe_join_allows_exact_base(self, tmp_path):
base = str(tmp_path)
# Joining with nothing should return the base itself
assert _safe_join(base) == base


class TestLocalBackendPathTraversal:
def test_get_annotation_path_rejects_traversal(self, tmp_path):
backend = LocalBackend(path=str(tmp_path))
with pytest.raises(HTTPException) as exc:
backend._get_annotation_path("../etc")
assert exc.value.status_code == 400

@pytest.mark.asyncio
async def test_list_apps_rejects_traversal(self, tmp_path):
backend = LocalBackend(path=str(tmp_path))
with pytest.raises(HTTPException) as exc:
await backend.list_apps(None, "../../../etc", None)
assert exc.value.status_code == 400

@pytest.mark.asyncio
async def test_get_application_logs_rejects_traversal_project(self, tmp_path):
backend = LocalBackend(path=str(tmp_path))
with pytest.raises(HTTPException) as exc:
await backend.get_application_logs(None, "../etc", "app1", None)
assert exc.value.status_code == 400

@pytest.mark.asyncio
async def test_get_application_logs_rejects_traversal_app(self, tmp_path):
backend = LocalBackend(path=str(tmp_path))
with pytest.raises(HTTPException) as exc:
await backend.get_application_logs(None, "project1", "../etc", None)
assert exc.value.status_code == 400

@pytest.mark.asyncio
async def test_get_application_logs_allows_valid(self, tmp_path):
backend = LocalBackend(path=str(tmp_path))
# Create the expected directory structure
app_dir = tmp_path / "project1" / "app1"
app_dir.mkdir(parents=True)
# Use a minimal valid graph.json matching ApplicationModel schema
(app_dir / "graph.json").write_text(
'{"entrypoint": "counter", "actions": [{"name": "counter", "reads": [], "writes": ["counter"], "code": "pass"}], "transitions": []}'
)
(app_dir / "log.jsonl").write_text("")
(app_dir / "metadata.json").write_text("{}")

result = await backend.get_application_logs(None, "project1", "app1", None)
assert result is not None
Loading