From 0c114d14bb86e4b1c6c6381d60b5faef3a0d311d Mon Sep 17 00:00:00 2001 From: "Justin J. Novack" Date: Fri, 1 May 2026 21:47:55 -0400 Subject: [PATCH] feat: add ansible parsing --- code_review_graph/parser.py | 572 +++++++++++++++++- .../playbooks/sample_ansible_playbook.yml | 92 +++ tests/fixtures/roles/myrole/meta/main.yml | 5 + tests/fixtures/tasks/sample_ansible_tasks.yml | 44 ++ tests/test_multilang.py | 216 +++++++ 5 files changed, 928 insertions(+), 1 deletion(-) create mode 100644 tests/fixtures/playbooks/sample_ansible_playbook.yml create mode 100644 tests/fixtures/roles/myrole/meta/main.yml create mode 100644 tests/fixtures/tasks/sample_ansible_tasks.yml diff --git a/code_review_graph/parser.py b/code_review_graph/parser.py index f681263a..bd75b437 100644 --- a/code_review_graph/parser.py +++ b/code_review_graph/parser.py @@ -16,6 +16,15 @@ import tree_sitter_language_pack as tslp +try: + import yaml as _yaml # type: ignore[import-untyped] + from yaml import MappingNode as _YamlMapping + from yaml import ScalarNode as _YamlScalar + from yaml import SequenceNode as _YamlSequence +except ImportError: + _yaml = None # type: ignore[assignment] + _YamlMapping = _YamlSequence = _YamlScalar = None # type: ignore[assignment,misc] + from .tsconfig_resolver import TsconfigResolver @@ -125,8 +134,57 @@ class EdgeInfo: ".res": "rescript", ".resi": "rescript", ".gd": "gdscript", + ".yml": "yaml", + ".yaml": "yaml", } +# --------------------------------------------------------------------------- +# Ansible YAML constants +# --------------------------------------------------------------------------- + +# Path components that strongly suggest an Ansible project layout +_ANSIBLE_PATH_COMPONENTS: frozenset[str] = frozenset({ + "playbooks", "roles", "tasks", "handlers", "group_vars", "host_vars", +}) + +# Common top-level playbook filenames (still require content confirmation) +_ANSIBLE_PLAYBOOK_NAMES: frozenset[str] = frozenset({ + "site.yml", "site.yaml", "main.yml", "main.yaml", + "install.yml", "install.yaml", "deploy.yml", "deploy.yaml", +}) + +# Play-level keys that are ONLY valid in Ansible plays. +# `hosts:` alone is not sufficient to identify a play — require at least one of these. +_ANSIBLE_PLAY_KEYS: frozenset[str] = frozenset({ + "tasks", "handlers", "pre_tasks", "post_tasks", "roles", + "gather_facts", "become", "become_user", "become_method", + "serial", "strategy", "vars_files", "vars_prompt", + "any_errors_fatal", "max_fail_percentage", "ignore_errors", +}) + +# Bare module names for content sniffing; also used after FQCN prefix strip +_ANSIBLE_MODULE_KEYS: frozenset[str] = frozenset({ + "apt", "yum", "dnf", "package", "pip", "copy", "template", "file", + "service", "systemd", "command", "shell", "raw", "git", "user", "stat", + "include_tasks", "import_tasks", "include_role", "import_role", + "set_fact", "debug", "fail", "assert", "wait_for", "pause", + "lineinfile", "blockinfile", "get_url", "uri", "unarchive", + "add_host", "group_by", "include_vars", +}) + +# Task mapping keys that are metadata, NOT module invocations +_TASK_META_KEYS: frozenset[str] = frozenset({ + "name", "when", "loop", "loop_control", + "with_items", "with_first_found", "with_fileglob", "with_dict", + "with_subelements", "with_nested", "with_sequence", "with_indexed_items", + "register", "notify", "tags", "become", "become_user", "become_method", + "ignore_errors", "vars", "no_log", "check_mode", "environment", + "any_errors_fatal", "run_once", "delegate_to", "delegate_facts", + "block", "rescue", "always", + "changed_when", "failed_when", "retries", "delay", "until", + "listen", "connection", "timeout", +}) + # Tree-sitter node type mappings per language # Maps (language) -> dict of semantic role -> list of TS node types _CLASS_TYPES: dict[str, list[str]] = { @@ -611,6 +669,116 @@ def file_hash(path: Path) -> str: return hashlib.sha256(path.read_bytes()).hexdigest() +# --------------------------------------------------------------------------- +# Ansible YAML helpers (module-level so tests can import them directly) +# --------------------------------------------------------------------------- + + +def _is_ansible_path(path: Path) -> bool: + """Return True if the path suggests an Ansible YAML file by directory convention.""" + parts = {p.lower() for p in path.parts} + return bool(parts & _ANSIBLE_PATH_COMPONENTS) or path.name.lower() in _ANSIBLE_PLAYBOOK_NAMES + + +def _is_ansible_content(source: bytes) -> bool: + """Lightweight byte-scan: does this YAML look like an Ansible file? + + Checks that the file is a top-level list and contains at least one + Ansible-specific structural marker (hosts, tasks, handlers, import_playbook, + or a known module key / FQCN pattern). + """ + try: + text = source.decode("utf-8", errors="replace") + except Exception: + return False + for line in text.splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#") or stripped == "---": + continue # skip blank lines, comments, and YAML document markers + if not (line.startswith("- ") or line == "-"): + return False + break + else: + return False + has_hosts = bool(re.search(r"^\s+hosts\s*:", text, re.MULTILINE)) + has_tasks = bool(re.search(r"^\s+tasks\s*:", text, re.MULTILINE)) + has_handlers = bool(re.search(r"^\s+handlers\s*:", text, re.MULTILINE)) + has_import_pb = bool(re.search(r"^\s+import_playbook\s*:", text, re.MULTILINE)) + has_name = bool(re.search(r"^\s+-?\s*name\s*:", text, re.MULTILINE)) + has_module = any( + re.search(rf"^\s+{re.escape(k)}\s*:", text, re.MULTILINE) + for k in _ANSIBLE_MODULE_KEYS + ) or bool(re.search(r"^\s+ansible\.\w+\.\w+\s*:", text, re.MULTILINE)) + return has_hosts or has_import_pb or has_tasks or has_handlers or (has_name and has_module) + + +def _ansible_file_type(path: Path) -> str: + """Classify an Ansible file by path convention. + + Returns one of: 'playbook', 'tasks', 'handlers', 'meta', 'vars', 'unknown'. + """ + parts_lower = [p.lower() for p in path.parts] + name_lower = path.name.lower() + if "meta" in parts_lower and name_lower in ("main.yml", "main.yaml"): + return "meta" + if "handlers" in parts_lower: + return "handlers" + if "tasks" in parts_lower: + return "tasks" + if any(p in parts_lower for p in ("group_vars", "host_vars", "vars", "defaults")): + return "vars" + if "playbooks" in parts_lower or name_lower in _ANSIBLE_PLAYBOOK_NAMES: + return "playbook" + return "unknown" + + +def _ansible_fqcn_short(key: str) -> str: + """Strip FQCN prefix: 'ansible.builtin.include_tasks' → 'include_tasks'.""" + return key.rsplit(".", 1)[-1] + + +def _yaml_line(node: object) -> int: + return node.start_mark.line + 1 # type: ignore[attr-defined] + + +def _yaml_end_line(node: object) -> int: + return node.end_mark.line + 1 # type: ignore[attr-defined] + + +def _yaml_get_key(mapping_node: object, key: str) -> Optional[object]: + for k_node, v_node in mapping_node.value: # type: ignore[attr-defined] + if isinstance(k_node, _YamlScalar) and k_node.value == key: + return v_node + return None + + +def _yaml_scalar(node: object) -> Optional[str]: + if isinstance(node, _YamlScalar): + return node.value # type: ignore[attr-defined] + return None + + +def _ansible_is_play_item(item: object) -> bool: + """True if this top-level sequence item is a definitive Ansible play or playbook import. + + Requires EITHER: + - ``import_playbook:`` key (unambiguous), OR + - ``hosts:`` key AND at least one key from ``_ANSIBLE_PLAY_KEYS``. + + A bare ``hosts: all`` without any other play key is too generic and is rejected. + """ + if not isinstance(item, _YamlMapping): + return False + keys: set[Optional[str]] = { + _yaml_scalar(k) + for k, _ in item.value # type: ignore[attr-defined] + if isinstance(k, _YamlScalar) + } + if "import_playbook" in keys: + return True + return "hosts" in keys and bool(keys & _ANSIBLE_PLAY_KEYS) + + # --------------------------------------------------------------------------- # Parser # --------------------------------------------------------------------------- @@ -640,7 +808,10 @@ def _get_parser(self, language: str): # type: ignore[arg-type] return self._parsers[language] def detect_language(self, path: Path) -> Optional[str]: - return EXTENSION_TO_LANGUAGE.get(path.suffix.lower()) + lang = EXTENSION_TO_LANGUAGE.get(path.suffix.lower()) + if lang == "yaml" and _is_ansible_path(path): + return "ansible" + return lang def parse_file(self, path: Path) -> tuple[list[NodeInfo], list[EdgeInfo]]: """Parse a single file and return extracted nodes and edges.""" @@ -682,6 +853,21 @@ def parse_bytes(self, path: Path, source: bytes) -> tuple[list[NodeInfo], list[E if language == "rescript": return self._parse_rescript(path, source) + # Ansible YAML: path heuristic promoted to "ansible". + # For clearly typed paths (tasks, handlers, meta, vars) trust the path classification. + # Only content-sniff for "playbook" or "unknown" to avoid false positives. + if language == "ansible": + if _yaml is None: + return [], [] + known_type = _ansible_file_type(path) not in ("playbook", "unknown") + if known_type or _is_ansible_content(source): + return self._parse_ansible(path, source) + return [], [] + + # Generic YAML: no tree-sitter grammar bundled; skip. + if language == "yaml": + return [], [] + parser = self._get_parser(language) if not parser: return [], [] @@ -1731,6 +1917,390 @@ def _get_test_description(self, call_node, source: bytes) -> Optional[str]: return normalized return None + # ----------------------------------------------------------------------- + # Ansible YAML parser + # ----------------------------------------------------------------------- + + def _parse_ansible( + self, path: Path, source: bytes, + ) -> tuple[list[NodeInfo], list[EdgeInfo]]: + """Parse an Ansible YAML file using PyYAML's compose() node tree. + + Dispatches to sub-parsers based on the file's classified role: + playbook, tasks, handlers, meta, or vars (vars emits File node only). + """ + try: + root = _yaml.compose(source.decode("utf-8", errors="replace")) + except _yaml.YAMLError as exc: + logger.debug("Ansible YAML parse error in %s: %s", path, exc) + return [], [] + if root is None: + return [], [] + + file_path_str = str(path) + line_count = source.count(b"\n") + 1 + nodes: list[NodeInfo] = [NodeInfo( + kind="File", + name=file_path_str, + file_path=file_path_str, + line_start=1, + line_end=line_count, + language="ansible", + )] + edges: list[EdgeInfo] = [] + + file_type = _ansible_file_type(path) + if file_type == "vars": + return nodes, edges + + # Content-based override: require strong evidence for "playbook" vs "tasks". + # hosts: alone is not sufficient — require at least one _ANSIBLE_PLAY_KEYS member + # or an import_playbook: key (which is unambiguously Ansible). + if file_type in ("unknown", "playbook"): + if isinstance(root, _YamlSequence) and root.value: + is_pb = any(_ansible_is_play_item(item) for item in root.value) + file_type = "playbook" if is_pb else "tasks" + else: + file_type = "unknown" + + if file_type == "playbook": + self._parse_ansible_playbook(root, file_path_str, nodes, edges) + elif file_type in ("tasks", "handlers"): + self._parse_ansible_tasks( + root, file_path_str, nodes, edges, + is_handler=(file_type == "handlers"), + parent_play=None, + ) + elif file_type == "meta": + self._parse_ansible_meta(root, file_path_str, nodes, edges) + + return nodes, edges + + def _parse_ansible_playbook( + self, + root: object, + file_path: str, + nodes: list[NodeInfo], + edges: list[EdgeInfo], + ) -> None: + """Extract plays and import_playbook references from a top-level SequenceNode.""" + if not isinstance(root, _YamlSequence): + return + + for item in root.value: + if not isinstance(item, _YamlMapping): + continue + + # import_playbook: is unambiguously Ansible; emit IMPORTS_FROM and skip + import_pb_node = _yaml_get_key(item, "import_playbook") + if import_pb_node is not None: + target = _yaml_scalar(import_pb_node) + if target: + edges.append(EdgeInfo( + kind="IMPORTS_FROM", + source=file_path, + target=target, + file_path=file_path, + line=_yaml_line(item), + extra={"ansible_kind": "import_playbook"}, + )) + continue + + if not _ansible_is_play_item(item): + continue + + # Derive play name + name_node = _yaml_get_key(item, "name") + hosts_node = _yaml_get_key(item, "hosts") + if name_node and _yaml_scalar(name_node): + play_name = _yaml_scalar(name_node) + elif hosts_node and _yaml_scalar(hosts_node): + play_name = f"play[{_yaml_scalar(hosts_node)}]" + else: + play_name = f"play@line{_yaml_line(item)}" + + play_line_start = _yaml_line(item) + play_line_end = _yaml_end_line(item) + + nodes.append(NodeInfo( + kind="Class", + name=play_name, # type: ignore[arg-type] + file_path=file_path, + line_start=play_line_start, + line_end=play_line_end, + language="ansible", + extra={"ansible_kind": "play"}, + )) + edges.append(EdgeInfo( + kind="CONTAINS", + source=file_path, + target=play_name, # type: ignore[arg-type] + file_path=file_path, + line=play_line_start, + )) + + # vars_files: → IMPORTS_FROM + vars_files_node = _yaml_get_key(item, "vars_files") + if isinstance(vars_files_node, _YamlSequence): + for vf in vars_files_node.value: + vf_path = _yaml_scalar(vf) + if vf_path: + edges.append(EdgeInfo( + kind="IMPORTS_FROM", + source=play_name, # type: ignore[arg-type] + target=vf_path, + file_path=file_path, + line=_yaml_line(vf), + extra={"ansible_kind": "vars_files"}, + )) + + # roles: list → IMPORTS_FROM (roles are not tasks) + roles_node = _yaml_get_key(item, "roles") + if isinstance(roles_node, _YamlSequence): + for role_item in roles_node.value: + role_name = self._ansible_extract_role_name(role_item) + if role_name: + edges.append(EdgeInfo( + kind="IMPORTS_FROM", + source=play_name, # type: ignore[arg-type] + target=role_name, + file_path=file_path, + line=_yaml_line(role_item), + extra={"ansible_kind": "role_reference"}, + )) + + # pre_tasks, tasks, post_tasks, handlers → task extraction + for section_key, is_handler in ( + ("pre_tasks", False), + ("tasks", False), + ("post_tasks", False), + ("handlers", True), + ): + section_node = _yaml_get_key(item, section_key) + if isinstance(section_node, _YamlSequence): + self._parse_ansible_tasks( + section_node, file_path, nodes, edges, + is_handler=is_handler, + parent_play=play_name, # type: ignore[arg-type] + ) + + def _parse_ansible_tasks( + self, + tasks_node: object, + file_path: str, + nodes: list[NodeInfo], + edges: list[EdgeInfo], + is_handler: bool, + parent_play: Optional[str], + ) -> None: + """Extract task/handler Function nodes from a SequenceNode of task mappings.""" + if not isinstance(tasks_node, _YamlSequence): + return + + for task_node in tasks_node.value: + if not isinstance(task_node, _YamlMapping): + continue + + # Find the module key: first key that is not task metadata or a with_* loop + module_key: Optional[str] = None + module_short: Optional[str] = None + module_args_node: Optional[object] = None + for k_node, v_node in task_node.value: + k_str = _yaml_scalar(k_node) + if not k_str: + continue + if k_str in _TASK_META_KEYS or k_str.startswith("with_"): + continue + module_key = k_str + module_short = _ansible_fqcn_short(k_str) + module_args_node = v_node + break + + # Derive task name with fallback chain + name_node = _yaml_get_key(task_node, "name") + name_raw = _yaml_scalar(name_node) if name_node is not None else None + if name_raw: + task_name: str = name_raw + elif module_short or module_key: + task_name = f"{module_short or module_key}@line{_yaml_line(task_node)}" + else: + task_name = f"task@line{_yaml_line(task_node)}" + + task_extra: dict = { + "ansible_kind": "handler" if is_handler else "task", + "ansible_module": module_key or "", + } + + # handler listen: alias + if is_handler: + listen_node = _yaml_get_key(task_node, "listen") + listen_val = _yaml_scalar(listen_node) if listen_node is not None else None + if listen_val: + task_extra["ansible_listen"] = listen_val + + nodes.append(NodeInfo( + kind="Function", + name=task_name, + file_path=file_path, + line_start=_yaml_line(task_node), + line_end=_yaml_end_line(task_node), + language="ansible", + parent_name=parent_play, + extra=task_extra, + )) + + # CONTAINS edge: parent play → task, or file → task for standalone files + edges.append(EdgeInfo( + kind="CONTAINS", + source=parent_play if parent_play is not None else file_path, + target=task_name, + file_path=file_path, + line=_yaml_line(task_node), + )) + + # notify: → CALLS + notify_node = _yaml_get_key(task_node, "notify") + if notify_node is not None: + for handler_name in self._ansible_extract_notify_targets(notify_node): + edges.append(EdgeInfo( + kind="CALLS", + source=task_name, + target=handler_name, + file_path=file_path, + line=_yaml_line(notify_node), + extra={"ansible_kind": "notify"}, + )) + + # include_tasks / import_tasks → IMPORTS_FROM (filename) + if module_short in ("include_tasks", "import_tasks"): + target_file = self._ansible_module_arg_str(module_args_node) + if target_file: + edges.append(EdgeInfo( + kind="IMPORTS_FROM", + source=task_name, + target=target_file, + file_path=file_path, + line=_yaml_line(task_node), + extra={"ansible_kind": module_short}, + )) + + # include_role / import_role → IMPORTS_FROM (role name) + if module_short in ("include_role", "import_role"): + role_name = self._ansible_role_from_module_args(module_args_node) + if role_name: + edges.append(EdgeInfo( + kind="IMPORTS_FROM", + source=task_name, + target=role_name, + file_path=file_path, + line=_yaml_line(task_node), + extra={"ansible_kind": module_short}, + )) + + # include_vars → IMPORTS_FROM (file or dir) + if module_short == "include_vars": + var_target = self._ansible_module_arg_str(module_args_node) + if var_target: + edges.append(EdgeInfo( + kind="IMPORTS_FROM", + source=task_name, + target=var_target, + file_path=file_path, + line=_yaml_line(task_node), + extra={"ansible_kind": "include_vars"}, + )) + + # block / rescue / always → recurse with same parent + for block_key in ("block", "rescue", "always"): + block_node = _yaml_get_key(task_node, block_key) + if block_node is not None: + self._parse_ansible_tasks( + block_node, file_path, nodes, edges, + is_handler=is_handler, + parent_play=parent_play, + ) + + def _parse_ansible_meta( + self, + root: object, + file_path: str, + nodes: list[NodeInfo], + edges: list[EdgeInfo], + ) -> None: + """Extract role dependencies from a role meta/main.yml.""" + if not isinstance(root, _YamlMapping): + return + deps_node = _yaml_get_key(root, "dependencies") + if not isinstance(deps_node, _YamlSequence): + return + for dep_item in deps_node.value: + dep_name = self._ansible_extract_role_name(dep_item) + if dep_name: + edges.append(EdgeInfo( + kind="DEPENDS_ON", + source=file_path, + target=dep_name, + file_path=file_path, + line=_yaml_line(dep_item), + extra={"ansible_kind": "role_dependency"}, + )) + + def _ansible_extract_role_name(self, item: object) -> Optional[str]: + """Extract a role name from a roles-list item. + + Handles: plain string, ``{role: name}`` dict, ``{name: ns.role}`` dict. + """ + if isinstance(item, _YamlScalar): + return item.value or None # type: ignore[attr-defined] + if isinstance(item, _YamlMapping): + for key in ("role", "name"): + v = _yaml_get_key(item, key) + val = _yaml_scalar(v) + if val: + return val + return None + + def _ansible_extract_notify_targets(self, notify_node: object) -> list[str]: + """Extract handler names from a notify: value (scalar or sequence).""" + if isinstance(notify_node, _YamlScalar): + return [notify_node.value] if notify_node.value else [] # type: ignore[attr-defined] + if isinstance(notify_node, _YamlSequence): + return [ + item.value # type: ignore[attr-defined] + for item in notify_node.value # type: ignore[attr-defined] + if isinstance(item, _YamlScalar) and item.value # type: ignore[attr-defined] + ] + return [] + + def _ansible_module_arg_str(self, args_node: Optional[object]) -> Optional[str]: + """Extract a simple string argument from a module args node. + + Handles: bare scalar (``include_tasks: db.yml``) or mapping with ``file:`` key. + Jinja2 expressions are returned as-is. + """ + if args_node is None: + return None + if isinstance(args_node, _YamlScalar): + return args_node.value or None # type: ignore[attr-defined] + if isinstance(args_node, _YamlMapping): + for key in ("file", "_raw_params"): + v = _yaml_get_key(args_node, key) + val = _yaml_scalar(v) + if val: + return val + return None + + def _ansible_role_from_module_args(self, args_node: Optional[object]) -> Optional[str]: + """Extract role name from include_role/import_role module args.""" + if args_node is None: + return None + if isinstance(args_node, _YamlScalar): + return args_node.value or None # type: ignore[attr-defined] + if isinstance(args_node, _YamlMapping): + v = _yaml_get_key(args_node, "name") + return _yaml_scalar(v) + return None + def _extract_from_tree( self, root, diff --git a/tests/fixtures/playbooks/sample_ansible_playbook.yml b/tests/fixtures/playbooks/sample_ansible_playbook.yml new file mode 100644 index 00000000..c8ad551e --- /dev/null +++ b/tests/fixtures/playbooks/sample_ansible_playbook.yml @@ -0,0 +1,92 @@ +--- +# Sanitized fixture for Ansible parser tests + +- import_playbook: base-setup.yml + +- name: Configure web servers + hosts: webservers + become: true + gather_facts: true + vars_files: + - vars/common.yml + - vars/web.yml + pre_tasks: + - name: Verify connectivity + ansible.builtin.wait_for_connection: + timeout: 30 + roles: + - common + - role: nginx + tags: [nginx] + tasks: + - name: Install packages + ansible.builtin.package: + name: "{{ item }}" + state: present + loop: [curl, rsync] + + - name: Deploy config + template: + src: app.conf.j2 + dest: /etc/app/app.conf + notify: restart app + + - name: Run deploy tasks + ansible.builtin.include_tasks: deploy.yml + + - name: Apply hardening role + ansible.builtin.import_role: + name: security + + - name: Handle migration + block: + - name: Run migration script + command: /opt/app/migrate.sh + - name: Verify migration + ansible.builtin.stat: + path: /opt/app/.migrated + register: migration_stat + rescue: + - name: Log migration failure + debug: + msg: "Migration failed, check logs" + + - name: Restart service if needed + service: + name: app + state: restarted + when: migration_stat.stat.exists | default(false) + + post_tasks: + - name: Smoke test + uri: + url: http://localhost/health + status_code: 200 + + handlers: + - name: restart app + service: + name: app + state: restarted + listen: app restarted + +- name: Configure database servers + hosts: dbservers + become: true + tasks: + - name: Install database + package: + name: postgresql + state: present + notify: + - restart db + - run migrations + + handlers: + - name: restart db + service: + name: postgresql + state: restarted + + - name: run migrations + command: /opt/db/migrate.sh diff --git a/tests/fixtures/roles/myrole/meta/main.yml b/tests/fixtures/roles/myrole/meta/main.yml new file mode 100644 index 00000000..fa243342 --- /dev/null +++ b/tests/fixtures/roles/myrole/meta/main.yml @@ -0,0 +1,5 @@ +--- +dependencies: + - common + - role: nginx + - name: security.hardening diff --git a/tests/fixtures/tasks/sample_ansible_tasks.yml b/tests/fixtures/tasks/sample_ansible_tasks.yml new file mode 100644 index 00000000..83116c4c --- /dev/null +++ b/tests/fixtures/tasks/sample_ansible_tasks.yml @@ -0,0 +1,44 @@ +--- +# Sanitized standalone task file fixture + +- name: Create app user + user: + name: appuser + shell: /bin/bash + +- name: Clone repository + git: + repo: https://github.com/example/app.git + dest: /opt/app + register: clone_result + +- ansible.builtin.package: + name: python3-pip + state: present + +- name: Install requirements + pip: + requirements: /opt/app/requirements.txt + changed_when: false + +- name: Apply role configuration + ansible.builtin.include_role: + name: shared_config + +- name: Run deployment steps + ansible.builtin.import_tasks: deploy_steps.yml + +- name: Load environment vars + include_vars: + file: env_vars.yml + +- name: Create directories + file: + path: "{{ item }}" + state: directory + mode: "0755" + loop: + - /opt/app/logs + - /opt/app/tmp + loop_control: + label: "{{ item }}" diff --git a/tests/test_multilang.py b/tests/test_multilang.py index 9d45f434..c1753d6e 100644 --- a/tests/test_multilang.py +++ b/tests/test_multilang.py @@ -1916,3 +1916,219 @@ def test_resolver_is_idempotent(self, tmp_path): # Second run should find nothing new — all already resolved. assert second["calls_resolved"] == 0 assert second["imports_resolved"] == 0 + + +# --------------------------------------------------------------------------- +# Ansible YAML parsing tests +# --------------------------------------------------------------------------- + +try: + import yaml as _yaml_check # noqa: F401 + _YAML_AVAILABLE = True +except ImportError: + _YAML_AVAILABLE = False + +_ANSIBLE_SKIP = pytest.mark.skipif(not _YAML_AVAILABLE, reason="pyyaml not installed") + +_PLAYBOOK = FIXTURES / "playbooks" / "sample_ansible_playbook.yml" +_TASKS_FILE = FIXTURES / "tasks" / "sample_ansible_tasks.yml" +_META_FILE = FIXTURES / "roles" / "myrole" / "meta" / "main.yml" + + +@_ANSIBLE_SKIP +class TestAnsiblePlaybookParsing: + def setup_method(self): + self.parser = CodeParser() + self.nodes, self.edges = self.parser.parse_file(_PLAYBOOK) + + def test_detects_language_ansible_paths(self): + p = self.parser + assert p.detect_language(Path("playbooks/site.yml")) == "ansible" + assert p.detect_language(Path("roles/web/tasks/main.yml")) == "ansible" + assert p.detect_language(Path("handlers/main.yml")) == "ansible" + assert p.detect_language(Path("config/settings.yml")) == "yaml" + + def test_file_node_created(self): + file_nodes = [n for n in self.nodes if n.kind == "File"] + assert len(file_nodes) == 1 + assert file_nodes[0].language == "ansible" + + def test_finds_plays_as_class_nodes(self): + play_names = {n.name for n in self.nodes if n.kind == "Class"} + assert "Configure web servers" in play_names + assert "Configure database servers" in play_names + + def test_plays_have_ansible_kind_extra(self): + plays = [n for n in self.nodes if n.kind == "Class"] + assert plays, "expected at least one play" + for p in plays: + assert p.extra.get("ansible_kind") == "play" + + def test_import_playbook_produces_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "base-setup.yml" in targets + + def test_pre_task_extracted(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + assert "Verify connectivity" in func_names + + def test_post_task_extracted(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + assert "Smoke test" in func_names + + def test_finds_tasks_as_function_nodes(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + assert "Install packages" in func_names + assert "Deploy config" in func_names + assert "Run deploy tasks" in func_names + + def test_fqcn_module_stored_in_extra(self): + task = next( + n for n in self.nodes + if n.kind == "Function" and n.name == "Verify connectivity" + ) + assert task.extra.get("ansible_module") == "ansible.builtin.wait_for_connection" + + def test_finds_handlers(self): + handlers = [ + n for n in self.nodes + if n.kind == "Function" and n.extra.get("ansible_kind") == "handler" + ] + handler_names = {h.name for h in handlers} + assert "restart app" in handler_names + assert "restart db" in handler_names + + def test_handler_listen_stored(self): + handler = next( + n for n in self.nodes + if n.kind == "Function" and n.name == "restart app" + ) + assert handler.extra.get("ansible_listen") == "app restarted" + + def test_notify_scalar_produces_calls(self): + calls = {e.target for e in self.edges if e.kind == "CALLS"} + assert "restart app" in calls + + def test_notify_list_produces_multiple_calls(self): + calls = {e.target for e in self.edges if e.kind == "CALLS"} + assert "restart db" in calls + assert "run migrations" in calls + + def test_include_tasks_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "deploy.yml" in targets + + def test_import_role_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "security" in targets + + def test_roles_list_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "common" in targets + assert "nginx" in targets + + def test_vars_files_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "vars/common.yml" in targets + + def test_block_tasks_extracted(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + assert "Run migration script" in func_names + assert "Verify migration" in func_names + + def test_rescue_tasks_extracted(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + assert "Log migration failure" in func_names + + def test_block_tasks_parented_to_play(self): + block_task = next( + n for n in self.nodes + if n.kind == "Function" and n.name == "Run migration script" + ) + assert block_task.parent_name == "Configure web servers" + + def test_file_contains_plays(self): + file_path_str = str(_PLAYBOOK) + file_contains = {e.target for e in self.edges + if e.kind == "CONTAINS" and e.source == file_path_str} + assert any("Configure web servers" in t for t in file_contains) + + def test_line_numbers_positive(self): + for n in self.nodes: + assert n.line_start > 0, f"{n.name} has line_start={n.line_start}" + assert n.line_end >= n.line_start, f"{n.name} has bad line range" + + def test_all_nodes_language_ansible(self): + for n in self.nodes: + assert n.language == "ansible", f"{n.name} has language={n.language!r}" + + +@_ANSIBLE_SKIP +class TestAnsibleTasksParsing: + def setup_method(self): + self.parser = CodeParser() + self.nodes, self.edges = self.parser.parse_file(_TASKS_FILE) + + def test_file_language_ansible(self): + file_nodes = [n for n in self.nodes if n.kind == "File"] + assert file_nodes[0].language == "ansible" + + def test_named_tasks_found(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + assert "Create app user" in func_names + assert "Clone repository" in func_names + assert "Install requirements" in func_names + + def test_nameless_task_fallback_name(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + fallbacks = [n for n in func_names if "@line" in n and "package" in n.lower()] + assert fallbacks, "expected a fallback-named task for the nameless package task" + + def test_loop_key_not_misidentified_as_module(self): + func_names = {n.name for n in self.nodes if n.kind == "Function"} + assert not any(n.startswith("loop@") or n.startswith("with_") for n in func_names) + + def test_fqcn_include_role_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "shared_config" in targets + + def test_import_tasks_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "deploy_steps.yml" in targets + + def test_include_vars_imports_from(self): + targets = {e.target for e in self.edges if e.kind == "IMPORTS_FROM"} + assert "env_vars.yml" in targets + + def test_file_contains_tasks(self): + file_path_str = str(_TASKS_FILE) + sources = {e.source for e in self.edges if e.kind == "CONTAINS"} + assert file_path_str in sources + + def test_tasks_have_no_parent_play(self): + for n in self.nodes: + if n.kind == "Function": + assert n.parent_name is None, f"{n.name} should have no parent_play" + + +@_ANSIBLE_SKIP +class TestAnsibleMetaParsing: + def setup_method(self): + self.parser = CodeParser() + self.nodes, self.edges = self.parser.parse_file(_META_FILE) + + def test_file_language_ansible(self): + file_nodes = [n for n in self.nodes if n.kind == "File"] + assert file_nodes[0].language == "ansible" + + def test_depends_on_bare_string(self): + dep_targets = {e.target for e in self.edges if e.kind == "DEPENDS_ON"} + assert "common" in dep_targets + + def test_depends_on_role_key(self): + dep_targets = {e.target for e in self.edges if e.kind == "DEPENDS_ON"} + assert "nginx" in dep_targets + + def test_depends_on_name_key_collections(self): + dep_targets = {e.target for e in self.edges if e.kind == "DEPENDS_ON"} + assert "security.hardening" in dep_targets