diff --git a/AGENTS.md b/AGENTS.md index 00de5ab1..7abf694f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,7 +7,7 @@ Infrahub Python SDK - async/sync client for Infrahub infrastructure management. ```bash uv sync --all-groups --all-extras # Install all deps uv run invoke format # Format code -uv run invoke lint # All linters (code + yamllint + documentation) +uv run invoke lint # Full pipeline: ruff, yamllint, ty, mypy, markdownlint, vale uv run invoke lint-code # All linters for Python code uv run pytest tests/unit/ # Unit tests uv run pytest tests/integration/ # Integration tests diff --git a/changelog/497.fixed.md b/changelog/497.fixed.md new file mode 100644 index 00000000..b32323d1 --- /dev/null +++ b/changelog/497.fixed.md @@ -0,0 +1 @@ +Fixed Python SDK query generation regarding from_pool generated attribute value diff --git a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx index a7b82ecb..d08c7fc5 100644 --- a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx +++ b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx @@ -24,3 +24,15 @@ value(self) -> Any ```python value(self, value: Any) -> None ``` + +#### `is_from_pool_attribute` + +```python +is_from_pool_attribute(self) -> bool +``` + +Check whether this attribute's value is sourced from a resource pool. + +**Returns:** + +- True if the attribute value is a resource pool node or was explicitly allocated from a pool. diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index 8043d567..54dd99aa 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -2,7 +2,7 @@ import ipaddress from collections.abc import Callable -from typing import TYPE_CHECKING, Any, get_args +from typing import TYPE_CHECKING, Any, NamedTuple, get_args from ..protocols_base import CoreNodeBase from ..uuidt import UUIDT @@ -13,6 +13,33 @@ from ..schema import AttributeSchemaAPI +class _GraphQLPayloadAttribute(NamedTuple): + """Result of resolving an attribute value for a GraphQL mutation. + + Attributes: + payload: Key/value entries to include in the mutation payload + (e.g. ``{"value": ...}`` or ``{"from_pool": ...}``). + variables: GraphQL variable bindings for unsafe string values. + needs_metadata: When ``True``, the payload needs to append property flags/objects + """ + + payload: dict[str, Any] + variables: dict[str, Any] + needs_metadata: bool + + def to_dict(self) -> dict[str, Any]: + return {"data": self.payload, "variables": self.variables} + + def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, str | None]) -> None: + if not self.needs_metadata: + return + for prop_name, prop in properties_flag.items(): + self.payload[prop_name] = prop + + for prop_name, prop in properties_object.items(): + self.payload[prop_name] = prop + + class Attribute: """Represents an attribute of a Node, including its schema, value, and properties.""" @@ -25,8 +52,12 @@ def __init__(self, name: str, schema: AttributeSchemaAPI, data: Any | dict) -> N """ self.name = name self._schema = schema + self._from_pool: dict[str, Any] | None = None - if not isinstance(data, dict) or "value" not in data: + if isinstance(data, dict) and "from_pool" in data: + self._from_pool = data.pop("from_pool") + data.setdefault("value", None) + elif not isinstance(data, dict) or "value" not in data: data = {"value": data} self._properties_flag = PROPERTIES_FLAG @@ -76,38 +107,55 @@ def value(self, value: Any) -> None: self._value = value self.value_has_been_mutated = True - def _generate_input_data(self) -> dict | None: - data: dict[str, Any] = {} - variables: dict[str, Any] = {} - - if self.value is None: - if self._schema.optional and self.value_has_been_mutated: - data["value"] = None - return data - - if isinstance(self.value, str): - if SAFE_VALUE.match(self.value): - data["value"] = self.value - else: - var_name = f"value_{UUIDT.new().hex}" - variables[var_name] = self.value - data["value"] = f"${var_name}" - elif isinstance(self.value, get_args(IP_TYPES)): - data["value"] = self.value.with_prefixlen - elif isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): - data["from_pool"] = {"id": self.value.id} - else: - data["value"] = self.value - - for prop_name in self._properties_flag: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name) + def _initialize_graphql_payload(self) -> _GraphQLPayloadAttribute: + """Resolve the attribute value into a GraphQL mutation payload object.""" - for prop_name in self._properties_object: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name)._generate_input_data() + # Pool-based allocation (dict data or resource-pool node) + if self._from_pool is not None: + return _GraphQLPayloadAttribute(payload={"from_pool": self._from_pool}, variables={}, needs_metadata=True) + if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + return _GraphQLPayloadAttribute( + payload={"from_pool": {"id": self.value.id}}, variables={}, needs_metadata=True + ) - return {"data": data, "variables": variables} + # Null value + if self.value is None: + data = {"value": None} if (self._schema.optional and self.value_has_been_mutated) else {} + return _GraphQLPayloadAttribute(payload=data, variables={}, needs_metadata=False) + + # Unsafe strings need a variable binding to avoid injection + if isinstance(self.value, str) and not SAFE_VALUE.match(self.value): + var_name = f"value_{UUIDT.new().hex}" + return _GraphQLPayloadAttribute( + payload={"value": f"${var_name}"}, + variables={var_name: self.value}, + needs_metadata=True, + ) + + # Safe strings, IP types, and everything else + value = self.value.with_prefixlen if isinstance(self.value, get_args(IP_TYPES)) else self.value + return _GraphQLPayloadAttribute(payload={"value": value}, variables={}, needs_metadata=True) + + def _generate_input_data(self) -> _GraphQLPayloadAttribute: + """Build the input payload for a GraphQL mutation on this attribute. + + Returns a ResolvedValue object, which contains all the data required. + """ + graphql_payload = self._initialize_graphql_payload() + + properties_flag: dict[str, Any] = { + property_name: getattr(self, property_name) + for property_name in self._properties_flag + if getattr(self, property_name) is not None + } + properties_object: dict[str, str | None] = { + property_name: getattr(self, property_name)._generate_input_data() + for property_name in self._properties_object + if getattr(self, property_name) is not None + } + graphql_payload.add_properties(properties_flag, properties_object) + + return graphql_payload def _generate_query_data(self, property: bool = False, include_metadata: bool = False) -> dict | None: data: dict[str, Any] = {"value": None} @@ -128,7 +176,15 @@ def _generate_query_data(self, property: bool = False, include_metadata: bool = return data def _generate_mutation_query(self) -> dict[str, Any]: - if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + if self.is_from_pool_attribute(): # If it points to a pool, ask for the value of the pool allocated resource return {self.name: {"value": None}} return {} + + def is_from_pool_attribute(self) -> bool: + """Check whether this attribute's value is sourced from a resource pool. + + Returns: + True if the attribute value is a resource pool node or was explicitly allocated from a pool. + """ + return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 0c85c3ad..9d024cbb 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -216,7 +216,7 @@ def is_resource_pool(self) -> bool: def get_raw_graphql_data(self) -> dict | None: return self._data - def _generate_input_data( # noqa: C901, PLR0915 + def _generate_input_data( # noqa: C901 self, exclude_unmodified: bool = False, exclude_hfid: bool = False, @@ -228,27 +228,18 @@ def _generate_input_data( # noqa: C901, PLR0915 dict[str, Dict]: Representation of an input data in dict format """ - data = {} - variables = {} + data: dict[str, Any] = {} + variables: dict[str, Any] = {} for item_name in self._attributes: attr: Attribute = getattr(self, item_name) if attr._schema.read_only: continue - attr_data = attr._generate_input_data() - - # NOTE, this code has been inherited when we splitted attributes and relationships - # into 2 loops, most likely it's possible to simply it - if attr_data and isinstance(attr_data, dict): - if variable_values := attr_data.get("data"): - data[item_name] = variable_values - else: - data[item_name] = attr_data - if variable_names := attr_data.get("variables"): - variables.update(variable_names) - - elif attr_data and isinstance(attr_data, list): - data[item_name] = attr_data + graphql_payload = attr._generate_input_data() + if graphql_payload.payload: + data[item_name] = graphql_payload.payload + if graphql_payload.variables: + variables.update(graphql_payload.variables) for item_name in self._relationships: allocate_from_pool = False @@ -1011,11 +1002,7 @@ async def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute @@ -1819,11 +1806,7 @@ def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute diff --git a/tests/AGENTS.md b/tests/AGENTS.md index f3608ead..cce67364 100644 --- a/tests/AGENTS.md +++ b/tests/AGENTS.md @@ -17,6 +17,12 @@ uv run pytest tests/unit/test_client.py # Single file ```text tests/ ├── unit/ # Fast, mocked, no external deps +│ ├── ctl/ # CLI command tests +│ └── sdk/ # SDK tests +│ ├── pool/ # Resource pool allocation tests +│ ├── spec/ # Object spec tests +│ ├── checks/ # InfrahubCheck tests +│ └── ... # Core SDK tests (client, node, schema, etc.) ├── integration/ # Real Infrahub via testcontainers ├── fixtures/ # Test data (JSON, YAML) └── helpers/ # Test utilities diff --git a/tests/unit/sdk/conftest.py b/tests/unit/sdk/conftest.py index 8fb9ecf2..d28c23b9 100644 --- a/tests/unit/sdk/conftest.py +++ b/tests/unit/sdk/conftest.py @@ -1015,115 +1015,6 @@ async def ipam_ipprefix_data() -> dict[str, Any]: } -@pytest.fixture -async def ipaddress_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPAddressPool", - "namespace": "Core", - "description": "A pool of IP address resources", - "label": "IP Address Pool", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_address_type", - "kind": "Text", - "optional": False, - "description": "The object type to create when reserving a resource in the pool", - }, - { - "name": "default_prefix_length", - "kind": "Number", - "optional": True, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "ipaddresspool__resource", - "cardinality": "many", - "optional": False, - "order_weight": 4000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "ipaddresspool__ipnamespace", - "cardinality": "one", - "optional": False, - "order_weight": 5000, - }, - ], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def ipprefix_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPPrefixPool", - "namespace": "Core", - "description": "A pool of IP prefix resources", - "label": "IP Prefix Pool", - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_prefix_length", - "kind": "Number", - "description": "The default prefix length as an integer for prefixes allocated from this pool.", - "optional": True, - "order_weight": 5000, - }, - { - "name": "default_member_type", - "kind": "Text", - "enum": ["prefix", "address"], - "default_value": "prefix", - "optional": True, - "order_weight": 3000, - }, - { - "name": "default_prefix_type", - "kind": "Text", - "optional": True, - "order_weight": 4000, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "prefixpool__resource", - "cardinality": "many", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 6000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "prefixpool__ipnamespace", - "cardinality": "one", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 7000, - }, - ], - } - return NodeSchema(**data).convert_api() - - @pytest.fixture async def address_schema() -> NodeSchemaAPI: data = { @@ -2645,3 +2536,23 @@ async def nested_device_with_interfaces_schema() -> NodeSchemaAPI: ], } return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def vlan_schema() -> NodeSchemaAPI: + data = { + "name": "VLAN", + "namespace": "Infra", + "label": "VLAN", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [ + {"name": "name", "kind": "Text", "unique": True}, + {"name": "vlan_id", "kind": "Number"}, + {"name": "role", "kind": "Text", "optional": True}, + {"name": "status", "kind": "Text", "optional": True}, + ], + "relationships": [], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/__init__.py b/tests/unit/sdk/pool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/sdk/pool/conftest.py b/tests/unit/sdk/pool/conftest.py new file mode 100644 index 00000000..e8276be6 --- /dev/null +++ b/tests/unit/sdk/pool/conftest.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import pytest + +from infrahub_sdk.schema import BranchSupportType, NodeSchema, NodeSchemaAPI + + +@pytest.fixture +async def ipaddress_pool_schema() -> NodeSchemaAPI: + data = { + "name": "IPAddressPool", + "namespace": "Core", + "description": "A pool of IP address resources", + "label": "IP Address Pool", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_address_type", + "kind": "Text", + "optional": False, + "description": "The object type to create when reserving a resource in the pool", + }, + { + "name": "default_prefix_length", + "kind": "Number", + "optional": True, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "ipaddresspool__resource", + "cardinality": "many", + "optional": False, + "order_weight": 4000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "ipaddresspool__ipnamespace", + "cardinality": "one", + "optional": False, + "order_weight": 5000, + }, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def ipprefix_pool_schema() -> NodeSchemaAPI: + data = { + "name": "IPPrefixPool", + "namespace": "Core", + "description": "A pool of IP prefix resources", + "label": "IP Prefix Pool", + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_prefix_length", + "kind": "Number", + "description": "The default prefix length as an integer for prefixes allocated from this pool.", + "optional": True, + "order_weight": 5000, + }, + { + "name": "default_member_type", + "kind": "Text", + "enum": ["prefix", "address"], + "default_value": "prefix", + "optional": True, + "order_weight": 3000, + }, + { + "name": "default_prefix_type", + "kind": "Text", + "optional": True, + "order_weight": 4000, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "prefixpool__resource", + "cardinality": "many", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 6000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "prefixpool__ipnamespace", + "cardinality": "one", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 7000, + }, + ], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/test_allocate.py b/tests/unit/sdk/pool/test_allocate.py new file mode 100644 index 00000000..1ed53500 --- /dev/null +++ b/tests/unit/sdk/pool/test_allocate.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_address( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPAddressPoolGetResource": { + "ok": True, + "node": { + "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", + "kind": "IpamIPAddress", + "identifier": "test", + "display_label": "192.0.2.0/32", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPAddress", + "address": {"value": "192.0.2.0/32"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = await clients.standard.allocate_next_ip_address( + resource_pool=ip_pool, + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = clients.sync.allocate_next_ip_address( + resource_pool=ip_pool, + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + + assert ip_address + assert str(ip_address.address.value) == "192.0.2.0/32" + assert ip_address.description.value == "test" + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_prefix( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipprefix_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPPrefixPoolGetResource": { + "ok": True, + "node": { + "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPPrefix", + "identifier": "test", + "display_label": "192.0.2.0/31", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPPrefix": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPPrefix", + "prefix": {"value": "192.0.2.0/31"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = await clients.standard.allocate_next_ip_prefix( + resource_pool=ip_pool, + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = clients.sync.allocate_next_ip_prefix( + resource_pool=ip_pool, + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + + assert ip_prefix + assert str(ip_prefix.prefix.value) == "192.0.2.0/31" + assert ip_prefix.description.value == "test" diff --git a/tests/unit/sdk/pool/test_attribute_from_pool.py b/tests/unit/sdk/pool/test_attribute_from_pool.py new file mode 100644 index 00000000..75d63f6d --- /dev/null +++ b/tests/unit/sdk/pool/test_attribute_from_pool.py @@ -0,0 +1,204 @@ +""" +When using from_pool on a number attribute (e.g. vlan_id), the SDK should generate: + vlan_id: { from_pool: { id: "...", identifier: "..." } } + +There are two ways to request a pool allocation: +1. Dict-based: {"from_pool": {"id": "...", "identifier": "..."}} +2. Node-based: pass an InfrahubNode pool object as the attribute value +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient, InfrahubClientSync + from infrahub_sdk.schema import NodeSchemaAPI + + +POOL_ID = "185b9728-1b76-dda7-d13d-106529b1bcd9" + + +# ────────────────────────────────────────────── +# Dict-based from_pool - async client +# ────────────────────────────────────────────── + + +async def test_number_attribute_from_pool_with_identifier( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_number_attribute_regular_value( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +async def test_number_attribute_from_pool_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A from_pool dict attribute should request value back in the mutation query.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + mutation_query = node._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} + + +# ────────────────────────────────────────────── +# Dict-based from_pool - sync client +# ────────────────────────────────────────────── + + +async def test_sync_number_attribute_from_pool_with_identifier( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value (sync client).""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_sync_number_attribute_regular_value( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before (sync client).""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +# ────────────────────────────────────────────── +# Node-based from_pool - async client +# ────────────────────────────────────────────── + +NODE_POOL_ID = "185b9728-1b56-dda7-d13d-106535b1bcd9" + + +async def test_attribute_with_pool_node_generates_from_pool( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_input_data should produce from_pool.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + input_data = vlan._generate_input_data()["data"]["data"] + + assert input_data["vlan_id"] == {"from_pool": {"id": NODE_POOL_ID}} + assert "value" not in input_data["vlan_id"] + + +async def test_attribute_with_pool_node_generates_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_mutation_query should request value back.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + mutation_query = vlan._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} diff --git a/tests/unit/sdk/pool/test_pool_queries.py b/tests/unit/sdk/pool/test_pool_queries.py new file mode 100644 index 00000000..4f27cba7 --- /dev/null +++ b/tests/unit/sdk/pool/test_pool_queries.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_allocated_resources( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolAllocated": { + "count": 2, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-1", + } + }, + { + "node": { + "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-2", + } + }, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 2, + "edges": [ + {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, + {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_resources_utilization( + httpx_mock: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolUtilization": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd86-3471-a020-2782-179ff078e58f", + "utilization": 93.75, + "utilization_branches": 0, + "utilization_default_branch": 93.75, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = await ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 diff --git a/tests/unit/sdk/pool/test_relationship_from_pool.py b/tests/unit/sdk/pool/test_relationship_from_pool.py new file mode 100644 index 00000000..9ce543dc --- /dev/null +++ b/tests/unit/sdk/pool/test_relationship_from_pool.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from infrahub_sdk import InfrahubClient + from infrahub_sdk.schema import NodeSchemaAPI + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_input_data_with_resource_pool_relationship( + client: InfrahubClient, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNodeSync( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_input_data()["data"] == { + "data": { + "name": {"value": "device-01"}, + "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, + "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, + }, + } + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_mutation_query_with_resource_pool_relationship( + client: InfrahubClient, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_mutation_query() == { + "object": { + "id": None, + "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, + "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, + }, + "ok": None, + } diff --git a/tests/unit/sdk/test_attribute_generate_input_data.py b/tests/unit/sdk/test_attribute_generate_input_data.py new file mode 100644 index 00000000..a50a2fe9 --- /dev/null +++ b/tests/unit/sdk/test_attribute_generate_input_data.py @@ -0,0 +1,395 @@ +"""Unit tests for Attribute._generate_input_data covering all code paths.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from infrahub_sdk.node.attribute import Attribute +from infrahub_sdk.protocols_base import CoreNodeBase +from infrahub_sdk.schema import AttributeSchemaAPI + +# ────────────────────────────────────────────── +# Value resolution: from_pool (dict-based) +# ────────────────────────────────────────────── + + +class TestFromPoolDict: + def test_from_pool_with_id(self) -> None: + pool_data = {"id": "pool-uuid-1"} + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "pool-uuid-1"}} + assert result.variables == {} + + def test_from_pool_with_id_and_identifier(self) -> None: + pool_data = {"id": "pool-uuid-1", "identifier": "test"} + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "pool-uuid-1", "identifier": "test"}} + assert result.variables == {} + + def test_from_pool_with_pool_name(self) -> None: + """from_pool can be a plain string (pool name), e.g. from_pool: 'VLAN ID Pool'.""" + attr = Attribute( + name="vlan_id", schema=_make_schema("Number", optional=True), data={"from_pool": "VLAN ID Pool"} + ) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": "VLAN ID Pool"} + assert result.variables == {} + assert "value" not in result.payload + + def test_from_pool_value_is_none(self) -> None: + """from_pool pops 'from_pool' and sets Attribute.value to None; value should NOT appear in payload.""" + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": {"id": "pool-uuid-1"}}) + + assert attr.value is None + result = attr._generate_input_data() + assert "value" not in result.payload + + +# ────────────────────────────────────────────── +# Value resolution: from_pool (node-based) +# ────────────────────────────────────────────── + + +class TestFromPoolNode: + def test_pool_node_generates_from_pool(self) -> None: + pool_node = _FakeNode(node_id="node-pool-uuid", is_pool=True) + + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=pool_node) + + result = attr._generate_input_data() + + assert result.payload == {"from_pool": {"id": "node-pool-uuid"}} + assert result.variables == {} + + def test_non_pool_node_treated_as_regular_value(self) -> None: + """A CoreNodeBase that is NOT a resource pool should go through the normal value path.""" + node = _FakeNode(node_id="regular-node-uuid", is_pool=False) + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=node) + + result = attr._generate_input_data() + + assert result.payload == {"value": node} + + +# ────────────────────────────────────────────── +# Value resolution: null values +# ────────────────────────────────────────────── + + +class TestNullValue: + def test_null_value_not_mutated(self) -> None: + """None value that was never mutated → empty payload, no properties.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": None}) + + result = attr._generate_input_data() + + assert result.payload == {} + assert result.variables == {} + assert result.needs_metadata is False + + def test_null_value_mutated_optional(self) -> None: + """None value on an optional attr that was mutated → explicit null.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=True), data={"value": "initial"}) + attr.value = None # triggers value_has_been_mutated + + result = attr._generate_input_data() + + assert result.payload == {"value": None} + assert result.needs_metadata is False + + def test_null_value_mutated_non_optional(self) -> None: + """None value on a non-optional attr that was mutated → empty payload (same as not mutated).""" + attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=False), data={"value": "initial"}) + attr.value = None + + result = attr._generate_input_data() + + assert result.payload == {} + assert result.needs_metadata is False + + +# ────────────────────────────────────────────── +# Value resolution: strings (safe vs unsafe) +# ────────────────────────────────────────────── + + +class TestStringValues: + @pytest.mark.parametrize( + "value", + [ + pytest.param("simple", id="alphanumeric"), + pytest.param("user.name", id="dots"), + pytest.param("/opt/repos/infrahub", id="filepath"), + pytest.param("https://github.com/opsmill", id="url"), + pytest.param("", id="empty-string"), + ], + ) + def test_safe_string(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + + result = attr._generate_input_data() + + assert result.payload == {"value": value} + assert result.variables == {} + + @pytest.mark.parametrize( + "value", + [ + pytest.param('has "quotes"', id="quotes"), + pytest.param("has\nnewline", id="newline"), + pytest.param("special{chars}", id="braces"), + ], + ) + def test_unsafe_string_uses_variable_binding(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + + result = attr._generate_input_data() + + # payload["value"] should be a variable reference like "$value_" + assert "value" in result.payload + assert result.payload["value"].startswith("$value_") + # The actual string should be in variables + assert len(result.variables) == 1 + var_name = next(iter(result.variables)) + assert result.variables[var_name] == value + + +# ────────────────────────────────────────────── +# Value resolution: IP types +# ────────────────────────────────────────────── + + +class TestIPValues: + def test_ipv4_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "10.0.0.1/24"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "10.0.0.1/24" + assert result.variables == {} + + def test_ipv6_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "2001:db8::1/64"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "2001:db8::1/64" + + def test_ipv4_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "10.0.0.0/24"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "10.0.0.0/24" + + def test_ipv6_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "2001:db8::/32"}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "2001:db8::/32" + + +# ────────────────────────────────────────────── +# Value resolution: other scalars +# ────────────────────────────────────────────── + + +class TestScalarValues: + def test_number_value(self) -> None: + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=42) + + result = attr._generate_input_data() + + assert result.payload == {"value": 42} + assert result.variables == {} + + def test_boolean_value(self) -> None: + attr = Attribute(name="enabled", schema=_make_schema("Boolean"), data=True) + + result = attr._generate_input_data() + + assert result.payload == {"value": True} + + +# ────────────────────────────────────────────── +# Property handling +# ────────────────────────────────────────────── + + +class TestProperties: + def test_no_properties_set(self) -> None: + """When no properties are set, payload only has the value.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + + result = attr._generate_input_data() + + assert result.payload == {"value": "hello"} + + def test_flag_property_is_protected(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": "hello", "is_protected": True}) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True + + def test_object_property_source(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={"value": "hello", "source": {"id": "source-uuid", "display_label": "Git", "__typename": "CoreGit"}}, + ) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["source"] == "source-uuid" + + def test_object_property_owner(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": "hello", + "owner": {"id": "owner-uuid", "display_label": "Admin", "__typename": "CoreAccount"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload["owner"] == "owner-uuid" + + def test_both_flag_and_object_properties(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": "hello", + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True + assert result.payload["source"] == "src-uuid" + + def test_properties_not_appended_for_null_value(self) -> None: + """When need_additional_properties is False (null non-mutated), properties are ignored.""" + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": None, + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + # Null value, not mutated → empty payload, properties NOT appended + assert result.payload == {} + + def test_properties_appended_for_from_pool(self) -> None: + """from_pool payloads have need_additional_properties=True, so properties are included.""" + attr = Attribute( + name="vlan_id", + schema=_make_schema("Number"), + data={"from_pool": {"id": "pool-uuid"}, "is_protected": True}, + ) + + result = attr._generate_input_data() + + assert result.payload["from_pool"] == {"id": "pool-uuid"} + assert result.payload["is_protected"] is True + + +# ────────────────────────────────────────────── +# Return type: to_dict() integration +# ────────────────────────────────────────────── + + +class TestToDictIntegration: + def test_to_dict_simple_value(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + + result = attr._generate_input_data().to_dict() + + assert result == {"data": {"value": "hello"}, "variables": {}} + + def test_to_dict_with_variables(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data='has "quotes"') + + result = attr._generate_input_data().to_dict() + + assert "data" in result + assert "variables" in result + assert len(result["variables"]) == 1 + var_name = next(iter(result["variables"])) + assert result["variables"][var_name] == 'has "quotes"' + assert result["data"]["value"] == f"${var_name}" + + +def _make_schema(kind: str = "Text", optional: bool = False) -> AttributeSchemaAPI: + return AttributeSchemaAPI(name="test_attr", kind=kind, optional=optional) + + +class _FakeNode(CoreNodeBase): + """Minimal CoreNodeBase implementation for testing.""" + + def __init__(self, node_id: str, is_pool: bool) -> None: + self.id = node_id + self._is_pool = is_pool + self._schema: Any = None + self._internal_id = "" + self.display_label = None + self.typename = None + + @property + def hfid(self) -> list[str] | None: + return None + + @property + def hfid_str(self) -> str | None: + return None + + def get_human_friendly_id(self) -> list[str] | None: + return None + + def get_human_friendly_id_as_string(self, include_kind: bool = False) -> str | None: + return None + + def get_kind(self) -> str: + return "" + + def get_all_kinds(self) -> list[str]: + return [] + + def get_branch(self) -> str: + return "" + + def is_ip_prefix(self) -> bool: + return False + + def is_ip_address(self) -> bool: + return False + + def is_resource_pool(self) -> bool: + return self._is_pool + + def get_raw_graphql_data(self) -> dict | None: + return None diff --git a/tests/unit/sdk/test_client.py b/tests/unit/sdk/test_client.py index e9cce23e..1e883f95 100644 --- a/tests/unit/sdk/test_client.py +++ b/tests/unit/sdk/test_client.py @@ -14,11 +14,9 @@ if TYPE_CHECKING: from collections.abc import Callable, Mapping from inspect import Parameter - from typing import Any from pytest_httpx import HTTPXMock - from infrahub_sdk.schema import NodeSchemaAPI from tests.unit.sdk.conftest import BothClients pytestmark = pytest.mark.httpx_mock(can_send_already_matched_responses=True) @@ -636,208 +634,6 @@ async def test_method_filters_empty( assert len(repos) == 0 -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_address( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPAddressPoolGetResource": { - "ok": True, - "node": { - "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", - "kind": "IpamIPAddress", - "identifier": "test", - "display_label": "192.0.2.0/32", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPAddress", - "address": {"value": "192.0.2.0/32"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = await clients.standard.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = clients.sync.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - - assert ip_address - assert str(ip_address.address.value) == "192.0.2.0/32" - assert ip_address.description.value == "test" - - -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_prefix( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipprefix_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPPrefixPoolGetResource": { - "ok": True, - "node": { - "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPPrefix", - "identifier": "test", - "display_label": "192.0.2.0/31", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPPrefix": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPPrefix", - "prefix": {"value": "192.0.2.0/31"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = await clients.standard.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = clients.sync.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - - assert ip_prefix - assert str(ip_prefix.prefix.value) == "192.0.2.0/31" - assert ip_prefix.description.value == "test" - - EXPECTED_ECHO = """URL: http://mock/graphql/main QUERY: diff --git a/tests/unit/sdk/test_node.py b/tests/unit/sdk/test_node.py index 8dc18c9b..3db48edf 100644 --- a/tests/unit/sdk/test_node.py +++ b/tests/unit/sdk/test_node.py @@ -2211,289 +2211,6 @@ async def test_relationships_excluded_input_data( assert node.tags.has_update is False -@pytest.mark.parametrize("client_type", client_types) -async def test_create_input_data_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_input_data()["data"] == { - "data": { - "name": {"value": "device-01"}, - "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, - "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, - }, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_create_mutation_query_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_mutation_query() == { - "object": { - "id": None, - "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, - "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, - }, - "ok": None, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_allocated_resources( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolAllocated": { - "count": 2, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-1", - } - }, - { - "node": { - "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-2", - } - }, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 2, - "edges": [ - {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, - {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_resources_utilization( - httpx_mock: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolUtilization": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd86-3471-a020-2782-179ff078e58f", - "utilization": 93.75, - "utilization_branches": 0, - "utilization_default_branch": 93.75, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = await ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - - @pytest.mark.parametrize("client_type", client_types) async def test_from_graphql( clients: BothClients, mock_schema_query_01: HTTPXMock, location_data01: dict[str, Any], client_type: str