Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion agentex/database/migrations/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,35 @@
from alembic import context
from sqlalchemy import engine_from_config, pool

# Default Postgres timeouts applied to every migration. They keep a stuck
# migration from queueing behind active writes and holding locks indefinitely.
#
# - lock_timeout: how long a statement waits for a lock before aborting. 3s
# means a migration that cannot acquire its lock quickly gives up instead of
# blocking writers behind it.
# - statement_timeout: maximum runtime for any single statement. 30s catches
# runaway DDL/UPDATEs; long index builds must use CREATE INDEX CONCURRENTLY
# in an autocommit_block, which runs outside the transaction-bound timeout.
# - idle_in_transaction_session_timeout: kills a transaction that has gone
# idle while still holding locks (e.g. a stalled AccessExclusiveLock).
#
# These are session-level so they persist across each per-migration
# transaction and across autocommit_block boundaries on the same connection.
# Migration authors must NOT override them with `SET lock_timeout` or
# `SET statement_timeout` inside a migration file — the migration linter
# (scripts/ci_tools/migration_lint.py) flags those, with the
# `migration-unsafe-ack` PR label as the documented escape hatch for
# genuinely-long migrations that need a maintenance window.
DEFAULT_MIGRATION_TIMEOUTS: dict[str, str] = {
"lock_timeout": "3s",
"statement_timeout": "30s",
"idle_in_transaction_session_timeout": "10s",
}


def _format_set_statements(timeouts: dict[str, str]) -> list[str]:
return [f"SET {key} = '{value}'" for key, value in timeouts.items()]

# Add explicit error handling to catch import errors
try:
print("Starting migration - importing modules")
Expand Down Expand Up @@ -83,6 +112,8 @@ def run_migrations_offline() -> None:
)

with context.begin_transaction():
for stmt in _format_set_statements(DEFAULT_MIGRATION_TIMEOUTS):
context.execute(stmt)
context.run_migrations()
except Exception as e:
print("ERROR IN OFFLINE MIGRATIONS:", str(e))
Expand All @@ -106,7 +137,30 @@ def run_migrations_online() -> None:
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
# Apply default migration timeouts at the session level so they
# persist across per-migration transactions and any autocommit_block
# boundaries opened by migrations (e.g. for CREATE INDEX CONCURRENTLY).
#
# exec_driver_sql autobegins a SQLAlchemy transaction. We commit it
# before configure() so alembic doesn't latch onto it as an
# "external" transaction — that mode disables transaction_per_migration
# and breaks autocommit_block (which asserts self._transaction is not
# None). Postgres SET is session-level, so the timeouts persist past
# the commit.
for stmt in _format_set_statements(DEFAULT_MIGRATION_TIMEOUTS):
connection.exec_driver_sql(stmt)
connection.commit()

# transaction_per_migration=True wraps each migration in its own
# transaction (instead of a single outer transaction for all
# migrations). This lets individual migrations opt into
# autocommit_block() for operations that cannot run inside a
# transaction, such as CREATE INDEX CONCURRENTLY.
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True,
)

with context.begin_transaction():
context.run_migrations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,26 @@
Revises: 4a9b7787ccd7
Create Date: 2026-04-14 11:26:45.193515

The original version of this migration also ran a large UPDATE backfill,
added a foreign key (which scanned the full table under AccessExclusiveLock),
and created an index non-concurrently. On a sufficiently large spans table
this can exhaust the connection pool while concurrent span writes pile up
behind the lock.

The backfill, FK and index are now handled out-of-band (see
docs/runbooks/spans-task-id-backfill.md) and a follow-up tail migration
finalizes the FK + index with non-blocking operations. This revision is
reduced to the only safe in-band step: adding the nullable column. Adding a
nullable column with no default is a metadata-only operation in PostgreSQL
>= 11, so it is fast and does not block writes.

The IF NOT EXISTS guard makes the migration safe to re-run on environments
where the original (heavier) version of this migration already completed
successfully.
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
Expand All @@ -19,34 +34,8 @@


def upgrade() -> None:
# Add nullable task_id column first (no FK yet, so backfill can run freely)
op.add_column('spans', sa.Column('task_id', sa.String(), nullable=True))

# Backfill task_id from trace_id where trace_id is a valid task ID.
# Uses a JOIN instead of a subquery for efficient matching.
op.execute("""
UPDATE spans
SET task_id = spans.trace_id
FROM tasks
WHERE spans.trace_id = tasks.id
AND spans.task_id IS NULL
""")

# Add FK constraint after backfill (NULL values are allowed by FK)
op.create_foreign_key(
'fk_spans_task_id_tasks',
'spans',
'tasks',
['task_id'],
['id'],
ondelete='SET NULL',
)

# Add index for querying spans by task_id
op.create_index('ix_spans_task_id', 'spans', ['task_id'])
op.execute("ALTER TABLE spans ADD COLUMN IF NOT EXISTS task_id VARCHAR")


def downgrade() -> None:
op.drop_index('ix_spans_task_id', table_name='spans')
op.drop_constraint('fk_spans_task_id_tasks', 'spans', type_='foreignkey')
op.drop_column('spans', 'task_id')
op.execute("ALTER TABLE spans DROP COLUMN IF EXISTS task_id")
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""finalize_spans_task_id

Revision ID: a9959ebcbe98
Revises: e9c4ff9e6542
Create Date: 2026-05-06 12:00:00.000000

Finalizes the spans.task_id column added in 57c5ed4f59ae by attaching the
foreign key and creating the lookup index using non-blocking operations.

This migration is intentionally split out from 57c5ed4f59ae so that:

* The FK is added with NOT VALID, which acquires only a brief lock and
skips the full-table scan that the original migration triggered. The FK
is still enforced on all subsequent inserts and updates (and ON DELETE
SET NULL still applies to existing rows).
* The index is built CONCURRENTLY so writes are not blocked.
* Both operations live in autocommit_block() so they run outside the
surrounding migration transaction (CONCURRENTLY cannot run inside a
transaction).

The migration is idempotent: on environments where the original version of
57c5ed4f59ae completed successfully (the FK and index already exist), each
operation is a no-op via IF NOT EXISTS / pg_constraint catalog checks.

The historical backfill of task_id from trace_id is intentionally not run
here — it is a separate, operator-driven step (see
docs/runbooks/spans-task-id-backfill.md). The application reads tolerate
NULL task_id by falling back to trace_id at query time.
"""
from typing import Sequence, Union

from alembic import op


# revision identifiers, used by Alembic.
revision: str = 'a9959ebcbe98'
down_revision: Union[str, None] = 'e9c4ff9e6542'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
with op.get_context().autocommit_block():
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'fk_spans_task_id_tasks'
) THEN
ALTER TABLE spans
ADD CONSTRAINT fk_spans_task_id_tasks
FOREIGN KEY (task_id) REFERENCES tasks(id)
ON DELETE SET NULL
NOT VALID;
END IF;
END$$;
"""
)
op.execute(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_spans_task_id "
"ON spans (task_id)"
)


def downgrade() -> None:
with op.get_context().autocommit_block():
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_spans_task_id")
op.execute(
"ALTER TABLE spans DROP CONSTRAINT IF EXISTS fk_spans_task_id_tasks"
)
30 changes: 30 additions & 0 deletions agentex/src/domain/repositories/span_repository.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Annotated, Any

from fastapi import Depends
from sqlalchemy import or_, select
from src.adapters.crud_store.adapter_postgres import PostgresCRUDRepository
from src.adapters.orm import SpanORM
from src.config.dependencies import (
Expand Down Expand Up @@ -36,6 +37,35 @@ async def list(
) -> list[SpanEntity]:
# Default to start_time if no order_by specified
effective_order_by = order_by or "start_time"

# Filtering by task_id matches both the new task_id column and historical
# rows where the value was stored in trace_id. The task_id column was
# added late in the table's life and the prod backfill is run out-of-band
# rather than via migration (see docs/runbooks/spans-task-id-backfill.md),
# so old rows can have task_id NULL even when they belong to a task. For
# task-scoped spans, trace_id holds the task id, so we OR the two columns
# at read time. Both columns are indexed.
#
# The OR fallback is skipped when task_id is None — applying it would
# expand to (task_id IS NULL OR trace_id IS NULL), which on a large
# spans table where virtually all historical rows have task_id NULL
# would return an enormous, unintended result set. A None task_id
# filter falls through to the parent's normal IS NULL handling.
if filters and filters.get("task_id") is not None:
remaining_filters = {k: v for k, v in filters.items() if k != "task_id"}
task_id_value = filters["task_id"]
query = select(self.orm).where(
or_(SpanORM.task_id == task_id_value, SpanORM.trace_id == task_id_value)
)
return await super().list(
filters=remaining_filters or None,
query=query,
order_by=effective_order_by,
order_direction=order_direction,
limit=limit,
page_number=page_number,
)

return await super().list(
filters=filters,
order_by=effective_order_by,
Expand Down
88 changes: 88 additions & 0 deletions agentex/tests/unit/database/test_alembic_env_timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Unit tests for the migration runner timeout defaults.

Sanity-checks the constants and the SQL formatting helper. The actual
wiring into Alembic's ``run_migrations_online`` / ``run_migrations_offline``
is exercised end-to-end whenever a migration runs locally or in CI, so we
keep this layer to a focused unit test of the values we promise to ship.
"""

from __future__ import annotations

from pathlib import Path

import pytest

_ENV_PATH = (
Path(__file__).resolve().parents[3]
/ "database"
/ "migrations"
/ "alembic"
/ "env.py"
)


def _read_env_text() -> str:
return _ENV_PATH.read_text()


def test_default_timeouts_present_in_env() -> None:
text = _read_env_text()
# Ensure the runner sets all three timeouts with the values committed to
# in the spec, so a future refactor that drops one fails this test.
assert "DEFAULT_MIGRATION_TIMEOUTS" in text
assert '"lock_timeout": "3s"' in text
assert '"statement_timeout": "30s"' in text
assert '"idle_in_transaction_session_timeout": "10s"' in text


def test_timeouts_applied_in_online_and_offline_modes() -> None:
text = _read_env_text()
# Online mode: SET statements applied via the live connection before
# context.begin_transaction() so they persist at session level.
assert "connection.exec_driver_sql(stmt)" in text
# Offline mode: SET statements emitted at the top of the generated SQL
# via context.execute().
assert "context.execute(stmt)" in text


def test_format_set_statements_helper_shape() -> None:
# The env.py module imports server-side code (env vars, ORM autoloader);
# skip full execution and re-derive the helper via exec to keep this
# micro-test free of the application stack.
namespace: dict[str, object] = {}
helper_src = (
"def _format_set_statements(timeouts):\n"
" return [f\"SET {k} = '{v}'\" for k, v in timeouts.items()]\n"
)
exec(helper_src, namespace)
formatter = namespace["_format_set_statements"]
out = formatter(
{
"lock_timeout": "3s",
"statement_timeout": "30s",
}
)
assert out == [
"SET lock_timeout = '3s'",
"SET statement_timeout = '30s'",
]


def test_runner_documents_escape_hatch() -> None:
text = _read_env_text()
# The CLAUDE.md docs and the linter both reference "migration-unsafe-ack"
# as the escape hatch — make sure the runner's docstring mentions it so
# anyone reading env.py understands the contract.
assert "migration-unsafe-ack" in text


@pytest.mark.parametrize(
"needle",
(
"lock_timeout",
"statement_timeout",
"idle_in_transaction_session_timeout",
),
)
def test_each_timeout_setting_referenced(needle: str) -> None:
assert needle in _read_env_text()
Loading
Loading