diff --git a/application/tests/web_main_test.py b/application/tests/web_main_test.py index 9e219b4ce..f66be64d7 100644 --- a/application/tests/web_main_test.py +++ b/application/tests/web_main_test.py @@ -7,6 +7,7 @@ import json import unittest import tempfile +from types import SimpleNamespace from unittest.mock import patch import redis @@ -688,7 +689,48 @@ def test_standards_from_db(self, node_mock, redis_conn_mock) -> None: headers={"Content-Type": "application/json"}, ) self.assertEqual(200, response.status_code) - self.assertEqual(expected, json.loads(response.data)) + self.assertEqual(expected + ["OpenCRE"], json.loads(response.data)) + + @patch.object(web_main.gap_analysis, "schedule") + @patch.object(db, "Node_collection") + def test_gap_analysis_supports_opencre_as_standard( + self, db_mock, schedule_mock + ) -> None: + shared_cre = defs.CRE(id="170-772", name="Cryptography", description="") + compare = defs.Standard( + name="OWASP Web Security Testing Guide (WSTG)", + section="WSTG-CRYP-04", + ) + compare.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=shared_cre.shallow_copy()) + ) + opencre = defs.CRE(id="170-772", name="Cryptography", description="") + opencre.add_link( + defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy()) + ) + + db_mock.return_value.get_gap_analysis_result.return_value = None + db_mock.return_value.gap_analysis_exists.return_value = False + db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: ( + [compare] if name == "OWASP Web Security Testing Guide (WSTG)" else [] + ) + db_mock.return_value.session.query.return_value.all.return_value = [ + SimpleNamespace(id="cre-internal-1") + ] + db_mock.return_value.get_CREs.return_value = [opencre] + + with self.app.test_client() as client: + response = client.get( + "/rest/v1/map_analysis?standard=OpenCRE&standard=OWASP%20Web%20Security%20Testing%20Guide%20(WSTG)", + headers={"Content-Type": "application/json"}, + ) + + payload = json.loads(response.data) + self.assertEqual(200, response.status_code) + self.assertIn("result", payload) + self.assertIn(opencre.id, payload["result"]) + self.assertIn(compare.id, payload["result"][opencre.id]["paths"]) + schedule_mock.assert_not_called() def test_gap_analysis_weak_links_no_cache(self) -> None: with self.app.test_client() as client: diff --git a/application/web/web_main.py b/application/web/web_main.py index 29567470a..b33e8fe5e 100644 --- a/application/web/web_main.py +++ b/application/web/web_main.py @@ -48,6 +48,7 @@ ITEMS_PER_PAGE = 20 +OPENCRE_STANDARD_NAME = "OpenCRE" app = Blueprint( "web", @@ -294,6 +295,129 @@ def find_document_by_tag() -> Any: abort(404, "Tag does not exist") +def _get_opencre_documents(collection: db.Node_collection) -> list[defs.CRE]: + return [ + collection.get_CREs(internal_id=cre.id)[0] + for cre in collection.session.query(db.CRE).all() + ] + + +def _get_map_analysis_documents( + standard: str, collection: db.Node_collection +) -> list[defs.Document]: + if standard == OPENCRE_STANDARD_NAME: + return _get_opencre_documents(collection) + return collection.get_nodes(name=standard) + + +def _get_document_cre_ids(document: defs.Document) -> list[str]: + if document.doctype == defs.Credoctypes.CRE: + return [document.id] + return [ + link.document.id + for link in document.links + if link.document.doctype == defs.Credoctypes.CRE + ] + + +def _build_direct_overlap_path( + base_document: defs.Document, cre_id: str, compare_document: defs.Document +) -> dict[str, Any] | None: + if base_document.doctype == defs.Credoctypes.CRE: + if compare_document.doctype == defs.Credoctypes.CRE: + return None + return { + "end": compare_document.shallow_copy(), + "path": [ + { + "start": base_document.shallow_copy(), + "end": compare_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + } + ], + "score": 0, + } + + if compare_document.doctype == defs.Credoctypes.CRE: + return { + "end": compare_document.shallow_copy(), + "path": [ + { + "start": base_document.shallow_copy(), + "end": compare_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + } + ], + "score": 0, + } + + return { + "end": compare_document.shallow_copy(), + "path": [ + { + "start": base_document.shallow_copy(), + "end": defs.CRE(id=cre_id).shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + }, + { + "start": defs.CRE(id=cre_id).shallow_copy(), + "end": compare_document.shallow_copy(), + "relationship": "LINKED_TO", + "score": 0, + }, + ], + "score": 0, + } + + +def _build_direct_cre_overlap_map_analysis( + standards: list[str], + standards_hash: str, + collection: db.Node_collection, +) -> dict[str, Any] | None: + if len(standards) < 2: + return None + + base_nodes = _get_map_analysis_documents(standards[0], collection) + compare_nodes = _get_map_analysis_documents(standards[1], collection) + if not base_nodes or not compare_nodes: + return None + + compare_nodes_by_cre: dict[str, list[defs.Document]] = {} + for compare_node in compare_nodes: + for cre_id in _get_document_cre_ids(compare_node): + compare_nodes_by_cre.setdefault(cre_id, []).append(compare_node) + + grouped_paths: dict[str, dict[str, Any]] = {} + for base_node in base_nodes: + shared_paths: dict[str, Any] = {} + for cre_id in _get_document_cre_ids(base_node): + for compare_node in compare_nodes_by_cre.get(cre_id, []): + path = _build_direct_overlap_path(base_node, cre_id, compare_node) + if not path: + continue + shared_paths.setdefault(compare_node.id, path) + + if shared_paths: + grouped_paths[base_node.id] = { + "start": base_node.shallow_copy(), + "paths": shared_paths, + "extra": 0, + } + + if not grouped_paths: + return None + + result = {"result": grouped_paths} + collection.add_gap_analysis_result( + cache_key=standards_hash, ga_object=flask_json.dumps(result) + ) + return result + + @app.route("/rest/v1/map_analysis", methods=["GET"]) def map_analysis() -> Any: standards = request.args.getlist("standard") @@ -304,6 +428,14 @@ def map_analysis() -> Any: standards = request.args.getlist("standard") standards_hash = gap_analysis.make_resources_key(standards) + if OPENCRE_STANDARD_NAME in standards: + direct_gap_analysis = _build_direct_cre_overlap_map_analysis( + standards, standards_hash, database + ) + if direct_gap_analysis: + return jsonify(direct_gap_analysis) + abort(404, "No direct overlap found for requested standards") + # First, check if we have cached results in the database if database.gap_analysis_exists(standards_hash): gap_analysis_result = database.get_gap_analysis_result(standards_hash) @@ -438,7 +570,9 @@ def standards() -> Any: posthog.capture(f"standards", "") database = db.Node_collection() - standards = database.standards() + standards = list(database.standards()) + if OPENCRE_STANDARD_NAME not in standards: + standards.append(OPENCRE_STANDARD_NAME) return standards