Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 157 additions & 74 deletions mpt_api_client/models/model.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,184 @@
from typing import Any, ClassVar, Self, override

from box import Box
from box.box import _camel_killer # type: ignore[attr-defined] # noqa: PLC2701
import re
from collections import UserList
from collections.abc import Iterable
from typing import Any, ClassVar, Self, get_args, get_origin, override

from mpt_api_client.http.types import Response
from mpt_api_client.models.meta import Meta

ResourceData = dict[str, Any]

_box_safe_attributes: list[str] = ["_box_config", "_attribute_mapping"]

_SNAKE_CASE_BOUNDARY = re.compile(r"([a-z0-9])([A-Z])")
_SNAKE_CASE_ACRONYM = re.compile(r"(?<=[A-Z])(?=[A-Z][a-z0-9])")

class MptBox(Box):
"""python-box that preserves camelCase keys when converted to json."""

def __init__(self, *args, attribute_mapping: dict[str, str] | None = None, **_): # type: ignore[no-untyped-def]
attribute_mapping = attribute_mapping or {}
self._attribute_mapping = attribute_mapping
super().__init__(
*args,
camel_killer_box=False,
default_box=False,
default_box_create_on_get=False,
)
def to_snake_case(key: str) -> str:
"""Converts a camelCase string to snake_case."""
if "_" in key and key.islower():
return key
# Common pattern for PascalCase/camelCase conversion
snake = _SNAKE_CASE_BOUNDARY.sub(r"\1_\2", key)
snake = _SNAKE_CASE_ACRONYM.sub(r"_", snake)
return snake.lower().replace("__", "_")


def to_camel_case(key: str) -> str:
"""Converts a snake_case string to camelCase."""
parts = key.split("_")
return parts[0] + "".join(x.title() for x in parts[1:]) # noqa: WPS111 WPS221


class ModelList(UserList[Any]):
"""A list that automatically converts dictionaries to BaseModel objects."""

def __init__(
self,
iterable: Iterable[Any] | None = None,
model_class: type["BaseModel"] | None = None, # noqa: WPS221
) -> None:
self._model_class = model_class or BaseModel
iterable = iterable or []
super().__init__([self._process_item(item) for item in iterable])

@override
def __setitem__(self, key, value): # type: ignore[no-untyped-def]
mapped_key = self._prep_key(key)
super().__setitem__(mapped_key, value) # type: ignore[no-untyped-call]
def append(self, item: Any) -> None:
self.data.append(self._process_item(item))

@override
def __setattr__(self, item: str, value: Any) -> None:
if item in _box_safe_attributes:
return object.__setattr__(self, item, value)
def extend(self, iterable: Iterable[Any]) -> None:
self.data.extend(self._process_item(item) for item in iterable)

super().__setattr__(item, value) # type: ignore[no-untyped-call]
return None
@override
def insert(self, index: Any, item: Any) -> None:
self.data.insert(index, self._process_item(item))

@override
def __getattr__(self, item: str) -> Any:
if item in _box_safe_attributes:
return object.__getattribute__(self, item)
return super().__getattr__(item) # type: ignore[no-untyped-call]
def __setitem__(self, index: Any, item: Any) -> None:
self.data[index] = self._process_item(item)

def _process_item(self, item: Any) -> Any:
if isinstance(item, dict) and not isinstance(item, BaseModel):
return self._model_class(**item)
if isinstance(item, (list, UserList)) and not isinstance(item, ModelList):
return ModelList(item, model_class=self._model_class)
return item


class BaseModel:
"""Base dataclass for models providing object-only access and case conversion."""

def __init__(self, **kwargs: Any) -> None: # noqa: WPS210
"""Processes resource data to convert keys and handle nested structures."""
# Get type hints for field mapping
hints = getattr(self, "__annotations__", {})

for key, value in kwargs.items():
mapped_key = to_snake_case(key)

# Check if there's a type hint for this key
target_class = hints.get(mapped_key)
processed_value = self._process_value(value, target_class=target_class)
object.__setattr__(self, mapped_key, processed_value)

def __getattr__(self, name: str) -> Any:
# 1. Try to find the attribute in __dict__ (includes attributes set in __init__)
if name in self.__dict__:
return self.__dict__[name] # noqa: WPS420 WPS529

# 2. Check for methods or properties
try:
return object.__getattribute__(self, name)
except AttributeError:
pass # noqa: WPS420

raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{name}'", # noqa: WPS237
)

@override
def to_dict(self) -> dict[str, Any]: # noqa: WPS210
reverse_mapping = {
mapped_key: original_key for original_key, mapped_key in self._attribute_mapping.items()
}
def __setattr__(self, name: str, value: Any) -> None:
if name.startswith("_"):
object.__setattr__(self, name, value)
return

snake_name = to_snake_case(name)

# Get target class for value processing if it's a known attribute
hints = getattr(self, "__annotations__", {})
target_class = hints.get(snake_name) or hints.get(name)

processed_value = self._process_value(value, target_class=target_class)
object.__setattr__(self, snake_name, processed_value)

def to_dict(self) -> dict[str, Any]:
"""Returns the resource as a dictionary with original API keys."""
out_dict = {}
for parsed_key, item_value in super().to_dict().items():
original_key = reverse_mapping[parsed_key]
out_dict[original_key] = item_value
return out_dict

def _prep_key(self, key: str) -> str:
try:
return self._attribute_mapping[key]
except KeyError:
self._attribute_mapping[key] = _camel_killer(key)
return self._attribute_mapping[key]
# Iterate over all attributes in __dict__ that aren't internal
for key, value in self.__dict__.items():
if key.startswith("_"):
continue
if key == "meta":
continue

original_key = to_camel_case(key)
out_dict[original_key] = self._serialize_value(value)

class Model: # noqa: WPS214
return out_dict

def _serialize_value(self, value: Any) -> Any:
"""Recursively serializes values back to dicts."""
if isinstance(value, BaseModel):
return value.to_dict()
if isinstance(value, (list, UserList)):
return [self._serialize_value(item) for item in value]
return value

def _process_value(self, value: Any, target_class: Any = None) -> Any: # noqa: WPS231 C901

Check failure on line 138 in mpt_api_client/models/model.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=softwareone-platform_mpt-api-python-client&issues=AZzDxkBk4txuQOfr3kFv&open=AZzDxkBk4txuQOfr3kFv&pullRequest=222
"""Recursively processes values to ensure nested dicts are BaseModels."""
if isinstance(value, dict) and not isinstance(value, BaseModel):
# If a target class is provided and it's a subclass of BaseModel, use it
if (
target_class
and isinstance(target_class, type)
and issubclass(target_class, BaseModel)
):
return target_class(**value)
return BaseModel(**value)

if isinstance(value, (list, UserList)) and not isinstance(value, ModelList):
# Try to determine the model class for the list elements from type hints
model_class = BaseModel
if target_class:
# Handle list[ModelClass]

origin = get_origin(target_class)
if origin is list:
args = get_args(target_class)
if args and isinstance(args[0], type) and issubclass(args[0], BaseModel): # noqa: WPS221
model_class = args[0] # noqa: WPS220

return ModelList(value, model_class=model_class)
# Recursively handle BaseModel if it's already one
if isinstance(value, BaseModel):
return value
return value


class Model(BaseModel):
"""Provides a resource to interact with api data using fluent interfaces."""

_data_key: ClassVar[str | None] = None
_safe_attributes: ClassVar[list[str]] = ["meta", "_box"]
_attribute_mapping: ClassVar[dict[str, str]] = {}

def __init__(self, resource_data: ResourceData | None = None, meta: Meta | None = None) -> None:
self.meta = meta
self._box = MptBox(
resource_data or {},
attribute_mapping=self._attribute_mapping,
)
id: str

def __init__(
self, resource_data: ResourceData | None = None, meta: Meta | None = None, **kwargs: Any
) -> None:
object.__setattr__(self, "meta", meta)
data = dict(resource_data or {})
data.update(kwargs)
super().__init__(**data)

@override
def __repr__(self) -> str:
Expand All @@ -84,19 +188,7 @@
@classmethod
def new(cls, resource_data: ResourceData | None = None, meta: Meta | None = None) -> Self:
"""Creates a new resource from ResourceData and Meta."""
return cls(resource_data, meta)

def __getattr__(self, attribute: str) -> Box | Any:
"""Returns the resource data."""
return self._box.__getattr__(attribute)

@override
def __setattr__(self, attribute: str, attribute_value: Any) -> None:
if attribute in self._safe_attributes:
object.__setattr__(self, attribute, attribute_value)
return

self._box.__setattr__(attribute, attribute_value)
return cls(resource_data, meta=meta)

@classmethod
def from_response(cls, response: Response) -> Self:
Expand All @@ -114,12 +206,3 @@
raise TypeError("Response data must be a dict.")
meta = Meta.from_response(response)
return cls.new(response_data, meta)

@property
def id(self) -> str:
"""Returns the resource ID."""
return str(self._box.get("id", "")) # type: ignore[no-untyped-call]

def to_dict(self) -> dict[str, Any]:
"""Returns the resource as a dictionary."""
return self._box.to_dict()
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ classifiers = [
]
dependencies = [
"httpx==0.28.*",
"python-box==7.4.*",
]

[dependency-groups]
Expand Down
Loading