Skip to content

Commit 029f710

Browse files
authored
Hotfix/celery import issues (#705)
1 parent 161afb4 commit 029f710

3 files changed

Lines changed: 36 additions & 38 deletions

File tree

backend/app/celery/celery_app.py

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import importlib
21
import logging
32

43
from celery import Celery
@@ -9,34 +8,6 @@
98

109
logger = logging.getLogger(__name__)
1110

12-
# All modules referenced as function_path in execute_high/low_priority_task calls.
13-
# Pre-importing these at worker process startup eliminates the 2-5s cold-import
14-
# penalty on the first task execution.
15-
_JOB_MODULES = [
16-
"app.services.llm.jobs",
17-
"app.services.response.jobs",
18-
"app.services.doctransform.job",
19-
"app.services.collections.create_collection",
20-
"app.services.collections.delete_collection",
21-
"app.services.stt_evaluations.batch_job",
22-
"app.services.tts_evaluations.batch_job",
23-
"app.services.tts_evaluations.batch_result_processing",
24-
"app.services.stt_evaluations.metric_job",
25-
]
26-
27-
28-
@worker_process_init.connect
29-
def warmup_job_modules(sender, **kwargs: object) -> None:
30-
"""Pre-import all job modules so the first task execution is not delayed by cold imports."""
31-
for module_path in _JOB_MODULES:
32-
try:
33-
importlib.import_module(module_path)
34-
logger.debug(f"[warmup_job_modules] Pre-imported {module_path}")
35-
except Exception as exc:
36-
logger.warning(
37-
f"[warmup_job_modules] Failed to pre-import {module_path}: {exc}"
38-
)
39-
4011

4112
# Create Celery instance
4213
celery_app = Celery(

backend/app/celery/tasks/job_execution.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,38 @@
11
import logging
2-
import importlib
2+
from collections.abc import Callable
33
from celery import current_task
44
from asgi_correlation_id import correlation_id
55

66
from app.celery.celery_app import celery_app
7+
import app.services.llm.jobs as _llm_jobs
8+
import app.services.response.jobs as _response_jobs
9+
import app.services.doctransform.job as _doctransform_job
10+
import app.services.collections.create_collection as _create_collection
11+
import app.services.collections.delete_collection as _delete_collection
12+
import app.services.stt_evaluations.batch_job as _stt_batch_job
13+
import app.services.stt_evaluations.metric_job as _stt_metric_job
14+
import app.services.tts_evaluations.batch_job as _tts_batch_job
15+
import app.services.tts_evaluations.batch_result_processing as _tts_result_processing
716

817
logger = logging.getLogger(__name__)
918

19+
# Hardcoded dispatch table — avoids dynamic importlib at task execution time.
20+
# Imports above happen once in the main Celery process before worker forks,
21+
# so all child workers inherit them via copy-on-write instead of each loading
22+
# them independently (which was causing OOM with warmup_job_modules).
23+
_FUNCTION_REGISTRY: dict[str, Callable] = {
24+
"app.services.llm.jobs.execute_job": _llm_jobs.execute_job,
25+
"app.services.llm.jobs.execute_chain_job": _llm_jobs.execute_chain_job,
26+
"app.services.response.jobs.execute_job": _response_jobs.execute_job,
27+
"app.services.doctransform.job.execute_job": _doctransform_job.execute_job,
28+
"app.services.collections.create_collection.execute_job": _create_collection.execute_job,
29+
"app.services.collections.delete_collection.execute_job": _delete_collection.execute_job,
30+
"app.services.stt_evaluations.batch_job.execute_batch_submission": _stt_batch_job.execute_batch_submission,
31+
"app.services.stt_evaluations.metric_job.execute_metric_computation": _stt_metric_job.execute_metric_computation,
32+
"app.services.tts_evaluations.batch_job.execute_batch_submission": _tts_batch_job.execute_batch_submission,
33+
"app.services.tts_evaluations.batch_result_processing.execute_tts_result_processing": _tts_result_processing.execute_tts_result_processing,
34+
}
35+
1036

1137
@celery_app.task(bind=True, queue="high_priority")
1238
def execute_high_priority_task(
@@ -85,10 +111,11 @@ def _execute_job_internal(
85111
logger.info(f"Set correlation ID context: {trace_id} for job {job_id}")
86112

87113
try:
88-
# Dynamically import and resolve the function
89-
module_path, function_name = function_path.rsplit(".", 1)
90-
module = importlib.import_module(module_path)
91-
execute_function = getattr(module, function_name)
114+
execute_function = _FUNCTION_REGISTRY.get(function_path)
115+
if execute_function is None:
116+
raise ValueError(
117+
f"[_execute_job_internal] Unknown function path: {function_path}"
118+
)
92119

93120
logger.info(
94121
f"Executing {priority} job {job_id} (task {task_id}) using function {function_path}"

backend/app/celery/utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@
77
from celery.result import AsyncResult
88

99
from app.celery.celery_app import celery_app
10-
from app.celery.tasks.job_execution import (
11-
execute_high_priority_task,
12-
execute_low_priority_task,
13-
)
1410

1511
logger = logging.getLogger(__name__)
1612

@@ -31,6 +27,8 @@ def start_high_priority_job(
3127
Returns:
3228
Celery task ID (different from job_id)
3329
"""
30+
from app.celery.tasks.job_execution import execute_high_priority_task
31+
3432
task = execute_high_priority_task.delay(
3533
function_path=function_path,
3634
project_id=project_id,
@@ -59,6 +57,8 @@ def start_low_priority_job(
5957
Returns:
6058
Celery task ID (different from job_id)
6159
"""
60+
from app.celery.tasks.job_execution import execute_low_priority_task
61+
6262
task = execute_low_priority_task.delay(
6363
function_path=function_path,
6464
project_id=project_id,

0 commit comments

Comments
 (0)