diff --git a/src/kernel_ci_cloud_labs/pull_labs_poller.py b/src/kernel_ci_cloud_labs/pull_labs_poller.py index 572fd7d..f5c2690 100644 --- a/src/kernel_ci_cloud_labs/pull_labs_poller.py +++ b/src/kernel_ci_cloud_labs/pull_labs_poller.py @@ -13,9 +13,12 @@ Long-lived service (or one-shot job) that: 1. Polls kernelci-api /events for new pull-lab jobs. - 2. Fetches each job's PULL_LABS job_definition JSON. - 3. Translates it into a pullab_cloud run config and runs the pipeline. - 4. Submits per-test results directly to KCIDB. + 2. Claims each job node (state=running) so other pollers skip it. + 3. Fetches each job's PULL_LABS job_definition JSON. + 4. Translates it into a pullab_cloud run config and runs the pipeline. + 5. Submits per-test results directly to KCIDB. + 6. Marks the job node done in kernelci-api (state=done + result, plus + data.error_code / data.error_msg on an infrastructure failure). Generic Python only — uses stdlib urllib for HTTP, supports env-var and config-file configuration, can be invoked as a CLI, a long-running @@ -35,6 +38,7 @@ import urllib.error import urllib.parse import urllib.request +from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Tuple from kernel_ci_cloud_labs.kcidb_submit import ( @@ -116,6 +120,27 @@ def _http_get_json(url: str, token: Optional[str] = None, timeout: float = 30.0) return json.loads(body) if body else None +def _http_put_json( + url: str, + payload: Any, + token: Optional[str] = None, + timeout: float = 30.0, +) -> Any: + """PUT a JSON body; return the parsed JSON response (or None). + + Raises urllib.error.URLError / HTTPError on transport or HTTP errors, + mirroring _http_get_json so callers can handle both with one except. + """ + headers = {"Content-Type": "application/json", "Accept": "application/json"} + if token: + headers["Authorization"] = f"Bearer {token}" + body = json.dumps(payload).encode("utf-8") + req = urllib.request.Request(url, data=body, method="PUT", headers=headers) + with urllib.request.urlopen(req, timeout=timeout) as resp: + resp_body = resp.read().decode("utf-8", errors="replace") + return json.loads(resp_body) if resp_body else None + + # --------------------------------------------------------------------------- # Cursor persistence — generic filesystem backend by default. # A deployment can swap in a custom CursorStore (e.g. backed by S3) by @@ -270,6 +295,45 @@ def _extract_test_results(summary: Dict[str, Any]) -> Tuple[List[Dict[str, Any]] return rows, None +def _node_result_from_rows(test_rows: List[Dict[str, Any]]) -> str: + """Derive a kernelci-api node result for a job that actually ran. + + "incomplete" is reserved for infrastructure failures and is decided by + the caller -- this never returns it. Any non-passing test status + (FAIL/ERROR/MISS) fails the node. + """ + statuses = {row.get("status") for row in test_rows} + if statuses & {"FAIL", "ERROR", "MISS"}: + return "fail" + if statuses & {"PASS", "DONE"}: + return "pass" + if "SKIP" in statuses: + return "skip" + return "fail" + + +# kernelci-api Node.data.error_code values (a subset of the kernelci +# ErrorCodes enum in kernelci/api/models.py). Per that enum's docstring, +# error_code is set when an infrastructure error occurs; "Infrastructure" +# is the generic catch-all and "invalid_job_params" flags a bad job. +_ERR_INFRASTRUCTURE = "Infrastructure" +_ERR_INVALID_JOB_PARAMS = "invalid_job_params" + + +@dataclass +class NodeOutcome: + """How a job node should be finished in kernelci-api. + + *error_code* / *error_msg* go into the node's ``data`` and are set only + on an infrastructure failure (result == "incomplete"), matching the + kernelci-pipeline scheduler convention. + """ + + result: str + error_code: Optional[str] = None + error_msg: Optional[str] = None + + # --------------------------------------------------------------------------- # Main poller class. # --------------------------------------------------------------------------- @@ -443,10 +507,94 @@ def resolve_build_id(self, node: Dict[str, Any]) -> Optional[str]: return None return None + # -- Node state updates --------------------------------------------- + + def _node_url(self, node_id: str) -> str: + return f"{self.api_base_uri.rstrip('/')}/node/{node_id}" + + def _claim_node(self, node: Dict[str, Any]) -> bool: + """Claim a job node by transitioning it to state=running. + + Re-reads the node first: if it is no longer "available", another + poller has already taken it, so we skip it. This narrows -- but, + without an atomic compare-and-set in kernelci-api, cannot fully + close -- the window for two pollers claiming the same job. + + Returns True only if this poller now owns the node. + """ + node_id = node.get("id") + if not node_id: + logger.warning("Cannot claim node without an id") + return False + url = self._node_url(node_id) + try: + current = _http_get_json(url, token=self.api_token) or {} + except (urllib.error.URLError, json.JSONDecodeError) as e: + logger.error("Could not re-read node %s before claim: %s", node_id, e) + return False + state = current.get("state") + if state != "available": + logger.info("Skipping node %s: already claimed (state=%s)", node_id, state) + return False + current["state"] = "running" + try: + _http_put_json(url, current, token=self.api_token) + except (urllib.error.URLError, json.JSONDecodeError) as e: + logger.error("Failed to claim node %s (PUT state=running): %s", node_id, e) + return False + logger.info("Claimed node %s (state=running)", node_id) + return True + + def _finish_node(self, node_id: str, outcome: NodeOutcome) -> bool: + """Mark a claimed job node done with the given outcome. + + Sets state=done and result; on an infrastructure failure also sets + data.error_code / data.error_msg (kernelci ErrorCodes values), + matching the kernelci-pipeline scheduler. A failure here is logged + but not fatal -- the job already ran and its results were submitted + to KCIDB. + """ + url = self._node_url(node_id) + try: + current = _http_get_json(url, token=self.api_token) or {} + except (urllib.error.URLError, json.JSONDecodeError) as e: + logger.error("Could not re-read node %s before finish: %s", node_id, e) + return False + current["state"] = "done" + current["result"] = outcome.result + if outcome.error_code: + # error_code/error_msg live in node.data (kernelci JobData), not + # at the top level. + data = current.get("data") or {} + data["error_code"] = outcome.error_code + data["error_msg"] = outcome.error_msg + current["data"] = data + try: + _http_put_json(url, current, token=self.api_token) + except (urllib.error.URLError, json.JSONDecodeError) as e: + logger.error( + "Failed to finish node %s (PUT state=done result=%s): %s", + node_id, outcome.result, e, + ) + return False + logger.info( + "Finished node %s (state=done, result=%s%s)", + node_id, outcome.result, + f", error_code={outcome.error_code}" if outcome.error_code else "", + ) + return True + # -- Per-event processing ------------------------------------------- def process_event(self, event: Dict[str, Any]) -> bool: - """Process one event end to end. Returns True on success.""" + """Process one event end to end. Returns True on success. + + The job node is claimed (state=running) before any work starts and + finished (state=done + result, plus error_code/error_msg on an + infrastructure failure) afterwards, whatever the outcome. A node we + fail to claim -- already taken, or an API error -- is skipped + without being run or submitted. + """ node = event.get("node") or {} node_id = node.get("id") @@ -463,13 +611,38 @@ def process_event(self, event: Dict[str, Any]) -> bool: logger.debug("Skipping event %s: no job_definition artifact", node_id) return True + if not self._claim_node(node): + return True + logger.info("Processing pull-lab job node=%s definition=%s", node_id, jobdef_url) + # We own the node now: it must be finished whatever happens below. + # This default covers an unexpected crash inside _execute_job. + node_outcome = NodeOutcome( + "incomplete", _ERR_INFRASTRUCTURE, "unexpected internal error" + ) + try: + ok, node_outcome = self._execute_job(node, node_id, jobdef_url) + return ok + finally: + self._finish_node(node_id, node_outcome) + + def _execute_job( + self, node: Dict[str, Any], node_id: str, jobdef_url: str + ) -> Tuple[bool, NodeOutcome]: + """Fetch, translate, run and submit one already-claimed job. + + Returns (ok, outcome): *ok* is False on a recoverable failure; + *outcome* is the NodeOutcome passed to _finish_node(). + """ try: jobdef = _http_get_json(jobdef_url, token=self.api_token) except (urllib.error.URLError, json.JSONDecodeError) as e: logger.error("Failed to fetch job_definition for %s: %s", node_id, e) - return False + return False, NodeOutcome( + "incomplete", _ERR_INFRASTRUCTURE, + f"failed to fetch job_definition: {e}", + ) build_id = self.resolve_build_id(node) if not build_id: @@ -484,14 +657,22 @@ def process_event(self, event: Dict[str, Any]) -> bool: run_config = translate_job(jobdef, self.base_config, node_id=node_id) except ValueError as e: logger.error("Translation failed for node %s: %s", node_id, e) - return False + return False, NodeOutcome( + "incomplete", _ERR_INVALID_JOB_PARAMS, + f"job translation failed: {e}", + ) + infra_error: Optional[NodeOutcome] = None try: per_test, log_url = self.job_executor(run_config) except Exception as e: # pylint: disable=broad-exception-caught logger.error("Job execution failed for node %s: %s", node_id, e, exc_info=True) - # Submit an ERROR row so KCIDB sees we picked it up. The boot. - # prefix makes the dashboard classify it as a (failed) boot test. + # An executor crash is an infrastructure failure: emit an ERROR + # row so KCIDB sees we picked it up, and mark the node incomplete. + # The boot. prefix has the dashboard classify it as a failed boot. + infra_error = NodeOutcome( + "incomplete", _ERR_INFRASTRUCTURE, f"job execution failed: {e}" + ) per_test = [{"name": "boot.infrastructure", "status": "ERROR"}] log_url = None @@ -509,6 +690,12 @@ def process_event(self, event: Dict[str, Any]) -> bool: for idx, t in enumerate(per_test or []) ] if not test_rows: + # No per-test results came back -> the outcome is unknown, itself + # an infrastructure failure (-> node result incomplete). + infra_error = NodeOutcome( + "incomplete", _ERR_INFRASTRUCTURE, + "executor returned no per-test results", + ) test_rows = [ build_test_row( origin=self.kcidb_origin, @@ -526,6 +713,10 @@ def process_event(self, event: Dict[str, Any]) -> bool: ) ] + # error_code + "incomplete" only on an infrastructure failure; a job + # that actually ran is pass/fail/skip from its tests. Independent of + # whether the KCIDB submission below succeeds. + outcome = infra_error or NodeOutcome(_node_result_from_rows(test_rows)) try: submit_tests( self.kcidb_submit_url, @@ -536,9 +727,9 @@ def process_event(self, event: Dict[str, Any]) -> bool: ) except urllib.error.URLError as e: logger.error("KCIDB submit failed for node %s: %s", node_id, e) - return False + return False, outcome logger.info("Submitted %d test row(s) for node %s", len(test_rows), node_id) - return True + return True, outcome # -- Loop ----------------------------------------------------------- diff --git a/tests/test_pull_labs_poller.py b/tests/test_pull_labs_poller.py index d349d58..1e4a1b5 100644 --- a/tests/test_pull_labs_poller.py +++ b/tests/test_pull_labs_poller.py @@ -8,6 +8,7 @@ import json import os import tempfile +import urllib.error from unittest.mock import patch import pytest @@ -16,12 +17,17 @@ from kernel_ci_cloud_labs.pull_labs_poller import ( DEFAULT_FROM_TIMESTAMP, FileCursorStore, + NodeOutcome, PullLabsPoller, _extract_test_results, + _node_result_from_rows, _parse_kcidb_rest, _test_name_to_path, ) +_GET = "kernel_ci_cloud_labs.pull_labs_poller._http_get_json" +_PUT = "kernel_ci_cloud_labs.pull_labs_poller._http_put_json" + # Capture the real validator at import time so a specific test can restore it # after the autouse fixture has stubbed it out. _REAL_VALIDATE_DEFAULT_EXECUTOR_DEPS = poller_mod._validate_default_executor_deps @@ -315,6 +321,198 @@ def test_other_names_pass_through(self, name): assert _test_name_to_path(name) == name +# --------------------------------------------------------------------------- +# Node state updates — claim (state=running) and finish (state=done + result) +# --------------------------------------------------------------------------- + + +class TestNodeResultFromRows: + """_node_result_from_rows() maps KCIDB statuses to a node result. + + It never returns "incomplete" -- that is reserved for infrastructure + failures and decided by the caller (see TestProcessEventNodeResult). + """ + + @pytest.mark.parametrize( + "statuses,expected", + [ + (["PASS"], "pass"), + (["PASS", "PASS"], "pass"), + (["DONE"], "pass"), + (["PASS", "FAIL"], "fail"), + (["FAIL", "ERROR"], "fail"), + # ERROR/MISS in a job that ran fail the node -- they do NOT make + # it "incomplete" (that is infrastructure-failure only). + (["PASS", "ERROR"], "fail"), + (["MISS"], "fail"), + (["SKIP"], "skip"), + ], + ) + def test_result_mapping(self, statuses, expected): + rows = [{"status": s} for s in statuses] + assert _node_result_from_rows(rows) == expected + + def test_never_returns_incomplete(self): + for statuses in (["PASS"], ["FAIL"], ["ERROR"], ["MISS"], ["SKIP"], []): + rows = [{"status": s} for s in statuses] + assert _node_result_from_rows(rows) != "incomplete" + + +class TestNodeStateUpdates: + """_claim_node() / _finish_node() PUT node state to kernelci-api.""" + + def test_claim_available_node_puts_running(self): + p = PullLabsPoller(_minimal_kc()) + puts = [] + with patch(_GET, return_value={"id": "n1", "state": "available"}), \ + patch(_PUT, side_effect=lambda url, payload, **kw: puts.append((url, payload))): + assert p._claim_node({"id": "n1"}) is True + assert len(puts) == 1 + assert puts[0][0].endswith("/node/n1") + assert puts[0][1]["state"] == "running" + + def test_claim_skips_already_claimed_node(self): + p = PullLabsPoller(_minimal_kc()) + with patch(_GET, return_value={"id": "n1", "state": "running"}), \ + patch(_PUT) as put: + assert p._claim_node({"id": "n1"}) is False + put.assert_not_called() + + def test_claim_skips_on_get_error(self): + p = PullLabsPoller(_minimal_kc()) + with patch(_GET, side_effect=urllib.error.URLError("boom")): + assert p._claim_node({"id": "n1"}) is False + + def test_claim_skips_on_put_error(self): + p = PullLabsPoller(_minimal_kc()) + with patch(_GET, return_value={"id": "n1", "state": "available"}), \ + patch(_PUT, side_effect=urllib.error.URLError("boom")): + assert p._claim_node({"id": "n1"}) is False + + def test_claim_skips_node_without_id(self): + p = PullLabsPoller(_minimal_kc()) + assert p._claim_node({}) is False + + def test_finish_node_puts_done_and_result(self): + p = PullLabsPoller(_minimal_kc()) + puts = [] + with patch(_GET, return_value={"id": "n1", "state": "running"}), \ + patch(_PUT, side_effect=lambda url, payload, **kw: puts.append(payload)): + assert p._finish_node("n1", NodeOutcome("pass")) is True + assert puts[0]["state"] == "done" + assert puts[0]["result"] == "pass" + # No error_code/error_msg for a clean (non-infra) result. + assert "error_code" not in puts[0].get("data", {}) + + def test_finish_node_sets_error_code_on_infra_failure(self): + p = PullLabsPoller(_minimal_kc()) + puts = [] + with patch(_GET, return_value={"id": "n1", "state": "running", "data": {}}), \ + patch(_PUT, side_effect=lambda url, payload, **kw: puts.append(payload)): + ok = p._finish_node( + "n1", NodeOutcome("incomplete", "Infrastructure", "vm did not boot") + ) + assert ok is True + assert puts[0]["result"] == "incomplete" + # error_code/error_msg go into node.data, not the top level. + assert puts[0]["data"]["error_code"] == "Infrastructure" + assert puts[0]["data"]["error_msg"] == "vm did not boot" + + def test_process_event_skips_unclaimable_job(self): + # A job we cannot claim must not be run or submitted. + executor_calls = [] + p = PullLabsPoller( + _minimal_kc(), + job_executor=lambda cfg: (executor_calls.append(cfg), ([], None))[1], + ) + event = { + "node": { + "id": "n1", + "data": {"runtime": "pull-labs-aws-ec2"}, + "artifacts": {"job_definition": "https://x/y.json"}, + } + } + with patch.object(p, "_claim_node", return_value=False), \ + patch.object(p, "_finish_node") as finish: + assert p.process_event(event) is True + assert executor_calls == [] + finish.assert_not_called() + + +def _job_event(node_id="n1"): + """A minimal claimable job event whose node resolves its own build_id.""" + return { + "node": { + "id": node_id, + "kind": "kbuild", # resolve_build_id returns directly, no HTTP + "data": {"runtime": "pull-labs-aws-ec2"}, + "artifacts": {"job_definition": "https://x/y.json"}, + } + } + + +class TestProcessEventNodeResult: + """process_event() finishes the node; "incomplete" means infra failure.""" + + def _run(self, poller, event, translate=None): + translate = translate or {"return_value": {}} + captured = {} + with patch.object(poller, "_claim_node", return_value=True), \ + patch.object( + poller, "_finish_node", + side_effect=lambda nid, outcome: captured.update(outcome=outcome), + ), \ + patch(_GET, return_value={"artifacts": {}}), \ + patch("kernel_ci_cloud_labs.pull_labs_poller.translate_job", **translate), \ + patch("kernel_ci_cloud_labs.pull_labs_poller.submit_tests", return_value={}): + poller.process_event(event) + return captured["outcome"] + + def test_passing_run_finishes_pass(self): + p = PullLabsPoller( + _minimal_kc(), + job_executor=lambda cfg: ([{"name": "ltp", "status": "PASS"}], None), + ) + outcome = self._run(p, _job_event()) + assert outcome.result == "pass" + assert outcome.error_code is None + + def test_failing_run_finishes_fail(self): + p = PullLabsPoller( + _minimal_kc(), + job_executor=lambda cfg: ([{"name": "ltp", "status": "FAIL"}], None), + ) + outcome = self._run(p, _job_event()) + assert outcome.result == "fail" + assert outcome.error_code is None + + def test_executor_crash_finishes_incomplete_infrastructure(self): + def _boom(cfg): + raise RuntimeError("vm did not boot") + + p = PullLabsPoller(_minimal_kc(), job_executor=_boom) + outcome = self._run(p, _job_event()) + assert outcome.result == "incomplete" + assert outcome.error_code == "Infrastructure" + assert "vm did not boot" in outcome.error_msg + + def test_no_results_finishes_incomplete_infrastructure(self): + p = PullLabsPoller(_minimal_kc(), job_executor=lambda cfg: ([], None)) + outcome = self._run(p, _job_event()) + assert outcome.result == "incomplete" + assert outcome.error_code == "Infrastructure" + + def test_translate_failure_finishes_invalid_job_params(self): + p = PullLabsPoller(_minimal_kc(), job_executor=lambda cfg: ([], None)) + outcome = self._run( + p, _job_event(), + translate={"side_effect": ValueError("missing artifacts.kernel")}, + ) + assert outcome.result == "incomplete" + assert outcome.error_code == "invalid_job_params" + assert "missing artifacts.kernel" in outcome.error_msg + + # --------------------------------------------------------------------------- # Default-executor dependency validation # ---------------------------------------------------------------------------