Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 201 additions & 10 deletions src/kernel_ci_cloud_labs/pull_labs_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@

Long-lived service (or one-shot job) that:
1. Polls kernelci-api /events for new pull-lab jobs.
2. Fetches each job's PULL_LABS job_definition JSON.
3. Translates it into a pullab_cloud run config and runs the pipeline.
4. Submits per-test results directly to KCIDB.
2. Claims each job node (state=running) so other pollers skip it.
3. Fetches each job's PULL_LABS job_definition JSON.
4. Translates it into a pullab_cloud run config and runs the pipeline.
5. Submits per-test results directly to KCIDB.
6. Marks the job node done in kernelci-api (state=done + result, plus
data.error_code / data.error_msg on an infrastructure failure).

Generic Python only — uses stdlib urllib for HTTP, supports env-var and
config-file configuration, can be invoked as a CLI, a long-running
Expand All @@ -35,6 +38,7 @@
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple

from kernel_ci_cloud_labs.kcidb_submit import (
Expand Down Expand Up @@ -116,6 +120,27 @@ def _http_get_json(url: str, token: Optional[str] = None, timeout: float = 30.0)
return json.loads(body) if body else None


def _http_put_json(
url: str,
payload: Any,
token: Optional[str] = None,
timeout: float = 30.0,
) -> Any:
"""PUT a JSON body; return the parsed JSON response (or None).

Raises urllib.error.URLError / HTTPError on transport or HTTP errors,
mirroring _http_get_json so callers can handle both with one except.
"""
headers = {"Content-Type": "application/json", "Accept": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=body, method="PUT", headers=headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
resp_body = resp.read().decode("utf-8", errors="replace")
return json.loads(resp_body) if resp_body else None


# ---------------------------------------------------------------------------
# Cursor persistence — generic filesystem backend by default.
# A deployment can swap in a custom CursorStore (e.g. backed by S3) by
Expand Down Expand Up @@ -270,6 +295,45 @@ def _extract_test_results(summary: Dict[str, Any]) -> Tuple[List[Dict[str, Any]]
return rows, None


def _node_result_from_rows(test_rows: List[Dict[str, Any]]) -> str:
"""Derive a kernelci-api node result for a job that actually ran.

"incomplete" is reserved for infrastructure failures and is decided by
the caller -- this never returns it. Any non-passing test status
(FAIL/ERROR/MISS) fails the node.
"""
statuses = {row.get("status") for row in test_rows}
if statuses & {"FAIL", "ERROR", "MISS"}:
return "fail"
if statuses & {"PASS", "DONE"}:
return "pass"
if "SKIP" in statuses:
return "skip"
return "fail"


# kernelci-api Node.data.error_code values (a subset of the kernelci
# ErrorCodes enum in kernelci/api/models.py). Per that enum's docstring,
# error_code is set when an infrastructure error occurs; "Infrastructure"
# is the generic catch-all and "invalid_job_params" flags a bad job.
_ERR_INFRASTRUCTURE = "Infrastructure"
_ERR_INVALID_JOB_PARAMS = "invalid_job_params"


@dataclass
class NodeOutcome:
"""How a job node should be finished in kernelci-api.

*error_code* / *error_msg* go into the node's ``data`` and are set only
on an infrastructure failure (result == "incomplete"), matching the
kernelci-pipeline scheduler convention.
"""

result: str
error_code: Optional[str] = None
error_msg: Optional[str] = None


# ---------------------------------------------------------------------------
# Main poller class.
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -443,10 +507,94 @@ def resolve_build_id(self, node: Dict[str, Any]) -> Optional[str]:
return None
return None

# -- Node state updates ---------------------------------------------

def _node_url(self, node_id: str) -> str:
return f"{self.api_base_uri.rstrip('/')}/node/{node_id}"

def _claim_node(self, node: Dict[str, Any]) -> bool:
"""Claim a job node by transitioning it to state=running.

Re-reads the node first: if it is no longer "available", another
poller has already taken it, so we skip it. This narrows -- but,
without an atomic compare-and-set in kernelci-api, cannot fully
close -- the window for two pollers claiming the same job.

Returns True only if this poller now owns the node.
"""
node_id = node.get("id")
if not node_id:
logger.warning("Cannot claim node without an id")
return False
url = self._node_url(node_id)
try:
current = _http_get_json(url, token=self.api_token) or {}
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error("Could not re-read node %s before claim: %s", node_id, e)
return False
state = current.get("state")
if state != "available":
logger.info("Skipping node %s: already claimed (state=%s)", node_id, state)
return False
current["state"] = "running"
try:
_http_put_json(url, current, token=self.api_token)
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error("Failed to claim node %s (PUT state=running): %s", node_id, e)
return False
logger.info("Claimed node %s (state=running)", node_id)
return True

def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool:
"""Mark a claimed job node done with the given outcome.

Sets state=done and result; on an infrastructure failure also sets
data.error_code / data.error_msg (kernelci ErrorCodes values),
matching the kernelci-pipeline scheduler. A failure here is logged
but not fatal -- the job already ran and its results were submitted
to KCIDB.
"""
url = self._node_url(node_id)
try:
current = _http_get_json(url, token=self.api_token) or {}
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error("Could not re-read node %s before finish: %s", node_id, e)
return False
current["state"] = "done"
current["result"] = outcome.result
if outcome.error_code:
# error_code/error_msg live in node.data (kernelci JobData), not
# at the top level.
data = current.get("data") or {}
data["error_code"] = outcome.error_code
data["error_msg"] = outcome.error_msg
current["data"] = data
try:
_http_put_json(url, current, token=self.api_token)
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error(
"Failed to finish node %s (PUT state=done result=%s): %s",
node_id, outcome.result, e,
)
return False
logger.info(
"Finished node %s (state=done, result=%s%s)",
node_id, outcome.result,
f", error_code={outcome.error_code}" if outcome.error_code else "",
)
return True

# -- Per-event processing -------------------------------------------

def process_event(self, event: Dict[str, Any]) -> bool:
"""Process one event end to end. Returns True on success."""
"""Process one event end to end. Returns True on success.

The job node is claimed (state=running) before any work starts and
finished (state=done + result, plus error_code/error_msg on an
infrastructure failure) afterwards, whatever the outcome. A node we
fail to claim -- already taken, or an API error -- is skipped
without being run or submitted.
"""
node = event.get("node") or {}
node_id = node.get("id")

Expand All @@ -463,13 +611,38 @@ def process_event(self, event: Dict[str, Any]) -> bool:
logger.debug("Skipping event %s: no job_definition artifact", node_id)
return True

if not self._claim_node(node):
return True

logger.info("Processing pull-lab job node=%s definition=%s", node_id, jobdef_url)

# We own the node now: it must be finished whatever happens below.
# This default covers an unexpected crash inside _execute_job.
node_outcome = NodeOutcome(
"incomplete", _ERR_INFRASTRUCTURE, "unexpected internal error"
)
try:
ok, node_outcome = self._execute_job(node, node_id, jobdef_url)
return ok
finally:
self._finish_node(node_id, node_outcome)

def _execute_job(
self, node: Dict[str, Any], node_id: str, jobdef_url: str
) -> Tuple[bool, NodeOutcome]:
"""Fetch, translate, run and submit one already-claimed job.

Returns (ok, outcome): *ok* is False on a recoverable failure;
*outcome* is the NodeOutcome passed to _finish_node().
"""
try:
jobdef = _http_get_json(jobdef_url, token=self.api_token)
except (urllib.error.URLError, json.JSONDecodeError) as e:
logger.error("Failed to fetch job_definition for %s: %s", node_id, e)
return False
return False, NodeOutcome(
"incomplete", _ERR_INFRASTRUCTURE,
f"failed to fetch job_definition: {e}",
)

build_id = self.resolve_build_id(node)
if not build_id:
Expand All @@ -484,14 +657,22 @@ def process_event(self, event: Dict[str, Any]) -> bool:
run_config = translate_job(jobdef, self.base_config, node_id=node_id)
except ValueError as e:
logger.error("Translation failed for node %s: %s", node_id, e)
return False
return False, NodeOutcome(
"incomplete", _ERR_INVALID_JOB_PARAMS,
f"job translation failed: {e}",
)

infra_error: Optional[NodeOutcome] = None
try:
per_test, log_url = self.job_executor(run_config)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Job execution failed for node %s: %s", node_id, e, exc_info=True)
# Submit an ERROR row so KCIDB sees we picked it up. The boot.
# prefix makes the dashboard classify it as a (failed) boot test.
# An executor crash is an infrastructure failure: emit an ERROR
# row so KCIDB sees we picked it up, and mark the node incomplete.
# The boot. prefix has the dashboard classify it as a failed boot.
infra_error = NodeOutcome(
"incomplete", _ERR_INFRASTRUCTURE, f"job execution failed: {e}"
)
per_test = [{"name": "boot.infrastructure", "status": "ERROR"}]
log_url = None

Expand All @@ -509,6 +690,12 @@ def process_event(self, event: Dict[str, Any]) -> bool:
for idx, t in enumerate(per_test or [])
]
if not test_rows:
# No per-test results came back -> the outcome is unknown, itself
# an infrastructure failure (-> node result incomplete).
infra_error = NodeOutcome(
"incomplete", _ERR_INFRASTRUCTURE,
"executor returned no per-test results",
)
test_rows = [
build_test_row(
origin=self.kcidb_origin,
Expand All @@ -526,6 +713,10 @@ def process_event(self, event: Dict[str, Any]) -> bool:
)
]

# error_code + "incomplete" only on an infrastructure failure; a job
# that actually ran is pass/fail/skip from its tests. Independent of
# whether the KCIDB submission below succeeds.
outcome = infra_error or NodeOutcome(_node_result_from_rows(test_rows))
try:
submit_tests(
self.kcidb_submit_url,
Expand All @@ -536,9 +727,9 @@ def process_event(self, event: Dict[str, Any]) -> bool:
)
except urllib.error.URLError as e:
logger.error("KCIDB submit failed for node %s: %s", node_id, e)
return False
return False, outcome
logger.info("Submitted %d test row(s) for node %s", len(test_rows), node_id)
return True
return True, outcome

# -- Loop -----------------------------------------------------------

Expand Down
Loading
Loading