Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b8d1ee3
Add Kafka stream backend architecture with split KV/stream protocols
claude Mar 27, 2026
44f570d
Simplify to single-backend architecture with ops layer wired to all c…
claude Mar 27, 2026
51afd25
Add queue commit/nack semantics and retry support for Kafka resilience
claude Mar 27, 2026
a5a6584
Concurrent task execution per worker and Kafka consumer heartbeat
claude Mar 27, 2026
29685ca
Revert "Concurrent task execution per worker and Kafka consumer heart…
claude Mar 27, 2026
de16450
Add worker_id to Kafka client IDs for observability
claude Mar 27, 2026
8af3122
Route activity system through ops layer with Kafka activity topic
claude Mar 27, 2026
7099247
Split backends into packages with domain-specific modules
claude Mar 27, 2026
35a6a2a
Go full async and rename backend methods to descriptive names
claude Mar 27, 2026
84423a9
Add Kafka integration tests and CI workflow
claude Mar 27, 2026
a4d87cf
Fix CI: use correct Kafka image tag, override addopts, fix readiness …
claude Mar 27, 2026
659f9fe
Switch to apache/kafka:3.9.2 for CI Kafka service
claude Mar 27, 2026
969f6f6
CI: disable fail-fast, add verbose test output for debugging
claude Mar 27, 2026
f763709
Fix Kafka consumer hangs: per-topic group IDs, retry loop, faster hea…
claude Mar 27, 2026
48c3e2d
Use manual partition assignment instead of consumer groups
claude Mar 27, 2026
b50156c
Fix force_metadata_update — use partition discovery retry loop
claude Mar 27, 2026
584994c
Remove consumer group_id to avoid group coordinator hangs
claude Mar 27, 2026
66082e7
CI: capture Kafka test output in job summary on failure
claude Mar 27, 2026
918363a
CI: post Kafka test output as PR comment on failure
claude Mar 27, 2026
896ecdc
Use subscribe() with per-topic group IDs, set rebalance delay to 0
claude Mar 27, 2026
a861c9a
Use admin metadata for partition discovery in manual assignment
claude Mar 27, 2026
1a96fed
CI: use curl for PR comment instead of github-script
claude Mar 27, 2026
67922c9
CI: upload test output as artifact instead of PR comment
claude Mar 27, 2026
86fde70
Add debug prints to queue_pop and test_push_and_pop
claude Mar 27, 2026
686f3d2
Update uv.lock after adding kafka extra dependency
claude Mar 27, 2026
985d483
CI: emit test output as warning annotations for API access
claude Mar 27, 2026
5f6384a
Better debug output: print consumer state on timeout, filter annotations
claude Mar 27, 2026
ce54be7
CI: filter annotations to only show failures and debug output
claude Mar 27, 2026
e7fb281
CI: re-trigger after transient Docker pull failure
claude Mar 27, 2026
3f792b1
CI: retry after transient failures
claude Mar 27, 2026
47c0324
CI: use apache/kafka:latest to avoid Docker pull issues with pinned tag
claude Mar 27, 2026
d0aa733
CI: switch to confluentinc/cp-kafka:7.7.1 for reliable Docker pulls
claude Mar 27, 2026
92ab91c
CI: use docker run instead of service containers for Kafka
claude Mar 27, 2026
e4b3946
Add docker-compose.kafka.yml, clean up debug prints, use docker run i…
claude Mar 27, 2026
424d41c
Fix queue_pop message buffer and produce() key type handling
tcdent Mar 27, 2026
cabf769
Kafka consumer groups, full async, producer-side topic creation
tcdent Mar 27, 2026
a9fdbdd
Class-based backend architecture, eliminate ops passthrough layer
tcdent Mar 27, 2026
b2e28c5
Flatten backend modules, remove dead code, clean up noise
tcdent Mar 27, 2026
a3c3f6d
Kafka headers, stateless activity backend, schedule backend, pool sup…
tcdent Mar 28, 2026
17f9819
Extract activity from backends into producer/consumer pattern
tcdent Mar 28, 2026
4259527
Typed worker messages, Task as pure data, multiprocessing IPC
tcdent Mar 28, 2026
2c7ef1f
Partitioned Redis queues with scan-based fair dequeue
tcdent Mar 28, 2026
b1ef5d5
Remove pubsub, inline dequeue, queue.complete, activity over IPC
tcdent Mar 29, 2026
b382ef3
Schedule backend, session cleanup, dead code removal, resiliency tests
tcdent Mar 29, 2026
4f9ba7c
Kafka state: raise NotImplementedError, drop in-memory KV/counter caches
tcdent Mar 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
name: CI

on:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
# -----------------------------------------------------------------------
# Unit tests — no external services (fakeredis + SQLite)
# -----------------------------------------------------------------------

test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.12", "3.13"]

steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true

- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}

- name: Install dependencies
run: uv sync --dev

- name: Run unit tests
run: |
uv run pytest tests/ \
--ignore=tests/test_kafka_integration.py \
-o "addopts=" \
-v --tb=long

# -----------------------------------------------------------------------
# Kafka integration tests — real broker via docker run
# -----------------------------------------------------------------------
test-kafka:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Start Kafka broker
run: |
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_NODE_ID=1 \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_LOG_CLEANER_MIN_COMPACTION_LAG_MS=0 \
-e KAFKA_LOG_CLEANER_MIN_CLEANABLE_RATIO=0.01 \
-e KAFKA_LOG_RETENTION_MS=60000 \
-e KAFKA_NUM_PARTITIONS=1 \
-e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \
-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
-e CLUSTER_ID=ciTestCluster0001 \
apache/kafka:3.9.0

- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true

- name: Set up Python
run: uv python install 3.12

- name: Install dependencies
run: uv sync --dev --extra kafka

- name: Wait for Kafka to be ready
run: |
echo "Waiting for Kafka..."
for i in $(seq 1 30); do
if nc -z localhost 9092 2>/dev/null; then
echo "Kafka port is open"
sleep 5
echo "Kafka is ready"
exit 0
fi
echo " attempt $i/30..."
sleep 2
done
echo "Kafka failed to start"
docker logs kafka
exit 1

- name: Run Kafka integration tests
run: |
uv run pytest tests/test_kafka_integration.py \
-o "addopts=" \
-v --tb=long 2>&1 | tee /tmp/kafka_test_output.txt
exit ${PIPESTATUS[0]}
env:
AGENTEXEC_STATE_BACKEND: agentexec.state.kafka
KAFKA_BOOTSTRAP_SERVERS: localhost:9092
AGENTEXEC_KAFKA_DEFAULT_PARTITIONS: "2"
AGENTEXEC_KAFKA_REPLICATION_FACTOR: "1"

- name: Show Kafka logs on failure
if: failure()
run: docker logs kafka 2>&1 | tail -50

- name: Create failure check annotation with output
if: failure()
run: |
if [ -f /tmp/kafka_test_output.txt ]; then
grep -E '\[queue_|FAILED|ERROR|AssertionError|TIMEOUT|short test summary' /tmp/kafka_test_output.txt | tail -9 | while IFS= read -r line; do
echo "::warning::$line"
done
fi
48 changes: 48 additions & 0 deletions docker-compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Kafka development environment for running integration tests locally.
#
# Usage:
# docker compose -f docker-compose.kafka.yml up -d
#
# KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
# AGENTEXEC_STATE_BACKEND=agentexec.state.kafka \
# uv run pytest tests/test_kafka_integration.py -v
#
# docker compose -f docker-compose.kafka.yml down
#
# Kafka UI available at http://localhost:8080

services:
kafka:
image: apache/kafka:3.9.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
CLUSTER_ID: "agentexec-dev-cluster-01"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
healthcheck:
test: /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
interval: 5s
timeout: 10s
retries: 15
start_period: 15s

kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: agentexec
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
kafka:
condition: service_healthy
187 changes: 187 additions & 0 deletions examples/queue-fairness/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""Queue fairness test.

Validates that tasks distributed across many partition queues get
roughly equal treatment under the scan-based dequeue strategy.

Usage:
uv run python examples/queue-fairness/run.py
uv run python examples/queue-fairness/run.py --partitions 100 --tasks-per-partition 5 --workers 8
"""

from __future__ import annotations

import argparse
import asyncio
import json
import statistics
import time
from uuid import UUID, uuid4

from pydantic import BaseModel

import agentexec as ax
from agentexec.config import CONF
from agentexec.state import backend


class BenchContext(BaseModel):
partition_id: int
task_index: int
queued_at: float


async def enqueue_tasks(partitions: int, tasks_per_partition: int) -> int:
"""Push tasks across N partitions with M tasks each."""
total = 0
for p in range(partitions):
partition_key = f"partition:{p}"
for t in range(tasks_per_partition):
task = ax.Task(
task_name="bench_task",
context={
"partition_id": p,
"task_index": t,
"queued_at": time.time(),
},
agent_id=uuid4(),
)
await backend.queue.push(
CONF.queue_prefix,
task.model_dump_json(),
partition_key=partition_key,
)
total += 1
return total


async def worker(
worker_id: int,
results: list[dict],
stop_event: asyncio.Event,
work_duration: float,
):
"""Simulated worker that pops tasks and records timing."""
while not stop_event.is_set():
data = await backend.queue.pop(CONF.queue_prefix, timeout=1)
if data is None:
# Check if we should stop
await asyncio.sleep(0.1)
continue

picked_up_at = time.time()
context = data.get("context", {})
queued_at = context.get("queued_at", picked_up_at)
wait_time = picked_up_at - queued_at

results.append({
"worker_id": worker_id,
"partition_id": context.get("partition_id"),
"task_index": context.get("task_index"),
"wait_time": wait_time,
"picked_up_at": picked_up_at,
})

# Simulate work
await asyncio.sleep(work_duration)

# Release the partition lock
partition_key = f"partition:{context.get('partition_id')}"
await backend.queue.release_lock(CONF.queue_prefix, partition_key)


async def run(
partitions: int,
tasks_per_partition: int,
num_workers: int,
work_duration: float,
):
print(f"Enqueueing {partitions} partitions x {tasks_per_partition} tasks = {partitions * tasks_per_partition} total")
total = await enqueue_tasks(partitions, tasks_per_partition)
print(f"Enqueued {total} tasks")

results: list[dict] = []
stop_event = asyncio.Event()

print(f"Starting {num_workers} workers (simulated work: {work_duration}s)")
start = time.time()

workers = [
asyncio.create_task(worker(i, results, stop_event, work_duration))
for i in range(num_workers)
]

# Wait until all tasks are processed
while len(results) < total:
await asyncio.sleep(0.5)
elapsed = time.time() - start
print(f" {len(results)}/{total} tasks processed ({elapsed:.1f}s)", end="\r")

elapsed = time.time() - start
stop_event.set()

# Let workers drain
await asyncio.gather(*workers, return_exceptions=True)

print(f"\n\nCompleted {len(results)} tasks in {elapsed:.1f}s")
print(f"Throughput: {len(results) / elapsed:.1f} tasks/sec")

# Analyze fairness per partition
partition_times: dict[int, list[float]] = {}
for r in results:
pid = r["partition_id"]
if pid not in partition_times:
partition_times[pid] = []
partition_times[pid].append(r["wait_time"])

avg_per_partition = {
pid: statistics.mean(times) for pid, times in partition_times.items()
}

all_waits = [r["wait_time"] for r in results]
all_avgs = list(avg_per_partition.values())

print(f"\nWait time (seconds from enqueue to pickup):")
print(f" Overall mean: {statistics.mean(all_waits):.3f}s")
print(f" Overall median: {statistics.median(all_waits):.3f}s")
print(f" Overall stdev: {statistics.stdev(all_waits):.3f}s")
print(f" Min: {min(all_waits):.3f}s")
print(f" Max: {max(all_waits):.3f}s")

print(f"\nFairness across {len(partition_times)} partitions:")
print(f" Mean of partition averages: {statistics.mean(all_avgs):.3f}s")
print(f" Stdev of partition averages: {statistics.stdev(all_avgs):.3f}s")
print(f" Min partition avg: {min(all_avgs):.3f}s")
print(f" Max partition avg: {max(all_avgs):.3f}s")
print(f" Spread (max-min): {max(all_avgs) - min(all_avgs):.3f}s")

# Worker distribution
worker_counts: dict[int, int] = {}
for r in results:
wid = r["worker_id"]
worker_counts[wid] = worker_counts.get(wid, 0) + 1

print(f"\nWorker distribution:")
for wid in sorted(worker_counts):
print(f" Worker {wid}: {worker_counts[wid]} tasks")

await backend.close()


def main():
parser = argparse.ArgumentParser(description="Queue fairness benchmark")
parser.add_argument("--partitions", type=int, default=500, help="Number of partition queues")
parser.add_argument("--tasks-per-partition", type=int, default=12, help="Tasks per partition")
parser.add_argument("--workers", type=int, default=4, help="Number of concurrent workers")
parser.add_argument("--work-duration", type=float, default=0.5, help="Simulated work time (seconds)")
args = parser.parse_args()

asyncio.run(run(
partitions=args.partitions,
tasks_per_partition=args.tasks_per_partition,
num_workers=args.workers,
work_duration=args.work_duration,
))


if __name__ == "__main__":
main()
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ dependencies = [
"croniter>=6.0.0",
]

[project.optional-dependencies]
kafka = [
"aiokafka>=0.11.0",
]


[project.urls]
Homepage = "https://github.com/Agent-CI/agentexec"
Expand Down
Loading
Loading