diff --git a/application/tests/cwe_parser_test.py b/application/tests/cwe_parser_test.py index 2c0f72327..0d088884e 100644 --- a/application/tests/cwe_parser_test.py +++ b/application/tests/cwe_parser_test.py @@ -102,6 +102,157 @@ def iter_content(self, chunk_size=None): self.assertCountEqual(nodes[0].todict(), expected[0].todict()) self.assertCountEqual(nodes[1].todict(), expected[1].todict()) + @patch.object(requests, "get") + def test_register_CWE_inherits_mappings_transitively(self, mock_requests) -> None: + tmpdir = mkdtemp() + tmpFile = os.path.join(tmpdir, "cwe.xml") + tmpzip = os.path.join(tmpdir, "cwe.zip") + with open(tmpFile, "w") as cx: + cx.write(self.CWE_transitive_xml) + with zipfile.ZipFile(tmpzip, "w", zipfile.ZIP_DEFLATED) as zipf: + zipf.write(tmpFile, arcname="cwe.xml") + + class fakeRequest: + def iter_content(self, chunk_size=None): + with open(tmpzip, "rb") as zipf: + return [zipf.read()] + + mock_requests.return_value = fakeRequest() + + cre = defs.CRE(id="089-089", name="CRE-Injection") + dbcre = self.collection.add_cre(cre=cre) + dbcwe = self.collection.add_node(defs.Standard(name="CWE", sectionID="89")) + self.collection.add_link(dbcre, dbcwe, defs.LinkTypes.LinkedTo) + + entries = cwe.CWE().parse( + cache=self.collection, + ph=prompt_client.PromptHandler(database=self.collection), + ) + imported_cwes = {node.sectionID: node for node in entries.results["CWE"]} + + self.assertEqual(imported_cwes["2001"].links[0].document.todict(), cre.todict()) + self.assertEqual(imported_cwes["2002"].links[0].document.todict(), cre.todict()) + + @patch.object(requests, "get") + def test_register_CWE_applies_fallback_family_mappings(self, mock_requests) -> None: + tmpdir = mkdtemp() + tmpFile = os.path.join(tmpdir, "cwe.xml") + tmpzip = os.path.join(tmpdir, "cwe.zip") + with open(tmpFile, "w") as cx: + cx.write(self.CWE_fallback_xml) + with zipfile.ZipFile(tmpzip, "w", zipfile.ZIP_DEFLATED) as zipf: + zipf.write(tmpFile, arcname="cwe.xml") + + class fakeRequest: + def iter_content(self, chunk_size=None): + with open(tmpzip, "rb") as zipf: + return [zipf.read()] + + mock_requests.return_value = fakeRequest() + + injection_cre = defs.CRE(id="760-764", name="Injection protection") + xss_cre = defs.CRE(id="760-765", name="XSS protection") + xxe_cre = defs.CRE(id="764-507", name="Restrict XML parsing (against XXE)") + auth_cre = defs.CRE( + id="117-371", name="Use a centralized access control mechanism" + ) + authn_cre = defs.CRE( + id="113-133", name="Use centralized authentication mechanism" + ) + csrf_cre = defs.CRE(id="028-727", name="CSRF protection") + ssrf_cre = defs.CRE(id="028-728", name="SSRF protection") + hardcoded_secret_cre = defs.CRE( + id="774-888", name="Do not store secrets in the code" + ) + password_storage_cre = defs.CRE( + id="622-203", name="Store passwords salted and hashed" + ) + credential_storage_cre = defs.CRE( + id="881-321", name="Store credentials securely" + ) + session_management_cre = defs.CRE(id="177-260", name="Session management") + secure_cookie_cre = defs.CRE( + id="688-081", name='Set "secure" attribute for cookie-based session tokens' + ) + deserialization_cre = defs.CRE(id="836-068", name="Deserialization Prevention") + self.collection.add_cre(cre=injection_cre) + self.collection.add_cre(cre=xss_cre) + self.collection.add_cre(cre=xxe_cre) + self.collection.add_cre(cre=auth_cre) + self.collection.add_cre(cre=authn_cre) + self.collection.add_cre(cre=csrf_cre) + self.collection.add_cre(cre=ssrf_cre) + self.collection.add_cre(cre=hardcoded_secret_cre) + self.collection.add_cre(cre=password_storage_cre) + self.collection.add_cre(cre=credential_storage_cre) + self.collection.add_cre(cre=session_management_cre) + self.collection.add_cre(cre=secure_cookie_cre) + self.collection.add_cre(cre=deserialization_cre) + + entries = cwe.CWE().parse( + cache=self.collection, + ph=prompt_client.PromptHandler(database=self.collection), + ) + imported_cwes = {node.sectionID: node for node in entries.results["CWE"]} + + self.assertEqual( + imported_cwes["89"].links[0].document.todict(), injection_cre.todict() + ) + self.assertEqual( + imported_cwes["79"].links[0].document.todict(), xss_cre.todict() + ) + self.assertEqual( + imported_cwes["611"].links[0].document.todict(), xxe_cre.todict() + ) + self.assertEqual( + imported_cwes["612"].links[0].document.todict(), auth_cre.todict() + ) + self.assertEqual( + imported_cwes["287"].links[0].document.todict(), authn_cre.todict() + ) + self.assertEqual( + imported_cwes["352"].links[0].document.todict(), csrf_cre.todict() + ) + self.assertEqual( + imported_cwes["918"].links[0].document.todict(), ssrf_cre.todict() + ) + self.assertEqual( + imported_cwes["798"].links[0].document.todict(), + hardcoded_secret_cre.todict(), + ) + self.assertEqual( + imported_cwes["321"].links[0].document.todict(), + hardcoded_secret_cre.todict(), + ) + self.assertEqual( + imported_cwes["256"].links[0].document.todict(), + password_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["257"].links[0].document.todict(), + password_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["258"].links[0].document.todict(), + credential_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["260"].links[0].document.todict(), + credential_storage_cre.todict(), + ) + self.assertEqual( + imported_cwes["384"].links[0].document.todict(), + session_management_cre.todict(), + ) + self.assertEqual( + imported_cwes["614"].links[0].document.todict(), + secure_cookie_cre.todict(), + ) + self.assertEqual( + imported_cwes["502"].links[0].document.todict(), + deserialization_cre.todict(), + ) + CWE_xml = """ """ + + CWE_transitive_xml = """ + + + + + + + + + + + + + + Padding entry so xmltodict returns a list of Weakness elements. + + + +""" + + CWE_fallback_xml = """ + + + + XSS entry. + + + SQL injection entry. + + + XXE entry. + + + Authorization entry. + + + Authentication entry. + + + CSRF entry. + + + Hard-coded credentials entry. + + + Hard-coded key entry. + + + Password storage entry. + + + Recoverable password entry. + + + Password in config entry. + + + Password in config entry. + + + Session fixation entry. + + + Cookie secure attribute entry. + + + Deserialization entry. + + + SSRF entry. + + + +""" diff --git a/application/utils/external_project_parsers/data/cwe_fallback_mappings.json b/application/utils/external_project_parsers/data/cwe_fallback_mappings.json new file mode 100644 index 000000000..11d9d1ff8 --- /dev/null +++ b/application/utils/external_project_parsers/data/cwe_fallback_mappings.json @@ -0,0 +1,102 @@ +[ + { + "keywords": [ + "xml external entity", + "xxe" + ], + "cre_id": "764-507" + }, + { + "keywords": [ + "cross-site scripting", + " xss", + "(xss)" + ], + "cre_id": "760-765" + }, + { + "keywords": [ + "authorization", + "access control" + ], + "cre_id": "117-371" + }, + { + "keywords": [ + "improper authentication", + "missing authentication", + "authentication bypass" + ], + "cre_id": "113-133" + }, + { + "keywords": [ + "cross-site request forgery", + "(csrf)", + "csrf" + ], + "cre_id": "028-727" + }, + { + "keywords": [ + "server-side request forgery", + "(ssrf)", + "ssrf" + ], + "cre_id": "028-728" + }, + { + "keywords": [ + "plaintext storage of a password", + "storing passwords in a recoverable format" + ], + "cre_id": "622-203" + }, + { + "keywords": [ + "empty password in configuration file", + "password in configuration file" + ], + "cre_id": "881-321" + }, + { + "keywords": [ + "hard-coded password", + "hardcoded password", + "hard-coded credentials", + "hardcoded credentials", + "hard-coded credential", + "hardcoded credential", + "hard-coded cryptographic key", + "hardcoded cryptographic key", + "hard-coded key", + "hardcoded key" + ], + "cre_id": "774-888" + }, + { + "keywords": [ + "session fixation" + ], + "cre_id": "177-260" + }, + { + "keywords": [ + "sensitive cookie in https session without 'secure' attribute" + ], + "cre_id": "688-081" + }, + { + "keywords": [ + "deserialization of untrusted data" + ], + "cre_id": "836-068" + }, + { + "keywords": [ + "injection", + "query logic" + ], + "cre_id": "760-764" + } +] diff --git a/application/utils/external_project_parsers/parsers/cwe.py b/application/utils/external_project_parsers/parsers/cwe.py index b0821aba5..2de35a8bb 100644 --- a/application/utils/external_project_parsers/parsers/cwe.py +++ b/application/utils/external_project_parsers/parsers/cwe.py @@ -1,8 +1,10 @@ import logging import os import tempfile +import json +from pathlib import Path import requests -from typing import Dict +from typing import Dict, List from application.database import db from application.defs import cre_defs as defs import shutil @@ -21,6 +23,22 @@ class CWE(ParserInterface): name = "CWE" cwe_zip = "https://cwe.mitre.org/data/xml/cwec_latest.xml.zip" + fallback_mapping_path = ( + Path(__file__).resolve().parent.parent / "data" / "cwe_fallback_mappings.json" + ) + + def __init__(self) -> None: + self.fallback_cre_by_match = self.load_fallback_cre_mappings() + + def load_fallback_cre_mappings(self) -> List[tuple[tuple[str, ...], str]]: + with self.fallback_mapping_path.open("r", encoding="utf-8") as mapping_file: + raw_mappings = json.load(mapping_file) + + mappings = [] + for entry in raw_mappings: + keywords = tuple(keyword.lower() for keyword in entry["keywords"]) + mappings.append((keywords, entry["cre_id"])) + return mappings def parse(self, cache: db.Node_collection, ph: prompt_client.PromptHandler): response = requests.get(self.cwe_zip, stream=True) @@ -72,17 +90,74 @@ def link_to_related_cwe( ) -> defs.Standard: related_cwes = cache.get_nodes(name="CWE", sectionID=related_id) if related_cwes: - for cre in [ - c.document - for c in related_cwes[0].links - if c.document.doctype == defs.Credoctypes.CRE - ]: - logger.debug( - f"linked CWE with id {cwe.sectionID} to CRE with ID {cre.id}" - ) - cwe.add_link( - defs.Link(document=cre, ltype=defs.LinkTypes.AutomaticallyLinkedTo) - ) + return self.link_to_related_cwe_entry(cwe, related_cwes[0]) + return cwe + + def link_to_related_cwe_entry( + self, cwe: defs.Standard, related_cwe: defs.Standard + ) -> defs.Standard: + for cre in [ + link.document + for link in related_cwe.links + if link.document.doctype == defs.Credoctypes.CRE + ]: + logger.debug(f"linked CWE with id {cwe.sectionID} to CRE with ID {cre.id}") + autolink = defs.Link( + document=cre, ltype=defs.LinkTypes.AutomaticallyLinkedTo + ) + if not cwe.has_link(autolink): + cwe.add_link(autolink) + return cwe + + def collect_related_weakness_ids(self, weakness: Dict) -> List[str]: + related_ids = [] + related_weaknesses = weakness.get("Related_Weaknesses") + if not related_weaknesses: + return related_ids + + containers = ( + related_weaknesses + if isinstance(related_weaknesses, list) + else [related_weaknesses] + ) + for container in containers: + if not isinstance(container, Dict): + continue + related_entries = container.get("Related_Weakness") + if not related_entries: + continue + related_entries = ( + related_entries + if isinstance(related_entries, list) + else [related_entries] + ) + for entry in related_entries: + if isinstance(entry, Dict) and entry.get("@CWE_ID"): + related_ids.append(str(entry["@CWE_ID"])) + return related_ids + + def apply_fallback_cre_mapping( + self, cwe: defs.Standard, cache: db.Node_collection + ) -> defs.Standard: + if any(link.document.doctype == defs.Credoctypes.CRE for link in cwe.links): + return cwe + + section_text = (cwe.section or "").lower() + for keywords, cre_id in self.fallback_cre_by_match: + if not any(keyword in section_text for keyword in keywords): + continue + + matching_cres = cache.get_CREs(external_id=cre_id) + if not matching_cres: + continue + + fallback_link = defs.Link( + document=matching_cres[0], ltype=defs.LinkTypes.AutomaticallyLinkedTo + ) + if not cwe.has_link(fallback_link): + cwe.add_link(fallback_link) + return cwe + return cwe # cwe is a special case because it already partially exists in our spreadsheet @@ -91,6 +166,8 @@ def link_to_related_cwe( def register_cwe(self, cache: db.Node_collection, xml_file: str): statuses = {} entries = [] + entries_by_id = {} + related_ids_by_cwe = {} with open(xml_file, "r") as xml: weakness_catalog = xmltodict.parse(xml.read()).get("Weakness_Catalog") for _, weaknesses in weakness_catalog.get("Weaknesses").items(): @@ -147,23 +224,31 @@ def register_cwe(self, cache: db.Node_collection, xml_file: str): logger.info( f"CWE '{cwe.sectionID}-{cwe.section}' does not have any related CAPEC attack patterns, skipping automated linking" ) - if weakness.get("Related_Weaknesses"): - if isinstance(weakness.get("Related_Weaknesses"), list): - for related_weakness in weakness.get("Related_Weaknesses"): - cwe = self.parse_related_weakness( - cache, related_weakness, cwe - ) - else: - cwe = self.parse_related_weakness( - cache, weakness.get("Related_Weaknesses"), cwe - ) entries.append(cwe) - return entries + entries_by_id[cwe.sectionID] = cwe + related_ids_by_cwe[cwe.sectionID] = ( + self.collect_related_weakness_ids(weakness) + ) - def parse_related_weakness( - self, cache: db.Node_collection, rw: Dict[str, Dict], cwe: defs.Standard - ) -> defs.Standard: - cwe_entry = rw.get("Related_Weakness") - if isinstance(cwe_entry, Dict): - id = cwe_entry["@CWE_ID"] - return self.link_to_related_cwe(cwe=cwe, cache=cache, related_id=id) + changed = True + while changed: + changed = False + for cwe_id, related_ids in related_ids_by_cwe.items(): + cwe = entries_by_id[cwe_id] + before_count = len(cwe.links) + for related_id in related_ids: + related_cwe = entries_by_id.get(related_id) + if related_cwe: + cwe = self.link_to_related_cwe_entry(cwe, related_cwe) + else: + cwe = self.link_to_related_cwe( + cwe=cwe, cache=cache, related_id=related_id + ) + entries_by_id[cwe_id] = cwe + if len(cwe.links) != before_count: + changed = True + + for cwe_id, cwe in entries_by_id.items(): + entries_by_id[cwe_id] = self.apply_fallback_cre_mapping(cwe, cache) + + return entries diff --git a/scripts/update-cwe.sh b/scripts/update-cwe.sh new file mode 100755 index 000000000..7c12c92e1 --- /dev/null +++ b/scripts/update-cwe.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +VENV_DIR="$ROOT_DIR/venv" +CACHE_FILE="${1:-$ROOT_DIR/standards_cache.sqlite}" +TIMESTAMP="$(date +%Y%m%d-%H%M%S)" +BACKUP_FILE="${CACHE_FILE}.bak.${TIMESTAMP}" + +if [[ ! -d "$VENV_DIR" ]]; then + echo "Creating virtual environment in $VENV_DIR" + python3 -m venv "$VENV_DIR" +fi + +source "$VENV_DIR/bin/activate" + +if ! python -c "import requests" >/dev/null 2>&1; then + echo "Installing Python dependencies" + pip install -r "$ROOT_DIR/requirements.txt" +fi + +if [[ -f "$CACHE_FILE" ]]; then + cp "$CACHE_FILE" "$BACKUP_FILE" + echo "Backed up database to $BACKUP_FILE" +fi + +export CRE_NO_NEO4J="${CRE_NO_NEO4J:-1}" +export CRE_NO_GEN_EMBEDDINGS="${CRE_NO_GEN_EMBEDDINGS:-1}" + +echo "Importing latest MITRE CWE data into $CACHE_FILE" +exec python "$ROOT_DIR/cre.py" --cwe_in --cache_file "$CACHE_FILE"