-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
321 lines (294 loc) · 11.5 KB
/
utils.py
File metadata and controls
321 lines (294 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# utils.py
import os
import sys
from time import sleep
import psutil
import argparse
import importlib.util
from enum import Enum
from gen_data import *
from pyomo.environ import Objective, ConcreteModel, value
from pyomo.opt import SolverFactory, SolverResults
from rosepy.pyomo.pyomo_interface import PyomoInterface
# Utility functions
def is_package_installed(package_name):
"""
Check if a package is installed (used for solvers).
"""
return importlib.util.find_spec(package_name) is not None
def validate_state_code(state_code: str) -> str:
"""
Validate the state code provided by the user.
"""
if state_code.upper() not in VALID_STATE_CODES:
raise argparse.ArgumentTypeError(
f"Invalid state code: {state_code}. Must be one of {', '.join(VALID_STATE_CODES)}."
)
return state_code.upper()
def positive_int(val) -> int:
"""
Validate that the value provided by the user is a positive integer.
"""
ivalue = int(val)
if ivalue <= 0:
raise argparse.ArgumentTypeError(f"{val} is an invalid positive int value")
return ivalue
def non_negative_int(val) -> int:
"""
Validate that the value provided by the user is a non-negative integer.
"""
ivalue = int(val)
if ivalue < 0:
raise argparse.ArgumentTypeError(f"{val} is an invalid non-negative int value")
return ivalue
def positive_float(val) -> float:
"""
Validate that the value provided by the user is a positive float.
"""
fvalue = float(val)
if fvalue <= 0:
raise argparse.ArgumentTypeError(f"{val} is an invalid positive float value")
return fvalue
def get_physical_cores() -> int | None:
"""
Get the number of physical CPU cores available on the machine.
"""
try:
return psutil.cpu_count(logical=False)
except Exception as e:
print(f"Error retrieving physical cores: {e}")
return None
def get_solver(solver_name: str, callback: bool = False) -> SolverFactory:
"""
Get the solver factory for the specified solver name.
"""
if not is_package_installed(f"{solver_name}py"):
raise RuntimeError(f"Solver '{solver_name}' is not installed.")
if solver_name == "gurobi":
if callback:
solver = SolverFactory(f"appsi_{solver_name}")
else:
solver = SolverFactory(f"{solver_name}_persistent")
elif solver_name == "highs":
solver = SolverFactory(f"appsi_{solver_name}")
elif solver_name == "rose":
solver = SolverFactory(f"{solver_name}")
else:
raise RuntimeError(
f"Invalid solver '{solver_name}' please choose gurobi, highs or rose."
)
return solver
def setup_benders_sub_solver(
model: ConcreteModel,
solver: SolverFactory,
callback: callable = None,
options: dict = None,
solver_threads: int = 0,
verbose: bool = False,
) -> Dict:
"""
Prep the solver for Benders using the specified solver.
Returns the updated options dictionary.
"""
if not isinstance(model, ConcreteModel):
raise ValueError("The model must be a ConcreteModel instance.")
if not hasattr(solver, "solve") or not callable(getattr(solver, "solve")):
raise ValueError("The solver object must have callable attribute 'solve'.")
# Try to determine the solver interface name
solver_iface_name = (
getattr(solver.__class__, "__module__", "")
or getattr(solver, "__name__", "")
or ""
)
if hasattr(
solver, "set_instance"
): # For some solver interfaces you have to set the instance first before setting callbacks or options
solver.set_instance(model)
if hasattr(solver, "set_callback") and callback:
solver.set_callback(callback)
if hasattr(solver, "set_gurobi_param"):
solver.set_gurobi_param("OutputFlag", verbose)
solver.set_gurobi_param("Threads", solver_threads)
if options:
options["OutputFlag"] = verbose
options["Threads"] = solver_threads
for key, val in options.items():
solver.set_gurobi_param(key, val)
else:
options = {"OutputFlag": verbose, "Threads": solver_threads}
return options
elif hasattr(solver, "gurobi_options"):
solver.config.stream_solver = verbose
solver.gurobi_options["OutputFlag"] = verbose
solver.gurobi_options["Threads"] = solver_threads
if options:
options["OutputFlag"] = verbose
options["Threads"] = solver_threads
for key, val in options.items():
solver.gurobi_options[key] = val
else:
options = {"OutputFlag": verbose, "Threads": solver_threads}
return options
elif hasattr(solver, "highs_options"):
solver.config.stream_solver = verbose
solver.highs_options["output_flag"] = verbose
solver.highs_options["threads"] = solver_threads
if options:
options["output_flag"] = verbose
options["threads"] = solver_threads
for key, val in options.items():
solver.highs_options[key] = val
else:
options = {"output_flag": verbose, "threads": solver_threads}
return options
elif "rose" in solver_iface_name:
if options:
options["rank_burls"] = solver_threads
options["solver_engine"] = "rose_experimental_with_default_presolve"
else:
options = {
"rank_burls": solver_threads,
"solver_engine": "rose_experimental_with_default_presolve",
}
return options
else:
raise RuntimeError(f"Solver interface error for: '{solver_iface_name}'.")
def solve_model(
model: ConcreteModel,
solver: SolverFactory,
callback: callable = None,
options: dict = None,
solver_threads: int = 0,
verbose: bool = False,
) -> SolverResults:
"""
Solve the Pyomo model using the specified solver and return the results.
"""
if not isinstance(model, ConcreteModel):
raise ValueError("The model must be a ConcreteModel instance.")
if not hasattr(solver, "solve") or not callable(getattr(solver, "solve")):
raise ValueError("The solver object must have callable attribute 'solve'.")
# Try to determine the solver interface name
solver_iface_name = (
getattr(solver.__class__, "__module__", "")
or getattr(solver, "__name__", "")
or ""
)
if hasattr(
solver, "set_instance"
): # For some solver interfaces you have to set the instance first before setting callbacks or options
solver.set_instance(model)
if hasattr(solver, "set_callback") and callback:
solver.set_callback(callback)
if hasattr(solver, "set_gurobi_param"):
solver.set_gurobi_param("OutputFlag", bool(verbose))
solver.set_gurobi_param("Threads", solver_threads)
if options:
for key, val in options.items():
solver.set_gurobi_param(key, val)
return solver.solve(model)
elif hasattr(solver, "gurobi_options"):
solver.gurobi_options["OutputFlag"] = verbose
solver.gurobi_options["Threads"] = solver_threads
if options:
for key, val in options.items():
solver.gurobi_options[key] = val
solver.config.stream_solver = verbose
return solver.solve(model)
elif hasattr(solver, "highs_options"):
solver.config.stream_solver = verbose
solver.highs_options["output_flag"] = verbose
solver.highs_options["threads"] = verbose
if options:
for key, val in options.items():
solver.highs_options[key] = val
return solver.solve(model)
elif "rose" in solver_iface_name:
if options:
options["rank_burls"] = solver_threads
#options["solver_engine"] = "rose_experimental_with_default_presolve"
else:
options = {
"rank_burls": solver_threads,
#"solver_engine": "rose_experimental_with_default_presolve",
}
return solver.solve(model, options=options)
else:
raise RuntimeError(f"Solver interface error for: '{solver_iface_name}'.")
def get_objective_value(model: ConcreteModel) -> float:
"""
Get the value of the objective function from the Pyomo model.
"""
# Find the active objective(s) in the model
objs = [obj for obj in model.component_objects(Objective, active=True)]
if not objs:
raise RuntimeError("No active objective found in the model.")
if len(objs) > 1:
print("Warning: Multiple active objectives found, returning the first one.")
obj = objs[0]
return value(obj)
class CapacityRule(Enum):
"""
Enumeration of capacity rules for the required (sufficient) production capacity constraint in the Facility Location problem.
MIN: Ensure there is enough capacity to meet minimum demand (not robust).
MAX: Ensure there is enough capacity to meet demand under any circumstance (robust solution).
AVERAGE: Ensure there is enough capacity to meet average demand (not robust).
EXPECTED: Ensure there is enough capacity to meet expected demand (not robust).
"""
MIN = "min"
MAX = "max"
AVERAGE = "average"
EXPECTED = "expected"
def calculate_capacity_threshold(
model: ConcreteModel, capacity_rule: CapacityRule
) -> float:
"""
Calculate the capacity threshold for the required (sufficient) production capacity constraint based on the specified capacity rule.
MIN: Ensure there is enough capacity to meet minimum demand (not robust).
MAX: Ensure there is enough capacity to meet demand under any circumstance (robust solution).
AVERAGE: Ensure there is enough capacity to meet average demand (not robust).
EXPECTED: Ensure there is enough capacity to meet expected demand (not robust).
"""
if capacity_rule == CapacityRule.MIN:
return min(
sum(value(model.customer_demand[j, s]) for j in model.CUSTOMERS)
for s in model.SCENARIOS
)
elif capacity_rule == CapacityRule.MAX:
return max(
sum(value(model.customer_demand[j, s]) for j in model.CUSTOMERS)
for s in model.SCENARIOS
)
elif capacity_rule == CapacityRule.AVERAGE:
return sum(
sum(value(model.customer_demand[j, s]) for j in model.CUSTOMERS)
for s in model.SCENARIOS
) / len(model.SCENARIOS)
elif capacity_rule == CapacityRule.EXPECTED:
return sum(
sum(
value(model.customer_demand[j, s]) * value(model.prob[s])
for j in model.CUSTOMERS
)
for s in model.SCENARIOS
)
else:
raise ValueError(f"Unsupported capacity rule: {capacity_rule}")
def sanity_check_benders_solution(
master: ConcreteModel, expected_operating_cost: float, tol=1e-6
):
"""
Sanity check for the Benders decomposition solution.
Compares the objective value of the master problem with the fixed cost + expected operating cost coming from the subproblems.
"""
obj = get_objective_value(master)
fixed = sum(
value(master.fixed_cost[i]) * value(master.facility_open[i])
for i in master.FACILITIES
)
rhs = fixed + expected_operating_cost
gap = abs(obj - rhs) / abs(obj) if abs(obj) >= 1.0 else abs(obj - rhs)
if gap > tol:
raise RuntimeError(
f"Decomposition failed: gap={gap} > tol={tol}, master objective:{obj}"
)