Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion application/tests/web_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import unittest
import tempfile
from types import SimpleNamespace
from unittest.mock import patch

import redis
Expand Down Expand Up @@ -688,7 +689,48 @@ def test_standards_from_db(self, node_mock, redis_conn_mock) -> None:
headers={"Content-Type": "application/json"},
)
self.assertEqual(200, response.status_code)
self.assertEqual(expected, json.loads(response.data))
self.assertEqual(expected + ["OpenCRE"], json.loads(response.data))

@patch.object(web_main.gap_analysis, "schedule")
@patch.object(db, "Node_collection")
def test_gap_analysis_supports_opencre_as_standard(
self, db_mock, schedule_mock
) -> None:
shared_cre = defs.CRE(id="170-772", name="Cryptography", description="")
compare = defs.Standard(
name="OWASP Web Security Testing Guide (WSTG)",
section="WSTG-CRYP-04",
)
compare.add_link(
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=shared_cre.shallow_copy())
)
opencre = defs.CRE(id="170-772", name="Cryptography", description="")
opencre.add_link(
defs.Link(ltype=defs.LinkTypes.LinkedTo, document=compare.shallow_copy())
)

db_mock.return_value.get_gap_analysis_result.return_value = None
db_mock.return_value.gap_analysis_exists.return_value = False
db_mock.return_value.get_nodes.side_effect = lambda name=None, **kwargs: (
[compare] if name == "OWASP Web Security Testing Guide (WSTG)" else []
)
db_mock.return_value.session.query.return_value.all.return_value = [
SimpleNamespace(id="cre-internal-1")
]
db_mock.return_value.get_CREs.return_value = [opencre]

with self.app.test_client() as client:
response = client.get(
"/rest/v1/map_analysis?standard=OpenCRE&standard=OWASP%20Web%20Security%20Testing%20Guide%20(WSTG)",
headers={"Content-Type": "application/json"},
)

payload = json.loads(response.data)
self.assertEqual(200, response.status_code)
self.assertIn("result", payload)
self.assertIn(opencre.id, payload["result"])
self.assertIn(compare.id, payload["result"][opencre.id]["paths"])
schedule_mock.assert_not_called()

def test_gap_analysis_weak_links_no_cache(self) -> None:
with self.app.test_client() as client:
Expand Down
136 changes: 135 additions & 1 deletion application/web/web_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@


ITEMS_PER_PAGE = 20
OPENCRE_STANDARD_NAME = "OpenCRE"

app = Blueprint(
"web",
Expand Down Expand Up @@ -294,6 +295,129 @@ def find_document_by_tag() -> Any:
abort(404, "Tag does not exist")


def _get_opencre_documents(collection: db.Node_collection) -> list[defs.CRE]:
return [
collection.get_CREs(internal_id=cre.id)[0]
for cre in collection.session.query(db.CRE).all()
]


def _get_map_analysis_documents(
standard: str, collection: db.Node_collection
) -> list[defs.Document]:
if standard == OPENCRE_STANDARD_NAME:
return _get_opencre_documents(collection)
return collection.get_nodes(name=standard)


def _get_document_cre_ids(document: defs.Document) -> list[str]:
if document.doctype == defs.Credoctypes.CRE:
return [document.id]
return [
link.document.id
for link in document.links
if link.document.doctype == defs.Credoctypes.CRE
]


def _build_direct_overlap_path(
base_document: defs.Document, cre_id: str, compare_document: defs.Document
) -> dict[str, Any] | None:
if base_document.doctype == defs.Credoctypes.CRE:
if compare_document.doctype == defs.Credoctypes.CRE:
return None
return {
"end": compare_document.shallow_copy(),
"path": [
{
"start": base_document.shallow_copy(),
"end": compare_document.shallow_copy(),
"relationship": "LINKED_TO",
"score": 0,
}
],
"score": 0,
}

if compare_document.doctype == defs.Credoctypes.CRE:
return {
"end": compare_document.shallow_copy(),
"path": [
{
"start": base_document.shallow_copy(),
"end": compare_document.shallow_copy(),
"relationship": "LINKED_TO",
"score": 0,
}
],
"score": 0,
}

return {
"end": compare_document.shallow_copy(),
"path": [
{
"start": base_document.shallow_copy(),
"end": defs.CRE(id=cre_id).shallow_copy(),
"relationship": "LINKED_TO",
"score": 0,
},
{
"start": defs.CRE(id=cre_id).shallow_copy(),
"end": compare_document.shallow_copy(),
"relationship": "LINKED_TO",
"score": 0,
},
],
"score": 0,
}


def _build_direct_cre_overlap_map_analysis(
standards: list[str],
standards_hash: str,
collection: db.Node_collection,
) -> dict[str, Any] | None:
if len(standards) < 2:
return None

base_nodes = _get_map_analysis_documents(standards[0], collection)
compare_nodes = _get_map_analysis_documents(standards[1], collection)
if not base_nodes or not compare_nodes:
return None

compare_nodes_by_cre: dict[str, list[defs.Document]] = {}
for compare_node in compare_nodes:
for cre_id in _get_document_cre_ids(compare_node):
compare_nodes_by_cre.setdefault(cre_id, []).append(compare_node)

grouped_paths: dict[str, dict[str, Any]] = {}
for base_node in base_nodes:
shared_paths: dict[str, Any] = {}
for cre_id in _get_document_cre_ids(base_node):
for compare_node in compare_nodes_by_cre.get(cre_id, []):
path = _build_direct_overlap_path(base_node, cre_id, compare_node)
if not path:
continue
shared_paths.setdefault(compare_node.id, path)

if shared_paths:
grouped_paths[base_node.id] = {
"start": base_node.shallow_copy(),
"paths": shared_paths,
"extra": 0,
}

if not grouped_paths:
return None

result = {"result": grouped_paths}
collection.add_gap_analysis_result(
cache_key=standards_hash, ga_object=flask_json.dumps(result)
)
return result


@app.route("/rest/v1/map_analysis", methods=["GET"])
def map_analysis() -> Any:
standards = request.args.getlist("standard")
Expand All @@ -304,6 +428,14 @@ def map_analysis() -> Any:
standards = request.args.getlist("standard")
standards_hash = gap_analysis.make_resources_key(standards)

if OPENCRE_STANDARD_NAME in standards:
direct_gap_analysis = _build_direct_cre_overlap_map_analysis(
standards, standards_hash, database
)
if direct_gap_analysis:
return jsonify(direct_gap_analysis)
abort(404, "No direct overlap found for requested standards")

# First, check if we have cached results in the database
if database.gap_analysis_exists(standards_hash):
gap_analysis_result = database.get_gap_analysis_result(standards_hash)
Expand Down Expand Up @@ -438,7 +570,9 @@ def standards() -> Any:
posthog.capture(f"standards", "")

database = db.Node_collection()
standards = database.standards()
standards = list(database.standards())
if OPENCRE_STANDARD_NAME not in standards:
standards.append(OPENCRE_STANDARD_NAME)
return standards


Expand Down