-
-
Notifications
You must be signed in to change notification settings - Fork 290
Add Alpine security advisory importer #2203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
NucleiAv
wants to merge
3
commits into
aboutcode-org:main
Choose a base branch
from
NucleiAv:feat/alpine-security-importer-2158
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
188 changes: 188 additions & 0 deletions
188
vulnerabilities/pipelines/v2_importers/alpine_security_importer.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,188 @@ | ||
| # | ||
| # Copyright (c) nexB Inc. and others. All rights reserved. | ||
| # VulnerableCode is a trademark of nexB Inc. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | ||
| # See https://github.com/aboutcode-org/vulnerablecode for support or download. | ||
| # See https://aboutcode.org for more information about nexB OSS projects. | ||
| # | ||
|
|
||
| import json | ||
| import logging | ||
| from typing import Iterable | ||
|
|
||
| import requests | ||
| from packageurl import PackageURL | ||
| from univers.version_range import AlpineLinuxVersionRange | ||
| from univers.versions import InvalidVersion | ||
|
|
||
| from vulnerabilities.importer import AdvisoryDataV2 | ||
| from vulnerabilities.importer import AffectedPackageV2 | ||
| from vulnerabilities.importer import ReferenceV2 | ||
| from vulnerabilities.importer import VulnerabilitySeverity | ||
| from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 | ||
| from vulnerabilities.severity_systems import SCORING_SYSTEMS | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| ALPINE_SECURITY_ROOT = "https://security.alpinelinux.org/" | ||
| BRANCH_URL = "https://security.alpinelinux.org/branch/{branch}" | ||
| ADVISORY_HEADERS = {"Accept": "application/ld+json"} | ||
|
|
||
| # EOL branches absent from root API index; 3.13-3.16 omitted (return 0 items) | ||
| HISTORICAL_BRANCHES = [ | ||
| "3.22-community", | ||
| "3.18-main", | ||
| "3.17-main", | ||
| "3.12-main", | ||
| "3.11-main", | ||
| "3.10-main", | ||
| ] | ||
|
|
||
|
|
||
| def get_branches() -> list: | ||
| """Discover active branches from the root API and append HISTORICAL_BRANCHES.""" | ||
| try: | ||
| resp = requests.get(ALPINE_SECURITY_ROOT, headers=ADVISORY_HEADERS, timeout=30) | ||
| resp.raise_for_status() | ||
| data = resp.json() | ||
| # Branch entries have dict values; scalar values indicate non-branch keys. | ||
| active = [k for k, v in data.items() if isinstance(v, dict)] | ||
| except (requests.RequestException, ValueError) as e: | ||
| logger.error("Failed to discover branches from root API: %s", e) | ||
| active = [] | ||
|
|
||
| seen = set(active) | ||
| return active + [b for b in HISTORICAL_BRANCHES if b not in seen] | ||
|
|
||
|
|
||
| class AlpineSecurityImporterPipeline(VulnerableCodeBaseImporterPipelineV2): | ||
| """Collect Alpine Linux advisories from https://security.alpinelinux.org/.""" | ||
|
|
||
| pipeline_id = "alpine_security_importer" | ||
| spdx_license_expression = "CC-BY-SA-4.0" | ||
| license_url = "https://security.alpinelinux.org/" | ||
| precedence = 200 | ||
|
|
||
| @classmethod | ||
| def steps(cls): | ||
| return (cls.collect_and_store_advisories,) | ||
|
|
||
| def advisories_count(self) -> int: | ||
| count = 0 | ||
| for branch in get_branches(): | ||
| url = BRANCH_URL.format(branch=branch) | ||
| try: | ||
| resp = requests.get(url, headers=ADVISORY_HEADERS, timeout=30) | ||
| resp.raise_for_status() | ||
| data = resp.json() | ||
| except (requests.RequestException, ValueError) as e: | ||
| logger.error("Failed to fetch branch %s: %s", branch, e) | ||
| continue | ||
| count += len(data.get("items") or []) | ||
| return count | ||
|
|
||
| def collect_advisories(self) -> Iterable[AdvisoryDataV2]: | ||
| for branch in get_branches(): | ||
| url = BRANCH_URL.format(branch=branch) | ||
| try: | ||
| resp = requests.get(url, headers=ADVISORY_HEADERS, timeout=30) | ||
| resp.raise_for_status() | ||
| data = resp.json() | ||
| except (requests.RequestException, ValueError) as e: | ||
| logger.error("Failed to fetch branch %s: %s", branch, e) | ||
| continue | ||
| for item in data.get("items") or []: | ||
| advisory = parse_advisory(item) | ||
| if advisory: | ||
| yield advisory | ||
|
|
||
|
|
||
| def parse_advisory(data: dict): | ||
| """Parse a JSON-LD advisory; return None if the advisory ID is missing.""" | ||
| cve_url = data.get("id") or "" | ||
| cve_id = cve_url.rstrip("/").split("/")[-1] | ||
| if not cve_id: | ||
| return None | ||
|
|
||
| summary = data.get("description") or "" | ||
|
|
||
| references = [] | ||
| for ref in data.get("ref") or []: | ||
| ref_url = ref.get("rel") or "" | ||
| if ref_url: | ||
| references.append( | ||
| ReferenceV2( | ||
| url=ref_url, | ||
| reference_type=ref.get("referenceType") or "", | ||
| ) | ||
| ) | ||
| for cpe_match in data.get("cpeMatch") or []: | ||
| cpe_uri = cpe_match.get("cpeUri") or "" | ||
| cpe_id = cpe_match.get("id") or "" | ||
| if cpe_uri and cpe_id: | ||
| references.append(ReferenceV2(url=cpe_id, reference_id=cpe_uri)) | ||
|
|
||
| severities = [] | ||
| cvss3 = data.get("cvss3") or {} | ||
| cvss_score = cvss3.get("score") | ||
| cvss_vector = cvss3.get("vector") or "" | ||
| if cvss_vector and cvss_score: | ||
| if cvss_vector.startswith("CVSS:3.1/"): | ||
| system = SCORING_SYSTEMS["cvssv3.1"] | ||
| else: | ||
| system = SCORING_SYSTEMS["cvssv3"] | ||
| severities.append( | ||
| VulnerabilitySeverity( | ||
| system=system, | ||
| value=str(cvss_score), | ||
| scoring_elements=cvss_vector, | ||
| ) | ||
| ) | ||
|
|
||
| affected_packages = [] | ||
| for state in data.get("state") or []: | ||
| if not state.get("fixed"): | ||
| continue | ||
| pkg_version_url = state.get("packageVersion") or "" | ||
| repo = state.get("repo") or "" | ||
| parts = pkg_version_url.rstrip("/").split("/") | ||
| if len(parts) < 2: | ||
| continue | ||
| pkg_name = parts[-2] | ||
| version = parts[-1] | ||
| if not pkg_name or not version: | ||
| continue | ||
| repo_parts = repo.split("-", 1) | ||
| if len(repo_parts) != 2: | ||
| continue | ||
| version_tag, reponame = repo_parts | ||
| distroversion = version_tag if version_tag == "edge" else f"v{version_tag}" | ||
| purl = PackageURL( | ||
| type="apk", | ||
| namespace="alpine", | ||
| name=pkg_name, | ||
| qualifiers={"distroversion": distroversion, "reponame": reponame}, | ||
| ) | ||
| try: | ||
| fixed_version_range = AlpineLinuxVersionRange.from_versions([version]) | ||
| except InvalidVersion: | ||
| logger.warning("Cannot parse Alpine version %r in %s", version, cve_id) | ||
| continue | ||
| affected_packages.append( | ||
| AffectedPackageV2( | ||
| package=purl, | ||
| fixed_version_range=fixed_version_range, | ||
| ) | ||
| ) | ||
|
|
||
| return AdvisoryDataV2( | ||
| advisory_id=cve_id, | ||
| aliases=[], | ||
| summary=summary, | ||
| affected_packages=affected_packages, | ||
| references=references, | ||
| severities=severities, | ||
| url=cve_url, | ||
| original_advisory_text=json.dumps(data, indent=2, ensure_ascii=False), | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| # | ||
| # Copyright (c) nexB Inc. and others. All rights reserved. | ||
| # VulnerableCode is a trademark of nexB Inc. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
| # See http://www.apache.org/licenses/LICENSE-2.0 for the license text. | ||
| # See https://github.com/aboutcode-org/vulnerablecode for support or download. | ||
| # See https://aboutcode.org for more information about nexB OSS projects. | ||
| # | ||
|
|
||
| import os | ||
| from unittest import TestCase | ||
| from unittest.mock import MagicMock | ||
| from unittest.mock import patch | ||
|
|
||
| import requests | ||
|
|
||
| from vulnerabilities.pipelines.v2_importers.alpine_security_importer import ( | ||
| AlpineSecurityImporterPipeline, | ||
| ) | ||
| from vulnerabilities.pipelines.v2_importers.alpine_security_importer import parse_advisory | ||
| from vulnerabilities.tests import util_tests | ||
| from vulnerabilities.utils import load_json | ||
|
|
||
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | ||
| TEST_DATA = os.path.join(BASE_DIR, "test_data/alpine_security") | ||
|
|
||
|
|
||
| class TestAlpineSecurityImporter(TestCase): | ||
| def test_parse_advisory_with_cvss(self): | ||
| """Advisory with CVSS 3.1 score, references, and no fixed versions.""" | ||
| data = load_json(os.path.join(TEST_DATA, "alpine_security_mock1.json")) | ||
| expected = os.path.join(TEST_DATA, "expected_alpine_security_output1.json") | ||
| result = parse_advisory(data) | ||
| self.assertIsNotNone(result) | ||
| util_tests.check_results_against_json(result.to_dict(), expected) | ||
|
|
||
| def test_parse_advisory_with_fixed_states(self): | ||
| """Advisory with no CVSS but multiple fixed package versions across branches.""" | ||
| data = load_json(os.path.join(TEST_DATA, "alpine_security_mock2.json")) | ||
| expected = os.path.join(TEST_DATA, "expected_alpine_security_output2.json") | ||
| result = parse_advisory(data) | ||
| self.assertIsNotNone(result) | ||
| util_tests.check_results_against_json(result.to_dict(), expected) | ||
|
|
||
| def test_parse_advisory_missing_id_returns_none(self): | ||
| """Advisory with an empty id field must be skipped.""" | ||
| data = { | ||
| "id": "", | ||
| "description": "test", | ||
| "cvss3": {"score": 0.0, "vector": None}, | ||
| "ref": [], | ||
| "state": [], | ||
| } | ||
| self.assertIsNone(parse_advisory(data)) | ||
|
|
||
| def test_parse_advisory_skips_malformed_package_url(self): | ||
| """State entries with a packageVersion URL too short to parse must be skipped.""" | ||
| data = { | ||
| "id": "https://security.alpinelinux.org/vuln/CVE-2099-00001", | ||
| "description": "test", | ||
| "cvss3": {"score": 0.0, "vector": None}, | ||
| "ref": [], | ||
| "state": [ | ||
| { | ||
| "fixed": True, | ||
| "packageVersion": "https://security.alpinelinux.org/srcpkg/", | ||
| "repo": "edge-main", | ||
| } | ||
| ], | ||
| } | ||
| result = parse_advisory(data) | ||
| self.assertIsNotNone(result) | ||
| self.assertEqual(result.affected_packages, []) | ||
|
|
||
| def test_parse_advisory_skips_unfixed_states(self): | ||
| """State entries with fixed=False must not produce affected_packages.""" | ||
| data = { | ||
| "id": "https://security.alpinelinux.org/vuln/CVE-2099-00002", | ||
| "description": "test", | ||
| "cvss3": {"score": 0.0, "vector": None}, | ||
| "ref": [], | ||
| "state": [ | ||
| { | ||
| "fixed": False, | ||
| "packageVersion": "https://security.alpinelinux.org/srcpkg/curl/8.0.0-r0", | ||
| "repo": "edge-main", | ||
| } | ||
| ], | ||
| } | ||
| result = parse_advisory(data) | ||
| self.assertIsNotNone(result) | ||
| self.assertEqual(result.affected_packages, []) | ||
|
|
||
|
|
||
| class TestAlpineSecurityImporterPipeline(TestCase): | ||
| @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.get_branches") | ||
| @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.requests.get") | ||
| def test_collect_advisories_yields_advisory(self, mock_get, mock_branches): | ||
| mock_branches.return_value = ["3.19-main"] | ||
| data = load_json(os.path.join(TEST_DATA, "alpine_security_mock1.json")) | ||
| resp = MagicMock() | ||
| resp.json.return_value = {"items": [data]} | ||
| resp.raise_for_status.return_value = None | ||
| mock_get.return_value = resp | ||
| advisories = list(AlpineSecurityImporterPipeline().collect_advisories()) | ||
| self.assertGreater(len(advisories), 0) | ||
|
|
||
| @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.get_branches") | ||
| @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.requests.get") | ||
| def test_collect_advisories_http_error_logs_and_continues(self, mock_get, mock_branches): | ||
| mock_branches.return_value = ["3.19-main"] | ||
| mock_get.side_effect = requests.RequestException("timeout") | ||
| logger_name = "vulnerabilities.pipelines.v2_importers.alpine_security_importer" | ||
| with self.assertLogs(logger_name, level="ERROR") as cm: | ||
| advisories = list(AlpineSecurityImporterPipeline().collect_advisories()) | ||
| self.assertEqual(advisories, []) | ||
| self.assertTrue(any("timeout" in msg for msg in cm.output)) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure we only have CVSS3 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I sampled 20+ live advisories across multiple branches and the API only ever returns a cvss3 key at the top level. No cvss2 or cvss4 fields exist in the JSON-LD schema for this endpoint.