From 76d96d8727cbab979c4c565c2af28c2b434e0a5e Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Mon, 23 Feb 2026 09:22:55 +0100 Subject: [PATCH 01/14] IHS-156 fix SDK from_pool attribute management before querying GraphQL API --- infrahub_sdk/node/attribute.py | 32 ++- .../test_node_from_pool_number_attribute.py | 240 ++++++++++++++++++ 2 files changed, 263 insertions(+), 9 deletions(-) create mode 100644 tests/unit/sdk/test_node_from_pool_number_attribute.py diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index 8043d567..88173c4c 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -25,8 +25,12 @@ def __init__(self, name: str, schema: AttributeSchemaAPI, data: Any | dict) -> N """ self.name = name self._schema = schema + self._from_pool: dict[str, Any] | None = None - if not isinstance(data, dict) or "value" not in data: + if isinstance(data, dict) and "from_pool" in data: + self._from_pool = data.pop("from_pool") + data.setdefault("value", None) + elif not isinstance(data, dict) or "value" not in data: data = {"value": data} self._properties_flag = PROPERTIES_FLAG @@ -76,16 +80,14 @@ def value(self, value: Any) -> None: self._value = value self.value_has_been_mutated = True - def _generate_input_data(self) -> dict | None: - data: dict[str, Any] = {} - variables: dict[str, Any] = {} - - if self.value is None: + def _generate_value_data(self, data: dict[str, Any], variables: dict[str, Any]) -> dict | None: + if self._from_pool is not None: + data["from_pool"] = self._from_pool + elif self.value is None: if self._schema.optional and self.value_has_been_mutated: data["value"] = None return data - - if isinstance(self.value, str): + elif isinstance(self.value, str): if SAFE_VALUE.match(self.value): data["value"] = self.value else: @@ -98,6 +100,15 @@ def _generate_input_data(self) -> dict | None: data["from_pool"] = {"id": self.value.id} else: data["value"] = self.value + return None + + def _generate_input_data(self) -> dict | None: + data: dict[str, Any] = {} + variables: dict[str, Any] = {} + + early_return = self._generate_value_data(data, variables) + if early_return is not None: + return early_return for prop_name in self._properties_flag: if getattr(self, prop_name) is not None: @@ -128,7 +139,10 @@ def _generate_query_data(self, property: bool = False, include_metadata: bool = return data def _generate_mutation_query(self) -> dict[str, Any]: - if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + if self._from_pool_attribute(): # If it points to a pool, ask for the value of the pool allocated resource return {self.name: {"value": None}} return {} + + def _from_pool_attribute(self) -> bool | Any: + return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None diff --git a/tests/unit/sdk/test_node_from_pool_number_attribute.py b/tests/unit/sdk/test_node_from_pool_number_attribute.py new file mode 100644 index 00000000..0ac57945 --- /dev/null +++ b/tests/unit/sdk/test_node_from_pool_number_attribute.py @@ -0,0 +1,240 @@ +""" +When using from_pool on a number attribute (e.g. vlan_id), the SDK should generate: + vlan_id: { from_pool: { id: "...", identifier: "..." } } + +Instead, it currently wraps it in value: + vlan_id: { value: { from_pool: { id: "...", identifier: "..." } } } + +This causes a GraphQL error: 'Expected value of type BigInt, found {from_pool: ...}' + +There are two ways to request a pool allocation: +1. Dict-based: {"from_pool": {"id": "...", "identifier": "..."}} +2. Node-based: pass an InfrahubNode pool object as the attribute value +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync +from infrahub_sdk.schema import NodeSchema + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient, InfrahubClientSync + from infrahub_sdk.schema import NodeSchemaAPI + + +POOL_ID = "185b9728-1b76-dda7-d13d-106529b1bcd9" + + +@pytest.fixture +async def vlan_schema() -> NodeSchemaAPI: + data = { + "name": "VLAN", + "namespace": "Infra", + "label": "VLAN", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [ + {"name": "name", "kind": "Text", "unique": True}, + {"name": "vlan_id", "kind": "Number"}, + {"name": "role", "kind": "Text", "optional": True}, + {"name": "status", "kind": "Text", "optional": True}, + ], + "relationships": [], + } + return NodeSchema(**data).convert_api() + + +# ────────────────────────────────────────────── +# Dict-based from_pool – async client +# ────────────────────────────────────────────── + + +async def test_number_attribute_from_pool_with_identifier( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value. + + This is the exact scenario from the IHS-156 bug report where a user wants + to use an identifier for idempotent number pool allocation. + """ + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_number_attribute_regular_value( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +async def test_number_attribute_from_pool_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, +) -> None: + """A from_pool dict attribute should request value back in the mutation query.""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + } + node = InfrahubNode(client=client, schema=vlan_schema, data=data) + + # Act + mutation_query = node._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} + + +# ────────────────────────────────────────────── +# Dict-based from_pool – sync client +# ────────────────────────────────────────────── + + +async def test_sync_number_attribute_from_pool_with_identifier( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """A number attribute with from_pool and identifier should NOT be wrapped in value (sync client). + + This is the exact scenario from the IHS-156 bug report where a user wants + to use an identifier for idempotent number pool allocation. + """ + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, + "role": "user", + "status": "active", + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["role"] == {"value": "user"} + assert input_data["status"] == {"value": "active"} + assert input_data["vlan_id"] == {"from_pool": {"id": POOL_ID, "identifier": "test"}} + assert "value" not in input_data["vlan_id"] + + +async def test_sync_number_attribute_regular_value( + client_sync: InfrahubClientSync, + vlan_schema: NodeSchemaAPI, +) -> None: + """Regular number values should still be wrapped in value as before (sync client).""" + data: dict[str, Any] = { + "name": "Example VLAN", + "vlan_id": 100, + } + node = InfrahubNodeSync(client=client_sync, schema=vlan_schema, data=data) + + # Act + input_data = node._generate_input_data()["data"]["data"] + + assert input_data["name"] == {"value": "Example VLAN"} + assert input_data["vlan_id"] == {"value": 100} + + +# ────────────────────────────────────────────── +# Node-based from_pool – async client +# ────────────────────────────────────────────── + +NODE_POOL_ID = "185b9728-1b56-dda7-d13d-106535b1bcd9" + + +async def test_attribute_with_pool_node_generates_from_pool( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_input_data should produce from_pool.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + input_data = vlan._generate_input_data()["data"]["data"] + + assert input_data["vlan_id"] == {"from_pool": {"id": NODE_POOL_ID}} + assert "value" not in input_data["vlan_id"] + + +async def test_attribute_with_pool_node_generates_mutation_query( + client: InfrahubClient, + vlan_schema: NodeSchemaAPI, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], +) -> None: + """When an attribute value is a CoreNodeBase pool node, _generate_mutation_query should request value back.""" + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": NODE_POOL_ID, + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + vlan = InfrahubNode( + client=client, + schema=vlan_schema, + data={"name": "Example VLAN", "vlan_id": ip_pool}, + ) + + # Act + mutation_query = vlan._generate_mutation_query() + + assert mutation_query["object"]["vlan_id"] == {"value": None} From 531a1a6b812bed56754536a8e70d293e4f5e3ae2 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Mon, 23 Feb 2026 10:10:27 +0100 Subject: [PATCH 02/14] IHS-156 refactor the test cases to have a better view on the tests perimeter --- tests/unit/sdk/conftest.py | 227 -------------- tests/unit/sdk/pool/__init__.py | 0 tests/unit/sdk/pool/conftest.py | 259 ++++++++++++++++ tests/unit/sdk/pool/test_allocate.py | 219 ++++++++++++++ .../test_attribute_from_pool.py} | 34 +-- tests/unit/sdk/pool/test_pool_queries.py | 185 ++++++++++++ .../sdk/pool/test_relationship_from_pool.py | 130 ++++++++ tests/unit/sdk/test_client.py | 204 ------------- tests/unit/sdk/test_node.py | 283 ------------------ 9 files changed, 796 insertions(+), 745 deletions(-) create mode 100644 tests/unit/sdk/pool/__init__.py create mode 100644 tests/unit/sdk/pool/conftest.py create mode 100644 tests/unit/sdk/pool/test_allocate.py rename tests/unit/sdk/{test_node_from_pool_number_attribute.py => pool/test_attribute_from_pool.py} (88%) create mode 100644 tests/unit/sdk/pool/test_pool_queries.py create mode 100644 tests/unit/sdk/pool/test_relationship_from_pool.py diff --git a/tests/unit/sdk/conftest.py b/tests/unit/sdk/conftest.py index 8fb9ecf2..70ddd1de 100644 --- a/tests/unit/sdk/conftest.py +++ b/tests/unit/sdk/conftest.py @@ -907,223 +907,6 @@ async def ipnetwork_schema() -> NodeSchemaAPI: return NodeSchema(**data).convert_api() -@pytest.fixture -async def ipam_ipprefix_schema() -> NodeSchemaAPI: - data = { - "name": "IPNetwork", - "namespace": "Ipam", - "default_filter": "prefix__value", - "display_labels": ["prefix_value"], - "order_by": ["prefix_value"], - "inherit_from": ["BuiltinIPAddress"], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def simple_device_schema() -> NodeSchemaAPI: - data = { - "name": "Device", - "namespace": "Infra", - "label": "Device", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "attributes": [{"name": "name", "kind": "Text", "unique": True}], - "relationships": [ - { - "name": "primary_address", - "peer": "IpamIPAddress", - "label": "Primary IP Address", - "optional": True, - "cardinality": "one", - "kind": "Attribute", - }, - { - "name": "ip_address_pool", - "peer": "CoreIPAddressPool", - "label": "Address allocator", - "optional": True, - "cardinality": "one", - "kind": "Attribute", - }, - ], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def ipam_ipprefix_data() -> dict[str, Any]: - return { - "node": { - "__typename": "IpamIPPrefix", - "id": "llllllll-llll-llll-llll-llllllllllll", - "display_label": "192.0.2.0/24", - "prefix": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - "value": "192.0.2.0/24", - }, - "description": { - "is_protected": False, - "owner": None, - "source": None, - "value": None, - }, - "member_type": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - "value": "address", - }, - "is_pool": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - "value": False, - }, - "ip_namespace": { - "properties": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - }, - "node": { - "id": "rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr", - "display_label": "default", - "__typename": "IpamNamespace", - }, - }, - } - } - - -@pytest.fixture -async def ipaddress_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPAddressPool", - "namespace": "Core", - "description": "A pool of IP address resources", - "label": "IP Address Pool", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_address_type", - "kind": "Text", - "optional": False, - "description": "The object type to create when reserving a resource in the pool", - }, - { - "name": "default_prefix_length", - "kind": "Number", - "optional": True, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "ipaddresspool__resource", - "cardinality": "many", - "optional": False, - "order_weight": 4000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "ipaddresspool__ipnamespace", - "cardinality": "one", - "optional": False, - "order_weight": 5000, - }, - ], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def ipprefix_pool_schema() -> NodeSchemaAPI: - data = { - "name": "IPPrefixPool", - "namespace": "Core", - "description": "A pool of IP prefix resources", - "label": "IP Prefix Pool", - "include_in_menu": False, - "branch": BranchSupportType.AGNOSTIC.value, - "inherit_from": ["CoreResourcePool"], - "attributes": [ - { - "name": "default_prefix_length", - "kind": "Number", - "description": "The default prefix length as an integer for prefixes allocated from this pool.", - "optional": True, - "order_weight": 5000, - }, - { - "name": "default_member_type", - "kind": "Text", - "enum": ["prefix", "address"], - "default_value": "prefix", - "optional": True, - "order_weight": 3000, - }, - { - "name": "default_prefix_type", - "kind": "Text", - "optional": True, - "order_weight": 4000, - }, - ], - "relationships": [ - { - "name": "resources", - "peer": "BuiltinIPPrefix", - "kind": "Attribute", - "identifier": "prefixpool__resource", - "cardinality": "many", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 6000, - }, - { - "name": "ip_namespace", - "peer": "BuiltinIPNamespace", - "kind": "Attribute", - "identifier": "prefixpool__ipnamespace", - "cardinality": "one", - "branch": BranchSupportType.AGNOSTIC.value, - "optional": False, - "order_weight": 7000, - }, - ], - } - return NodeSchema(**data).convert_api() - - @pytest.fixture async def address_schema() -> NodeSchemaAPI: data = { @@ -2504,16 +2287,6 @@ def query_introspection() -> str: """ -@pytest.fixture -async def mock_schema_query_ipam(httpx_mock: HTTPXMock) -> HTTPXMock: - response_text = (get_fixtures_dir() / "schema_ipam.json").read_text(encoding="UTF-8") - - httpx_mock.add_response( - method="GET", url="http://mock/api/schema?branch=main", json=ujson.loads(response_text), is_reusable=True - ) - return httpx_mock - - @pytest.fixture async def mock_query_location_batch_count( httpx_mock: HTTPXMock, client: InfrahubClient, mock_schema_query_01: HTTPXMock diff --git a/tests/unit/sdk/pool/__init__.py b/tests/unit/sdk/pool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/sdk/pool/conftest.py b/tests/unit/sdk/pool/conftest.py new file mode 100644 index 00000000..0508063b --- /dev/null +++ b/tests/unit/sdk/pool/conftest.py @@ -0,0 +1,259 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import pytest +import ujson + +from infrahub_sdk.schema import BranchSupportType, NodeSchema, NodeSchemaAPI +from infrahub_sdk.utils import get_fixtures_dir + +if TYPE_CHECKING: + from pytest_httpx import HTTPXMock + + +@pytest.fixture +async def ipam_ipprefix_schema() -> NodeSchemaAPI: + data = { + "name": "IPNetwork", + "namespace": "Ipam", + "default_filter": "prefix__value", + "display_labels": ["prefix_value"], + "order_by": ["prefix_value"], + "inherit_from": ["BuiltinIPAddress"], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def simple_device_schema() -> NodeSchemaAPI: + data = { + "name": "Device", + "namespace": "Infra", + "label": "Device", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [{"name": "name", "kind": "Text", "unique": True}], + "relationships": [ + { + "name": "primary_address", + "peer": "IpamIPAddress", + "label": "Primary IP Address", + "optional": True, + "cardinality": "one", + "kind": "Attribute", + }, + { + "name": "ip_address_pool", + "peer": "CoreIPAddressPool", + "label": "Address allocator", + "optional": True, + "cardinality": "one", + "kind": "Attribute", + }, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def ipam_ipprefix_data() -> dict[str, Any]: + return { + "node": { + "__typename": "IpamIPPrefix", + "id": "llllllll-llll-llll-llll-llllllllllll", + "display_label": "192.0.2.0/24", + "prefix": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + "value": "192.0.2.0/24", + }, + "description": { + "is_protected": False, + "owner": None, + "source": None, + "value": None, + }, + "member_type": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + "value": "address", + }, + "is_pool": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + "value": False, + }, + "ip_namespace": { + "properties": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + }, + "node": { + "id": "rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr", + "display_label": "default", + "__typename": "IpamNamespace", + }, + }, + } + } + + +@pytest.fixture +async def ipaddress_pool_schema() -> NodeSchemaAPI: + data = { + "name": "IPAddressPool", + "namespace": "Core", + "description": "A pool of IP address resources", + "label": "IP Address Pool", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_address_type", + "kind": "Text", + "optional": False, + "description": "The object type to create when reserving a resource in the pool", + }, + { + "name": "default_prefix_length", + "kind": "Number", + "optional": True, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "ipaddresspool__resource", + "cardinality": "many", + "optional": False, + "order_weight": 4000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "ipaddresspool__ipnamespace", + "cardinality": "one", + "optional": False, + "order_weight": 5000, + }, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def ipprefix_pool_schema() -> NodeSchemaAPI: + data = { + "name": "IPPrefixPool", + "namespace": "Core", + "description": "A pool of IP prefix resources", + "label": "IP Prefix Pool", + "include_in_menu": False, + "branch": BranchSupportType.AGNOSTIC.value, + "inherit_from": ["CoreResourcePool"], + "attributes": [ + { + "name": "default_prefix_length", + "kind": "Number", + "description": "The default prefix length as an integer for prefixes allocated from this pool.", + "optional": True, + "order_weight": 5000, + }, + { + "name": "default_member_type", + "kind": "Text", + "enum": ["prefix", "address"], + "default_value": "prefix", + "optional": True, + "order_weight": 3000, + }, + { + "name": "default_prefix_type", + "kind": "Text", + "optional": True, + "order_weight": 4000, + }, + ], + "relationships": [ + { + "name": "resources", + "peer": "BuiltinIPPrefix", + "kind": "Attribute", + "identifier": "prefixpool__resource", + "cardinality": "many", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 6000, + }, + { + "name": "ip_namespace", + "peer": "BuiltinIPNamespace", + "kind": "Attribute", + "identifier": "prefixpool__ipnamespace", + "cardinality": "one", + "branch": BranchSupportType.AGNOSTIC.value, + "optional": False, + "order_weight": 7000, + }, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def mock_schema_query_ipam(httpx_mock: HTTPXMock) -> HTTPXMock: + response_text = (get_fixtures_dir() / "schema_ipam.json").read_text(encoding="UTF-8") + + httpx_mock.add_response( + method="GET", url="http://mock/api/schema?branch=main", json=ujson.loads(response_text), is_reusable=True + ) + return httpx_mock + + +@pytest.fixture +async def vlan_schema() -> NodeSchemaAPI: + data = { + "name": "VLAN", + "namespace": "Infra", + "label": "VLAN", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [ + {"name": "name", "kind": "Text", "unique": True}, + {"name": "vlan_id", "kind": "Number"}, + {"name": "role", "kind": "Text", "optional": True}, + {"name": "status", "kind": "Text", "optional": True}, + ], + "relationships": [], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/test_allocate.py b/tests/unit/sdk/pool/test_allocate.py new file mode 100644 index 00000000..1ed53500 --- /dev/null +++ b/tests/unit/sdk/pool/test_allocate.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_address( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPAddressPoolGetResource": { + "ok": True, + "node": { + "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", + "kind": "IpamIPAddress", + "identifier": "test", + "display_label": "192.0.2.0/32", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPAddress", + "address": {"value": "192.0.2.0/32"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = await clients.standard.allocate_next_ip_address( + resource_pool=ip_pool, + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_address = clients.sync.allocate_next_ip_address( + resource_pool=ip_pool, + identifier="test", + prefix_length=32, + address_type="IpamIPAddress", + data={"description": "test"}, + tracker="allocate-ip-loopback", + ) + + assert ip_address + assert str(ip_address.address.value) == "192.0.2.0/32" + assert ip_address.description.value == "test" + + +@pytest.mark.parametrize("client_type", client_types) +async def test_allocate_next_ip_prefix( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipprefix_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubIPPrefixPoolGetResource": { + "ok": True, + "node": { + "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPPrefix", + "identifier": "test", + "display_label": "192.0.2.0/31", + }, + } + } + }, + match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, + is_reusable=True, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPPrefix": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "__typename": "IpamIPPrefix", + "prefix": {"value": "192.0.2.0/31"}, + "description": {"value": "test"}, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, + is_reusable=True, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = await clients.standard.allocate_next_ip_prefix( + resource_pool=ip_pool, + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipprefix_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core intercos", + "default_prefix_type": "IpamIPPrefix", + "default_prefix_length": 31, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + ip_prefix = clients.sync.allocate_next_ip_prefix( + resource_pool=ip_pool, + identifier="test", + prefix_length=31, + prefix_type="IpamIPPrefix", + data={"description": "test"}, + tracker="allocate-ip-interco", + ) + + assert ip_prefix + assert str(ip_prefix.prefix.value) == "192.0.2.0/31" + assert ip_prefix.description.value == "test" diff --git a/tests/unit/sdk/test_node_from_pool_number_attribute.py b/tests/unit/sdk/pool/test_attribute_from_pool.py similarity index 88% rename from tests/unit/sdk/test_node_from_pool_number_attribute.py rename to tests/unit/sdk/pool/test_attribute_from_pool.py index 0ac57945..88b32fa7 100644 --- a/tests/unit/sdk/test_node_from_pool_number_attribute.py +++ b/tests/unit/sdk/pool/test_attribute_from_pool.py @@ -2,11 +2,6 @@ When using from_pool on a number attribute (e.g. vlan_id), the SDK should generate: vlan_id: { from_pool: { id: "...", identifier: "..." } } -Instead, it currently wraps it in value: - vlan_id: { value: { from_pool: { id: "...", identifier: "..." } } } - -This causes a GraphQL error: 'Expected value of type BigInt, found {from_pool: ...}' - There are two ways to request a pool allocation: 1. Dict-based: {"from_pool": {"id": "...", "identifier": "..."}} 2. Node-based: pass an InfrahubNode pool object as the attribute value @@ -16,10 +11,7 @@ from typing import TYPE_CHECKING, Any -import pytest - from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync -from infrahub_sdk.schema import NodeSchema if TYPE_CHECKING: from infrahub_sdk import InfrahubClient, InfrahubClientSync @@ -29,28 +21,8 @@ POOL_ID = "185b9728-1b76-dda7-d13d-106529b1bcd9" -@pytest.fixture -async def vlan_schema() -> NodeSchemaAPI: - data = { - "name": "VLAN", - "namespace": "Infra", - "label": "VLAN", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "attributes": [ - {"name": "name", "kind": "Text", "unique": True}, - {"name": "vlan_id", "kind": "Number"}, - {"name": "role", "kind": "Text", "optional": True}, - {"name": "status", "kind": "Text", "optional": True}, - ], - "relationships": [], - } - return NodeSchema(**data).convert_api() - - # ────────────────────────────────────────────── -# Dict-based from_pool – async client +# Dict-based from_pool - async client # ────────────────────────────────────────────── @@ -117,7 +89,7 @@ async def test_number_attribute_from_pool_mutation_query( # ────────────────────────────────────────────── -# Dict-based from_pool – sync client +# Dict-based from_pool - sync client # ────────────────────────────────────────────── @@ -167,7 +139,7 @@ async def test_sync_number_attribute_regular_value( # ────────────────────────────────────────────── -# Node-based from_pool – async client +# Node-based from_pool - async client # ────────────────────────────────────────────── NODE_POOL_ID = "185b9728-1b56-dda7-d13d-106535b1bcd9" diff --git a/tests/unit/sdk/pool/test_pool_queries.py b/tests/unit/sdk/pool/test_pool_queries.py new file mode 100644 index 00000000..4f27cba7 --- /dev/null +++ b/tests/unit/sdk/pool/test_pool_queries.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from pytest_httpx import HTTPXMock + + from infrahub_sdk.schema import NodeSchemaAPI + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_allocated_resources( + httpx_mock: HTTPXMock, + mock_schema_query_ipam: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolAllocated": { + "count": 2, + "edges": [ + { + "node": { + "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-1", + } + }, + { + "node": { + "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + "kind": "IpamIPAddress", + "branch": "main", + "identifier": "ip-2", + } + }, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, + ) + httpx_mock.add_response( + method="POST", + json={ + "data": { + "IpamIPAddress": { + "count": 2, + "edges": [ + {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, + {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) + assert len(resources) == 2 + assert [resource.id for resource in resources] == [ + "17d9bd8d-8fc2-70b0-278a-179f425e25cb", + "17d9bd8e-31ee-acf0-2786-179fb76f2f67", + ] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_get_pool_resources_utilization( + httpx_mock: HTTPXMock, + clients: BothClients, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + httpx_mock.add_response( + method="POST", + json={ + "data": { + "InfrahubResourcePoolUtilization": { + "count": 1, + "edges": [ + { + "node": { + "id": "17d9bd86-3471-a020-2782-179ff078e58f", + "utilization": 93.75, + "utilization_branches": 0, + "utilization_default_branch": 93.75, + } + } + ], + } + } + }, + match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, + ) + + if client_type == "standard": + ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=clients.standard, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = await ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 + else: + ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=clients.sync, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + + utilizations = ip_pool.get_pool_resources_utilization() + assert len(utilizations) == 1 + assert utilizations[0]["utilization"] == 93.75 diff --git a/tests/unit/sdk/pool/test_relationship_from_pool.py b/tests/unit/sdk/pool/test_relationship_from_pool.py new file mode 100644 index 00000000..ec4e265d --- /dev/null +++ b/tests/unit/sdk/pool/test_relationship_from_pool.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + +if TYPE_CHECKING: + from typing import Any + + from infrahub_sdk import InfrahubClient + from infrahub_sdk.schema import NodeSchemaAPI + +client_types = ["standard", "sync"] + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_input_data_with_resource_pool_relationship( + client: InfrahubClient, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_input_data()["data"] == { + "data": { + "name": {"value": "device-01"}, + "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, + "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, + }, + } + + +@pytest.mark.parametrize("client_type", client_types) +async def test_create_mutation_query_with_resource_pool_relationship( + client: InfrahubClient, + ipaddress_pool_schema: NodeSchemaAPI, + ipam_ipprefix_schema: NodeSchemaAPI, + simple_device_schema: NodeSchemaAPI, + ipam_ipprefix_data: dict[str, Any], + client_type: str, +) -> None: + if client_type == "standard": + ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNode( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + else: + ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) + ip_pool = InfrahubNodeSync( + client=client, + schema=ipaddress_pool_schema, + data={ + "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", + "name": "Core loopbacks", + "default_address_type": "IpamIPAddress", + "default_prefix_length": 32, + "ip_namespace": "ip_namespace", + "resources": [ip_prefix], + }, + ) + device = InfrahubNode( + client=client, + schema=simple_device_schema, + data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, + ) + + assert device._generate_mutation_query() == { + "object": { + "id": None, + "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, + "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, + }, + "ok": None, + } diff --git a/tests/unit/sdk/test_client.py b/tests/unit/sdk/test_client.py index e9cce23e..1e883f95 100644 --- a/tests/unit/sdk/test_client.py +++ b/tests/unit/sdk/test_client.py @@ -14,11 +14,9 @@ if TYPE_CHECKING: from collections.abc import Callable, Mapping from inspect import Parameter - from typing import Any from pytest_httpx import HTTPXMock - from infrahub_sdk.schema import NodeSchemaAPI from tests.unit.sdk.conftest import BothClients pytestmark = pytest.mark.httpx_mock(can_send_already_matched_responses=True) @@ -636,208 +634,6 @@ async def test_method_filters_empty( assert len(repos) == 0 -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_address( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPAddressPoolGetResource": { - "ok": True, - "node": { - "id": "17da1246-54f1-a9c0-2784-179f0ec5b128", - "kind": "IpamIPAddress", - "identifier": "test", - "display_label": "192.0.2.0/32", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-loopback"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPAddress", - "address": {"value": "192.0.2.0/32"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = await clients.standard.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_address = clients.sync.allocate_next_ip_address( - resource_pool=ip_pool, - identifier="test", - prefix_length=32, - address_type="IpamIPAddress", - data={"description": "test"}, - tracker="allocate-ip-loopback", - ) - - assert ip_address - assert str(ip_address.address.value) == "192.0.2.0/32" - assert ip_address.description.value == "test" - - -@pytest.mark.parametrize("client_type", client_types) -async def test_allocate_next_ip_prefix( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipprefix_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubIPPrefixPoolGetResource": { - "ok": True, - "node": { - "id": "7d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPPrefix", - "identifier": "test", - "display_label": "192.0.2.0/31", - }, - } - } - }, - match_headers={"X-Infrahub-Tracker": "allocate-ip-interco"}, - is_reusable=True, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPPrefix": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "__typename": "IpamIPPrefix", - "prefix": {"value": "192.0.2.0/31"}, - "description": {"value": "test"}, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipprefix-page1"}, - is_reusable=True, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = await clients.standard.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipprefix_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core intercos", - "default_prefix_type": "IpamIPPrefix", - "default_prefix_length": 31, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - ip_prefix = clients.sync.allocate_next_ip_prefix( - resource_pool=ip_pool, - identifier="test", - prefix_length=31, - prefix_type="IpamIPPrefix", - data={"description": "test"}, - tracker="allocate-ip-interco", - ) - - assert ip_prefix - assert str(ip_prefix.prefix.value) == "192.0.2.0/31" - assert ip_prefix.description.value == "test" - - EXPECTED_ECHO = """URL: http://mock/graphql/main QUERY: diff --git a/tests/unit/sdk/test_node.py b/tests/unit/sdk/test_node.py index 8dc18c9b..3db48edf 100644 --- a/tests/unit/sdk/test_node.py +++ b/tests/unit/sdk/test_node.py @@ -2211,289 +2211,6 @@ async def test_relationships_excluded_input_data( assert node.tags.has_update is False -@pytest.mark.parametrize("client_type", client_types) -async def test_create_input_data_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_input_data()["data"] == { - "data": { - "name": {"value": "device-01"}, - "primary_address": {"from_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}}, - "ip_address_pool": {"id": "pppppppp-pppp-pppp-pppp-pppppppppppp"}, - }, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_create_mutation_query_with_resource_pool_relationship( - client: InfrahubClient, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - simple_device_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - if client_type == "standard": - ip_prefix = InfrahubNode(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - else: - ip_prefix = InfrahubNodeSync(client=client, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=client, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - device = InfrahubNode( - client=client, - schema=simple_device_schema, - data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, - ) - - assert device._generate_mutation_query() == { - "object": { - "id": None, - "primary_address": {"node": {"__typename": None, "display_label": None, "id": None}}, - "ip_address_pool": {"node": {"__typename": None, "display_label": None, "id": None}}, - }, - "ok": None, - } - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_allocated_resources( - httpx_mock: HTTPXMock, - mock_schema_query_ipam: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolAllocated": { - "count": 2, - "edges": [ - { - "node": { - "id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-1", - } - }, - { - "node": { - "id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - "kind": "IpamIPAddress", - "branch": "main", - "identifier": "ip-2", - } - }, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-allocated-resources-page1"}, - ) - httpx_mock.add_response( - method="POST", - json={ - "data": { - "IpamIPAddress": { - "count": 2, - "edges": [ - {"node": {"id": "17d9bd8d-8fc2-70b0-278a-179f425e25cb", "__typename": "IpamIPAddress"}}, - {"node": {"id": "17d9bd8e-31ee-acf0-2786-179fb76f2f67", "__typename": "IpamIPAddress"}}, - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "query-ipamipaddress-page1"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = await ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - resources = ip_pool.get_pool_allocated_resources(resource=ip_prefix) - assert len(resources) == 2 - assert [resource.id for resource in resources] == [ - "17d9bd8d-8fc2-70b0-278a-179f425e25cb", - "17d9bd8e-31ee-acf0-2786-179fb76f2f67", - ] - - -@pytest.mark.parametrize("client_type", client_types) -async def test_get_pool_resources_utilization( - httpx_mock: HTTPXMock, - clients: BothClients, - ipaddress_pool_schema: NodeSchemaAPI, - ipam_ipprefix_schema: NodeSchemaAPI, - ipam_ipprefix_data: dict[str, Any], - client_type: str, -) -> None: - httpx_mock.add_response( - method="POST", - json={ - "data": { - "InfrahubResourcePoolUtilization": { - "count": 1, - "edges": [ - { - "node": { - "id": "17d9bd86-3471-a020-2782-179ff078e58f", - "utilization": 93.75, - "utilization_branches": 0, - "utilization_default_branch": 93.75, - } - } - ], - } - } - }, - match_headers={"X-Infrahub-Tracker": "get-pool-utilization"}, - ) - - if client_type == "standard": - ip_prefix = InfrahubNode(client=clients.standard, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNode( - client=clients.standard, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = await ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - else: - ip_prefix = InfrahubNodeSync(client=clients.sync, schema=ipam_ipprefix_schema, data=ipam_ipprefix_data) - ip_pool = InfrahubNodeSync( - client=clients.sync, - schema=ipaddress_pool_schema, - data={ - "id": "pppppppp-pppp-pppp-pppp-pppppppppppp", - "name": "Core loopbacks", - "default_address_type": "IpamIPAddress", - "default_prefix_length": 32, - "ip_namespace": "ip_namespace", - "resources": [ip_prefix], - }, - ) - - utilizations = ip_pool.get_pool_resources_utilization() - assert len(utilizations) == 1 - assert utilizations[0]["utilization"] == 93.75 - - @pytest.mark.parametrize("client_type", client_types) async def test_from_graphql( clients: BothClients, mock_schema_query_01: HTTPXMock, location_data01: dict[str, Any], client_type: str From b1f51afbc80115dff56c3790853bea7c9120b0f3 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Mon, 23 Feb 2026 14:33:39 +0100 Subject: [PATCH 03/14] towncrier regarding Github issue #497 --- changelog/497.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/497.fixed.md diff --git a/changelog/497.fixed.md b/changelog/497.fixed.md new file mode 100644 index 00000000..b32323d1 --- /dev/null +++ b/changelog/497.fixed.md @@ -0,0 +1 @@ +Fixed Python SDK query generation regarding from_pool generated attribute value From d831b3aba22227b66a81db9d000165eb06de2db6 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Mon, 23 Feb 2026 15:00:17 +0100 Subject: [PATCH 04/14] IHS-156 update AGENTS doc using the command /feedback --- AGENTS.md | 2 +- tests/AGENTS.md | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index 00de5ab1..7abf694f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -7,7 +7,7 @@ Infrahub Python SDK - async/sync client for Infrahub infrastructure management. ```bash uv sync --all-groups --all-extras # Install all deps uv run invoke format # Format code -uv run invoke lint # All linters (code + yamllint + documentation) +uv run invoke lint # Full pipeline: ruff, yamllint, ty, mypy, markdownlint, vale uv run invoke lint-code # All linters for Python code uv run pytest tests/unit/ # Unit tests uv run pytest tests/integration/ # Integration tests diff --git a/tests/AGENTS.md b/tests/AGENTS.md index f3608ead..cce67364 100644 --- a/tests/AGENTS.md +++ b/tests/AGENTS.md @@ -17,6 +17,12 @@ uv run pytest tests/unit/test_client.py # Single file ```text tests/ ├── unit/ # Fast, mocked, no external deps +│ ├── ctl/ # CLI command tests +│ └── sdk/ # SDK tests +│ ├── pool/ # Resource pool allocation tests +│ ├── spec/ # Object spec tests +│ ├── checks/ # InfrahubCheck tests +│ └── ... # Core SDK tests (client, node, schema, etc.) ├── integration/ # Real Infrahub via testcontainers ├── fixtures/ # Test data (JSON, YAML) └── helpers/ # Test utilities From ac1db9e027e26dff5188c00407e29919776cdd91 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 19:42:40 +0100 Subject: [PATCH 05/14] IHS-156 is_from_pool_attribute typing and naming --- infrahub_sdk/node/attribute.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index 88173c4c..07e7c425 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -139,10 +139,10 @@ def _generate_query_data(self, property: bool = False, include_metadata: bool = return data def _generate_mutation_query(self) -> dict[str, Any]: - if self._from_pool_attribute(): + if self.is_from_pool_attribute(): # If it points to a pool, ask for the value of the pool allocated resource return {self.name: {"value": None}} return {} - def _from_pool_attribute(self) -> bool | Any: + def is_from_pool_attribute(self) -> bool: return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None From 86a8a880bed39129d890dc82a2c24bbda13a4899 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 21:32:02 +0100 Subject: [PATCH 06/14] IHS-156 refactor generate_input_data --- infrahub_sdk/node/attribute.py | 120 ++++++++++++++++++++++----------- infrahub_sdk/node/node.py | 25 +++---- 2 files changed, 87 insertions(+), 58 deletions(-) diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index 07e7c425..ba43fc58 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -2,7 +2,7 @@ import ipaddress from collections.abc import Callable -from typing import TYPE_CHECKING, Any, get_args +from typing import TYPE_CHECKING, Any, NamedTuple, get_args from ..protocols_base import CoreNodeBase from ..uuidt import UUIDT @@ -80,45 +80,57 @@ def value(self, value: Any) -> None: self._value = value self.value_has_been_mutated = True - def _generate_value_data(self, data: dict[str, Any], variables: dict[str, Any]) -> dict | None: - if self._from_pool is not None: - data["from_pool"] = self._from_pool - elif self.value is None: - if self._schema.optional and self.value_has_been_mutated: - data["value"] = None - return data - elif isinstance(self.value, str): - if SAFE_VALUE.match(self.value): - data["value"] = self.value - else: - var_name = f"value_{UUIDT.new().hex}" - variables[var_name] = self.value - data["value"] = f"${var_name}" - elif isinstance(self.value, get_args(IP_TYPES)): - data["value"] = self.value.with_prefixlen - elif isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): - data["from_pool"] = {"id": self.value.id} - else: - data["value"] = self.value - return None - - def _generate_input_data(self) -> dict | None: - data: dict[str, Any] = {} - variables: dict[str, Any] = {} - - early_return = self._generate_value_data(data, variables) - if early_return is not None: - return early_return - - for prop_name in self._properties_flag: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name) - - for prop_name in self._properties_object: - if getattr(self, prop_name) is not None: - data[prop_name] = getattr(self, prop_name)._generate_input_data() + def _initialize_graphql_payload(self) -> _GraphQLPayloadAttribute: + """Resolve the attribute value into a GraphQL mutation payload object.""" - return {"data": data, "variables": variables} + # Pool-based allocation (dict data or resource-pool node) + if self._from_pool is not None: + return _GraphQLPayloadAttribute( + payload_dict={"from_pool": self._from_pool}, variables={}, need_additional_properties=True + ) + if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): + return _GraphQLPayloadAttribute( + payload_dict={"from_pool": {"id": self.value.id}}, variables={}, need_additional_properties=True + ) + + # Null value + if self.value is None: + data = {"value": None} if (self._schema.optional and self.value_has_been_mutated) else {} + return _GraphQLPayloadAttribute(payload_dict=data, variables={}, need_additional_properties=False) + + # Unsafe strings need a variable binding to avoid injection + if isinstance(self.value, str) and not SAFE_VALUE.match(self.value): + var_name = f"value_{UUIDT.new().hex}" + return _GraphQLPayloadAttribute( + payload_dict={"value": f"${var_name}"}, + variables={var_name: self.value}, + need_additional_properties=True, + ) + + # Safe strings, IP types, and everything else + value = self.value.with_prefixlen if isinstance(self.value, get_args(IP_TYPES)) else self.value + return _GraphQLPayloadAttribute(payload_dict={"value": value}, variables={}, need_additional_properties=True) + + def _generate_input_data(self) -> _GraphQLPayloadAttribute: + """Build the input payload for a GraphQL mutation on this attribute. + + Returns a ResolvedValue object, which contains all the data required. + """ + graphql_payload = self._initialize_graphql_payload() + + properties_flag: dict[str, Any] = { + property_name: getattr(self, property_name) + for property_name in self._properties_flag + if getattr(self, property_name) is not None + } + properties_object: dict[str, dict] = { + property_name: getattr(self, property_name)._generate_input_data() + for property_name in self._properties_object + if getattr(self, property_name) is not None + } + graphql_payload.add_properties(properties_flag, properties_object) + + return graphql_payload def _generate_query_data(self, property: bool = False, include_metadata: bool = False) -> dict | None: data: dict[str, Any] = {"value": None} @@ -139,10 +151,36 @@ def _generate_query_data(self, property: bool = False, include_metadata: bool = return data def _generate_mutation_query(self) -> dict[str, Any]: - if self.is_from_pool_attribute(): + if self._is_from_pool_attribute(): # If it points to a pool, ask for the value of the pool allocated resource return {self.name: {"value": None}} return {} - def is_from_pool_attribute(self) -> bool: + def _is_from_pool_attribute(self) -> bool: return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None + + +class _GraphQLPayloadAttribute(NamedTuple): + """Result of resolving an attribute value for a GraphQL mutation. + + Attributes: + payload_dict: Key/value entries to include in the mutation payload + (e.g. ``{"value": ...}`` or ``{"from_pool": ...}``). + variables: GraphQL variable bindings for unsafe string values. + need_additional_properties: When ``True``, the payload needs to append property flags/objects + """ + + payload_dict: dict[str, Any] + variables: dict[str, Any] + need_additional_properties: bool + + def to_dict(self) -> dict[str, Any]: + return {"data": self.payload_dict, "variables": self.variables} + + def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, dict]) -> None: + if self.need_additional_properties: + for prop_name, prop in properties_flag.items(): + self.payload_dict[prop_name] = prop + + for prop_name, prop in properties_object.items(): + self.payload_dict[prop_name] = prop diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 0c85c3ad..e723f55a 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -216,7 +216,7 @@ def is_resource_pool(self) -> bool: def get_raw_graphql_data(self) -> dict | None: return self._data - def _generate_input_data( # noqa: C901, PLR0915 + def _generate_input_data( # noqa: C901 self, exclude_unmodified: bool = False, exclude_hfid: bool = False, @@ -228,27 +228,18 @@ def _generate_input_data( # noqa: C901, PLR0915 dict[str, Dict]: Representation of an input data in dict format """ - data = {} - variables = {} + data: dict[str, Any] = {} + variables: dict[str, Any] = {} for item_name in self._attributes: attr: Attribute = getattr(self, item_name) if attr._schema.read_only: continue - attr_data = attr._generate_input_data() - - # NOTE, this code has been inherited when we splitted attributes and relationships - # into 2 loops, most likely it's possible to simply it - if attr_data and isinstance(attr_data, dict): - if variable_values := attr_data.get("data"): - data[item_name] = variable_values - else: - data[item_name] = attr_data - if variable_names := attr_data.get("variables"): - variables.update(variable_names) - - elif attr_data and isinstance(attr_data, list): - data[item_name] = attr_data + graphql_payload = attr._generate_input_data() + if graphql_payload.payload_dict: + data[item_name] = graphql_payload.payload_dict + if graphql_payload.variables: + variables.update(graphql_payload.variables) for item_name in self._relationships: allocate_from_pool = False From 1c5c452e775077b9d03c106ef8fd55575e103e56 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 22:19:25 +0100 Subject: [PATCH 07/14] IHS-156 remove Jira issue related comment in the tests --- tests/unit/sdk/pool/test_attribute_from_pool.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/tests/unit/sdk/pool/test_attribute_from_pool.py b/tests/unit/sdk/pool/test_attribute_from_pool.py index 88b32fa7..75d63f6d 100644 --- a/tests/unit/sdk/pool/test_attribute_from_pool.py +++ b/tests/unit/sdk/pool/test_attribute_from_pool.py @@ -30,11 +30,7 @@ async def test_number_attribute_from_pool_with_identifier( client: InfrahubClient, vlan_schema: NodeSchemaAPI, ) -> None: - """A number attribute with from_pool and identifier should NOT be wrapped in value. - - This is the exact scenario from the IHS-156 bug report where a user wants - to use an identifier for idempotent number pool allocation. - """ + """A number attribute with from_pool and identifier should NOT be wrapped in value.""" data: dict[str, Any] = { "name": "Example VLAN", "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, @@ -97,11 +93,7 @@ async def test_sync_number_attribute_from_pool_with_identifier( client_sync: InfrahubClientSync, vlan_schema: NodeSchemaAPI, ) -> None: - """A number attribute with from_pool and identifier should NOT be wrapped in value (sync client). - - This is the exact scenario from the IHS-156 bug report where a user wants - to use an identifier for idempotent number pool allocation. - """ + """A number attribute with from_pool and identifier should NOT be wrapped in value (sync client).""" data: dict[str, Any] = { "name": "Example VLAN", "vlan_id": {"from_pool": {"id": POOL_ID, "identifier": "test"}}, From 6b33e0d97d196f08f6b5d8a4c6dc3e2efc2152bd Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 22:46:27 +0100 Subject: [PATCH 08/14] IHS-156 removed a part of fixtures to upper level --- tests/unit/sdk/conftest.py | 138 ++++++++++++++++++++++++++++++ tests/unit/sdk/pool/conftest.py | 145 -------------------------------- 2 files changed, 138 insertions(+), 145 deletions(-) diff --git a/tests/unit/sdk/conftest.py b/tests/unit/sdk/conftest.py index 70ddd1de..d28c23b9 100644 --- a/tests/unit/sdk/conftest.py +++ b/tests/unit/sdk/conftest.py @@ -907,6 +907,114 @@ async def ipnetwork_schema() -> NodeSchemaAPI: return NodeSchema(**data).convert_api() +@pytest.fixture +async def ipam_ipprefix_schema() -> NodeSchemaAPI: + data = { + "name": "IPNetwork", + "namespace": "Ipam", + "default_filter": "prefix__value", + "display_labels": ["prefix_value"], + "order_by": ["prefix_value"], + "inherit_from": ["BuiltinIPAddress"], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def simple_device_schema() -> NodeSchemaAPI: + data = { + "name": "Device", + "namespace": "Infra", + "label": "Device", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [{"name": "name", "kind": "Text", "unique": True}], + "relationships": [ + { + "name": "primary_address", + "peer": "IpamIPAddress", + "label": "Primary IP Address", + "optional": True, + "cardinality": "one", + "kind": "Attribute", + }, + { + "name": "ip_address_pool", + "peer": "CoreIPAddressPool", + "label": "Address allocator", + "optional": True, + "cardinality": "one", + "kind": "Attribute", + }, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def ipam_ipprefix_data() -> dict[str, Any]: + return { + "node": { + "__typename": "IpamIPPrefix", + "id": "llllllll-llll-llll-llll-llllllllllll", + "display_label": "192.0.2.0/24", + "prefix": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + "value": "192.0.2.0/24", + }, + "description": { + "is_protected": False, + "owner": None, + "source": None, + "value": None, + }, + "member_type": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + "value": "address", + }, + "is_pool": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + "value": False, + }, + "ip_namespace": { + "properties": { + "is_protected": True, + "owner": None, + "source": { + "__typename": "Account", + "display_label": "CRM", + "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", + }, + }, + "node": { + "id": "rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr", + "display_label": "default", + "__typename": "IpamNamespace", + }, + }, + } + } + + @pytest.fixture async def address_schema() -> NodeSchemaAPI: data = { @@ -2287,6 +2395,16 @@ def query_introspection() -> str: """ +@pytest.fixture +async def mock_schema_query_ipam(httpx_mock: HTTPXMock) -> HTTPXMock: + response_text = (get_fixtures_dir() / "schema_ipam.json").read_text(encoding="UTF-8") + + httpx_mock.add_response( + method="GET", url="http://mock/api/schema?branch=main", json=ujson.loads(response_text), is_reusable=True + ) + return httpx_mock + + @pytest.fixture async def mock_query_location_batch_count( httpx_mock: HTTPXMock, client: InfrahubClient, mock_schema_query_01: HTTPXMock @@ -2418,3 +2536,23 @@ async def nested_device_with_interfaces_schema() -> NodeSchemaAPI: ], } return NodeSchema(**data).convert_api() + + +@pytest.fixture +async def vlan_schema() -> NodeSchemaAPI: + data = { + "name": "VLAN", + "namespace": "Infra", + "label": "VLAN", + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [ + {"name": "name", "kind": "Text", "unique": True}, + {"name": "vlan_id", "kind": "Number"}, + {"name": "role", "kind": "Text", "optional": True}, + {"name": "status", "kind": "Text", "optional": True}, + ], + "relationships": [], + } + return NodeSchema(**data).convert_api() diff --git a/tests/unit/sdk/pool/conftest.py b/tests/unit/sdk/pool/conftest.py index 0508063b..e8276be6 100644 --- a/tests/unit/sdk/pool/conftest.py +++ b/tests/unit/sdk/pool/conftest.py @@ -1,123 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any - import pytest -import ujson from infrahub_sdk.schema import BranchSupportType, NodeSchema, NodeSchemaAPI -from infrahub_sdk.utils import get_fixtures_dir - -if TYPE_CHECKING: - from pytest_httpx import HTTPXMock - - -@pytest.fixture -async def ipam_ipprefix_schema() -> NodeSchemaAPI: - data = { - "name": "IPNetwork", - "namespace": "Ipam", - "default_filter": "prefix__value", - "display_labels": ["prefix_value"], - "order_by": ["prefix_value"], - "inherit_from": ["BuiltinIPAddress"], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def simple_device_schema() -> NodeSchemaAPI: - data = { - "name": "Device", - "namespace": "Infra", - "label": "Device", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "attributes": [{"name": "name", "kind": "Text", "unique": True}], - "relationships": [ - { - "name": "primary_address", - "peer": "IpamIPAddress", - "label": "Primary IP Address", - "optional": True, - "cardinality": "one", - "kind": "Attribute", - }, - { - "name": "ip_address_pool", - "peer": "CoreIPAddressPool", - "label": "Address allocator", - "optional": True, - "cardinality": "one", - "kind": "Attribute", - }, - ], - } - return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def ipam_ipprefix_data() -> dict[str, Any]: - return { - "node": { - "__typename": "IpamIPPrefix", - "id": "llllllll-llll-llll-llll-llllllllllll", - "display_label": "192.0.2.0/24", - "prefix": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - "value": "192.0.2.0/24", - }, - "description": { - "is_protected": False, - "owner": None, - "source": None, - "value": None, - }, - "member_type": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - "value": "address", - }, - "is_pool": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - "value": False, - }, - "ip_namespace": { - "properties": { - "is_protected": True, - "owner": None, - "source": { - "__typename": "Account", - "display_label": "CRM", - "id": "cccccccc-cccc-cccc-cccc-cccccccccccc", - }, - }, - "node": { - "id": "rrrrrrrr-rrrr-rrrr-rrrr-rrrrrrrrrrrr", - "display_label": "default", - "__typename": "IpamNamespace", - }, - }, - } - } @pytest.fixture @@ -227,33 +112,3 @@ async def ipprefix_pool_schema() -> NodeSchemaAPI: ], } return NodeSchema(**data).convert_api() - - -@pytest.fixture -async def mock_schema_query_ipam(httpx_mock: HTTPXMock) -> HTTPXMock: - response_text = (get_fixtures_dir() / "schema_ipam.json").read_text(encoding="UTF-8") - - httpx_mock.add_response( - method="GET", url="http://mock/api/schema?branch=main", json=ujson.loads(response_text), is_reusable=True - ) - return httpx_mock - - -@pytest.fixture -async def vlan_schema() -> NodeSchemaAPI: - data = { - "name": "VLAN", - "namespace": "Infra", - "label": "VLAN", - "default_filter": "name__value", - "order_by": ["name__value"], - "display_labels": ["name__value"], - "attributes": [ - {"name": "name", "kind": "Text", "unique": True}, - {"name": "vlan_id", "kind": "Number"}, - {"name": "role", "kind": "Text", "optional": True}, - {"name": "status", "kind": "Text", "optional": True}, - ], - "relationships": [], - } - return NodeSchema(**data).convert_api() From eaf4d4d88900bc3039531117c699e7cdcc29fd3e Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 22:46:43 +0100 Subject: [PATCH 09/14] IHS-156 tested all cases of generated_input_data --- .../sdk/test_attribute_generate_input_data.py | 395 ++++++++++++++++++ 1 file changed, 395 insertions(+) create mode 100644 tests/unit/sdk/test_attribute_generate_input_data.py diff --git a/tests/unit/sdk/test_attribute_generate_input_data.py b/tests/unit/sdk/test_attribute_generate_input_data.py new file mode 100644 index 00000000..8b1426bb --- /dev/null +++ b/tests/unit/sdk/test_attribute_generate_input_data.py @@ -0,0 +1,395 @@ +"""Unit tests for Attribute._generate_input_data covering all code paths.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from infrahub_sdk.node.attribute import Attribute +from infrahub_sdk.protocols_base import CoreNodeBase +from infrahub_sdk.schema import AttributeSchemaAPI + +# ────────────────────────────────────────────── +# Value resolution: from_pool (dict-based) +# ────────────────────────────────────────────── + + +class TestFromPoolDict: + def test_from_pool_with_id(self) -> None: + pool_data = {"id": "pool-uuid-1"} + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload_dict == {"from_pool": {"id": "pool-uuid-1"}} + assert result.variables == {} + + def test_from_pool_with_id_and_identifier(self) -> None: + pool_data = {"id": "pool-uuid-1", "identifier": "test"} + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": pool_data}) + + result = attr._generate_input_data() + + assert result.payload_dict == {"from_pool": {"id": "pool-uuid-1", "identifier": "test"}} + assert result.variables == {} + + def test_from_pool_with_pool_name(self) -> None: + """from_pool can be a plain string (pool name), e.g. from_pool: 'VLAN ID Pool'.""" + attr = Attribute( + name="vlan_id", schema=_make_schema("Number", optional=True), data={"from_pool": "VLAN ID Pool"} + ) + + result = attr._generate_input_data() + + assert result.payload_dict == {"from_pool": "VLAN ID Pool"} + assert result.variables == {} + assert "value" not in result.payload_dict + + def test_from_pool_value_is_none(self) -> None: + """from_pool pops 'from_pool' and sets Attribute.value to None; value should NOT appear in payload.""" + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data={"from_pool": {"id": "pool-uuid-1"}}) + + assert attr.value is None + result = attr._generate_input_data() + assert "value" not in result.payload_dict + + +# ────────────────────────────────────────────── +# Value resolution: from_pool (node-based) +# ────────────────────────────────────────────── + + +class TestFromPoolNode: + def test_pool_node_generates_from_pool(self) -> None: + pool_node = _FakeNode(node_id="node-pool-uuid", is_pool=True) + + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=pool_node) + + result = attr._generate_input_data() + + assert result.payload_dict == {"from_pool": {"id": "node-pool-uuid"}} + assert result.variables == {} + + def test_non_pool_node_treated_as_regular_value(self) -> None: + """A CoreNodeBase that is NOT a resource pool should go through the normal value path.""" + node = _FakeNode(node_id="regular-node-uuid", is_pool=False) + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=node) + + result = attr._generate_input_data() + + assert result.payload_dict == {"value": node} + + +# ────────────────────────────────────────────── +# Value resolution: null values +# ────────────────────────────────────────────── + + +class TestNullValue: + def test_null_value_not_mutated(self) -> None: + """None value that was never mutated → empty payload, no properties.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": None}) + + result = attr._generate_input_data() + + assert result.payload_dict == {} + assert result.variables == {} + assert result.need_additional_properties is False + + def test_null_value_mutated_optional(self) -> None: + """None value on an optional attr that was mutated → explicit null.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=True), data={"value": "initial"}) + attr.value = None # triggers value_has_been_mutated + + result = attr._generate_input_data() + + assert result.payload_dict == {"value": None} + assert result.need_additional_properties is False + + def test_null_value_mutated_non_optional(self) -> None: + """None value on a non-optional attr that was mutated → empty payload (same as not mutated).""" + attr = Attribute(name="test_attr", schema=_make_schema("Text", optional=False), data={"value": "initial"}) + attr.value = None + + result = attr._generate_input_data() + + assert result.payload_dict == {} + assert result.need_additional_properties is False + + +# ────────────────────────────────────────────── +# Value resolution: strings (safe vs unsafe) +# ────────────────────────────────────────────── + + +class TestStringValues: + @pytest.mark.parametrize( + "value", + [ + pytest.param("simple", id="alphanumeric"), + pytest.param("user.name", id="dots"), + pytest.param("/opt/repos/infrahub", id="filepath"), + pytest.param("https://github.com/opsmill", id="url"), + pytest.param("", id="empty-string"), + ], + ) + def test_safe_string(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + + result = attr._generate_input_data() + + assert result.payload_dict == {"value": value} + assert result.variables == {} + + @pytest.mark.parametrize( + "value", + [ + pytest.param('has "quotes"', id="quotes"), + pytest.param("has\nnewline", id="newline"), + pytest.param("special{chars}", id="braces"), + ], + ) + def test_unsafe_string_uses_variable_binding(self, value: str) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data=value) + + result = attr._generate_input_data() + + # payload_dict["value"] should be a variable reference like "$value_" + assert "value" in result.payload_dict + assert result.payload_dict["value"].startswith("$value_") + # The actual string should be in variables + assert len(result.variables) == 1 + var_name = next(iter(result.variables)) + assert result.variables[var_name] == value + + +# ────────────────────────────────────────────── +# Value resolution: IP types +# ────────────────────────────────────────────── + + +class TestIPValues: + def test_ipv4_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "10.0.0.1/24"}) + + result = attr._generate_input_data() + + assert result.payload_dict["value"] == "10.0.0.1/24" + assert result.variables == {} + + def test_ipv6_interface(self) -> None: + attr = Attribute(name="address", schema=_make_schema("IPHost"), data={"value": "2001:db8::1/64"}) + + result = attr._generate_input_data() + + assert result.payload_dict["value"] == "2001:db8::1/64" + + def test_ipv4_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "10.0.0.0/24"}) + + result = attr._generate_input_data() + + assert result.payload_dict["value"] == "10.0.0.0/24" + + def test_ipv6_network(self) -> None: + attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "2001:db8::/32"}) + + result = attr._generate_input_data() + + assert result.payload_dict["value"] == "2001:db8::/32" + + +# ────────────────────────────────────────────── +# Value resolution: other scalars +# ────────────────────────────────────────────── + + +class TestScalarValues: + def test_number_value(self) -> None: + attr = Attribute(name="vlan_id", schema=_make_schema("Number"), data=42) + + result = attr._generate_input_data() + + assert result.payload_dict == {"value": 42} + assert result.variables == {} + + def test_boolean_value(self) -> None: + attr = Attribute(name="enabled", schema=_make_schema("Boolean"), data=True) + + result = attr._generate_input_data() + + assert result.payload_dict == {"value": True} + + +# ────────────────────────────────────────────── +# Property handling +# ────────────────────────────────────────────── + + +class TestProperties: + def test_no_properties_set(self) -> None: + """When no properties are set, payload only has the value.""" + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + + result = attr._generate_input_data() + + assert result.payload_dict == {"value": "hello"} + + def test_flag_property_is_protected(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": "hello", "is_protected": True}) + + result = attr._generate_input_data() + + assert result.payload_dict["value"] == "hello" + assert result.payload_dict["is_protected"] is True + + def test_object_property_source(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={"value": "hello", "source": {"id": "source-uuid", "display_label": "Git", "__typename": "CoreGit"}}, + ) + + result = attr._generate_input_data() + + assert result.payload_dict["value"] == "hello" + assert result.payload_dict["source"] == "source-uuid" + + def test_object_property_owner(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": "hello", + "owner": {"id": "owner-uuid", "display_label": "Admin", "__typename": "CoreAccount"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload_dict["owner"] == "owner-uuid" + + def test_both_flag_and_object_properties(self) -> None: + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": "hello", + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + assert result.payload_dict["value"] == "hello" + assert result.payload_dict["is_protected"] is True + assert result.payload_dict["source"] == "src-uuid" + + def test_properties_not_appended_for_null_value(self) -> None: + """When need_additional_properties is False (null non-mutated), properties are ignored.""" + attr = Attribute( + name="test_attr", + schema=_make_schema("Text"), + data={ + "value": None, + "is_protected": True, + "source": {"id": "src-uuid", "display_label": "Git", "__typename": "CoreGit"}, + }, + ) + + result = attr._generate_input_data() + + # Null value, not mutated → empty payload, properties NOT appended + assert result.payload_dict == {} + + def test_properties_appended_for_from_pool(self) -> None: + """from_pool payloads have need_additional_properties=True, so properties are included.""" + attr = Attribute( + name="vlan_id", + schema=_make_schema("Number"), + data={"from_pool": {"id": "pool-uuid"}, "is_protected": True}, + ) + + result = attr._generate_input_data() + + assert result.payload_dict["from_pool"] == {"id": "pool-uuid"} + assert result.payload_dict["is_protected"] is True + + +# ────────────────────────────────────────────── +# Return type: to_dict() integration +# ────────────────────────────────────────────── + + +class TestToDictIntegration: + def test_to_dict_simple_value(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data="hello") + + result = attr._generate_input_data().to_dict() + + assert result == {"data": {"value": "hello"}, "variables": {}} + + def test_to_dict_with_variables(self) -> None: + attr = Attribute(name="test_attr", schema=_make_schema("Text"), data='has "quotes"') + + result = attr._generate_input_data().to_dict() + + assert "data" in result + assert "variables" in result + assert len(result["variables"]) == 1 + var_name = next(iter(result["variables"])) + assert result["variables"][var_name] == 'has "quotes"' + assert result["data"]["value"] == f"${var_name}" + + +def _make_schema(kind: str = "Text", optional: bool = False) -> AttributeSchemaAPI: + return AttributeSchemaAPI(name="test_attr", kind=kind, optional=optional) + + +class _FakeNode(CoreNodeBase): + """Minimal CoreNodeBase implementation for testing.""" + + def __init__(self, node_id: str, is_pool: bool) -> None: + self.id = node_id + self._is_pool = is_pool + self._schema: Any = None + self._internal_id = "" + self.display_label = None + self.typename = None + + @property + def hfid(self) -> list[str] | None: + return None + + @property + def hfid_str(self) -> str | None: + return None + + def get_human_friendly_id(self) -> list[str] | None: + return None + + def get_human_friendly_id_as_string(self, include_kind: bool = False) -> str | None: + return None + + def get_kind(self) -> str: + return "" + + def get_all_kinds(self) -> list[str]: + return [] + + def get_branch(self) -> str: + return "" + + def is_ip_prefix(self) -> bool: + return False + + def is_ip_address(self) -> bool: + return False + + def is_resource_pool(self) -> bool: + return self._is_pool + + def get_raw_graphql_data(self) -> dict | None: + return None From c871d420fae923dcf3455672b6822bc320893c55 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 23:11:11 +0100 Subject: [PATCH 10/14] IHS-156 fixed a bug in test_relationship_from_pool.py --- tests/unit/sdk/pool/test_relationship_from_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/sdk/pool/test_relationship_from_pool.py b/tests/unit/sdk/pool/test_relationship_from_pool.py index ec4e265d..9ce543dc 100644 --- a/tests/unit/sdk/pool/test_relationship_from_pool.py +++ b/tests/unit/sdk/pool/test_relationship_from_pool.py @@ -57,7 +57,7 @@ async def test_create_input_data_with_resource_pool_relationship( "resources": [ip_prefix], }, ) - device = InfrahubNode( + device = InfrahubNodeSync( client=client, schema=simple_device_schema, data={"name": "device-01", "primary_address": ip_pool, "ip_address_pool": ip_pool}, From 0ca55cbb9e378e08d2b25a7825624be64c85822a Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Wed, 25 Feb 2026 23:18:16 +0100 Subject: [PATCH 11/14] IHS-156 side effect bugs regarding from_pool attributes --- .../sdk_ref/infrahub_sdk/node/attribute.mdx | 13 +++++++++++++ infrahub_sdk/node/attribute.py | 10 ++++++++-- infrahub_sdk/node/node.py | 12 ++---------- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx index a7b82ecb..572a2d50 100644 --- a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx +++ b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx @@ -24,3 +24,16 @@ value(self) -> Any ```python value(self, value: Any) -> None ``` + +#### `is_from_pool_attribute` + +```python +is_from_pool_attribute(self) -> bool +``` + +Check whether this attribute's value is sourced from a resource pool. + +**Returns:** + +- True if the attribute value is a resource pool node or was explicitly +- allocated from a pool. diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index ba43fc58..b9153f1e 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -151,12 +151,18 @@ def _generate_query_data(self, property: bool = False, include_metadata: bool = return data def _generate_mutation_query(self) -> dict[str, Any]: - if self._is_from_pool_attribute(): + if self.is_from_pool_attribute(): # If it points to a pool, ask for the value of the pool allocated resource return {self.name: {"value": None}} return {} - def _is_from_pool_attribute(self) -> bool: + def is_from_pool_attribute(self) -> bool: + """Check whether this attribute's value is sourced from a resource pool. + + Returns: + True if the attribute value is a resource pool node or was explicitly + allocated from a pool. + """ return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index e723f55a..32665b60 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -1002,11 +1002,7 @@ async def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute @@ -1810,11 +1806,7 @@ def _process_mutation_result( for attr_name in self._attributes: attr = getattr(self, attr_name) - if ( - attr_name not in object_response - or not isinstance(attr.value, InfrahubNodeBase) - or not attr.value.is_resource_pool() - ): + if attr_name not in object_response or not attr.is_from_pool_attribute(): continue # Process allocated resource from a pool and update attribute From 8cde1bc0160cce15374fad5548048c63a68b13f6 Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Thu, 26 Feb 2026 08:39:14 +0100 Subject: [PATCH 12/14] IHS-156 last feedbacks regarding documentation and variable naming --- .../sdk_ref/infrahub_sdk/node/attribute.mdx | 3 +- infrahub_sdk/node/attribute.py | 66 +++++++++---------- .../sdk/test_attribute_generate_input_data.py | 6 +- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx index 572a2d50..d08c7fc5 100644 --- a/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx +++ b/docs/docs/python-sdk/sdk_ref/infrahub_sdk/node/attribute.mdx @@ -35,5 +35,4 @@ Check whether this attribute's value is sourced from a resource pool. **Returns:** -- True if the attribute value is a resource pool node or was explicitly -- allocated from a pool. +- True if the attribute value is a resource pool node or was explicitly allocated from a pool. diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index b9153f1e..e9d672d8 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -13,6 +13,33 @@ from ..schema import AttributeSchemaAPI +class _GraphQLPayloadAttribute(NamedTuple): + """Result of resolving an attribute value for a GraphQL mutation. + + Attributes: + payload_dict: Key/value entries to include in the mutation payload + (e.g. ``{"value": ...}`` or ``{"from_pool": ...}``). + variables: GraphQL variable bindings for unsafe string values. + needs_metadata: When ``True``, the payload needs to append property flags/objects + """ + + payload_dict: dict[str, Any] + variables: dict[str, Any] + needs_metadata: bool + + def to_dict(self) -> dict[str, Any]: + return {"data": self.payload_dict, "variables": self.variables} + + def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, dict]) -> None: + if not self.needs_metadata: + return + for prop_name, prop in properties_flag.items(): + self.payload_dict[prop_name] = prop + + for prop_name, prop in properties_object.items(): + self.payload_dict[prop_name] = prop + + class Attribute: """Represents an attribute of a Node, including its schema, value, and properties.""" @@ -86,17 +113,17 @@ def _initialize_graphql_payload(self) -> _GraphQLPayloadAttribute: # Pool-based allocation (dict data or resource-pool node) if self._from_pool is not None: return _GraphQLPayloadAttribute( - payload_dict={"from_pool": self._from_pool}, variables={}, need_additional_properties=True + payload_dict={"from_pool": self._from_pool}, variables={}, needs_metadata=True ) if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): return _GraphQLPayloadAttribute( - payload_dict={"from_pool": {"id": self.value.id}}, variables={}, need_additional_properties=True + payload_dict={"from_pool": {"id": self.value.id}}, variables={}, needs_metadata=True ) # Null value if self.value is None: data = {"value": None} if (self._schema.optional and self.value_has_been_mutated) else {} - return _GraphQLPayloadAttribute(payload_dict=data, variables={}, need_additional_properties=False) + return _GraphQLPayloadAttribute(payload_dict=data, variables={}, needs_metadata=False) # Unsafe strings need a variable binding to avoid injection if isinstance(self.value, str) and not SAFE_VALUE.match(self.value): @@ -104,12 +131,12 @@ def _initialize_graphql_payload(self) -> _GraphQLPayloadAttribute: return _GraphQLPayloadAttribute( payload_dict={"value": f"${var_name}"}, variables={var_name: self.value}, - need_additional_properties=True, + needs_metadata=True, ) # Safe strings, IP types, and everything else value = self.value.with_prefixlen if isinstance(self.value, get_args(IP_TYPES)) else self.value - return _GraphQLPayloadAttribute(payload_dict={"value": value}, variables={}, need_additional_properties=True) + return _GraphQLPayloadAttribute(payload_dict={"value": value}, variables={}, needs_metadata=True) def _generate_input_data(self) -> _GraphQLPayloadAttribute: """Build the input payload for a GraphQL mutation on this attribute. @@ -160,33 +187,6 @@ def is_from_pool_attribute(self) -> bool: """Check whether this attribute's value is sourced from a resource pool. Returns: - True if the attribute value is a resource pool node or was explicitly - allocated from a pool. + True if the attribute value is a resource pool node or was explicitly allocated from a pool. """ return (isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool()) or self._from_pool is not None - - -class _GraphQLPayloadAttribute(NamedTuple): - """Result of resolving an attribute value for a GraphQL mutation. - - Attributes: - payload_dict: Key/value entries to include in the mutation payload - (e.g. ``{"value": ...}`` or ``{"from_pool": ...}``). - variables: GraphQL variable bindings for unsafe string values. - need_additional_properties: When ``True``, the payload needs to append property flags/objects - """ - - payload_dict: dict[str, Any] - variables: dict[str, Any] - need_additional_properties: bool - - def to_dict(self) -> dict[str, Any]: - return {"data": self.payload_dict, "variables": self.variables} - - def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, dict]) -> None: - if self.need_additional_properties: - for prop_name, prop in properties_flag.items(): - self.payload_dict[prop_name] = prop - - for prop_name, prop in properties_object.items(): - self.payload_dict[prop_name] = prop diff --git a/tests/unit/sdk/test_attribute_generate_input_data.py b/tests/unit/sdk/test_attribute_generate_input_data.py index 8b1426bb..f7da6850 100644 --- a/tests/unit/sdk/test_attribute_generate_input_data.py +++ b/tests/unit/sdk/test_attribute_generate_input_data.py @@ -95,7 +95,7 @@ def test_null_value_not_mutated(self) -> None: assert result.payload_dict == {} assert result.variables == {} - assert result.need_additional_properties is False + assert result.needs_metadata is False def test_null_value_mutated_optional(self) -> None: """None value on an optional attr that was mutated → explicit null.""" @@ -105,7 +105,7 @@ def test_null_value_mutated_optional(self) -> None: result = attr._generate_input_data() assert result.payload_dict == {"value": None} - assert result.need_additional_properties is False + assert result.needs_metadata is False def test_null_value_mutated_non_optional(self) -> None: """None value on a non-optional attr that was mutated → empty payload (same as not mutated).""" @@ -115,7 +115,7 @@ def test_null_value_mutated_non_optional(self) -> None: result = attr._generate_input_data() assert result.payload_dict == {} - assert result.need_additional_properties is False + assert result.needs_metadata is False # ────────────────────────────────────────────── From 61c6ccc1fb82bf44118dd6cd10059df7a6587aef Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Thu, 26 Feb 2026 08:49:17 +0100 Subject: [PATCH 13/14] IHS-156 fix typing error --- infrahub_sdk/node/attribute.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index e9d672d8..06fa7cd5 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -30,7 +30,7 @@ class _GraphQLPayloadAttribute(NamedTuple): def to_dict(self) -> dict[str, Any]: return {"data": self.payload_dict, "variables": self.variables} - def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, dict]) -> None: + def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, str | None]) -> None: if not self.needs_metadata: return for prop_name, prop in properties_flag.items(): @@ -150,7 +150,7 @@ def _generate_input_data(self) -> _GraphQLPayloadAttribute: for property_name in self._properties_flag if getattr(self, property_name) is not None } - properties_object: dict[str, dict] = { + properties_object: dict[str, str | None] = { property_name: getattr(self, property_name)._generate_input_data() for property_name in self._properties_object if getattr(self, property_name) is not None From f34027be350d7c199860a9b3a1729598e8c519fe Mon Sep 17 00:00:00 2001 From: Pol Michel Date: Thu, 26 Feb 2026 11:27:09 +0100 Subject: [PATCH 14/14] IHS-156 renamed payload_dict -> payload --- infrahub_sdk/node/attribute.py | 22 +++---- infrahub_sdk/node/node.py | 4 +- .../sdk/test_attribute_generate_input_data.py | 64 +++++++++---------- 3 files changed, 44 insertions(+), 46 deletions(-) diff --git a/infrahub_sdk/node/attribute.py b/infrahub_sdk/node/attribute.py index 06fa7cd5..54dd99aa 100644 --- a/infrahub_sdk/node/attribute.py +++ b/infrahub_sdk/node/attribute.py @@ -17,27 +17,27 @@ class _GraphQLPayloadAttribute(NamedTuple): """Result of resolving an attribute value for a GraphQL mutation. Attributes: - payload_dict: Key/value entries to include in the mutation payload + payload: Key/value entries to include in the mutation payload (e.g. ``{"value": ...}`` or ``{"from_pool": ...}``). variables: GraphQL variable bindings for unsafe string values. needs_metadata: When ``True``, the payload needs to append property flags/objects """ - payload_dict: dict[str, Any] + payload: dict[str, Any] variables: dict[str, Any] needs_metadata: bool def to_dict(self) -> dict[str, Any]: - return {"data": self.payload_dict, "variables": self.variables} + return {"data": self.payload, "variables": self.variables} def add_properties(self, properties_flag: dict[str, Any], properties_object: dict[str, str | None]) -> None: if not self.needs_metadata: return for prop_name, prop in properties_flag.items(): - self.payload_dict[prop_name] = prop + self.payload[prop_name] = prop for prop_name, prop in properties_object.items(): - self.payload_dict[prop_name] = prop + self.payload[prop_name] = prop class Attribute: @@ -112,31 +112,29 @@ def _initialize_graphql_payload(self) -> _GraphQLPayloadAttribute: # Pool-based allocation (dict data or resource-pool node) if self._from_pool is not None: - return _GraphQLPayloadAttribute( - payload_dict={"from_pool": self._from_pool}, variables={}, needs_metadata=True - ) + return _GraphQLPayloadAttribute(payload={"from_pool": self._from_pool}, variables={}, needs_metadata=True) if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool(): return _GraphQLPayloadAttribute( - payload_dict={"from_pool": {"id": self.value.id}}, variables={}, needs_metadata=True + payload={"from_pool": {"id": self.value.id}}, variables={}, needs_metadata=True ) # Null value if self.value is None: data = {"value": None} if (self._schema.optional and self.value_has_been_mutated) else {} - return _GraphQLPayloadAttribute(payload_dict=data, variables={}, needs_metadata=False) + return _GraphQLPayloadAttribute(payload=data, variables={}, needs_metadata=False) # Unsafe strings need a variable binding to avoid injection if isinstance(self.value, str) and not SAFE_VALUE.match(self.value): var_name = f"value_{UUIDT.new().hex}" return _GraphQLPayloadAttribute( - payload_dict={"value": f"${var_name}"}, + payload={"value": f"${var_name}"}, variables={var_name: self.value}, needs_metadata=True, ) # Safe strings, IP types, and everything else value = self.value.with_prefixlen if isinstance(self.value, get_args(IP_TYPES)) else self.value - return _GraphQLPayloadAttribute(payload_dict={"value": value}, variables={}, needs_metadata=True) + return _GraphQLPayloadAttribute(payload={"value": value}, variables={}, needs_metadata=True) def _generate_input_data(self) -> _GraphQLPayloadAttribute: """Build the input payload for a GraphQL mutation on this attribute. diff --git a/infrahub_sdk/node/node.py b/infrahub_sdk/node/node.py index 32665b60..9d024cbb 100644 --- a/infrahub_sdk/node/node.py +++ b/infrahub_sdk/node/node.py @@ -236,8 +236,8 @@ def _generate_input_data( # noqa: C901 if attr._schema.read_only: continue graphql_payload = attr._generate_input_data() - if graphql_payload.payload_dict: - data[item_name] = graphql_payload.payload_dict + if graphql_payload.payload: + data[item_name] = graphql_payload.payload if graphql_payload.variables: variables.update(graphql_payload.variables) diff --git a/tests/unit/sdk/test_attribute_generate_input_data.py b/tests/unit/sdk/test_attribute_generate_input_data.py index f7da6850..a50a2fe9 100644 --- a/tests/unit/sdk/test_attribute_generate_input_data.py +++ b/tests/unit/sdk/test_attribute_generate_input_data.py @@ -22,7 +22,7 @@ def test_from_pool_with_id(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"from_pool": {"id": "pool-uuid-1"}} + assert result.payload == {"from_pool": {"id": "pool-uuid-1"}} assert result.variables == {} def test_from_pool_with_id_and_identifier(self) -> None: @@ -31,7 +31,7 @@ def test_from_pool_with_id_and_identifier(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"from_pool": {"id": "pool-uuid-1", "identifier": "test"}} + assert result.payload == {"from_pool": {"id": "pool-uuid-1", "identifier": "test"}} assert result.variables == {} def test_from_pool_with_pool_name(self) -> None: @@ -42,9 +42,9 @@ def test_from_pool_with_pool_name(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"from_pool": "VLAN ID Pool"} + assert result.payload == {"from_pool": "VLAN ID Pool"} assert result.variables == {} - assert "value" not in result.payload_dict + assert "value" not in result.payload def test_from_pool_value_is_none(self) -> None: """from_pool pops 'from_pool' and sets Attribute.value to None; value should NOT appear in payload.""" @@ -52,7 +52,7 @@ def test_from_pool_value_is_none(self) -> None: assert attr.value is None result = attr._generate_input_data() - assert "value" not in result.payload_dict + assert "value" not in result.payload # ────────────────────────────────────────────── @@ -68,7 +68,7 @@ def test_pool_node_generates_from_pool(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"from_pool": {"id": "node-pool-uuid"}} + assert result.payload == {"from_pool": {"id": "node-pool-uuid"}} assert result.variables == {} def test_non_pool_node_treated_as_regular_value(self) -> None: @@ -78,7 +78,7 @@ def test_non_pool_node_treated_as_regular_value(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"value": node} + assert result.payload == {"value": node} # ────────────────────────────────────────────── @@ -93,7 +93,7 @@ def test_null_value_not_mutated(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {} + assert result.payload == {} assert result.variables == {} assert result.needs_metadata is False @@ -104,7 +104,7 @@ def test_null_value_mutated_optional(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"value": None} + assert result.payload == {"value": None} assert result.needs_metadata is False def test_null_value_mutated_non_optional(self) -> None: @@ -114,7 +114,7 @@ def test_null_value_mutated_non_optional(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {} + assert result.payload == {} assert result.needs_metadata is False @@ -139,7 +139,7 @@ def test_safe_string(self, value: str) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"value": value} + assert result.payload == {"value": value} assert result.variables == {} @pytest.mark.parametrize( @@ -155,9 +155,9 @@ def test_unsafe_string_uses_variable_binding(self, value: str) -> None: result = attr._generate_input_data() - # payload_dict["value"] should be a variable reference like "$value_" - assert "value" in result.payload_dict - assert result.payload_dict["value"].startswith("$value_") + # payload["value"] should be a variable reference like "$value_" + assert "value" in result.payload + assert result.payload["value"].startswith("$value_") # The actual string should be in variables assert len(result.variables) == 1 var_name = next(iter(result.variables)) @@ -175,7 +175,7 @@ def test_ipv4_interface(self) -> None: result = attr._generate_input_data() - assert result.payload_dict["value"] == "10.0.0.1/24" + assert result.payload["value"] == "10.0.0.1/24" assert result.variables == {} def test_ipv6_interface(self) -> None: @@ -183,21 +183,21 @@ def test_ipv6_interface(self) -> None: result = attr._generate_input_data() - assert result.payload_dict["value"] == "2001:db8::1/64" + assert result.payload["value"] == "2001:db8::1/64" def test_ipv4_network(self) -> None: attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "10.0.0.0/24"}) result = attr._generate_input_data() - assert result.payload_dict["value"] == "10.0.0.0/24" + assert result.payload["value"] == "10.0.0.0/24" def test_ipv6_network(self) -> None: attr = Attribute(name="network", schema=_make_schema("IPNetwork"), data={"value": "2001:db8::/32"}) result = attr._generate_input_data() - assert result.payload_dict["value"] == "2001:db8::/32" + assert result.payload["value"] == "2001:db8::/32" # ────────────────────────────────────────────── @@ -211,7 +211,7 @@ def test_number_value(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"value": 42} + assert result.payload == {"value": 42} assert result.variables == {} def test_boolean_value(self) -> None: @@ -219,7 +219,7 @@ def test_boolean_value(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"value": True} + assert result.payload == {"value": True} # ────────────────────────────────────────────── @@ -234,15 +234,15 @@ def test_no_properties_set(self) -> None: result = attr._generate_input_data() - assert result.payload_dict == {"value": "hello"} + assert result.payload == {"value": "hello"} def test_flag_property_is_protected(self) -> None: attr = Attribute(name="test_attr", schema=_make_schema("Text"), data={"value": "hello", "is_protected": True}) result = attr._generate_input_data() - assert result.payload_dict["value"] == "hello" - assert result.payload_dict["is_protected"] is True + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True def test_object_property_source(self) -> None: attr = Attribute( @@ -253,8 +253,8 @@ def test_object_property_source(self) -> None: result = attr._generate_input_data() - assert result.payload_dict["value"] == "hello" - assert result.payload_dict["source"] == "source-uuid" + assert result.payload["value"] == "hello" + assert result.payload["source"] == "source-uuid" def test_object_property_owner(self) -> None: attr = Attribute( @@ -268,7 +268,7 @@ def test_object_property_owner(self) -> None: result = attr._generate_input_data() - assert result.payload_dict["owner"] == "owner-uuid" + assert result.payload["owner"] == "owner-uuid" def test_both_flag_and_object_properties(self) -> None: attr = Attribute( @@ -283,9 +283,9 @@ def test_both_flag_and_object_properties(self) -> None: result = attr._generate_input_data() - assert result.payload_dict["value"] == "hello" - assert result.payload_dict["is_protected"] is True - assert result.payload_dict["source"] == "src-uuid" + assert result.payload["value"] == "hello" + assert result.payload["is_protected"] is True + assert result.payload["source"] == "src-uuid" def test_properties_not_appended_for_null_value(self) -> None: """When need_additional_properties is False (null non-mutated), properties are ignored.""" @@ -302,7 +302,7 @@ def test_properties_not_appended_for_null_value(self) -> None: result = attr._generate_input_data() # Null value, not mutated → empty payload, properties NOT appended - assert result.payload_dict == {} + assert result.payload == {} def test_properties_appended_for_from_pool(self) -> None: """from_pool payloads have need_additional_properties=True, so properties are included.""" @@ -314,8 +314,8 @@ def test_properties_appended_for_from_pool(self) -> None: result = attr._generate_input_data() - assert result.payload_dict["from_pool"] == {"id": "pool-uuid"} - assert result.payload_dict["is_protected"] is True + assert result.payload["from_pool"] == {"id": "pool-uuid"} + assert result.payload["is_protected"] is True # ──────────────────────────────────────────────