From 1a755c371c8047dd953b2074d7a93676ca18ae8b Mon Sep 17 00:00:00 2001 From: spjuhel Date: Thu, 2 Apr 2026 11:44:55 +0200 Subject: [PATCH 1/3] makes soon to be old `measures` module a legacy --- climada/engine/test/test_cost_benefit.py | 4 +- climada/entity/__init__.py | 2 +- .../__init__.py | 0 .../{measures => _legacy_measures}/base.py | 0 climada/entity/_legacy_measures/helper.py | 246 ++++++++++++++ .../entity/_legacy_measures/measure_config.py | 318 ++++++++++++++++++ .../measure_set.py | 0 .../test/__init__.py | 0 .../test/data/.gitignore | 0 .../test/test_base.py | 0 .../test/test_meas_set.py | 0 climada/entity/_legacy_measures/types.py | 10 + climada/entity/entity_def.py | 2 +- climada/entity/test/test_entity.py | 2 +- climada/trajectories/snapshot.py | 2 +- climada/trajectories/test/test_snapshot.py | 2 +- 16 files changed, 581 insertions(+), 7 deletions(-) rename climada/entity/{measures => _legacy_measures}/__init__.py (100%) rename climada/entity/{measures => _legacy_measures}/base.py (100%) create mode 100644 climada/entity/_legacy_measures/helper.py create mode 100644 climada/entity/_legacy_measures/measure_config.py rename climada/entity/{measures => _legacy_measures}/measure_set.py (100%) rename climada/entity/{measures => _legacy_measures}/test/__init__.py (100%) rename climada/entity/{measures => _legacy_measures}/test/data/.gitignore (100%) rename climada/entity/{measures => _legacy_measures}/test/test_base.py (100%) rename climada/entity/{measures => _legacy_measures}/test/test_meas_set.py (100%) create mode 100644 climada/entity/_legacy_measures/types.py diff --git a/climada/engine/test/test_cost_benefit.py b/climada/engine/test/test_cost_benefit.py index e48afec110..8b0276c665 100644 --- a/climada/engine/test/test_cost_benefit.py +++ b/climada/engine/test/test_cost_benefit.py @@ -33,10 +33,10 @@ risk_rp_100, risk_rp_250, ) +from climada.entity._legacy_measures import Measure +from climada.entity._legacy_measures.base import LOGGER as ILOG from climada.entity.disc_rates import DiscRates from climada.entity.entity_def import Entity -from climada.entity.measures import Measure -from climada.entity.measures.base import LOGGER as ILOG from climada.hazard.base import Hazard from climada.test import get_test_file from climada.util.api_client import Client diff --git a/climada/entity/__init__.py b/climada/entity/__init__.py index 7b830c2b70..ceb24ee065 100755 --- a/climada/entity/__init__.py +++ b/climada/entity/__init__.py @@ -19,8 +19,8 @@ init entity """ +from ._legacy_measures import * from .disc_rates import * from .entity_def import * from .exposures import * from .impact_funcs import * -from .measures import * diff --git a/climada/entity/measures/__init__.py b/climada/entity/_legacy_measures/__init__.py similarity index 100% rename from climada/entity/measures/__init__.py rename to climada/entity/_legacy_measures/__init__.py diff --git a/climada/entity/measures/base.py b/climada/entity/_legacy_measures/base.py similarity index 100% rename from climada/entity/measures/base.py rename to climada/entity/_legacy_measures/base.py diff --git a/climada/entity/_legacy_measures/helper.py b/climada/entity/_legacy_measures/helper.py new file mode 100644 index 0000000000..99ba0e8820 --- /dev/null +++ b/climada/entity/_legacy_measures/helper.py @@ -0,0 +1,246 @@ +""" +This file is part of CLIMADA. + +Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS. + +CLIMADA is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free +Software Foundation, version 3. + +CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along +with CLIMADA. If not, see . + +--- + +Define Measure class. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field, fields +from functools import reduce +from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, TypeVar, Union, cast + +import numpy as np +import pandas as pd + +from climada.entity.exposures.base import Exposures +from climada.entity.impact_funcs.base import ImpactFunc +from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet +from climada.entity.measures.measure_config import ( + ExposuresModifierConfig, + HazardModifierConfig, + ImpfsetModifierConfig, + MeasureConfig, +) +from climada.hazard.base import Hazard + +if TYPE_CHECKING: + from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet + from climada.entity.measures.types import ( + ExposuresChange, + HazardChange, + ImpfsetChange, + ) + from climada.hazard.base import Hazard + +LOGGER = logging.getLogger(__name__) + +T = TypeVar("T", Exposures, ImpactFuncSet, Hazard) + + +def identity_function(x: T, **_kwargs: Any) -> T: + return x + + +def composite_fun(*funcs: Callable[..., T]) -> Callable[..., T]: + """ + Composes multiple functions from right to left. + f(g(h(x))) + """ + + def compose(f: Callable[..., T], g: Callable[..., T]) -> Callable[..., T]: + def composed(x: T, **kwargs: Any) -> T: + return f(g(x, **kwargs), **kwargs) + + return composed + + return reduce(compose, funcs, identity_function) + + +def replace_hazard(new_hazard: Hazard) -> HazardChange: + """Returns a function that replaces the hazard with given new one.""" + + def hazard_change(_: Hazard) -> Hazard: + return new_hazard + + return hazard_change + + +def impact_intensity_rp_cutoff_helper( + cut_off_rp: float, +) -> HazardChange: + """Helper to generate a function removing events from a hazard for which + impacts do not exceed the impacts of a given return period. + + This helper returns a function to be applied on a hazard. + The function returned has to run an impact computation to find out which + event to remove from the hazard. + As such it has the following signature: + + ```f(hazard: Hazard, # The hazard to apply on + exposures: Exposures, # The exposure for the impact computation + impfset: ImpactFuncSet, # The impfset for the impact computation + base_hazard: Hazard, # The hazard for the impact computation + exposures_region_id: Optional[list[int]] = None, # Region id to filter to + ) -> Hazard + ``` + + Identifies events exceeding a return period and returns the hazard intensity + matrix with those event intensities zeroed out. + """ + from climada.engine.impact_calc import ImpactCalc + + def hazard_change( + hazard: Hazard, + base_exposures: Exposures, + base_impfset: ImpactFuncSet, + base_hazard: Hazard, + exposures_region_id: Optional[list[int]] = None, + ) -> Hazard: + exp_imp = base_exposures + if exposures_region_id: + # Narrowing the type for the LSP via boolean indexing + in_reg = base_exposures.gdf["region_id"].isin(exposures_region_id) + exp_imp = Exposures(base_exposures.gdf[in_reg], crs=base_exposures.crs) + + imp = ImpactCalc(exp_imp, base_impfset, base_hazard).impact(save_mat=False) + + # Calculate exceedance frequencies + sort_idxs = np.argsort(imp.at_event)[::-1] + exceed_freq = np.cumsum(imp.frequency[sort_idxs]) + events_below_cutoff = sort_idxs[exceed_freq <= cut_off_rp] + + # Modify sparse data structure + intensity_modified = base_hazard.intensity.copy() + for event in events_below_cutoff: + start, end = ( + intensity_modified.indptr[event], + intensity_modified.indptr[event + 1], + ) + intensity_modified.data[start:end] = 0 + + hazard.intensity = intensity_modified + return hazard + + return hazard_change + + +def helper_hazard(hazard_modifier: HazardModifierConfig) -> HazardChange: + """Returns a function that scales and shifts hazard intensity.""" + + def hazard_change(hazard: Hazard, **_kwargs) -> Hazard: + changed_hazard = ( + Hazard.from_hdf5(hazard_modifier.new_hazard_path) + if hazard_modifier.new_hazard_path is not None + else hazard + ) + data = cast(np.ndarray, changed_hazard.intensity.data) + data *= hazard_modifier.haz_int_mult + data += hazard_modifier.haz_int_add + data[data < 0] = 0 + changed_hazard.intensity.eliminate_zeros() + return changed_hazard + + if hazard_modifier.impact_rp_cutoff is not None: + hazard_change = composite_fun( + impact_intensity_rp_cutoff_helper(hazard_modifier.impact_rp_cutoff), + hazard_change, + ) + + return hazard_change + + +def helper_impfset(impfset_modifier: ImpfsetModifierConfig) -> ImpfsetChange: + """Returns a function that modifies impact functions (mdd, paa, intensity) by ID.""" + + def impfset_change(impfset: ImpactFuncSet, **_kwargs) -> ImpactFuncSet: + changed_impfset = ( + impfset.from_excel(impfset_modifier.new_impfset_path) + if impfset_modifier.new_impfset_path is not None + else impfset + ) + if impfset_modifier.impf_ids is None or impfset_modifier.impf_ids == "all": + ids_to_change = impfset.get_ids(haz_type=impfset_modifier.haz_type) + elif isinstance(impfset_modifier.impf_ids, list): + ids_to_change = impfset_modifier.impf_ids + elif isinstance(impfset_modifier.impf_ids, (str, int)): + ids_to_change = [impfset_modifier.impf_ids] + else: + raise ValueError( + f"Impact function ids to changes are invalid: {impfset_modifier.impf_ids}" + ) + + funcs = changed_impfset.get_func(haz_type=impfset_modifier.haz_type) + funcs = [funcs] if isinstance(funcs, ImpactFunc) else funcs + + for impf in funcs: + # Apply Intensity Mod + if impf.id in ids_to_change: + mult, add = ( + impfset_modifier.impf_int_mult, + impfset_modifier.impf_int_add, + ) + impf.intensity = impf.intensity * mult + add + + mult, add = ( + impfset_modifier.impf_mdd_mult, + impfset_modifier.impf_mdd_add, + ) + impf.mdd = impf.mdd * mult + add + + mult, add = ( + impfset_modifier.impf_paa_mult, + impfset_modifier.impf_paa_add, + ) + impf.paa = impf.paa * mult + add + + return changed_impfset + + return impfset_change + + +def change_impfset(new_impfsets: ImpactFuncSet) -> ImpfsetChange: + """Returns a function that swaps the impact function set with the given one.""" + + def impfset_change(_: ImpactFuncSet) -> ImpactFuncSet: + return new_impfsets + + return impfset_change + + +def helper_exposure(exposures_modifier: ExposuresModifierConfig) -> ExposuresChange: + """Returns a function that reassigns impact function IDs and zeros out specific values.""" + + def exposures_change(exposures: Exposures, **_kwargs) -> Exposures: + changed_exposures = ( + exposures + if exposures_modifier.new_exposures_path is None + else Exposures.from_hdf5(exposures_modifier.new_exposures_path) + ) + gdf = cast(pd.DataFrame, changed_exposures.gdf) + if exposures_modifier.reassign_impf_id is not None: + for haz_type, mapping in exposures_modifier.reassign_impf_id.items(): + gdf[f"impf_{haz_type}"] = gdf[f"impf_{haz_type}"].replace(mapping) + + if exposures_modifier.set_to_zero is not None: + gdf.loc[exposures_modifier.set_to_zero, "value"] = 0 + + return changed_exposures + + return exposures_change diff --git a/climada/entity/_legacy_measures/measure_config.py b/climada/entity/_legacy_measures/measure_config.py new file mode 100644 index 0000000000..f742851690 --- /dev/null +++ b/climada/entity/_legacy_measures/measure_config.py @@ -0,0 +1,318 @@ +""" +This file is part of CLIMADA. + +Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS. + +CLIMADA is free software: you can redistribute it and/or modify it under the +terms of the GNU General Public License as published by the Free +Software Foundation, version 3. + +CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along +with CLIMADA. If not, see . + +--- + +Define configuration dataclasses for Measure reading and writing. +""" + +from __future__ import annotations + +import dataclasses +import logging +from abc import ABC +from dataclasses import asdict, dataclass, field, fields +from datetime import datetime +from typing import TYPE_CHECKING, Dict, Optional, Tuple, Union + +import pandas as pd + +from climada.util.string_parsers import parse_color, parse_mapping_string, parse_range + +if TYPE_CHECKING: + from climada.entity.measures.base import Measure + from climada.entity.measures.cost_income import CostIncome + +LOGGER = logging.getLogger(__name__) + + +@dataclass +class _ModifierConfig(ABC): + def to_dict(self): + # 1. Get the current values as a dict + current_data = asdict(self) + + # 2. Identify fields where the current value differs from the default + non_default_data = {} + for f in fields(self): + current_value = getattr(self, f.name) + + # Logic to get the default value (handling both default and default_factory) + default_value = f.default + if ( + f.default_factory is not field().default_factory + ): # Check if factory exists + default_value = f.default_factory() + + if current_value != default_value: + non_default_data[f.name] = current_data[f.name] + + non_default_data.pop("haz_type", None) + return non_default_data + + @classmethod + def from_dict(cls, d: dict): + filtered = cls._filter_dict_to_fields(d) + return cls(**filtered) + + @classmethod + def _filter_dict_to_fields(cls, d: dict): + """Filter out values that do not match the dataclass fields.""" + filtered = dict( + filter(lambda k: k[0] in [f.name for f in fields(cls)], d.items()) + ) + return filtered + + def _filter_out_default_fields(self): + non_defaults = {} + defaults = {} + for f in fields(self): + val = getattr(self, f.name) + default = f.default + if f.default_factory is not field().default_factory: + default = f.default_factory() + + if val != default: + non_defaults[f.name] = val + else: + defaults[f.name] = val + return non_defaults, defaults + + def __repr__(self) -> str: + non_defaults, defaults = self._filter_out_default_fields() + ndf_fields_str = ( + "\n\t\t\t".join(f"{k}={v!r}" for k, v in non_defaults.items()) + if non_defaults + else None + ) + fields_str = ( + "\n\t\t\t".join(f"{k}={v!r}" for k, v in defaults.items()) + if defaults + else None + ) + fields = ( + "(" "\n\t\tNon default fields:" f"\n\t\t\t{ndf_fields_str}" + if ndf_fields_str + else "()" + ) + return f"{self.__class__.__name__}{fields}" + + +@dataclass(repr=False) +class ImpfsetModifierConfig(_ModifierConfig): + """Configuration for impact function modifiers.""" + + haz_type: str + impf_ids: Optional[Union[int, str, list[Union[int, str]]]] = None + impf_mdd_mult: float = 1.0 + impf_mdd_add: float = 0.0 + impf_paa_mult: float = 1.0 + impf_paa_add: float = 0.0 + impf_int_mult: float = 1.0 + impf_int_add: float = 0.0 + new_impfset_path: Optional[str] = None + """Excel filepath for new impfset.""" + + def __post_init__(self): + if self.new_impfset_path is not None and any( + [ + self.impf_mdd_add, + self.impf_mdd_mult, + self.impf_paa_add, + self.impf_paa_mult, + self.impf_int_add, + self.impf_int_mult, + ] + ): + LOGGER.warning( + "Both new impfset object and impfset modifiers are provided, " + "modifiers will be applied after changing the impfset." + ) + + +@dataclass(repr=False) +class HazardModifierConfig(_ModifierConfig): + """Configuration for impact function modifiers.""" + + haz_type: str + haz_int_mult: Optional[float] = 1.0 + haz_int_add: Optional[float] = 0.0 + new_hazard_path: Optional[str] = None + """HDF5 filepath for new hazard.""" + impact_rp_cutoff: Optional[float] = None + + def __post_init__(self): + if self.new_hazard_path is not None and any( + [self.haz_int_mult, self.haz_int_add, self.impact_rp_cutoff] + ): + LOGGER.warning( + "Both new hazard object and hazard modifiers are provided, " + "modifiers will be applied after changing the hazard." + ) + + +@dataclass(repr=False) +class ExposuresModifierConfig(_ModifierConfig): + """Configuration for impact function modifiers.""" + + reassign_impf_id: Optional[Dict[str, Dict[int | str, int | str]]] = None + set_to_zero: Optional[list[int]] = None + new_exposures_path: Optional[str] = None + """HDF5 filepath for new exposure""" + + def __post_init__(self): + if self.new_exposures_path is not None and any( + [self.reassign_impf_id, self.set_to_zero] + ): + LOGGER.warning( + "Both new exposures object and exposures modifiers are provided, " + "modifiers will be applied after changing the exposures." + ) + + +@dataclass(repr=False) +class CostIncomeConfig(_ModifierConfig): + """Serializable configuration for CostIncome.""" + + mkt_price_year: Optional[int] = field(default_factory=lambda: datetime.today().year) + init_cost: float = 0.0 + periodic_cost: float = 0.0 + periodic_income: float = 0.0 + cost_yearly_growth_rate: float = 0.0 + income_yearly_growth_rate: float = 0.0 + freq: str = "Y" + custom_cash_flows: Optional[list[dict]] = None + + def to_cost_income(self) -> CostIncome: + df = None + if self.custom_cash_flows is not None: + df = pd.DataFrame(self.custom_cash_flows) + df["date"] = pd.to_datetime(df["date"]) + return CostIncome( + mkt_price_year=self.mkt_price_year, + init_cost=self.init_cost, + periodic_cost=self.periodic_cost, + periodic_income=self.periodic_income, + cost_yearly_growth_rate=self.cost_yearly_growth_rate, + income_yearly_growth_rate=self.income_yearly_growth_rate, + custom_cash_flows=df, + freq=self.freq, + ) + + @classmethod + def from_cost_income(cls, ci: CostIncome) -> "CostIncomeConfig": + """Round-trip from a live CostIncome object.""" + custom = None + if ci.custom_cash_flows is not None: + custom = ( + ci.custom_cash_flows.reset_index() + .rename(columns={"index": "date"}) + .assign(date=lambda df: df["date"].dt.strftime("%Y-%m-%d")) + .to_dict(orient="records") + ) + return cls( + mkt_price_year=ci.mkt_price_year.year, # datetime → int + init_cost=abs(ci.init_cost), # stored negative → positive + periodic_cost=abs(ci.periodic_cost), + periodic_income=ci.periodic_income, + cost_yearly_growth_rate=ci.cost_growth_rate, + income_yearly_growth_rate=ci.income_growth_rate, + freq=ci.freq, + custom_cash_flows=custom, + ) + + +@dataclass(repr=False) +class MeasureConfig(_ModifierConfig): + name: str + haz_type: str + impfset_modifier: ImpfsetModifierConfig + hazard_modifier: HazardModifierConfig + exposures_modifier: ExposuresModifierConfig + cost_income: CostIncomeConfig + implementation_duration: Optional[str] = None + color_rgb: Optional[Tuple[float, float, float]] = None + + def __repr__(self) -> str: + fields_str = "\n\t".join(f"{k}={v!r}" for k, v in self.__dict__.items()) + return f"{self.__class__.__name__}(\n\t{fields_str})" + + def to_dict(self) -> dict: + return { + "name": self.name, + "haz_type": self.haz_type, + **self.impfset_modifier.to_dict(), + **self.hazard_modifier.to_dict(), + **self.exposures_modifier.to_dict(), + **self.cost_income.to_dict(), + "implementation_duration": self.implementation_duration, + "color_rgb": list(self.color_rgb) if self.color_rgb is not None else None, + } + + @classmethod + def from_dict(cls, d: dict) -> "MeasureConfig": + color = d.get("color_rgb") + return cls( + name=d["name"], + haz_type=d["haz_type"], + impfset_modifier=ImpfsetModifierConfig.from_dict(d), + hazard_modifier=HazardModifierConfig.from_dict(d), + exposures_modifier=ExposuresModifierConfig.from_dict(d), + cost_income=CostIncomeConfig.from_dict(d), + implementation_duration=d.get("implementation_duration"), + color_rgb=( + tuple(color) if color is not None and not pd.isna(color) else None + ), + ) + + def to_yaml(self, path: str) -> None: + import yaml + + with open(path, "w") as f: + yaml.dump( + {"measures": [self.to_dict()]}, + f, + default_flow_style=False, + sort_keys=False, + ) + + @classmethod + def from_yaml(cls, path: str) -> "MeasureConfig": + import yaml + + with open(path) as f: + return cls.from_dict(yaml.safe_load(f)["measures"][0]) + + @classmethod + def from_row( + cls, row: pd.Series, haz_type: Optional[str] = None + ) -> "MeasureConfig": + """Build a MeasureConfig from a legacy Excel row.""" + row_dict = row.to_dict() + return cls.from_dict(row_dict) + + +def _serialize_modifier_dict(d: dict) -> dict: + """Stringify keys, convert tuples to lists for JSON.""" + return {str(k): list(v) for k, v in d.items()} + + +def _deserialize_modifier_dict(d: dict) -> dict: + """Restore int keys where possible, values back to tuples.""" + return { + (int(k) if isinstance(k, str) and k.isdigit() else k): tuple(v) + for k, v in d.items() + } diff --git a/climada/entity/measures/measure_set.py b/climada/entity/_legacy_measures/measure_set.py similarity index 100% rename from climada/entity/measures/measure_set.py rename to climada/entity/_legacy_measures/measure_set.py diff --git a/climada/entity/measures/test/__init__.py b/climada/entity/_legacy_measures/test/__init__.py similarity index 100% rename from climada/entity/measures/test/__init__.py rename to climada/entity/_legacy_measures/test/__init__.py diff --git a/climada/entity/measures/test/data/.gitignore b/climada/entity/_legacy_measures/test/data/.gitignore similarity index 100% rename from climada/entity/measures/test/data/.gitignore rename to climada/entity/_legacy_measures/test/data/.gitignore diff --git a/climada/entity/measures/test/test_base.py b/climada/entity/_legacy_measures/test/test_base.py similarity index 100% rename from climada/entity/measures/test/test_base.py rename to climada/entity/_legacy_measures/test/test_base.py diff --git a/climada/entity/measures/test/test_meas_set.py b/climada/entity/_legacy_measures/test/test_meas_set.py similarity index 100% rename from climada/entity/measures/test/test_meas_set.py rename to climada/entity/_legacy_measures/test/test_meas_set.py diff --git a/climada/entity/_legacy_measures/types.py b/climada/entity/_legacy_measures/types.py new file mode 100644 index 0000000000..1ad90986b1 --- /dev/null +++ b/climada/entity/_legacy_measures/types.py @@ -0,0 +1,10 @@ +from collections.abc import Callable +from typing import Concatenate + +from climada.entity.exposures.base import Exposures +from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet +from climada.hazard.base import Hazard + +HazardChange = Callable[Concatenate[Hazard, ...], Hazard] +ImpfsetChange = Callable[Concatenate[ImpactFuncSet, ...], ImpactFuncSet] +ExposuresChange = Callable[Concatenate[Exposures, ...], Exposures] diff --git a/climada/entity/entity_def.py b/climada/entity/entity_def.py index d58af9efed..c1bc3b2550 100755 --- a/climada/entity/entity_def.py +++ b/climada/entity/entity_def.py @@ -26,10 +26,10 @@ import pandas as pd +from climada.entity._legacy_measures.measure_set import Measure, MeasureSet from climada.entity.disc_rates.base import DiscRates from climada.entity.exposures.base import Exposures from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet -from climada.entity.measures.measure_set import MeasureSet LOGGER = logging.getLogger(__name__) diff --git a/climada/entity/test/test_entity.py b/climada/entity/test/test_entity.py index 7805a24e70..4a88fc531a 100644 --- a/climada/entity/test/test_entity.py +++ b/climada/entity/test/test_entity.py @@ -24,11 +24,11 @@ import numpy as np from climada import CONFIG +from climada.entity._legacy_measures.measure_set import MeasureSet from climada.entity.disc_rates.base import DiscRates from climada.entity.entity_def import Entity from climada.entity.exposures.base import Exposures from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet -from climada.entity.measures.measure_set import MeasureSet from climada.util.constants import ENT_TEMPLATE_XLS ENT_TEST_MAT = CONFIG.exposures.test_data.dir().joinpath("demo_today.mat") diff --git a/climada/trajectories/snapshot.py b/climada/trajectories/snapshot.py index 1d5f778135..dbf7f1bbd6 100644 --- a/climada/trajectories/snapshot.py +++ b/climada/trajectories/snapshot.py @@ -31,9 +31,9 @@ import numpy as np import pandas as pd +from climada.entity._legacy_measures.base import Measure from climada.entity.exposures import Exposures from climada.entity.impact_funcs import ImpactFuncSet -from climada.entity.measures.base import Measure from climada.hazard import Hazard LOGGER = logging.getLogger(__name__) diff --git a/climada/trajectories/test/test_snapshot.py b/climada/trajectories/test/test_snapshot.py index 77830d3b54..0acaf148da 100644 --- a/climada/trajectories/test/test_snapshot.py +++ b/climada/trajectories/test/test_snapshot.py @@ -5,9 +5,9 @@ import pandas as pd import pytest +from climada.entity._legacy_measures.base import Measure from climada.entity.exposures import Exposures from climada.entity.impact_funcs import ImpactFunc, ImpactFuncSet -from climada.entity.measures.base import Measure from climada.hazard import Hazard from climada.trajectories.snapshot import Snapshot from climada.util.constants import EXP_DEMO_H5, HAZ_DEMO_H5 From ceaf479c800c76b3f056d106d3660bc4ae26fdc8 Mon Sep 17 00:00:00 2001 From: spjuhel Date: Thu, 2 Apr 2026 11:56:15 +0200 Subject: [PATCH 2/3] Oups these shouldn't be here! --- climada/entity/_legacy_measures/helper.py | 246 -------------- .../entity/_legacy_measures/measure_config.py | 318 ------------------ climada/entity/_legacy_measures/types.py | 10 - 3 files changed, 574 deletions(-) delete mode 100644 climada/entity/_legacy_measures/helper.py delete mode 100644 climada/entity/_legacy_measures/measure_config.py delete mode 100644 climada/entity/_legacy_measures/types.py diff --git a/climada/entity/_legacy_measures/helper.py b/climada/entity/_legacy_measures/helper.py deleted file mode 100644 index 99ba0e8820..0000000000 --- a/climada/entity/_legacy_measures/helper.py +++ /dev/null @@ -1,246 +0,0 @@ -""" -This file is part of CLIMADA. - -Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS. - -CLIMADA is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free -Software Foundation, version 3. - -CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY -WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -PARTICULAR PURPOSE. See the GNU General Public License for more details. - -You should have received a copy of the GNU General Public License along -with CLIMADA. If not, see . - ---- - -Define Measure class. -""" - -from __future__ import annotations - -import logging -from dataclasses import dataclass, field, fields -from functools import reduce -from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, TypeVar, Union, cast - -import numpy as np -import pandas as pd - -from climada.entity.exposures.base import Exposures -from climada.entity.impact_funcs.base import ImpactFunc -from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet -from climada.entity.measures.measure_config import ( - ExposuresModifierConfig, - HazardModifierConfig, - ImpfsetModifierConfig, - MeasureConfig, -) -from climada.hazard.base import Hazard - -if TYPE_CHECKING: - from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet - from climada.entity.measures.types import ( - ExposuresChange, - HazardChange, - ImpfsetChange, - ) - from climada.hazard.base import Hazard - -LOGGER = logging.getLogger(__name__) - -T = TypeVar("T", Exposures, ImpactFuncSet, Hazard) - - -def identity_function(x: T, **_kwargs: Any) -> T: - return x - - -def composite_fun(*funcs: Callable[..., T]) -> Callable[..., T]: - """ - Composes multiple functions from right to left. - f(g(h(x))) - """ - - def compose(f: Callable[..., T], g: Callable[..., T]) -> Callable[..., T]: - def composed(x: T, **kwargs: Any) -> T: - return f(g(x, **kwargs), **kwargs) - - return composed - - return reduce(compose, funcs, identity_function) - - -def replace_hazard(new_hazard: Hazard) -> HazardChange: - """Returns a function that replaces the hazard with given new one.""" - - def hazard_change(_: Hazard) -> Hazard: - return new_hazard - - return hazard_change - - -def impact_intensity_rp_cutoff_helper( - cut_off_rp: float, -) -> HazardChange: - """Helper to generate a function removing events from a hazard for which - impacts do not exceed the impacts of a given return period. - - This helper returns a function to be applied on a hazard. - The function returned has to run an impact computation to find out which - event to remove from the hazard. - As such it has the following signature: - - ```f(hazard: Hazard, # The hazard to apply on - exposures: Exposures, # The exposure for the impact computation - impfset: ImpactFuncSet, # The impfset for the impact computation - base_hazard: Hazard, # The hazard for the impact computation - exposures_region_id: Optional[list[int]] = None, # Region id to filter to - ) -> Hazard - ``` - - Identifies events exceeding a return period and returns the hazard intensity - matrix with those event intensities zeroed out. - """ - from climada.engine.impact_calc import ImpactCalc - - def hazard_change( - hazard: Hazard, - base_exposures: Exposures, - base_impfset: ImpactFuncSet, - base_hazard: Hazard, - exposures_region_id: Optional[list[int]] = None, - ) -> Hazard: - exp_imp = base_exposures - if exposures_region_id: - # Narrowing the type for the LSP via boolean indexing - in_reg = base_exposures.gdf["region_id"].isin(exposures_region_id) - exp_imp = Exposures(base_exposures.gdf[in_reg], crs=base_exposures.crs) - - imp = ImpactCalc(exp_imp, base_impfset, base_hazard).impact(save_mat=False) - - # Calculate exceedance frequencies - sort_idxs = np.argsort(imp.at_event)[::-1] - exceed_freq = np.cumsum(imp.frequency[sort_idxs]) - events_below_cutoff = sort_idxs[exceed_freq <= cut_off_rp] - - # Modify sparse data structure - intensity_modified = base_hazard.intensity.copy() - for event in events_below_cutoff: - start, end = ( - intensity_modified.indptr[event], - intensity_modified.indptr[event + 1], - ) - intensity_modified.data[start:end] = 0 - - hazard.intensity = intensity_modified - return hazard - - return hazard_change - - -def helper_hazard(hazard_modifier: HazardModifierConfig) -> HazardChange: - """Returns a function that scales and shifts hazard intensity.""" - - def hazard_change(hazard: Hazard, **_kwargs) -> Hazard: - changed_hazard = ( - Hazard.from_hdf5(hazard_modifier.new_hazard_path) - if hazard_modifier.new_hazard_path is not None - else hazard - ) - data = cast(np.ndarray, changed_hazard.intensity.data) - data *= hazard_modifier.haz_int_mult - data += hazard_modifier.haz_int_add - data[data < 0] = 0 - changed_hazard.intensity.eliminate_zeros() - return changed_hazard - - if hazard_modifier.impact_rp_cutoff is not None: - hazard_change = composite_fun( - impact_intensity_rp_cutoff_helper(hazard_modifier.impact_rp_cutoff), - hazard_change, - ) - - return hazard_change - - -def helper_impfset(impfset_modifier: ImpfsetModifierConfig) -> ImpfsetChange: - """Returns a function that modifies impact functions (mdd, paa, intensity) by ID.""" - - def impfset_change(impfset: ImpactFuncSet, **_kwargs) -> ImpactFuncSet: - changed_impfset = ( - impfset.from_excel(impfset_modifier.new_impfset_path) - if impfset_modifier.new_impfset_path is not None - else impfset - ) - if impfset_modifier.impf_ids is None or impfset_modifier.impf_ids == "all": - ids_to_change = impfset.get_ids(haz_type=impfset_modifier.haz_type) - elif isinstance(impfset_modifier.impf_ids, list): - ids_to_change = impfset_modifier.impf_ids - elif isinstance(impfset_modifier.impf_ids, (str, int)): - ids_to_change = [impfset_modifier.impf_ids] - else: - raise ValueError( - f"Impact function ids to changes are invalid: {impfset_modifier.impf_ids}" - ) - - funcs = changed_impfset.get_func(haz_type=impfset_modifier.haz_type) - funcs = [funcs] if isinstance(funcs, ImpactFunc) else funcs - - for impf in funcs: - # Apply Intensity Mod - if impf.id in ids_to_change: - mult, add = ( - impfset_modifier.impf_int_mult, - impfset_modifier.impf_int_add, - ) - impf.intensity = impf.intensity * mult + add - - mult, add = ( - impfset_modifier.impf_mdd_mult, - impfset_modifier.impf_mdd_add, - ) - impf.mdd = impf.mdd * mult + add - - mult, add = ( - impfset_modifier.impf_paa_mult, - impfset_modifier.impf_paa_add, - ) - impf.paa = impf.paa * mult + add - - return changed_impfset - - return impfset_change - - -def change_impfset(new_impfsets: ImpactFuncSet) -> ImpfsetChange: - """Returns a function that swaps the impact function set with the given one.""" - - def impfset_change(_: ImpactFuncSet) -> ImpactFuncSet: - return new_impfsets - - return impfset_change - - -def helper_exposure(exposures_modifier: ExposuresModifierConfig) -> ExposuresChange: - """Returns a function that reassigns impact function IDs and zeros out specific values.""" - - def exposures_change(exposures: Exposures, **_kwargs) -> Exposures: - changed_exposures = ( - exposures - if exposures_modifier.new_exposures_path is None - else Exposures.from_hdf5(exposures_modifier.new_exposures_path) - ) - gdf = cast(pd.DataFrame, changed_exposures.gdf) - if exposures_modifier.reassign_impf_id is not None: - for haz_type, mapping in exposures_modifier.reassign_impf_id.items(): - gdf[f"impf_{haz_type}"] = gdf[f"impf_{haz_type}"].replace(mapping) - - if exposures_modifier.set_to_zero is not None: - gdf.loc[exposures_modifier.set_to_zero, "value"] = 0 - - return changed_exposures - - return exposures_change diff --git a/climada/entity/_legacy_measures/measure_config.py b/climada/entity/_legacy_measures/measure_config.py deleted file mode 100644 index f742851690..0000000000 --- a/climada/entity/_legacy_measures/measure_config.py +++ /dev/null @@ -1,318 +0,0 @@ -""" -This file is part of CLIMADA. - -Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS. - -CLIMADA is free software: you can redistribute it and/or modify it under the -terms of the GNU General Public License as published by the Free -Software Foundation, version 3. - -CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY -WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -PARTICULAR PURPOSE. See the GNU General Public License for more details. - -You should have received a copy of the GNU General Public License along -with CLIMADA. If not, see . - ---- - -Define configuration dataclasses for Measure reading and writing. -""" - -from __future__ import annotations - -import dataclasses -import logging -from abc import ABC -from dataclasses import asdict, dataclass, field, fields -from datetime import datetime -from typing import TYPE_CHECKING, Dict, Optional, Tuple, Union - -import pandas as pd - -from climada.util.string_parsers import parse_color, parse_mapping_string, parse_range - -if TYPE_CHECKING: - from climada.entity.measures.base import Measure - from climada.entity.measures.cost_income import CostIncome - -LOGGER = logging.getLogger(__name__) - - -@dataclass -class _ModifierConfig(ABC): - def to_dict(self): - # 1. Get the current values as a dict - current_data = asdict(self) - - # 2. Identify fields where the current value differs from the default - non_default_data = {} - for f in fields(self): - current_value = getattr(self, f.name) - - # Logic to get the default value (handling both default and default_factory) - default_value = f.default - if ( - f.default_factory is not field().default_factory - ): # Check if factory exists - default_value = f.default_factory() - - if current_value != default_value: - non_default_data[f.name] = current_data[f.name] - - non_default_data.pop("haz_type", None) - return non_default_data - - @classmethod - def from_dict(cls, d: dict): - filtered = cls._filter_dict_to_fields(d) - return cls(**filtered) - - @classmethod - def _filter_dict_to_fields(cls, d: dict): - """Filter out values that do not match the dataclass fields.""" - filtered = dict( - filter(lambda k: k[0] in [f.name for f in fields(cls)], d.items()) - ) - return filtered - - def _filter_out_default_fields(self): - non_defaults = {} - defaults = {} - for f in fields(self): - val = getattr(self, f.name) - default = f.default - if f.default_factory is not field().default_factory: - default = f.default_factory() - - if val != default: - non_defaults[f.name] = val - else: - defaults[f.name] = val - return non_defaults, defaults - - def __repr__(self) -> str: - non_defaults, defaults = self._filter_out_default_fields() - ndf_fields_str = ( - "\n\t\t\t".join(f"{k}={v!r}" for k, v in non_defaults.items()) - if non_defaults - else None - ) - fields_str = ( - "\n\t\t\t".join(f"{k}={v!r}" for k, v in defaults.items()) - if defaults - else None - ) - fields = ( - "(" "\n\t\tNon default fields:" f"\n\t\t\t{ndf_fields_str}" - if ndf_fields_str - else "()" - ) - return f"{self.__class__.__name__}{fields}" - - -@dataclass(repr=False) -class ImpfsetModifierConfig(_ModifierConfig): - """Configuration for impact function modifiers.""" - - haz_type: str - impf_ids: Optional[Union[int, str, list[Union[int, str]]]] = None - impf_mdd_mult: float = 1.0 - impf_mdd_add: float = 0.0 - impf_paa_mult: float = 1.0 - impf_paa_add: float = 0.0 - impf_int_mult: float = 1.0 - impf_int_add: float = 0.0 - new_impfset_path: Optional[str] = None - """Excel filepath for new impfset.""" - - def __post_init__(self): - if self.new_impfset_path is not None and any( - [ - self.impf_mdd_add, - self.impf_mdd_mult, - self.impf_paa_add, - self.impf_paa_mult, - self.impf_int_add, - self.impf_int_mult, - ] - ): - LOGGER.warning( - "Both new impfset object and impfset modifiers are provided, " - "modifiers will be applied after changing the impfset." - ) - - -@dataclass(repr=False) -class HazardModifierConfig(_ModifierConfig): - """Configuration for impact function modifiers.""" - - haz_type: str - haz_int_mult: Optional[float] = 1.0 - haz_int_add: Optional[float] = 0.0 - new_hazard_path: Optional[str] = None - """HDF5 filepath for new hazard.""" - impact_rp_cutoff: Optional[float] = None - - def __post_init__(self): - if self.new_hazard_path is not None and any( - [self.haz_int_mult, self.haz_int_add, self.impact_rp_cutoff] - ): - LOGGER.warning( - "Both new hazard object and hazard modifiers are provided, " - "modifiers will be applied after changing the hazard." - ) - - -@dataclass(repr=False) -class ExposuresModifierConfig(_ModifierConfig): - """Configuration for impact function modifiers.""" - - reassign_impf_id: Optional[Dict[str, Dict[int | str, int | str]]] = None - set_to_zero: Optional[list[int]] = None - new_exposures_path: Optional[str] = None - """HDF5 filepath for new exposure""" - - def __post_init__(self): - if self.new_exposures_path is not None and any( - [self.reassign_impf_id, self.set_to_zero] - ): - LOGGER.warning( - "Both new exposures object and exposures modifiers are provided, " - "modifiers will be applied after changing the exposures." - ) - - -@dataclass(repr=False) -class CostIncomeConfig(_ModifierConfig): - """Serializable configuration for CostIncome.""" - - mkt_price_year: Optional[int] = field(default_factory=lambda: datetime.today().year) - init_cost: float = 0.0 - periodic_cost: float = 0.0 - periodic_income: float = 0.0 - cost_yearly_growth_rate: float = 0.0 - income_yearly_growth_rate: float = 0.0 - freq: str = "Y" - custom_cash_flows: Optional[list[dict]] = None - - def to_cost_income(self) -> CostIncome: - df = None - if self.custom_cash_flows is not None: - df = pd.DataFrame(self.custom_cash_flows) - df["date"] = pd.to_datetime(df["date"]) - return CostIncome( - mkt_price_year=self.mkt_price_year, - init_cost=self.init_cost, - periodic_cost=self.periodic_cost, - periodic_income=self.periodic_income, - cost_yearly_growth_rate=self.cost_yearly_growth_rate, - income_yearly_growth_rate=self.income_yearly_growth_rate, - custom_cash_flows=df, - freq=self.freq, - ) - - @classmethod - def from_cost_income(cls, ci: CostIncome) -> "CostIncomeConfig": - """Round-trip from a live CostIncome object.""" - custom = None - if ci.custom_cash_flows is not None: - custom = ( - ci.custom_cash_flows.reset_index() - .rename(columns={"index": "date"}) - .assign(date=lambda df: df["date"].dt.strftime("%Y-%m-%d")) - .to_dict(orient="records") - ) - return cls( - mkt_price_year=ci.mkt_price_year.year, # datetime → int - init_cost=abs(ci.init_cost), # stored negative → positive - periodic_cost=abs(ci.periodic_cost), - periodic_income=ci.periodic_income, - cost_yearly_growth_rate=ci.cost_growth_rate, - income_yearly_growth_rate=ci.income_growth_rate, - freq=ci.freq, - custom_cash_flows=custom, - ) - - -@dataclass(repr=False) -class MeasureConfig(_ModifierConfig): - name: str - haz_type: str - impfset_modifier: ImpfsetModifierConfig - hazard_modifier: HazardModifierConfig - exposures_modifier: ExposuresModifierConfig - cost_income: CostIncomeConfig - implementation_duration: Optional[str] = None - color_rgb: Optional[Tuple[float, float, float]] = None - - def __repr__(self) -> str: - fields_str = "\n\t".join(f"{k}={v!r}" for k, v in self.__dict__.items()) - return f"{self.__class__.__name__}(\n\t{fields_str})" - - def to_dict(self) -> dict: - return { - "name": self.name, - "haz_type": self.haz_type, - **self.impfset_modifier.to_dict(), - **self.hazard_modifier.to_dict(), - **self.exposures_modifier.to_dict(), - **self.cost_income.to_dict(), - "implementation_duration": self.implementation_duration, - "color_rgb": list(self.color_rgb) if self.color_rgb is not None else None, - } - - @classmethod - def from_dict(cls, d: dict) -> "MeasureConfig": - color = d.get("color_rgb") - return cls( - name=d["name"], - haz_type=d["haz_type"], - impfset_modifier=ImpfsetModifierConfig.from_dict(d), - hazard_modifier=HazardModifierConfig.from_dict(d), - exposures_modifier=ExposuresModifierConfig.from_dict(d), - cost_income=CostIncomeConfig.from_dict(d), - implementation_duration=d.get("implementation_duration"), - color_rgb=( - tuple(color) if color is not None and not pd.isna(color) else None - ), - ) - - def to_yaml(self, path: str) -> None: - import yaml - - with open(path, "w") as f: - yaml.dump( - {"measures": [self.to_dict()]}, - f, - default_flow_style=False, - sort_keys=False, - ) - - @classmethod - def from_yaml(cls, path: str) -> "MeasureConfig": - import yaml - - with open(path) as f: - return cls.from_dict(yaml.safe_load(f)["measures"][0]) - - @classmethod - def from_row( - cls, row: pd.Series, haz_type: Optional[str] = None - ) -> "MeasureConfig": - """Build a MeasureConfig from a legacy Excel row.""" - row_dict = row.to_dict() - return cls.from_dict(row_dict) - - -def _serialize_modifier_dict(d: dict) -> dict: - """Stringify keys, convert tuples to lists for JSON.""" - return {str(k): list(v) for k, v in d.items()} - - -def _deserialize_modifier_dict(d: dict) -> dict: - """Restore int keys where possible, values back to tuples.""" - return { - (int(k) if isinstance(k, str) and k.isdigit() else k): tuple(v) - for k, v in d.items() - } diff --git a/climada/entity/_legacy_measures/types.py b/climada/entity/_legacy_measures/types.py deleted file mode 100644 index 1ad90986b1..0000000000 --- a/climada/entity/_legacy_measures/types.py +++ /dev/null @@ -1,10 +0,0 @@ -from collections.abc import Callable -from typing import Concatenate - -from climada.entity.exposures.base import Exposures -from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet -from climada.hazard.base import Hazard - -HazardChange = Callable[Concatenate[Hazard, ...], Hazard] -ImpfsetChange = Callable[Concatenate[ImpactFuncSet, ...], ImpactFuncSet] -ExposuresChange = Callable[Concatenate[Exposures, ...], Exposures] From c176e80090c62bb2bb17ad408e9305c2c3abc7e1 Mon Sep 17 00:00:00 2001 From: spjuhel Date: Thu, 2 Apr 2026 12:04:42 +0200 Subject: [PATCH 3/3] Forgotten places --- climada/engine/unsequa/input_var.py | 4 ++-- climada/entity/_legacy_measures/measure_set.py | 3 ++- climada/entity/_legacy_measures/test/test_base.py | 4 ++-- climada/entity/_legacy_measures/test/test_meas_set.py | 10 ++++++---- climada/trajectories/calc_risk_metrics.py | 2 +- climada/trajectories/test/test_calc_risk_metrics.py | 2 +- 6 files changed, 14 insertions(+), 11 deletions(-) diff --git a/climada/engine/unsequa/input_var.py b/climada/engine/unsequa/input_var.py index 76e63d766e..9abdd8d3fa 100644 --- a/climada/engine/unsequa/input_var.py +++ b/climada/engine/unsequa/input_var.py @@ -518,7 +518,7 @@ def ent( exp_list : [climada.entity.exposures.base.Exposure] The list of base exposure. Can be one or many to uniformly sample from. - meas_set : climada.entity.measures.measure_set.MeasureSet + meas_set : climada.entity._legacy_measures.measure_set.MeasureSet The base measures. haz_id_dict : dict Dictionary of the impact functions affected by uncertainty. @@ -660,7 +660,7 @@ def entfut( exp_list : [climada.entity.exposures.base.Exposure] The list of base exposure. Can be one or many to uniformly sample from. - meas_set : climada.entity.measures.measure_set.MeasureSet + meas_set : climada.entity._legacy_measures.measure_set.MeasureSet The base measures. haz_id_dict : dict Dictionary of the impact functions affected by uncertainty. diff --git a/climada/entity/_legacy_measures/measure_set.py b/climada/entity/_legacy_measures/measure_set.py index 90a2bb43c2..228788ba15 100755 --- a/climada/entity/_legacy_measures/measure_set.py +++ b/climada/entity/_legacy_measures/measure_set.py @@ -32,7 +32,8 @@ from matplotlib import colormaps as cm import climada.util.hdf5_handler as u_hdf5 -from climada.entity.measures.base import Measure + +from .base import Measure LOGGER = logging.getLogger(__name__) diff --git a/climada/entity/_legacy_measures/test/test_base.py b/climada/entity/_legacy_measures/test/test_base.py index 6f76eb7373..430ab7d44b 100644 --- a/climada/entity/_legacy_measures/test/test_base.py +++ b/climada/entity/_legacy_measures/test/test_base.py @@ -28,12 +28,12 @@ import climada.entity.exposures.test as exposures_test import climada.util.coordinates as u_coord from climada import CONFIG +from climada.entity._legacy_measures.base import IMPF_ID_FACT, Measure +from climada.entity._legacy_measures.measure_set import MeasureSet from climada.entity.entity_def import Entity from climada.entity.exposures.base import Exposures from climada.entity.impact_funcs.base import ImpactFunc from climada.entity.impact_funcs.impact_func_set import ImpactFuncSet -from climada.entity.measures.base import IMPF_ID_FACT, Measure -from climada.entity.measures.measure_set import MeasureSet from climada.hazard.base import Hazard from climada.test import get_test_file from climada.util.constants import HAZ_DEMO_H5 diff --git a/climada/entity/_legacy_measures/test/test_meas_set.py b/climada/entity/_legacy_measures/test/test_meas_set.py index a2cbdc3f16..868510fbe8 100644 --- a/climada/entity/_legacy_measures/test/test_meas_set.py +++ b/climada/entity/_legacy_measures/test/test_meas_set.py @@ -24,8 +24,8 @@ import numpy as np from climada import CONFIG -from climada.entity.measures.base import Measure -from climada.entity.measures.measure_set import MeasureSet +from climada.entity._legacy_measures.base import Measure +from climada.entity._legacy_measures.measure_set import MeasureSet from climada.util.constants import ENT_DEMO_TODAY, ENT_TEMPLATE_XLS DATA_DIR = CONFIG.measures.test_data.dir() @@ -58,7 +58,7 @@ def test_add_wrong_error(self): """Test error is raised when wrong ImpactFunc provided.""" meas = MeasureSet() with self.assertLogs( - "climada.entity.measures.measure_set", level="WARNING" + "climada.entity._legacy_measures.measure_set", level="WARNING" ) as cm: meas.append(Measure()) self.assertIn("Input Measure's hazard type not set.", cm.output[0]) @@ -76,7 +76,9 @@ def test_remove_measure_pass(self): def test_remove_wrong_error(self): """Test error is raised when invalid inputs.""" meas = MeasureSet(measure_list=[Measure(name="Mangrove", haz_type="FL")]) - with self.assertLogs("climada.entity.measures.measure_set", level="INFO") as cm: + with self.assertLogs( + "climada.entity._legacy_measures.measure_set", level="INFO" + ) as cm: meas.remove_measure(name="Seawall") self.assertIn("No Measure with name Seawall.", cm.output[0]) diff --git a/climada/trajectories/calc_risk_metrics.py b/climada/trajectories/calc_risk_metrics.py index 902fffc2ba..8e5ae8b150 100644 --- a/climada/trajectories/calc_risk_metrics.py +++ b/climada/trajectories/calc_risk_metrics.py @@ -32,7 +32,7 @@ import pandas as pd from climada.engine.impact import Impact -from climada.entity.measures.base import Measure +from climada.entity._legacy_measures.base import Measure from climada.trajectories.constants import ( AAI_METRIC_NAME, COORD_ID_COL_NAME, diff --git a/climada/trajectories/test/test_calc_risk_metrics.py b/climada/trajectories/test/test_calc_risk_metrics.py index 9c75f78fb4..6fc530d065 100644 --- a/climada/trajectories/test/test_calc_risk_metrics.py +++ b/climada/trajectories/test/test_calc_risk_metrics.py @@ -26,10 +26,10 @@ import pandas as pd import pytest +from climada.entity._legacy_measures.base import Measure from climada.entity.exposures import Exposures from climada.entity.impact_funcs import ImpactFuncSet from climada.entity.impact_funcs.trop_cyclone import ImpfTropCyclone -from climada.entity.measures.base import Measure from climada.hazard import Hazard from climada.trajectories.calc_risk_metrics import CalcRiskMetricsPoints from climada.trajectories.constants import (