From bcdabf554c065cafdfb19b8c59d0a4be8abd5aea Mon Sep 17 00:00:00 2001 From: Dheepak Krishnamurthy <1813121+kdheepak@users.noreply.github.com> Date: Tue, 1 Apr 2025 08:23:01 -0400 Subject: [PATCH] refactor(openTEPES): extract stage initialization logic into separate function Extracted the initialization logic for load levels and cycles into a new function to improve code readability and maintainability. --- openTEPES/openTEPES.py | 145 ++++++++++++----------------------------- 1 file changed, 43 insertions(+), 102 deletions(-) diff --git a/openTEPES/openTEPES.py b/openTEPES/openTEPES.py index 71a3086a..aa5cc15e 100644 --- a/openTEPES/openTEPES.py +++ b/openTEPES/openTEPES.py @@ -17,11 +17,49 @@ from .openTEPES_OutputResults import OutputResultsParVarCon, InvestmentResults, GenerationOperationResults, GenerationOperationHeatResults, ESSOperationResults, ReservoirOperationResults, NetworkH2OperationResults, NetworkHeatOperationResults, FlexibilityResults, NetworkOperationResults, MarginalResults, OperationSummaryResults, ReliabilityResults, CostSummaryResults, EconomicResults, NetworkMapResults +def initialize_stage(mTEPES, _phase_of_simulation=None): + # activate only load levels to formulate + mTEPES.del_component(mTEPES.st) + mTEPES.del_component(mTEPES.n ) + mTEPES.del_component(mTEPES.n2) + + if _phase_of_simulation == "initial": + + mTEPES.st = Set(doc='stages', initialize=[stt for stt in mTEPES.stt if st == stt if mTEPES.pStageWeight[stt] and sum(1 for (p,sc,st,nn) in mTEPES.s2n)]) + mTEPES.n = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if (p,sc,st,nn) in mTEPES.s2n ]) + mTEPES.n2 = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if (p,sc,st,nn) in mTEPES.s2n ]) + + else: + + mTEPES.st = Set(doc='stages', initialize=[stt for stt in mTEPES.stt if mTEPES.pStageWeight[stt] and sum(1 for p,sc,stt,nn in mTEPES.s2n)]) + mTEPES.n = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) + mTEPES.n2 = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) + + # load levels multiple of cycles for each ESS/generator + mTEPES.nesc = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [es] == 0] + mTEPES.necc = [(n,ec) for n,ec in mTEPES.n*mTEPES.ec if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [ec] == 0] + mTEPES.neso = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pOutflowsTimeStep[es] == 0] + mTEPES.ngen = [(n,g ) for n,g in mTEPES.n*mTEPES.g if mTEPES.n.ord(n) % mTEPES.pEnergyTimeStep [g ] == 0] + if mTEPES.pIndHydroTopology == 1: + mTEPES.nhc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2h) == 0] + if sum(1 for h,rs in mTEPES.p2r): + mTEPES.np2c = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) == 0] + else: + mTEPES.np2c = [] + if sum(1 for rs,h in mTEPES.r2p): + mTEPES.npc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) == 0] + else: + mTEPES.npc = [] + mTEPES.nrsc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] + mTEPES.nrcc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rn if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] + mTEPES.nrso = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pWaterOutTimeStep [rs] == 0] + + def openTEPES_run(DirName, CaseName, SolverName, pIndOutputResults, pIndLogConsole): InitialTime = time.time() _path = os.path.join(DirName, CaseName) - + #%% replacing string values by numerical values idxDict = dict() idxDict[0 ] = 0 @@ -78,32 +116,7 @@ def openTEPES_run(DirName, CaseName, SolverName, pIndOutputResults, pIndLogConso # control for not repeating the stages in case of several ones mTEPES.NoRepetition = 0 - # activate only load levels to formulate - mTEPES.del_component(mTEPES.st) - mTEPES.del_component(mTEPES.n ) - mTEPES.del_component(mTEPES.n2) - mTEPES.st = Set(doc='stages', initialize=[stt for stt in mTEPES.stt if st == stt if mTEPES.pStageWeight[stt] and sum(1 for (p,sc,st,nn) in mTEPES.s2n)]) - mTEPES.n = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if (p,sc,st,nn) in mTEPES.s2n ]) - mTEPES.n2 = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if (p,sc,st,nn) in mTEPES.s2n ]) - - # load levels multiple of cycles for each ESS/generator - mTEPES.nesc = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [es] == 0] - mTEPES.necc = [(n,ec) for n,ec in mTEPES.n*mTEPES.ec if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [ec] == 0] - mTEPES.neso = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pOutflowsTimeStep[es] == 0] - mTEPES.ngen = [(n,g ) for n,g in mTEPES.n*mTEPES.g if mTEPES.n.ord(n) % mTEPES.pEnergyTimeStep [g ] == 0] - if mTEPES.pIndHydroTopology == 1: - mTEPES.nhc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2h) == 0] - if sum(1 for h,rs in mTEPES.p2r): - mTEPES.np2c = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) == 0] - else: - mTEPES.np2c = [] - if sum(1 for rs,h in mTEPES.r2p): - mTEPES.npc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) == 0] - else: - mTEPES.npc = [] - mTEPES.nrsc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrcc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rn if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrso = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pWaterOutTimeStep [rs] == 0] + initialize_stage(mTEPES, _phase_of_simulation="initial") if len(mTEPES.st): @@ -177,31 +190,7 @@ def openTEPES_run(DirName, CaseName, SolverName, pIndOutputResults, pIndLogConso if st == mTEPES.Last_st and mTEPES.NoRepetition == 0: - mTEPES.del_component(mTEPES.st) - mTEPES.del_component(mTEPES.n ) - mTEPES.del_component(mTEPES.n2) - mTEPES.st = Set(doc='stages', initialize=[stt for stt in mTEPES.stt if mTEPES.pStageWeight[stt] and sum(1 for p,sc,stt,nn in mTEPES.s2n)]) - mTEPES.n = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) - mTEPES.n2 = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) - - # load levels multiple of cycles for each ESS/generator - mTEPES.nesc = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [es] == 0] - mTEPES.necc = [(n,ec) for n,ec in mTEPES.n*mTEPES.ec if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [ec] == 0] - mTEPES.neso = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pOutflowsTimeStep[es] == 0] - mTEPES.ngen = [(n,g ) for n,g in mTEPES.n*mTEPES.g if mTEPES.n.ord(n) % mTEPES.pEnergyTimeStep [g ] == 0] - if mTEPES.pIndHydroTopology == 1: - mTEPES.nhc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2h) == 0] - if sum(1 for h,rs in mTEPES.p2r): - mTEPES.np2c = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) == 0] - else: - mTEPES.np2c = [] - if sum(1 for rs,h in mTEPES.r2p): - mTEPES.npc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) == 0] - else: - mTEPES.npc = [] - mTEPES.nrsc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrcc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rn if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrso = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pWaterOutTimeStep [rs] == 0] + initialize_stage(mTEPES) # Writing LP file if pIndLogConsole == 1: @@ -230,31 +219,7 @@ def openTEPES_run(DirName, CaseName, SolverName, pIndOutputResults, pIndLogConso if (p,sc) == mTEPES.ps.last() and st == mTEPES.Last_st and mTEPES.NoRepetition == 0: mTEPES.NoRepetition = 1 - mTEPES.del_component(mTEPES.st) - mTEPES.del_component(mTEPES.n ) - mTEPES.del_component(mTEPES.n2) - mTEPES.st = Set(doc='stages', initialize=[stt for stt in mTEPES.stt if mTEPES.pStageWeight[stt] and sum(1 for p,sc,stt,nn in mTEPES.s2n)]) - mTEPES.n = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) - mTEPES.n2 = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) - - # load levels multiple of cycles for each ESS/generator - mTEPES.nesc = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [es] == 0] - mTEPES.necc = [(n,ec) for n,ec in mTEPES.n*mTEPES.ec if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [ec] == 0] - mTEPES.neso = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pOutflowsTimeStep[es] == 0] - mTEPES.ngen = [(n,g ) for n,g in mTEPES.n*mTEPES.g if mTEPES.n.ord(n) % mTEPES.pEnergyTimeStep [g ] == 0] - if mTEPES.pIndHydroTopology == 1: - mTEPES.nhc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2h) == 0] - if sum(1 for h,rs in mTEPES.p2r): - mTEPES.np2c = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) == 0] - else: - mTEPES.np2c = [] - if sum(1 for rs,h in mTEPES.r2p): - mTEPES.npc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) == 0] - else: - mTEPES.npc = [] - mTEPES.nrsc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrcc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rn if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrso = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pWaterOutTimeStep [rs] == 0] + initialize_stage(mTEPES) # Writing LP file if pIndLogConsole == 1: @@ -267,31 +232,7 @@ def openTEPES_run(DirName, CaseName, SolverName, pIndOutputResults, pIndLogConso # there are investment decisions (it is an expansion and operation model) or there are system emission constraints ProblemSolving(DirName, CaseName, SolverName, mTEPES, mTEPES, pIndLogConsole, p, sc, st) - mTEPES.del_component(mTEPES.st) - mTEPES.del_component(mTEPES.n ) - mTEPES.del_component(mTEPES.n2) - mTEPES.st = Set(doc='stages', initialize=[stt for stt in mTEPES.stt if mTEPES.pStageWeight[stt] and sum(1 for p,sc,stt,nn in mTEPES.s2n)]) - mTEPES.n = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) - mTEPES.n2 = Set(doc='load levels', initialize=[nn for nn in mTEPES.nn if sum(1 for p,sc,st in mTEPES.ps*mTEPES.st if (p,sc,st, nn) in mTEPES.s2n)]) - - # load levels multiple of cycles for each ESS/generator - mTEPES.nesc = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [es] == 0] - mTEPES.necc = [(n,ec) for n,ec in mTEPES.n*mTEPES.ec if mTEPES.n.ord(n) % mTEPES.pStorageTimeStep [ec] == 0] - mTEPES.neso = [(n,es) for n,es in mTEPES.n*mTEPES.es if mTEPES.n.ord(n) % mTEPES.pOutflowsTimeStep[es] == 0] - mTEPES.ngen = [(n,g ) for n,g in mTEPES.n*mTEPES.g if mTEPES.n.ord(n) % mTEPES.pEnergyTimeStep [g ] == 0] - if mTEPES.pIndHydroTopology == 1: - mTEPES.nhc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2h) == 0] - if sum(1 for h,rs in mTEPES.p2r): - mTEPES.np2c = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (h,rs) in mTEPES.p2r) == 0] - else: - mTEPES.np2c = [] - if sum(1 for rs,h in mTEPES.r2p): - mTEPES.npc = [(n,h ) for n,h in mTEPES.n*mTEPES.h if sum(1 for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) and mTEPES.n.ord(n) % sum(mTEPES.pReservoirTimeStep[rs] for rs in mTEPES.rs if (rs,h) in mTEPES.r2p) == 0] - else: - mTEPES.npc = [] - mTEPES.nrsc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrcc = [(n,rs) for n,rs in mTEPES.n*mTEPES.rn if mTEPES.n.ord(n) % mTEPES.pReservoirTimeStep[rs] == 0] - mTEPES.nrso = [(n,rs) for n,rs in mTEPES.n*mTEPES.rs if mTEPES.n.ord(n) % mTEPES.pWaterOutTimeStep [rs] == 0] + initialize_stage(mTEPES) # activate the constraints of all the periods and scenarios for c in mTEPES.component_objects(pyo.Constraint):