Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b5b2ebd
feature ok / needs to be tested
IdirLISN Mar 31, 2026
c8cbf4e
feature deactivated by default + hiden behind a button by default + v…
IdirLISN Apr 2, 2026
7ada400
worker monitoring button behind user menu
IdirLISN Apr 2, 2026
d6adcb5
files blacked for fixing the formatting issues
IdirLISN Apr 2, 2026
92f5247
fixing synthax and format
IdirLISN Apr 2, 2026
047cbbd
fixing synthax and format
IdirLISN Apr 2, 2026
7c058fa
fixing synthax and format
IdirLISN Apr 2, 2026
f477464
feature in progress
IdirLISN Apr 7, 2026
a2f0769
new file for monitoring implentation imported into frontend
IdirLISN May 6, 2026
ecf464b
test monitoring private queues
IdirLISN May 7, 2026
b7de924
compute worker monitoring on private queues (amazing stuff)
IdirLISN May 7, 2026
9666084
test number 234
IdirLISN May 7, 2026
cad27cf
test number 238
IdirLISN May 7, 2026
15aacef
test number 245
IdirLISN May 7, 2026
5c611a4
test number 246
IdirLISN May 7, 2026
4eba4b7
test number 249
IdirLISN May 7, 2026
781af58
test number 435
IdirLISN May 7, 2026
6766dc8
add d'un fichier manquant pour test
IdirLISN May 7, 2026
bceaab7
model queue = a problem
IdirLISN May 7, 2026
75031b5
clean feature/ needs to be tested
IdirLISN May 7, 2026
3fe688a
clean feature code and imports
IdirLISN May 11, 2026
d8e1f72
feature production ready
IdirLISN May 11, 2026
bd5268b
fix comment // feature production ready
IdirLISN May 11, 2026
cef370c
debug site worker
IdirLISN May 11, 2026
06bcc21
add collapse button on default worker panel
IdirLISN May 12, 2026
92e0345
feature production ready
IdirLISN May 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ dependencies = [
"django-cors-headers==4.9.0",
"nh3==0.3.3",
"configobj==5.0.9",
"black>=26.3.1",
"redis-cli>=1.0.1",
]

[tool.uv]
Expand Down
118 changes: 117 additions & 1 deletion src/apps/competitions/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import os
import re
import traceback
Expand All @@ -8,10 +9,13 @@
from io import BytesIO
from tempfile import TemporaryDirectory, NamedTemporaryFile

import urllib

import oyaml as yaml
import requests
from celery._state import app_or_default
from django.conf import settings
from django_redis import get_redis_connection
from django.core.exceptions import ObjectDoesNotExist
from django.core.files.base import ContentFile
from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F
Expand All @@ -20,9 +24,10 @@
from django.utils.timezone import now
from rest_framework.exceptions import ValidationError

from celery_config import app
from celery_config import app, app_for_vhost
from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \
CompetitionDump, Phase
from queues.models import Queue
from competitions.unpackers.utils import CompetitionUnpackingException
from competitions.unpackers.v1 import V15Unpacker
from competitions.unpackers.v2 import V2Unpacker
Expand All @@ -31,8 +36,12 @@
from datasets.models import Data
from utils.data import make_url_sassy
from utils.email import codalab_send_markdown_email
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

import logging

from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY, extract_queue_names, is_compute_worker, known_compute_queue_names
logger = logging.getLogger(__name__)

COMPETITION_FIELDS = [
Expand Down Expand Up @@ -791,3 +800,110 @@ def submission_status_cleanup():
sub.parent.cancel(status=Submission.FAILED)
else:
sub.cancel(status=Submission.FAILED)


# -------------------------------------------------
def _broadcast_worker_state(payload):
channel_layer = get_channel_layer()
if not channel_layer:
return

async_to_sync(channel_layer.group_send)(
"compute_workers",
{
"type": "worker.health",
"worker": payload,
},
)


@app.task(queue="site-worker", soft_time_limit=120)
def refresh_compute_worker_health():
celery_app = app
r = get_redis_connection("default")
known_queue_names = known_compute_queue_names()
broker_sources = []
broker_sources.append(("default", celery_app.conf.broker_url, celery_app))

private_queues = (
Queue.objects.filter(competitions__isnull=False)
.exclude(name__isnull=True)
.exclude(name="")
.distinct()
)
for queue in private_queues:
if not queue.broker_url:
continue
parsed = urllib.parse.urlparse(queue.broker_url)
vhost = parsed.path
broker_url = urllib.parse.urljoin(celery_app.conf.broker_url, vhost)
broker_sources.append((queue.name, broker_url, app_for_vhost(vhost)))

inspected_brokers = set()
for source_name, broker_url, broker_app in broker_sources:
if broker_url in inspected_brokers:
continue
inspected_brokers.add(broker_url)

try:
# timeout=5 : 4 appels × 5s × N brokers
inspector = broker_app.control.inspect(timeout=5)
if inspector is None:
logger.warning(
"Celery inspect returned None for broker=%s", source_name
)
continue
stats = inspector.stats() or {}
active = inspector.active() or {}
reserved = inspector.reserved() or {}
active_queues = inspector.active_queues() or {}
except Exception:
logger.exception(
"Unable to inspect Celery workers for broker %s", source_name
)
continue

for worker_name in stats.keys():
queues = active_queues.get(worker_name, []) or []
queue_names = extract_queue_names(queues)
if not is_compute_worker(worker_name, queue_names, known_queue_names):
continue

running_jobs = len(active.get(worker_name, [])) + len(
reserved.get(worker_name, [])
)
status = "busy" if running_jobs > 0 else "available"
payload = {
"hostname": worker_name,
"status": status,
"running_jobs": running_jobs,
"timestamp": now().timestamp(),
"queue_source": source_name,
"queue_names": sorted(queue_names),
}
heartbeat_key = f"worker:{source_name}:{worker_name}:heartbeat"
r.set(heartbeat_key, json.dumps(payload), ex=WORKER_HEARTBEAT_TTL)
r.hset(
WORKERS_REGISTRY_KEY,
f"{source_name}:{worker_name}",
json.dumps(
{
"hostname": worker_name,
"status": status,
"running_jobs": running_jobs,
"last_seen": payload["timestamp"],
"queue_source": source_name,
"queue_names": sorted(queue_names),
}
),
)
_broadcast_worker_state(payload)
# Logs about CW health HERE
# logger.info(
# "[WORKER-HEALTH] source=%s worker=%s status=%s jobs=%d queues=%s",
# source_name,
# worker_name,
# status,
# running_jobs,
# sorted(queue_names),
# )
18 changes: 12 additions & 6 deletions src/celery_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import copy
import urllib.parse

from celery import Celery
from kombu import Queue, Exchange
from django.conf import settings
import urllib.parse
import copy
from kombu import Exchange, Queue

app = Celery()

from django.conf import settings # noqa

app.config_from_object('django.conf:settings', namespace='CELERY')
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.task_queues = [
# Mostly defining queue here so we can set x-max-priority
Queue('compute-worker', Exchange('compute-worker'), routing_key='compute-worker', queue_arguments={'x-max-priority': 10}),
Queue(
"compute-worker",
Exchange("compute-worker"),
routing_key="compute-worker",
queue_arguments={"x-max-priority": 10},
),
]

_vhost_apps = {}
Expand All @@ -32,7 +38,7 @@ def app_for_vhost(vhost):
# Copy the settings so we can modify the broker url to include the vhost
django_settings = copy.copy(settings)
django_settings.CELERY_BROKER_URL = broker_url
vhost_app.config_from_object(django_settings, namespace='CELERY')
vhost_app.config_from_object(django_settings, namespace="CELERY")
vhost_app.conf.task_queues = app.conf.task_queues
_vhost_apps[vhost] = vhost_app
return _vhost_apps[vhost]
3 changes: 3 additions & 0 deletions src/routing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from django.urls import re_path
from apps.competitions.consumers import SubmissionIOConsumer, SubmissionOutputConsumer
from utils.consumers import ComputeWorkersConsumer


websocket_urlpatterns = [
re_path(r'submission_input/(?P<user_pk>\d+)/(?P<submission_id>\d+)/(?P<secret>[^/]+)/$', SubmissionIOConsumer.as_asgi()),
re_path(r'submission_output/$', SubmissionOutputConsumer.as_asgi()),
re_path(r"ws/workers/$", ComputeWorkersConsumer.as_asgi()),
]
4 changes: 4 additions & 0 deletions src/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@
'task': 'profiles.tasks.clean_non_activated_users',
'schedule': timedelta(days=1), # Run every 24 hours
},
"refresh_compute_worker_health": {
"task": "competitions.tasks.refresh_compute_worker_health",
"schedule": 60,
},
}
CELERY_TIMEZONE = 'UTC'
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Expand Down
7 changes: 7 additions & 0 deletions src/static/riot/competitions/detail/_header.tag
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
<button class="ui small button" onclick="{show_modal.bind(this, '.migration.modal')}">
Migrate
</button>

<worker-monitor-toggle
if="{competition.admin}"
can_view_workers_panel="true"
competition_id="{ competition.id }">
</worker-monitor-toggle>

</div>
<div class="row">
<div class="column">
Expand Down
2 changes: 1 addition & 1 deletion src/static/riot/competitions/detail/detail.tag
Original file line number Diff line number Diff line change
Expand Up @@ -645,4 +645,4 @@
}
}
</style>
</competition-detail>
</competition-detail>
Loading