diff --git a/application/tests/cwe_parser_test.py b/application/tests/cwe_parser_test.py
index 2c0f72327..0d088884e 100644
--- a/application/tests/cwe_parser_test.py
+++ b/application/tests/cwe_parser_test.py
@@ -102,6 +102,157 @@ def iter_content(self, chunk_size=None):
self.assertCountEqual(nodes[0].todict(), expected[0].todict())
self.assertCountEqual(nodes[1].todict(), expected[1].todict())
+ @patch.object(requests, "get")
+ def test_register_CWE_inherits_mappings_transitively(self, mock_requests) -> None:
+ tmpdir = mkdtemp()
+ tmpFile = os.path.join(tmpdir, "cwe.xml")
+ tmpzip = os.path.join(tmpdir, "cwe.zip")
+ with open(tmpFile, "w") as cx:
+ cx.write(self.CWE_transitive_xml)
+ with zipfile.ZipFile(tmpzip, "w", zipfile.ZIP_DEFLATED) as zipf:
+ zipf.write(tmpFile, arcname="cwe.xml")
+
+ class fakeRequest:
+ def iter_content(self, chunk_size=None):
+ with open(tmpzip, "rb") as zipf:
+ return [zipf.read()]
+
+ mock_requests.return_value = fakeRequest()
+
+ cre = defs.CRE(id="089-089", name="CRE-Injection")
+ dbcre = self.collection.add_cre(cre=cre)
+ dbcwe = self.collection.add_node(defs.Standard(name="CWE", sectionID="89"))
+ self.collection.add_link(dbcre, dbcwe, defs.LinkTypes.LinkedTo)
+
+ entries = cwe.CWE().parse(
+ cache=self.collection,
+ ph=prompt_client.PromptHandler(database=self.collection),
+ )
+ imported_cwes = {node.sectionID: node for node in entries.results["CWE"]}
+
+ self.assertEqual(imported_cwes["2001"].links[0].document.todict(), cre.todict())
+ self.assertEqual(imported_cwes["2002"].links[0].document.todict(), cre.todict())
+
+ @patch.object(requests, "get")
+ def test_register_CWE_applies_fallback_family_mappings(self, mock_requests) -> None:
+ tmpdir = mkdtemp()
+ tmpFile = os.path.join(tmpdir, "cwe.xml")
+ tmpzip = os.path.join(tmpdir, "cwe.zip")
+ with open(tmpFile, "w") as cx:
+ cx.write(self.CWE_fallback_xml)
+ with zipfile.ZipFile(tmpzip, "w", zipfile.ZIP_DEFLATED) as zipf:
+ zipf.write(tmpFile, arcname="cwe.xml")
+
+ class fakeRequest:
+ def iter_content(self, chunk_size=None):
+ with open(tmpzip, "rb") as zipf:
+ return [zipf.read()]
+
+ mock_requests.return_value = fakeRequest()
+
+ injection_cre = defs.CRE(id="760-764", name="Injection protection")
+ xss_cre = defs.CRE(id="760-765", name="XSS protection")
+ xxe_cre = defs.CRE(id="764-507", name="Restrict XML parsing (against XXE)")
+ auth_cre = defs.CRE(
+ id="117-371", name="Use a centralized access control mechanism"
+ )
+ authn_cre = defs.CRE(
+ id="113-133", name="Use centralized authentication mechanism"
+ )
+ csrf_cre = defs.CRE(id="028-727", name="CSRF protection")
+ ssrf_cre = defs.CRE(id="028-728", name="SSRF protection")
+ hardcoded_secret_cre = defs.CRE(
+ id="774-888", name="Do not store secrets in the code"
+ )
+ password_storage_cre = defs.CRE(
+ id="622-203", name="Store passwords salted and hashed"
+ )
+ credential_storage_cre = defs.CRE(
+ id="881-321", name="Store credentials securely"
+ )
+ session_management_cre = defs.CRE(id="177-260", name="Session management")
+ secure_cookie_cre = defs.CRE(
+ id="688-081", name='Set "secure" attribute for cookie-based session tokens'
+ )
+ deserialization_cre = defs.CRE(id="836-068", name="Deserialization Prevention")
+ self.collection.add_cre(cre=injection_cre)
+ self.collection.add_cre(cre=xss_cre)
+ self.collection.add_cre(cre=xxe_cre)
+ self.collection.add_cre(cre=auth_cre)
+ self.collection.add_cre(cre=authn_cre)
+ self.collection.add_cre(cre=csrf_cre)
+ self.collection.add_cre(cre=ssrf_cre)
+ self.collection.add_cre(cre=hardcoded_secret_cre)
+ self.collection.add_cre(cre=password_storage_cre)
+ self.collection.add_cre(cre=credential_storage_cre)
+ self.collection.add_cre(cre=session_management_cre)
+ self.collection.add_cre(cre=secure_cookie_cre)
+ self.collection.add_cre(cre=deserialization_cre)
+
+ entries = cwe.CWE().parse(
+ cache=self.collection,
+ ph=prompt_client.PromptHandler(database=self.collection),
+ )
+ imported_cwes = {node.sectionID: node for node in entries.results["CWE"]}
+
+ self.assertEqual(
+ imported_cwes["89"].links[0].document.todict(), injection_cre.todict()
+ )
+ self.assertEqual(
+ imported_cwes["79"].links[0].document.todict(), xss_cre.todict()
+ )
+ self.assertEqual(
+ imported_cwes["611"].links[0].document.todict(), xxe_cre.todict()
+ )
+ self.assertEqual(
+ imported_cwes["612"].links[0].document.todict(), auth_cre.todict()
+ )
+ self.assertEqual(
+ imported_cwes["287"].links[0].document.todict(), authn_cre.todict()
+ )
+ self.assertEqual(
+ imported_cwes["352"].links[0].document.todict(), csrf_cre.todict()
+ )
+ self.assertEqual(
+ imported_cwes["918"].links[0].document.todict(), ssrf_cre.todict()
+ )
+ self.assertEqual(
+ imported_cwes["798"].links[0].document.todict(),
+ hardcoded_secret_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["321"].links[0].document.todict(),
+ hardcoded_secret_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["256"].links[0].document.todict(),
+ password_storage_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["257"].links[0].document.todict(),
+ password_storage_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["258"].links[0].document.todict(),
+ credential_storage_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["260"].links[0].document.todict(),
+ credential_storage_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["384"].links[0].document.todict(),
+ session_management_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["614"].links[0].document.todict(),
+ secure_cookie_cre.todict(),
+ )
+ self.assertEqual(
+ imported_cwes["502"].links[0].document.todict(),
+ deserialization_cre.todict(),
+ )
+
CWE_xml = """
"""
+
+ CWE_transitive_xml = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Padding entry so xmltodict returns a list of Weakness elements.
+
+
+
+"""
+
+ CWE_fallback_xml = """
+
+
+
+ XSS entry.
+
+
+ SQL injection entry.
+
+
+ XXE entry.
+
+
+ Authorization entry.
+
+
+ Authentication entry.
+
+
+ CSRF entry.
+
+
+ Hard-coded credentials entry.
+
+
+ Hard-coded key entry.
+
+
+ Password storage entry.
+
+
+ Recoverable password entry.
+
+
+ Password in config entry.
+
+
+ Password in config entry.
+
+
+ Session fixation entry.
+
+
+ Cookie secure attribute entry.
+
+
+ Deserialization entry.
+
+
+ SSRF entry.
+
+
+
+"""
diff --git a/application/utils/external_project_parsers/data/cwe_fallback_mappings.json b/application/utils/external_project_parsers/data/cwe_fallback_mappings.json
new file mode 100644
index 000000000..11d9d1ff8
--- /dev/null
+++ b/application/utils/external_project_parsers/data/cwe_fallback_mappings.json
@@ -0,0 +1,102 @@
+[
+ {
+ "keywords": [
+ "xml external entity",
+ "xxe"
+ ],
+ "cre_id": "764-507"
+ },
+ {
+ "keywords": [
+ "cross-site scripting",
+ " xss",
+ "(xss)"
+ ],
+ "cre_id": "760-765"
+ },
+ {
+ "keywords": [
+ "authorization",
+ "access control"
+ ],
+ "cre_id": "117-371"
+ },
+ {
+ "keywords": [
+ "improper authentication",
+ "missing authentication",
+ "authentication bypass"
+ ],
+ "cre_id": "113-133"
+ },
+ {
+ "keywords": [
+ "cross-site request forgery",
+ "(csrf)",
+ "csrf"
+ ],
+ "cre_id": "028-727"
+ },
+ {
+ "keywords": [
+ "server-side request forgery",
+ "(ssrf)",
+ "ssrf"
+ ],
+ "cre_id": "028-728"
+ },
+ {
+ "keywords": [
+ "plaintext storage of a password",
+ "storing passwords in a recoverable format"
+ ],
+ "cre_id": "622-203"
+ },
+ {
+ "keywords": [
+ "empty password in configuration file",
+ "password in configuration file"
+ ],
+ "cre_id": "881-321"
+ },
+ {
+ "keywords": [
+ "hard-coded password",
+ "hardcoded password",
+ "hard-coded credentials",
+ "hardcoded credentials",
+ "hard-coded credential",
+ "hardcoded credential",
+ "hard-coded cryptographic key",
+ "hardcoded cryptographic key",
+ "hard-coded key",
+ "hardcoded key"
+ ],
+ "cre_id": "774-888"
+ },
+ {
+ "keywords": [
+ "session fixation"
+ ],
+ "cre_id": "177-260"
+ },
+ {
+ "keywords": [
+ "sensitive cookie in https session without 'secure' attribute"
+ ],
+ "cre_id": "688-081"
+ },
+ {
+ "keywords": [
+ "deserialization of untrusted data"
+ ],
+ "cre_id": "836-068"
+ },
+ {
+ "keywords": [
+ "injection",
+ "query logic"
+ ],
+ "cre_id": "760-764"
+ }
+]
diff --git a/application/utils/external_project_parsers/parsers/cwe.py b/application/utils/external_project_parsers/parsers/cwe.py
index b0821aba5..2de35a8bb 100644
--- a/application/utils/external_project_parsers/parsers/cwe.py
+++ b/application/utils/external_project_parsers/parsers/cwe.py
@@ -1,8 +1,10 @@
import logging
import os
import tempfile
+import json
+from pathlib import Path
import requests
-from typing import Dict
+from typing import Dict, List
from application.database import db
from application.defs import cre_defs as defs
import shutil
@@ -21,6 +23,22 @@
class CWE(ParserInterface):
name = "CWE"
cwe_zip = "https://cwe.mitre.org/data/xml/cwec_latest.xml.zip"
+ fallback_mapping_path = (
+ Path(__file__).resolve().parent.parent / "data" / "cwe_fallback_mappings.json"
+ )
+
+ def __init__(self) -> None:
+ self.fallback_cre_by_match = self.load_fallback_cre_mappings()
+
+ def load_fallback_cre_mappings(self) -> List[tuple[tuple[str, ...], str]]:
+ with self.fallback_mapping_path.open("r", encoding="utf-8") as mapping_file:
+ raw_mappings = json.load(mapping_file)
+
+ mappings = []
+ for entry in raw_mappings:
+ keywords = tuple(keyword.lower() for keyword in entry["keywords"])
+ mappings.append((keywords, entry["cre_id"]))
+ return mappings
def parse(self, cache: db.Node_collection, ph: prompt_client.PromptHandler):
response = requests.get(self.cwe_zip, stream=True)
@@ -72,17 +90,74 @@ def link_to_related_cwe(
) -> defs.Standard:
related_cwes = cache.get_nodes(name="CWE", sectionID=related_id)
if related_cwes:
- for cre in [
- c.document
- for c in related_cwes[0].links
- if c.document.doctype == defs.Credoctypes.CRE
- ]:
- logger.debug(
- f"linked CWE with id {cwe.sectionID} to CRE with ID {cre.id}"
- )
- cwe.add_link(
- defs.Link(document=cre, ltype=defs.LinkTypes.AutomaticallyLinkedTo)
- )
+ return self.link_to_related_cwe_entry(cwe, related_cwes[0])
+ return cwe
+
+ def link_to_related_cwe_entry(
+ self, cwe: defs.Standard, related_cwe: defs.Standard
+ ) -> defs.Standard:
+ for cre in [
+ link.document
+ for link in related_cwe.links
+ if link.document.doctype == defs.Credoctypes.CRE
+ ]:
+ logger.debug(f"linked CWE with id {cwe.sectionID} to CRE with ID {cre.id}")
+ autolink = defs.Link(
+ document=cre, ltype=defs.LinkTypes.AutomaticallyLinkedTo
+ )
+ if not cwe.has_link(autolink):
+ cwe.add_link(autolink)
+ return cwe
+
+ def collect_related_weakness_ids(self, weakness: Dict) -> List[str]:
+ related_ids = []
+ related_weaknesses = weakness.get("Related_Weaknesses")
+ if not related_weaknesses:
+ return related_ids
+
+ containers = (
+ related_weaknesses
+ if isinstance(related_weaknesses, list)
+ else [related_weaknesses]
+ )
+ for container in containers:
+ if not isinstance(container, Dict):
+ continue
+ related_entries = container.get("Related_Weakness")
+ if not related_entries:
+ continue
+ related_entries = (
+ related_entries
+ if isinstance(related_entries, list)
+ else [related_entries]
+ )
+ for entry in related_entries:
+ if isinstance(entry, Dict) and entry.get("@CWE_ID"):
+ related_ids.append(str(entry["@CWE_ID"]))
+ return related_ids
+
+ def apply_fallback_cre_mapping(
+ self, cwe: defs.Standard, cache: db.Node_collection
+ ) -> defs.Standard:
+ if any(link.document.doctype == defs.Credoctypes.CRE for link in cwe.links):
+ return cwe
+
+ section_text = (cwe.section or "").lower()
+ for keywords, cre_id in self.fallback_cre_by_match:
+ if not any(keyword in section_text for keyword in keywords):
+ continue
+
+ matching_cres = cache.get_CREs(external_id=cre_id)
+ if not matching_cres:
+ continue
+
+ fallback_link = defs.Link(
+ document=matching_cres[0], ltype=defs.LinkTypes.AutomaticallyLinkedTo
+ )
+ if not cwe.has_link(fallback_link):
+ cwe.add_link(fallback_link)
+ return cwe
+
return cwe
# cwe is a special case because it already partially exists in our spreadsheet
@@ -91,6 +166,8 @@ def link_to_related_cwe(
def register_cwe(self, cache: db.Node_collection, xml_file: str):
statuses = {}
entries = []
+ entries_by_id = {}
+ related_ids_by_cwe = {}
with open(xml_file, "r") as xml:
weakness_catalog = xmltodict.parse(xml.read()).get("Weakness_Catalog")
for _, weaknesses in weakness_catalog.get("Weaknesses").items():
@@ -147,23 +224,31 @@ def register_cwe(self, cache: db.Node_collection, xml_file: str):
logger.info(
f"CWE '{cwe.sectionID}-{cwe.section}' does not have any related CAPEC attack patterns, skipping automated linking"
)
- if weakness.get("Related_Weaknesses"):
- if isinstance(weakness.get("Related_Weaknesses"), list):
- for related_weakness in weakness.get("Related_Weaknesses"):
- cwe = self.parse_related_weakness(
- cache, related_weakness, cwe
- )
- else:
- cwe = self.parse_related_weakness(
- cache, weakness.get("Related_Weaknesses"), cwe
- )
entries.append(cwe)
- return entries
+ entries_by_id[cwe.sectionID] = cwe
+ related_ids_by_cwe[cwe.sectionID] = (
+ self.collect_related_weakness_ids(weakness)
+ )
- def parse_related_weakness(
- self, cache: db.Node_collection, rw: Dict[str, Dict], cwe: defs.Standard
- ) -> defs.Standard:
- cwe_entry = rw.get("Related_Weakness")
- if isinstance(cwe_entry, Dict):
- id = cwe_entry["@CWE_ID"]
- return self.link_to_related_cwe(cwe=cwe, cache=cache, related_id=id)
+ changed = True
+ while changed:
+ changed = False
+ for cwe_id, related_ids in related_ids_by_cwe.items():
+ cwe = entries_by_id[cwe_id]
+ before_count = len(cwe.links)
+ for related_id in related_ids:
+ related_cwe = entries_by_id.get(related_id)
+ if related_cwe:
+ cwe = self.link_to_related_cwe_entry(cwe, related_cwe)
+ else:
+ cwe = self.link_to_related_cwe(
+ cwe=cwe, cache=cache, related_id=related_id
+ )
+ entries_by_id[cwe_id] = cwe
+ if len(cwe.links) != before_count:
+ changed = True
+
+ for cwe_id, cwe in entries_by_id.items():
+ entries_by_id[cwe_id] = self.apply_fallback_cre_mapping(cwe, cache)
+
+ return entries
diff --git a/scripts/update-cwe.sh b/scripts/update-cwe.sh
new file mode 100755
index 000000000..7c12c92e1
--- /dev/null
+++ b/scripts/update-cwe.sh
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+set -euo pipefail
+
+ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
+VENV_DIR="$ROOT_DIR/venv"
+CACHE_FILE="${1:-$ROOT_DIR/standards_cache.sqlite}"
+TIMESTAMP="$(date +%Y%m%d-%H%M%S)"
+BACKUP_FILE="${CACHE_FILE}.bak.${TIMESTAMP}"
+
+if [[ ! -d "$VENV_DIR" ]]; then
+ echo "Creating virtual environment in $VENV_DIR"
+ python3 -m venv "$VENV_DIR"
+fi
+
+source "$VENV_DIR/bin/activate"
+
+if ! python -c "import requests" >/dev/null 2>&1; then
+ echo "Installing Python dependencies"
+ pip install -r "$ROOT_DIR/requirements.txt"
+fi
+
+if [[ -f "$CACHE_FILE" ]]; then
+ cp "$CACHE_FILE" "$BACKUP_FILE"
+ echo "Backed up database to $BACKUP_FILE"
+fi
+
+export CRE_NO_NEO4J="${CRE_NO_NEO4J:-1}"
+export CRE_NO_GEN_EMBEDDINGS="${CRE_NO_GEN_EMBEDDINGS:-1}"
+
+echo "Importing latest MITRE CWE data into $CACHE_FILE"
+exec python "$ROOT_DIR/cre.py" --cwe_in --cache_file "$CACHE_FILE"