From 411bcede0f9d74a2de17cfc1e4017a3850ee4138 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sun, 21 Jul 2024 20:38:49 +0800 Subject: [PATCH] Added: - Defined explicit data types for inputs and outputs of functions for better type checking and readability. - Added a test script for `prepshot.utils`. Fixed: - Added `pyoptinterface._src.core_ext` to Pylint's extension package allow list to resolve cpp-extension-no-member warning. Changed: - Updated `model.py` to keep necessary decision variables and use expressions for intermediate variables instead of direct determination. - Refactored `extract_results_non_hydro` in `output_data.py` to extract common features for different variables, simplifying the code. - Removed definitions of complex sets and opted for simple sets wherever possible to streamline the code. --- .pylintrc | 2 + prepshot/_model/co2.py | 51 ++++-- prepshot/_model/cost.py | 84 ++++++--- prepshot/_model/demand.py | 24 ++- prepshot/_model/generation.py | 43 +++-- prepshot/_model/head_iteration.py | 49 +++-- prepshot/_model/hydro.py | 118 +++++++------ prepshot/_model/investment.py | 86 +++++---- prepshot/_model/nondispatchable.py | 18 +- prepshot/_model/storage.py | 74 +++++--- prepshot/_model/transmission.py | 71 +++++--- prepshot/load_data.py | 29 ++- prepshot/logs.py | 11 +- prepshot/model.py | 223 +++++++++++++---------- prepshot/output_data.py | 275 ++++++++++++----------------- prepshot/set_up.py | 57 ++++-- prepshot/solver.py | 48 ++++- prepshot/utils.py | 127 +++++++++---- tests/test_utils.py | 121 +++++++++++++ 19 files changed, 970 insertions(+), 541 deletions(-) create mode 100644 .pylintrc create mode 100644 tests/test_utils.py diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..f8a2c51 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,2 @@ +[MASTER] +extension-pkg-allow-list=pyoptinterface._src.core_ext \ No newline at end of file diff --git a/prepshot/_model/co2.py b/prepshot/_model/co2.py index fb1747c..a3377f5 100644 --- a/prepshot/_model/co2.py +++ b/prepshot/_model/co2.py @@ -4,29 +4,41 @@ """This module contains constraints related to carbon emissions. """ +from typing import Union + import pyoptinterface as poi import numpy as np class AddCo2EmissionConstraints: """Class for carbon emission constraints and calculations. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ], + ) -> None: """Initialize the class. Parameters ---------- - model : pyoptinterface._src.solver.Model - Model index. - params : dict - Dictionary containing parameters. + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + Model object depending on the solver. """ self.model = model model.carbon_breakdown = poi.make_tupledict( - model.year_zone_tech_tuples, + model.year, model.zone, model.tech, rule=self.carbon_breakdown ) model.carbon_capacity = poi.make_tupledict( - model.year_zone_tuples, + model.year, model.zone, rule=self.emission_calc_by_zone_rule ) model.carbon = poi.make_tupledict( @@ -38,7 +50,10 @@ def __init__(self, model): rule=self.emission_limit_rule ) - def emission_limit_rule(self, y): + def emission_limit_rule( + self, + y : int + ) -> poi._src.core_ext.ConstraintIndex: """Annual carbon emission limits across all zones and technologies. Parameters @@ -58,7 +73,10 @@ def emission_limit_rule(self, y): lhs = model.carbon[y] - limit[y] return model.add_linear_constraint(lhs, poi.Leq, 0) - def emission_calc_rule(self, y): + def emission_calc_rule( + self, + y : int + ) -> poi._src.core_ext.ConstraintIndex: """Calculation of annual carbon emission across all zones and technologies. @@ -78,7 +96,11 @@ def emission_calc_rule(self, y): for z in model.zone ) - def emission_calc_by_zone_rule(self, y, z): + def emission_calc_by_zone_rule( + self, + y : int, + z : str + ) -> poi._src.core_ext.ConstraintIndex: """Calculation of annual carbon emissions by zone. Parameters @@ -99,7 +121,12 @@ def emission_calc_by_zone_rule(self, y, z): for te in model.tech ) - def carbon_breakdown(self, y, z, te): + def carbon_breakdown( + self, + y : int, + z : str, + te : str + ) -> poi._src.core_ext.ExprBuilder: """Carbon emission cost breakdown. Parameters @@ -121,5 +148,5 @@ def carbon_breakdown(self, y, z, te): dt = model.params['dt'] return poi.quicksum( ef * model.gen[h, m, y, z, te] * dt - for h, m in model.hour_month_tuples + for h in model.hour for m in model.month ) diff --git a/prepshot/_model/cost.py b/prepshot/_model/cost.py index 07aa48f..52607f5 100644 --- a/prepshot/_model/cost.py +++ b/prepshot/_model/cost.py @@ -4,12 +4,21 @@ """This module contains objective functions for the model. """ +from typing import Union + import pyoptinterface as poi class AddCostObjective: """Objective function class to determine the total cost of the model. """ - def __init__(self, model): + def __init__( + self, model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ) -> None: """The constructor for objective functions class. Parameters @@ -20,7 +29,7 @@ def __init__(self, model): self.model = model self.define_objective() - def define_objective(self): + def define_objective(self) -> None: """Objective function of the model, to minimize total cost. """ model = self.model @@ -33,7 +42,11 @@ def define_objective(self): + model.cost_fix + model.cost_newline - model.income model.set_objective(model.cost, sense=poi.ObjectiveSense.Minimize) - def fuel_cost_breakdown(self, y, z, te): + def fuel_cost_breakdown(self, + y : int, + z : str, + te : str + ) -> poi._src.core_ext.ExprBuilder: """Fuel cost breakdown of technologies. Parameters @@ -56,9 +69,13 @@ def fuel_cost_breakdown(self, y, z, te): vf = model.params['var_factor'][y] w = model.params['weight'] return 1 / w * fp * sum(model.gen[h, m, y, z, te] - for (h,m) in model.hour_month_tuples) * dt * vf + for h in model.hour for m in model.month) * dt * vf - def cost_var_line_breakdown(self, y, z, z1): + def cost_var_line_breakdown(self, + y : int, + z : str, + z1 : str + ) -> poi._src.core_ext.ExprBuilder: """Variable operation and maintenance cost breakdown of transmission lines. @@ -84,9 +101,13 @@ def cost_var_line_breakdown(self, y, z, z1): w = model.params['weight'] return 0.5 / w * lvc * dt * vf * \ sum(model.trans_export[h, m, y, z, z1] - for (h,m) in model.hour_month_tuples) + for h in model.hour for m in model.month) - def cost_var_tech_breakdown(self, y, z, te): + def cost_var_tech_breakdown(self, + y : int, + z : str, + te : str + ) -> poi._src.core_ext.ExprBuilder: """Variable operation and maintenance cost breakdown. Parameters @@ -110,9 +131,13 @@ def cost_var_tech_breakdown(self, y, z, te): vf = model.params['var_factor'][y] w = model.params['weight'] return 1 / w * tvc * sum(model.gen[h, m, y, z, te] - for (h,m) in model.hour_month_tuples) * dt * vf + for h in model.hour for m in model.month) * dt * vf - def cost_fix_line_breakdown(self, y, z, z1): + def cost_fix_line_breakdown(self, + y : int, + z : str, + z1 : str + ) -> poi._src.core_ext.ExprBuilder: """Fixed operation and maintenance cost breakdown of transmission lines. @@ -136,7 +161,9 @@ def cost_fix_line_breakdown(self, y, z, z1): ff = model.params['fix_factor'] return lfc[z, z1] * model.cap_lines_existing[y, z, z1] * ff[y] * 0.5 - def cost_fix_tech_breakdown(self, y, z, te): + def cost_fix_tech_breakdown(self, + y : int, z : str, te : str + ) -> poi._src.core_ext.ExprBuilder: """Fixed operation and maintenance cost breakdown. Parameters @@ -159,7 +186,9 @@ def cost_fix_tech_breakdown(self, y, z, te): ff = model.params['fix_factor'][y] return tfc * model.cap_existing[y, z, te] * ff - def cost_newtech_breakdown(self, y, z, te): + def cost_newtech_breakdown(self, + y : int, z : str, te : str + ) -> poi._src.core_ext.ExprBuilder: """New technology investment cost breakdown. Parameters @@ -182,7 +211,9 @@ def cost_newtech_breakdown(self, y, z, te): ivf = model.params['inv_factor'][te, y] return tic * model.cap_newtech[y, z, te] * ivf - def cost_newline_breakdown(self, y, z, z1): + def cost_newline_breakdown( + self, y : int, z : str, z1 : str + ) -> poi._src.core_ext.ExprBuilder: """New transmission line investment cost breakdown. Parameters @@ -207,7 +238,7 @@ def cost_newline_breakdown(self, y, z, z1): capacity_invested_line = model.cap_newline[y, z, z1] return lic * capacity_invested_line * d * ivf * 0.5 - def income_rule(self): + def income_rule(self) -> poi._src.core_ext.ExprBuilder: """Income from water withdrawal. Reference: https://www.nature.com/articles/s44221-023-00126-0 @@ -221,13 +252,16 @@ def income_rule(self): coef = 3600 * model.params['dt'] * model.params['price'] income = sum( model.withdraw[s, h, m, y] * coef - for s, h, m, y in model.station_hour_month_year_tuples + for s in model.station + for h in model.hour + for m in model.month + for y in model.year ) return income return poi.ExprBuilder(0) - def var_cost_rule(self): + def var_cost_rule(self) -> poi._src.core_ext.ExprBuilder: """Calculate total variable cost, which is sum of the fuel cost of technologies and variable Operation and maintenance (O&M) cost of technologies and transmission lines. @@ -239,17 +273,17 @@ def var_cost_rule(self): """ model = self.model model.cost_var_tech_breakdown = poi.make_tupledict( - model.year_zone_tech_tuples, + model.year, model.zone, model.tech, rule=self.cost_var_tech_breakdown ) model.cost_fuel_breakdown = poi.make_tupledict( - model.year_zone_tech_tuples, + model.year, model.zone, model.tech, rule=self.fuel_cost_breakdown ) model.cost_var_line_breakdown = poi.make_tupledict( - model.year_zone_zone_tuples, + model.year, model.zone, model.zone, rule=self.cost_var_line_breakdown ) cost_var = poi.ExprBuilder() @@ -258,7 +292,7 @@ def var_cost_rule(self): cost_var += poi.quicksum(model.cost_var_line_breakdown) return cost_var - def newtech_cost_rule(self): + def newtech_cost_rule(self) -> poi._src.core_ext.ExprBuilder: """Total investment cost of new technologies. Returns @@ -269,12 +303,12 @@ def newtech_cost_rule(self): """ model = self.model model.cost_newtech_breakdown = poi.make_tupledict( - model.year_zone_tech_tuples, + model.year, model.zone, model.tech, rule=self.cost_newtech_breakdown ) return poi.quicksum(model.cost_newtech_breakdown) - def newline_cost_rule(self): + def newline_cost_rule(self) -> poi._src.core_ext.ExprBuilder: """Total investment cost of new transmission lines. Returns @@ -285,12 +319,12 @@ def newline_cost_rule(self): """ model = self.model model.cost_newline_breakdown = poi.make_tupledict( - model.year_zone_zone_tuples, + model.year, model.zone, model.zone, rule=self.cost_newline_breakdown ) return poi.quicksum(model.cost_newline_breakdown) - def fix_cost_rule(self): + def fix_cost_rule(self) -> poi._src.core_ext.ExprBuilder: """Fixed O&M cost of technologies and transmission lines. Returns @@ -301,11 +335,11 @@ def fix_cost_rule(self): """ model = self.model model.cost_fix_tech_breakdown = poi.make_tupledict( - model.year_zone_tech_tuples, + model.year, model.zone, model.tech, rule=self.cost_fix_tech_breakdown ) model.cost_fix_line_breakdown = poi.make_tupledict( - model.year_zone_zone_tuples, + model.year, model.zone, model.zone, rule=self.cost_fix_line_breakdown ) return poi.quicksum(model.cost_fix_tech_breakdown) + \ diff --git a/prepshot/_model/demand.py b/prepshot/_model/demand.py index 6668c9b..6e3fc16 100644 --- a/prepshot/_model/demand.py +++ b/prepshot/_model/demand.py @@ -3,20 +3,31 @@ """This module contains constraints related to demand. """ +from typing import Union import pyoptinterface as poi class AddDemandConstraints: """This class contains demand constraints. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ) -> None: """Initialize the class and add constraints. """ self.model = model model.power_balance_cons = poi.make_tupledict( - model.hour_month_year_zone_tuples, rule=self.power_balance_rule + model.hour, model.month, model.year, model.zone, + rule=self.power_balance_rule ) - def power_balance_rule(self, h, m, y, z): + def power_balance_rule(self, + h : int, m : int, y : int, z : str + ) -> poi._src.core_ext.ConstraintIndex: """Nodal power balance. The total electricity demand for each time period and in each zone should be met by the following. @@ -42,15 +53,12 @@ def power_balance_rule(self, h, m, y, z): Constraint index of the model. """ model = self.model - lc = model.params['transmission_line_existing_capacity'] load = model.params['demand'] imp_z = poi.quicksum( - model.trans_import[h, m, y, z1, z] - for z1 in model.zone if (z, z1) in lc.keys() + model.trans_import[h, m, y, z1, z] for z1 in model.zone ) exp_z = poi.quicksum( - model.trans_export[h, m, y, z, z1] - for z1 in model.zone if (z, z1) in lc.keys() + model.trans_export[h, m, y, z, z1] for z1 in model.zone ) gen_z = poi.quicksum( model.gen[h, m, y, z, te] for te in model.tech diff --git a/prepshot/_model/generation.py b/prepshot/_model/generation.py index a2f20f6..9e40366 100644 --- a/prepshot/_model/generation.py +++ b/prepshot/_model/generation.py @@ -4,26 +4,40 @@ """This module contains constraints related to technology generation. """ +from typing import Union + import pyoptinterface as poi class AddGenerationConstraints: """Add constraints for generation in the model. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ) -> None: """Initialize the class and add constraints. """ self.model = model model.gen_up_bound_cons = poi.make_tupledict( - model.hour_month_year_zone_tech_tuples, rule=self.gen_up_bound_rule + model.hour, model.month, model.year, model.zone, model.tech, + rule=self.gen_up_bound_rule ) model.ramping_up_cons = poi.make_tupledict( - model.hour_month_year_zone_tech_tuples, rule=self.ramping_up_rule + model.hour, model.month, model.year, model.zone, model.tech, + rule=self.ramping_up_rule ) model.ramping_down_cons = poi.make_tupledict( - model.hour_month_year_zone_tech_tuples, rule=self.ramping_down_rule + model.hour, model.month, model.year, model.zone, model.tech, + rule=self.ramping_down_rule ) - - def gen_up_bound_rule(self, h, m, y, z, te): + + def gen_up_bound_rule(self, + h : int, m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Generation is less than or equal to the existing capacity. Parameters @@ -48,7 +62,10 @@ def gen_up_bound_rule(self, h, m, y, z, te): lhs = model.gen[h, m, y, z, te] - model.cap_existing[y, z, te] return model.add_linear_constraint(lhs, poi.Leq, 0) - def ramping_up_rule(self, h, m, y, z, te): + + def ramping_up_rule(self, + h : int, m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Ramping up limits. Parameters @@ -71,17 +88,17 @@ def ramping_up_rule(self, h, m, y, z, te): """ model = self.model rp = model.params['ramp_up'][te] * model.params['dt'] - if h > 1 and rp < 1: + if rp < 1 < h: lhs = ( model.gen[h, m, y, z, te] - model.gen[h-1, m, y, z, te] - rp * model.cap_existing[y, z, te] ) return model.add_linear_constraint(lhs, poi.Leq, 0) - else: - return None - def ramping_down_rule(self, h, m, y, z, te): + def ramping_down_rule(self, + h : int, m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Ramping down limits. Parameters @@ -104,11 +121,9 @@ def ramping_down_rule(self, h, m, y, z, te): """ model = self.model rd = model.params['ramp_down'][te] * model.params['dt'] - if h > 1 and rd < 1: + if rd < 1 < h: lhs = ( model.gen[h-1, m, y, z, te] - model.gen[h, m, y, z, te] - rd * model.cap_existing[y, z, te] ) return model.add_linear_constraint(lhs, poi.Leq, 0) - else: - return None diff --git a/prepshot/_model/head_iteration.py b/prepshot/_model/head_iteration.py index a0d9950..328381c 100644 --- a/prepshot/_model/head_iteration.py +++ b/prepshot/_model/head_iteration.py @@ -6,6 +6,7 @@ import datetime import logging +from typing import Union, Tuple, List, Dict, Any import numpy as np import pandas as pd @@ -13,26 +14,31 @@ import pyoptinterface as poi from prepshot.utils import interpolate_z_by_q_or_s +from prepshot.utils import cartesian_product -def initialize_waterhead(stations, year, month, hour, params): +def initialize_waterhead( + stations : List[str], year : List[int], + month : List[int], hour : List[int], + params : Dict[str, Any] +) -> Tuple[pd.DataFrame, pd.DataFrame]: """Initialize water head. Parameters ---------- - stations : list + stations : List[str] List of stations. - year : list + year : List[int] List of years. - month : list + month : List[int] List of months. - hour : list + hour : List[int] List of hours. - params : dict + params : Dict[str, Any] Dictionary of parameters for the model. Returns ------- - tuple + Tuple[pd.DataFrame, pd.DataFrame] A tuple of two pandas.DataFrame objects, the first one is the old water head, the second one is the new water head. """ @@ -50,7 +56,9 @@ def initialize_waterhead(stations, year, month, hour, params): ] * (len(hour) * len(month) * len(year)) return old_waterhead, new_waterhead -def compute_error(old_waterhead, new_waterhead): +def compute_error( + old_waterhead : pd.DataFrame, new_waterhead : pd.DataFrame +) -> float: """Calculate the error of the water head. Parameters @@ -73,8 +81,16 @@ def compute_error(old_waterhead, new_waterhead): return error def process_model_solution( - model, stations, year, month, hour, params, old_waterhead, new_waterhead -): + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ], + stations : List[str], year : List[int], month : List[int], + hour : List[int], params : Dict[str, Any], + old_waterhead : pd.DataFrame, new_waterhead : pd.DataFrame +) -> bool: """Process the solution of the model, updating the water head data. Parameters @@ -82,7 +98,7 @@ def process_model_solution( model : pyoptinterface._src.solver.Model Model to be solved. stations : list - List of hydropower stations. + List f hydropower stations. year : list List of years. month : list @@ -102,7 +118,7 @@ def process_model_solution( True if the model is solved, False otherwise. """ idx = pd.IndexSlice - for s, h, m, y in model.station_hour_month_year_tuples: + for s, h, m, y in cartesian_product(stations, hour, month, year): efficiency = params['reservoir_characteristics']['coeff', s] model.set_normalized_coefficient( model.output_calc_cons[s, h, m, y], @@ -145,8 +161,13 @@ def process_model_solution( return True def run_model_iteration( - model, params, error_threshold=0.001, max_iterations=5 -): + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + ], params : Dict[str, Any], + error_threshold=0.001, max_iterations=5 +) -> bool: """Run the model iteratively. Parameters diff --git a/prepshot/_model/hydro.py b/prepshot/_model/hydro.py index 89a402b..0441d96 100644 --- a/prepshot/_model/hydro.py +++ b/prepshot/_model/hydro.py @@ -4,12 +4,21 @@ """This module contains functions related to hydropower technologies. """ +from typing import Union + import pyoptinterface as poi class AddHydropowerConstraints: """Class for hydropower constraints and calculations. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ) -> None: """Initialize the class. Here I define the variables needed and the constraints for the hydropower model. @@ -21,81 +30,62 @@ def __init__(self, model): """ self.model = model if model.params['isinflow']: - self.define_variable() model.outflow = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.outflow_rule ) model.inflow = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.inflow_rule ) model.water_balance_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.water_balance_rule ) model.init_storage_cons = poi.make_tupledict( - model.station_month_year_tuples, + model.station, model.month, model.year, rule=self.init_storage_rule ) model.end_storage_cons = poi.make_tupledict( - model.station_month_year_tuples, + model.station, model.month, model.year, rule=self.end_storage_rule ) model.output_calc_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.output_calc_rule ) model.outflow_low_bound_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.outflow_low_bound_rule ) model.outflow_up_bound_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.outflow_up_bound_rule ) model.storage_low_bound_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.storage_low_bound_rule ) model.storage_up_bound_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.storage_up_bound_rule ) model.output_low_bound_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.output_low_bound_rule ) model.output_up_bound_cons = poi.make_tupledict( - model.station_hour_month_year_tuples, + model.station, model.hour, model.month, model.year, rule=self.output_up_bound_rule ) model.hydro_output_cons = poi.make_tupledict( - model.hour_month_year_zone_tuples, + model.hour, model.month, model.year, model.zone, rule=self.hydro_output_rule ) - def define_variable(self): - """Define variables for hydropower production constraints. - """ - model = self.model - model.genflow = model.add_variables( - model.station_hour_month_year_tuples, lb=0 - ) - model.spillflow = model.add_variables( - model.station_hour_month_year_tuples, lb=0 - ) - model.withdraw = model.add_variables( - model.station_hour_month_year_tuples, lb=0 - ) - model.storage_reservoir = model.add_variables( - model.station_hour_p_month_year_tuples, lb=0 - ) - model.output = model.add_variables( - model.station_hour_month_year_tuples, lb=0 - ) - - def inflow_rule(self, s, h, m, y): + def inflow_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ExprBuilder: """Define hydrolic connnect between cascade reservoirs, total inflow of downsteam reservoir = natural inflow + upstream outflow from upsteam reservoir(s). @@ -129,14 +119,16 @@ def inflow_rule(self, s, h, m, y): wdt[wdt['NEXTPOWER_ID'] == s].delay ): delay = int(int(delay)/dt) - if (h - delay >= hour[0]): + if h - delay >= hour[0]: t = h - delay else: t = hour[-1] + h - delay up_stream_outflow += model.outflow[ups, t, m, y] return up_stream_outflow + model.params['inflow'][s, y, m, h] - def outflow_rule(self, s, h, m, y): + def outflow_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ExprBuilder: """Total outflow of reservoir is equal to the sum of generation and spillage. @@ -159,7 +151,9 @@ def outflow_rule(self, s, h, m, y): model = self.model return model.genflow[s, h, m, y] + model.spillflow[s, h, m, y] - def water_balance_rule(self, s, h, m, y): + def water_balance_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Water balance of reservoir, i.e., storage[t] = storage[t-1] + net_storage[t]. @@ -190,7 +184,9 @@ def water_balance_rule(self, s, h, m, y): lhs -= model.storage_reservoir[s, h, m, y] return model.add_linear_constraint(lhs, poi.Eq, 0) - def init_storage_rule(self, s, m, y): + def init_storage_rule(self, + s : str, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Determine storage of reservoir in the initial hour of each month. Parameters @@ -213,7 +209,9 @@ def init_storage_rule(self, s, m, y): lhs = model.storage_reservoir[s, hour_period[0], m, y] - init_storage return model.add_linear_constraint(lhs, poi.Eq, 0) - def end_storage_rule(self, s, m, y): + def end_storage_rule(self, + s : str, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Determine storage of reservoir in the terminal hour of each month. Parameters @@ -236,7 +234,9 @@ def end_storage_rule(self, s, m, y): lhs = model.storage_reservoir[s, hour_period[-1], m, y] - final_storage return model.add_linear_constraint(lhs, poi.Eq, 0) - def outflow_low_bound_rule(self, s, h, m, y): + def outflow_low_bound_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Lower bound of total outflow. Parameters @@ -256,11 +256,14 @@ def outflow_low_bound_rule(self, s, h, m, y): Constraint index of the model. """ model = self.model - min_outflow = model.params['reservoir_characteristics']['outflow_min', s] + rc = model.params['reservoir_characteristics'] + min_outflow = rc['outflow_min', s] lhs = model.outflow[s, h, m, y] - min_outflow return model.add_linear_constraint(lhs, poi.Geq, 0) - def outflow_up_bound_rule(self, s, h, m, y): + def outflow_up_bound_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Upper bound of total outflow. Parameters @@ -280,11 +283,14 @@ def outflow_up_bound_rule(self, s, h, m, y): Constraint index of the model. """ model = self.model - max_outflow = model.params['reservoir_characteristics']['outflow_max', s] + rc = model.params['reservoir_characteristics'] + max_outflow = rc['outflow_max', s] lhs = model.outflow[s, h, m, y] - max_outflow return model.add_linear_constraint(lhs, poi.Leq, 0) - def storage_low_bound_rule(self, s, h, m, y): + def storage_low_bound_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Lower bound of reservoir storage. Parameters @@ -308,7 +314,9 @@ def storage_low_bound_rule(self, s, h, m, y): lhs = model.storage_reservoir[s, h, m, y] - min_storage return model.add_linear_constraint(lhs, poi.Geq, 0) - def storage_up_bound_rule(self, s, h, m, y): + def storage_up_bound_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Upper bound of reservoir storage. Parameters @@ -332,7 +340,9 @@ def storage_up_bound_rule(self, s, h, m, y): lhs = model.storage_reservoir[s, h, m, y] - max_storage return model.add_linear_constraint(lhs, poi.Leq, 0) - def output_low_bound_rule(self, s, h, m, y): + def output_low_bound_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Lower bound of hydropower output. Parameters @@ -356,7 +366,9 @@ def output_low_bound_rule(self, s, h, m, y): lhs = model.output[s, h, m, y] - min_output return model.add_linear_constraint(lhs, poi.Geq, 0) - def output_up_bound_rule(self, s, h, m, y): + def output_up_bound_rule(self, + s : str, h : int, m : int, y : int + ) -> poi._src.core_ext.ConstraintIndex: """Upper bound of hydropower output. Parameters @@ -380,7 +392,9 @@ def output_up_bound_rule(self, s, h, m, y): lhs = model.output[s, h, m, y] - max_output return model.add_linear_constraint(lhs, poi.Leq, 0) - def output_calc_rule(self, s, h, m, y): + def output_calc_rule(self, + s : str, h : int, m : int, y :int + ) -> poi._src.core_ext.ConstraintIndex: """Hydropower production calculation. Head parameter is specified after building the model. @@ -408,7 +422,9 @@ def output_calc_rule(self, s, h, m, y): ) return model.add_linear_constraint(lhs, poi.Eq, 0) - def hydro_output_rule(self, h, m, y, z): + def hydro_output_rule(self, + h : int, m : int, y : int, z : str + ) -> poi._src.core_ext.ConstraintIndex: """Hydropower output of all hydropower plants across each zone. Parameters diff --git a/prepshot/_model/investment.py b/prepshot/_model/investment.py index fd330ec..97d57c9 100644 --- a/prepshot/_model/investment.py +++ b/prepshot/_model/investment.py @@ -4,34 +4,53 @@ """This module is used to determine investment-related constraints. """ +from typing import Union + import numpy as np import pyoptinterface as poi class AddInvestmentConstraints: """Add constraints for investment in the model. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ) -> None: """Initialize the class and add constraints. """ self.model = model + model.remaining_technology = poi.make_tupledict( + model.year, model.zone, model.tech, + rule=self.tech_lifetime_rule + ) + model.cap_existing = poi.make_tupledict( + model.year, model.zone, model.tech, + rule=self.remaining_capacity_rule + ) model.tech_up_bound_cons = poi.make_tupledict( - model.year_zone_tech_tuples, rule=self.tech_up_bound_rule + model.year, model.zone, model.tech, + rule=self.tech_up_bound_rule ) model.new_tech_up_bound_cons = poi.make_tupledict( - model.year_zone_tech_tuples, rule=self.new_tech_up_bound_rule + model.year, model.zone, model.tech, + rule=self.new_tech_up_bound_rule ) model.new_tech_low_bound_cons = poi.make_tupledict( - model.year_zone_tech_tuples, rule=self.new_tech_low_bound_rule + model.year, model.zone, model.tech, + rule=self.new_tech_low_bound_rule ) model.tech_lifetime_cons = poi.make_tupledict( - model.year_zone_tech_tuples, rule=self.tech_lifetime_rule - ) - model.remaining_capacity_cons = poi.make_tupledict( model.year, model.zone, model.tech, - rule=self.remaining_capacity_rule + rule=self.tech_lifetime_rule ) - def tech_up_bound_rule(self, y, z, te): + def tech_up_bound_rule(self, + y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Allowed capacity of commercial operation technology is less than or equal to the predefined upper bound. @@ -54,9 +73,10 @@ def tech_up_bound_rule(self, y, z, te): if tub != np.Inf: lhs = model.cap_existing[y, z, te] - tub return model.add_linear_constraint(lhs, poi.Leq, 0) - return None - def new_tech_up_bound_rule(self, y, z, te): + def new_tech_up_bound_rule(self, + y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """New investment technology upper bound in specific year and zone. Parameters @@ -75,13 +95,13 @@ def new_tech_up_bound_rule(self, y, z, te): """ model = self.model ntub = model.params['new_technology_upper_bound'][te, z] - if ntub == np.Inf: - return None - else: + if ntub != np.Inf: lhs = model.cap_newtech[y, z, te] - ntub return model.add_linear_constraint(lhs, poi.Leq, 0) - def new_tech_low_bound_rule(self, y, z, te): + def new_tech_low_bound_rule(self, + y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """New investment technology lower bound. Parameters @@ -103,7 +123,9 @@ def new_tech_low_bound_rule(self, y, z, te): lhs = model.cap_newtech[y, z, te] - ntlb return model.add_linear_constraint(lhs, poi.Geq, 0) - def tech_lifetime_rule(self, y, z, te): + def tech_lifetime_rule(self, + y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Caculation of remaining technology capacity based on lifetime constraints. @@ -118,25 +140,21 @@ def tech_lifetime_rule(self, y, z, te): Returns ------- - pyoptinterface._src.core_ext.ConstraintIndex - Constraint index of the model. + pyoptinterface._src.core_ext.ExprBuilder + Index of expression of the model. """ model = self.model lifetime = model.params['lifetime'][te, y] service_time = y - model.params['year'][0] hcap = model.params['historical_capacity'] - rt = model.remaining_technology[y, z, te] remaining_time = int(lifetime - service_time) if remaining_time <= 0: - lhs = model.remaining_technology[y, z, te] - else: - lhs = poi.quicksum( - hcap[z, te, a] for a in range(0, remaining_time) - ) - lhs -= rt - return model.add_linear_constraint(lhs, poi.Eq, 0) - - def remaining_capacity_rule(self, y, z, te): + return 0 + return poi.quicksum(hcap[z, te, a] for a in range(0, remaining_time)) + + def remaining_capacity_rule(self, + y : int, z : str, te : str + ) -> poi._src.core_ext.ExprBuilder: """Remaining capacity of initial technology due to lifetime restrictions. Where in modeled year y, the available technology consists of the following. @@ -157,18 +175,18 @@ def remaining_capacity_rule(self, y, z, te): Returns ------- - pyoptinterface._src.core_ext.ConstraintIndex - Constraint index of the model. + pyoptinterface._src.core_ext.ExprBuilder + Index of expression of the model. """ model = self.model year = model.params['year'] lt = model.params['lifetime'] + cap_existing = poi.ExprBuilder() new_tech = poi.quicksum( model.cap_newtech[yy, z, te] for yy in year[:year.index(y) + 1] if y - yy < lt[te, y] ) - lhs = new_tech - lhs += model.remaining_technology[y, z, te] - lhs -= model.cap_existing[y, z, te] - return model.add_linear_constraint(lhs, poi.Eq, 0) + cap_existing += new_tech + cap_existing += model.remaining_technology[y, z, te] + return cap_existing diff --git a/prepshot/_model/nondispatchable.py b/prepshot/_model/nondispatchable.py index d5e4b88..de52b54 100644 --- a/prepshot/_model/nondispatchable.py +++ b/prepshot/_model/nondispatchable.py @@ -3,21 +3,31 @@ """This module contains functions related to nondispatchable technologies. """ +from typing import Union import pyoptinterface as poi class AddNondispatchableConstraints: """Add constraints for nondispatchable technologies. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ) -> None: self.model = model if model.nondispatchable_tech != 0: model.renew_gen_cons = poi.make_tupledict( - model.hour_month_year_zone_nondispatchable_tuples, - rule=self.renew_gen_rule + model.hour, model.month, model.year, model.zone, + model.nondispatchable_tech, rule=self.renew_gen_rule ) - def renew_gen_rule(self, h, m, y, z, te): + def renew_gen_rule(self, + h : int, m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Renewable generation is determined by the capacity factor and existing capacity. diff --git a/prepshot/_model/storage.py b/prepshot/_model/storage.py index 5628277..48776af 100644 --- a/prepshot/_model/storage.py +++ b/prepshot/_model/storage.py @@ -6,36 +6,46 @@ system has the same power capacity for charging and discharging. """ +from typing import Union + import pyoptinterface as poi class AddStorageConstraints: """Energy storage class. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ) -> None: self.model = model - if model.storage_tech != 0: - model.energy_storage_balance_cons = poi.make_tupledict( - model.hour_month_year_zone_storage_tuples, - rule=self.energy_storage_balance_rule - ) - model.init_energy_storage_cons = poi.make_tupledict( - model.month_year_zone_storage_tuples, - rule=self.init_energy_storage_rule - ) - model.end_energy_storage_cons = poi.make_tupledict( - model.month_year_zone_storage_tuples, - rule=self.end_energy_storage_rule - ) - model.energy_storage_up_bound_cons = poi.make_tupledict( - model.hour_month_year_zone_storage_tuples, - rule=self.energy_storage_up_bound_rule - ) - model.energy_storage_gen_cons = poi.make_tupledict( - model.hour_month_year_zone_storage_tuples, - rule=self.energy_storage_gen_rule - ) - - def energy_storage_balance_rule(self, h, m, y, z, te): + model.energy_storage_balance_cons = poi.make_tupledict( + model.hour, model.month, model.year, model.zone, + model.storage_tech, rule=self.energy_storage_balance_rule + ) + model.init_energy_storage_cons = poi.make_tupledict( + model.month, model.year, model.zone, + model.storage_tech, rule=self.init_energy_storage_rule + ) + model.end_energy_storage_cons = poi.make_tupledict( + model.month, model.year, model.zone, model.storage_tech, + rule=self.end_energy_storage_rule + ) + model.energy_storage_up_bound_cons = poi.make_tupledict( + model.hour, model.month, model.year, model.zone, + model.storage_tech, rule=self.energy_storage_up_bound_rule + ) + model.energy_storage_gen_cons = poi.make_tupledict( + model.hour, model.month, model.year, model.zone, + model.storage_tech, rule=self.energy_storage_gen_rule + ) + + def energy_storage_balance_rule(self, + h : int, m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Energy storage balance. Parameters @@ -69,7 +79,9 @@ def energy_storage_balance_rule(self, h, m, y, z, te): 0 ) - def init_energy_storage_rule(self, m, y, z, te): + def init_energy_storage_rule(self, + m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Initial energy storage. Parameters @@ -98,7 +110,9 @@ def init_energy_storage_rule(self, m, y, z, te): ) return model.add_linear_constraint(lhs, poi.Eq, 0) - def end_energy_storage_rule(self, m, y, z, te): + def end_energy_storage_rule(self, + m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """End energy storage. Parameters @@ -125,7 +139,9 @@ def end_energy_storage_rule(self, m, y, z, te): ) return model.add_linear_constraint(lhs, poi.Eq, 0) - def energy_storage_up_bound_rule(self, h, m, y, z, te): + def energy_storage_up_bound_rule(self, + h : int, m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Energy storage upper bound. Parameters @@ -155,7 +171,9 @@ def energy_storage_up_bound_rule(self, h, m, y, z, te): ) return model.add_linear_constraint(lhs, poi.Leq, 0) - def energy_storage_gen_rule(self, h, m, y, z, te): + def energy_storage_gen_rule(self, + h : int, m : int, y : int, z : str, te : str + ) -> poi._src.core_ext.ConstraintIndex: """Energy storage generation. Parameters diff --git a/prepshot/_model/transmission.py b/prepshot/_model/transmission.py index cf788f7..f2f421d 100644 --- a/prepshot/_model/transmission.py +++ b/prepshot/_model/transmission.py @@ -3,31 +3,46 @@ """This module contains transmission related functions. """ +from typing import Union + import pyoptinterface as poi class AddTransmissionConstraints: """Add constraints for transmission lines while considering multiple zones. """ - def __init__(self, model): + def __init__(self, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + ): """Initialize the class and add constraints. """ self.model = model - model.trans_capacity_cons = poi.make_tupledict( - model.year_zone_zone_tuples, rule=self.trans_capacity_rule - ) - model.trans_physical_cons = poi.make_tupledict( - model.year_zone_zone_tuples, rule=self.trans_physical_rule + model.cap_lines_existing = poi.make_tupledict( + model.year, model.zone, model.zone, + rule=self.trans_capacity_rule ) - model.trans_balance_cons = poi.make_tupledict( - model.hour_month_year_zone_zone_tuples, + model.trans_import = poi.make_tupledict( + model.hour, model.month, model.year, model.zone, model.zone, rule=self.trans_balance_rule ) + model.trans_physical_cons = poi.make_tupledict( + model.year, model.zone, model.zone, + rule=self.trans_physical_rule + ) model.trans_up_bound_cons = poi.make_tupledict( - model.hour_month_year_zone_zone_tuples, + model.hour, model.month, model.year, model.zone, model.zone, rule=self.trans_up_bound_rule ) - def trans_physical_rule(self, y, z, z1): + + + def trans_physical_rule(self, + y : int, z : str, z1 : str + ) -> poi._src.core_ext.ConstraintIndex: """Physical transmission lines. Parameters @@ -45,11 +60,14 @@ def trans_physical_rule(self, y, z, z1): Constraint index of the model. """ model = self.model - lhs = model.cap_newline[y, z, z1] - model.cap_newline[y, z1, z] - return model.add_linear_constraint(lhs, poi.Eq, 0) + if z != z1: + lhs = model.cap_newline[y, z, z1] - model.cap_newline[y, z1, z] + return model.add_linear_constraint(lhs, poi.Eq, 0) - def trans_capacity_rule(self, y, z, z1): + def trans_capacity_rule(self, + y : int, z : str, z1 : str + ) -> poi._src.core_ext.ExprBuilder: """Transmission capacity equal to the sum of the existing capacity and the new capacity in previous planned years. @@ -64,22 +82,25 @@ def trans_capacity_rule(self, y, z, z1): Returns ------- - pyoptinterface._src.core_ext.ConstraintIndex - Constraint index of the model. + pyoptinterface._src.core_ext.ExprBuilder + Index of expression of the model. """ model = self.model year = model.params['year'] lc = model.params['transmission_line_existing_capacity'] remaining_capacity_line = lc[z, z1] + cap_lines_existing = poi.ExprBuilder() new_capacity_line = poi.quicksum( model.cap_newline[yy, z, z1] for yy in year[:year.index(y) + 1] ) - lhs = new_capacity_line - lhs += remaining_capacity_line - lhs -= model.cap_lines_existing[y, z, z1] - return model.add_linear_constraint(lhs, poi.Eq, 0) + cap_lines_existing += new_capacity_line + cap_lines_existing += remaining_capacity_line + return cap_lines_existing + - def trans_balance_rule(self, h, m, y, z, z1): + def trans_balance_rule(self, + h : int, m : int, y : int, z : str, z1 : str + ) -> poi._src.core_ext.ConstraintIndex: """Transmission balance, i.e., the electricity imported from zone z1 to zone z should be equal to the electricity exported from zone z to zone z1 multiplied by the transmission line efficiency. @@ -104,12 +125,12 @@ def trans_balance_rule(self, h, m, y, z, z1): """ model = self.model eff = model.params['transmission_line_efficiency'][z, z1] - lhs = model.trans_import[h, m, y, z, z1] \ - - eff * model.trans_export[h, m, y, z, z1] - return model.add_linear_constraint(lhs, poi.Eq, 0) + return eff * model.trans_export[h, m, y, z, z1] - def trans_up_bound_rule(self, h, m, y, z, z1): + def trans_up_bound_rule(self, + h : int, m : int, y : int, z : str, z1 : str + ) -> poi._src.core_ext.ConstraintIndex: """Transmitted power is less than or equal to the transmission line capacity. @@ -132,6 +153,6 @@ def trans_up_bound_rule(self, h, m, y, z, z1): Constraint index of the model. """ model = self.model - lhs = model.trans_export[h, m, y, z, z1] \ + lhs = model.trans_export[h, m, y, z, z1] \ - model.cap_lines_existing[y, z, z1] return model.add_linear_constraint(lhs, poi.Leq, 0) diff --git a/prepshot/load_data.py b/prepshot/load_data.py index 29ce4bb..3e25860 100644 --- a/prepshot/load_data.py +++ b/prepshot/load_data.py @@ -15,7 +15,9 @@ from prepshot.utils import calc_inv_cost_factor, calc_cost_factor -def load_json(file_path): +def load_json( + file_path : str +) -> dict: """Load data from a JSON file. Parameters @@ -32,7 +34,9 @@ def load_json(file_path): return json.load(f) -def extract_config_data(config_data): +def extract_config_data( + config_data : dict +) -> dict: """Extract necessary data from configuration settings. Parameters @@ -74,7 +78,11 @@ def extract_config_data(config_data): return required_config_data -def load_excel_data(input_folder, params_info, data_store): +def load_excel_data( + input_folder : str, + params_info : dict, + data_store : dict +) -> None: """Load data from Excel files based on the provided parameters. Parameters @@ -103,7 +111,9 @@ def load_excel_data(input_folder, params_info, data_store): sys.exit(1) -def extract_sets(data_store): +def extract_sets( + data_store : dict +) -> None: """Extract simple sets from loaded parameters. Parameters @@ -126,7 +136,9 @@ def extract_sets(data_store): data_store["tech"] = list(data_store["technology_type"].keys()) -def compute_cost_factors(data_store): +def compute_cost_factors( + data_store : dict +) -> None: """Calculate cost factors for various transmission investment and operational costs. @@ -171,7 +183,7 @@ def compute_cost_factors(data_store): def read_excel( filename, index_cols, header_rows, unstack_levels=None, first_col_only=False, dropna=True -): +) -> pd.DataFrame: """Read data from an Excel file into a pandas DataFrame. Parameters @@ -208,7 +220,10 @@ def read_excel( return df -def process_data(params_info, input_folder): +def process_data( + params_info : dict, + input_folder : str +) -> dict: """Load and process data from input folder based on parameters settings. Parameters diff --git a/prepshot/logs.py b/prepshot/logs.py index 1484d59..26a54a2 100644 --- a/prepshot/logs.py +++ b/prepshot/logs.py @@ -6,11 +6,12 @@ """ import logging +from typing import Any import time from pathlib import Path -def setup_logging(): +def setup_logging() -> None: """Set up logging configuration for the model run. """ # Ensure the log directory exists. @@ -37,7 +38,9 @@ def setup_logging(): )) logging.getLogger().addHandler(console) -def log_parameter_info(config_data): +def log_parameter_info( + config_data : dict +) -> None: """Log key parameters used for the model. Parameters @@ -62,7 +65,9 @@ def log_parameter_info(config_data): config_data['general_parameters']['hour'] ) -def timer(func): +def timer( + func : object +) -> Any: """Decorator to log the start and end of a function and its runtime. Parameters diff --git a/prepshot/model.py b/prepshot/model.py index 820153b..0f33041 100644 --- a/prepshot/model.py +++ b/prepshot/model.py @@ -5,6 +5,10 @@ the pyoptinterface library. """ +from typing import Union + +import pyoptinterface as poi + from prepshot.utils import cartesian_product from prepshot._model.demand import AddDemandConstraints from prepshot._model.generation import AddGenerationConstraints @@ -19,7 +23,15 @@ from prepshot.solver import get_solver from prepshot.solver import set_solver_parameters -def define_model(para): +def define_model( + para : dict +) -> Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + +]: """This function creates the model class depending on predefined solver. Parameters @@ -29,7 +41,12 @@ def define_model(para): Returns ------- - pyoptinterface._src.solver.Model + Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] A pyoptinterface Model object depending on the solver Raises @@ -44,12 +61,24 @@ def define_model(para): return model -def define_basic_sets(model): +def define_basic_sets( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] +) -> None: """Define sets for the model. Parameters ---------- - model : pyoptinterface._src.solver.Model + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] Model to be solved. """ params = model.params @@ -73,7 +102,14 @@ def define_basic_sets(model): if params['isinflow']: model.station = params['stcd'] -def define_complex_sets(model): +def define_complex_sets( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] +) -> None: """Create complex sets based on simple sets and some conditations. The existing capacity between two zones is set to empty (i.e., No value is filled in the Excel cell), which means that these two zones cannot have @@ -83,132 +119,118 @@ def define_complex_sets(model): Parameters ---------- - model : pyoptinterface._src.solver.Model + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] Model to be solved. """ - params = model.params - h = model.hour - hp = model.hour_p - m = model.month - y = model.year - z = model.zone - te = model.tech - st = model.storage_tech - nd = model.nondispatchable_tech - - model.hour_month_year_tuples = cartesian_product(h, m, y) - model.hour_month_tuples = cartesian_product(h, m) - model.hour_month_year_zone_storage_tuples = \ - cartesian_product(h, m, y, z, st) - model.hour_month_year_zone_nondispatchable_tuples = \ - cartesian_product(h, m, y, z, nd) - model.hour_month_year_zone_tech_tuples = cartesian_product(h, m, y, z, te) - model.hour_month_year_zone_tuples = cartesian_product(h, m, y, z) - trans_sets = params['transmission_line_existing_capacity'].keys() - model.year_zone_zone_tuples = [ - (y_i, z_i, z1_i) for y_i, z_i, z1_i in cartesian_product(y, z, z) - if (z_i, z1_i) in trans_sets - ] - model.hour_month_year_zone_zone_tuples = [ - (h_i, m_i, y_i, z_i, z1_i) - for h_i, m_i, y_i, z_i, z1_i in cartesian_product(h, m, y, z, z) - if (z_i, z1_i) in trans_sets - ] - model.hour_month_tech_tuples = cartesian_product(h, m, te) - model.hour_p_month_year_zone_tuples = cartesian_product(hp, m, y, z) - model.hour_p_month_year_zone_tech_tuples = \ - cartesian_product(hp, m, y, z, te) - model.hour_p_month_year_zone_storage_tuples = \ - cartesian_product(hp, m, y, z, st) - model.month_year_zone_tuples = cartesian_product(m, y, z) - model.month_year_zone_storage_tuples = cartesian_product(m, y, z, st) - model.year_zone_tuples = cartesian_product(y, z) - model.year_zone_tech_tuples = cartesian_product(y, z, te) - model.year_tech_tuples = cartesian_product(y, te) - if params['isinflow']: - s = model.station - model.station_hour_month_year_tuples = cartesian_product(s, h, m, y) - model.station_hour_p_month_year_tuples = cartesian_product(s, hp, m, y) - model.station_month_year_tuples = cartesian_product(s, m, y) + trans_sets = model.params['transmission_line_existing_capacity'].keys() + for z_i, z1_i in cartesian_product(model.zone, model.zone): + if (z_i, z1_i) not in trans_sets: + model.params['transmission_line_existing_capacity'][z_i, z1_i] = 0 + model.params['transmission_line_efficiency'][z_i, z1_i] = 0 + # TODO: Set the capacity of new transmission lines to 0 + -def define_variables(model): +def define_variables( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] +) -> None: """Define variables for the model. Parameters ---------- - model : pyoptinterface._src.solver.Model + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] Model to be solved. """ - model.cost = model.add_variable(lb=0) - model.cost_var = model.add_variable(lb=0) - model.cost_fix = model.add_variable(lb=0) - model.cost_newtech = model.add_variable(lb=0) - model.cost_newline = model.add_variable(lb=0) - model.income = model.add_variable(lb=0) - model.cap_existing = model.add_variables(model.year_zone_tech_tuples, lb=0) - model.cap_newtech = model.add_variables(model.year_zone_tech_tuples, lb=0) - model.cap_newline = model.add_variables(model.year_zone_zone_tuples, lb=0) - model.cap_lines_existing = model.add_variables( - model.year_zone_zone_tuples, lb=0 + + model.cap_newtech = model.add_variables( + model.year, model.zone, model.tech, lb=0 + ) + model.cap_newline = model.add_variables( + model.year, model.zone, model.zone, lb=0 ) - model.carbon = model.add_variables(model.year, lb=0) - model.carbon_capacity = model.add_variables(model.year_zone_tuples, lb=0) model.gen = model.add_variables( - model.hour_month_year_zone_tech_tuples, lb=0 + model.hour, model.month, model.year, model.zone, model.tech, lb=0 ) model.storage = model.add_variables( - model.hour_p_month_year_zone_tech_tuples, lb=0 + model.hour_p, model.month, model.year, model.zone, model.tech, lb=0 ) model.charge = model.add_variables( - model.hour_month_year_zone_tech_tuples, lb=0 + model.hour, model.month, model.year, model.zone, model.tech, lb=0 ) model.trans_export = model.add_variables( - model.hour_month_year_zone_zone_tuples, lb=0 - ) - model.trans_import = model.add_variables( - model.hour_month_year_zone_zone_tuples, lb=0 - ) - model.remaining_technology = model.add_variables( - model.year_zone_tech_tuples, lb=0 + model.hour, model.month, model.year, model.zone, model.zone, lb=0 ) -# if params['isinflow']: -# model.genflow = model.add_variables( -# model.station_hour_month_year_tuples, lb=0 -# ) -# model.spillflow = model.add_variables( -# model.station_hour_month_year_tuples, lb=0 -# ) -# model.withdraw = model.add_variables( -# model.station_hour_month_year_tuples, lb=0 -# ) -# model.storage_reservoir = model.add_variables( -# model.station_hour_p_month_year_tuples, lb=0 -# ) -# model.output = model.add_variables( -# model.station_hour_month_year_tuples, lb=0 -# ) - -def define_constraints(model): + if model.params['isinflow']: + model.genflow = model.add_variables( + model.station, model.hour, model.month, model.year, lb=0 + ) + model.spillflow = model.add_variables( + model.station, model.hour, model.month, model.year, lb=0 + ) + model.withdraw = model.add_variables( + model.station, model.hour, model.month, model.year, lb=0 + ) + model.storage_reservoir = model.add_variables( + model.station, model.hour_p, model.month, model.year, lb=0 + ) + model.output = model.add_variables( + model.station, model.hour, model.month, model.year, lb=0 + ) + +def define_constraints( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] +) -> None: """Define constraints for the model. Parameters ---------- - model : pyoptinterface._src.solver.Model + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] Model to be solved. """ - AddDemandConstraints(model) + AddInvestmentConstraints(model) AddGenerationConstraints(model) AddTransmissionConstraints(model) - AddInvestmentConstraints(model) AddCo2EmissionConstraints(model) AddNondispatchableConstraints(model) AddStorageConstraints(model) AddHydropowerConstraints(model) + AddDemandConstraints(model) @timer -def create_model(params): +def create_model( + params : dict +) -> Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model +]: """Create the PREP-SHOT model. Parameters @@ -218,7 +240,12 @@ def create_model(params): Returns ------- - pyoptinterface._src.solver.Model + Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] A pyoptinterface Model object. """ model = define_model(params) diff --git a/prepshot/output_data.py b/prepshot/output_data.py index 9b59c42..4b75b02 100644 --- a/prepshot/output_data.py +++ b/prepshot/output_data.py @@ -5,26 +5,38 @@ """ import logging +import argparse +from typing import Union import numpy as np import xarray as xr import pandas as pd +import pyoptinterface as poi from prepshot.logs import timer - - -def create_data_array(data, dims, coords, unit): +from prepshot.utils import cartesian_product + + +def create_data_array( + data : dict, + dims : dict, + unit : str, + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ], +) -> xr.DataArray: """Create a xarray DataArray with specified data, dimensions, coordinates and units. Parameters ---------- - data : list + data : dict The data to be included in the DataArray. dims : list - The dimensions of the data. - coords : dict - The coordinates of the data. + The list of dimentions of the data. unit : str The unit of the data. @@ -33,6 +45,13 @@ def create_data_array(data, dims, coords, unit): xr.DataArray A DataArray with the specified data, dimensions, coordinates and units. """ + coords = {dim:getattr(model, dim) for dim in dims} + index_tuple = cartesian_product(*coords.values()) + if len(dims) == 1: + index_tuple = [i[0] for i in index_tuple] + data = np.array( + [model.get_value(data[tuple_]) for tuple_ in index_tuple] + ).reshape([len(coord) for coord in coords.values()]) return xr.DataArray(data=data, dims=dims, coords=coords, @@ -40,7 +59,14 @@ def create_data_array(data, dims, coords, unit): @timer -def extract_results_non_hydro(model): +def extract_results_non_hydro( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] +) -> xr.Dataset: """Extracts results for non-hydro models. Parameters @@ -54,149 +80,79 @@ def extract_results_non_hydro(model): A Dataset containing DataArrays for each attribute of the model. """ - hour = model.hour - month = model.month - year = model.year - zone = model.zone - tech = model.tech - - cost_var_values = model.get_value(model.cost_var) - cost_fix_values = model.get_value(model.cost_fix) - cost_newtech_values = model.get_value(model.cost_newtech) - cost_newline_values = model.get_value(model.cost_newline) - income_values = model.get_value(model.income) - - trans_import_v = create_data_array( - [[[[[model.get_value(model.trans_import[h, m, y, z1, z2]) / 1e6 - if (h, m, y, z1, z2) in model.hour_month_year_zone_zone_tuples - else np.nan for h in hour] for m in month] - for y in year] for z1 in zone] for z2 in zone], - ['zone2', 'zone1', 'year', 'month', 'hour'], - { - 'month': month, 'hour': hour, 'year': year, - 'zone1': zone, 'zone2': zone - }, - 'TWh' + model.zone1 = model.zone + model.zone2 = model.zone + + data_vars = {} + data_vars['trans_export'] = create_data_array( + model.trans_export, ['hour', 'month', 'year', 'zone1', 'zone2'], + 'TWh', + model) + data_vars['gen'] = create_data_array( + model.gen, ['hour', 'month', 'year', 'zone', 'tech'], 'TWh', model) + data_vars['install'] = create_data_array( + model.cap_existing, ['year', 'zone', 'tech'], 'MW', model ) - trans_export_v = create_data_array( - [[[[[model.get_value(model.trans_export[h, m, y, z1, z2]) / 1e6 - if (h, m, y, z1, z2) in model.hour_month_year_zone_zone_tuples - else np.nan - for h in hour] for m in month] - for y in year] for z2 in zone] for z1 in zone], - ['zone2', 'zone1', 'year', 'month', 'hour'], - { - 'month': month, 'hour': hour, 'year': year, - 'zone1': zone, 'zone2': zone - }, - 'TWh' + data_vars['carbon'] = create_data_array( + model.carbon, ['year'], 'Ton', model ) - gen_v = create_data_array( - [[[[[model.get_value(model.gen[h, m, y, z, te]) / 1e6 - for h in hour] for m in month] for y in year] - for z in zone] for te in tech], - ['tech', 'zone', 'year', 'month', 'hour'], - { - 'month': month, 'hour': hour, 'year': year, - 'zone': zone, 'tech': tech - }, - 'TWh' + data_vars['charge'] = create_data_array( + model.charge, ['hour', 'month', 'year', 'zone', 'tech'], 'MWh', model ) - install_v = create_data_array( - [[[model.get_value(model.cap_existing[y, z, te]) for y in year] - for z in zone] for te in tech], - ['tech', 'zone', 'year'], - {'zone': zone, 'tech': tech, 'year': year}, - 'MW' + data_vars['cost_var_breakdown'] = create_data_array( + model.cost_var_tech_breakdown, ['year', 'zone', 'tech'], + 'dollar', model ) - carbon_v = create_data_array( - [model.get_value(model.carbon[y]) for y in year], - ['year'], - {'year': year}, - 'Ton' + data_vars['cost_fix_breakdown'] = create_data_array( + model.cost_fix_tech_breakdown, ['year', 'zone', 'tech'], + 'dollar', model ) - charge_v = create_data_array( - [[[[[model.get_value(model.charge[h, m, y, z, te]) for h in hour] - for m in month] for y in year] for z in zone] for te in tech], - ['tech', 'zone', 'year', 'month', 'hour'], - { - 'tech': tech, 'zone': zone, 'year': year, - 'month': month, 'hour': hour - }, - 'MW' + data_vars['cost_newtech_breakdown'] = create_data_array( + model.cost_newtech_breakdown, ['year', 'zone', 'tech'], + 'dollar', model ) - cost_var_breakdown_v = create_data_array( - [[[model.get_value(model.cost_var_tech_breakdown[y, z, te]) - for y in year] for z in zone] for te in tech], - ['tech', 'zone', 'year'], - {'tech': tech, 'zone': zone, 'year': year}, - 'dollar' + data_vars['cost_newline_breakdown'] = create_data_array( + model.cost_newline_breakdown, ['year', 'zone1', 'zone2'], + 'dollar', model ) - cost_fix_breakdown_v = create_data_array( - [[[model.get_value(model.cost_fix_tech_breakdown[y, z, te]) - for y in year] for z in zone] for te in tech], - ['tech', 'zone', 'year'], - {'tech': tech, 'zone': zone, 'year': year}, 'dollar' + data_vars['carbon_breakdown'] = create_data_array( + model.carbon_breakdown, ['year', 'zone', 'tech'], 'Ton', model) + data_vars['cost_var'] = xr.DataArray(model.get_value(model.cost_var)) + data_vars['cost_fix'] = xr.DataArray(model.get_value(model.cost_fix)) + data_vars['cost_newtech'] = xr.DataArray( + data=model.get_value(model.cost_newtech) ) - cost_newtech_breakdown_v = create_data_array( - [[[model.get_value(model.cost_newtech_breakdown[y, z, te]) - for y in year] for z in zone] for te in tech], - ['tech', 'zone', 'year'], - {'tech': tech, 'zone': zone, 'year': year}, 'dollar' + data_vars['cost_newline'] = xr.DataArray( + data=model.get_value(model.cost_newline) ) - cost_newline_breakdown_v = create_data_array( - [[[model.get_value(model.cost_newline_breakdown[y, z1, z]) - if (y, z1, z) in model.year_zone_zone_tuples - else np.nan for y in year] for z1 in zone] for z in zone], - ['zone2', 'zone1', 'year'], - {'zone2': zone, 'zone1': zone, 'year': year}, 'dollar') - carbon_breakdown_v = create_data_array( - [[[model.get_value(model.carbon_breakdown[y, z, te]) - for y in year] for z in zone] for te in tech], - ['tech', 'zone', 'year'], - {'tech': tech, 'zone': zone, 'year': year}, 'ton' + data_vars['income'] = xr.DataArray( + data=model.get_value(model.income) ) - cost_v = xr.DataArray( - data = cost_var_values - + cost_fix_values - + cost_newtech_values - + cost_newline_values - - income_values + data_vars['cost'] = xr.DataArray( + data = model.get_value(model.cost) ) - cost_var_v = xr.DataArray(data=cost_var_values) - cost_fix_v = xr.DataArray(data=cost_fix_values) - cost_newtech_v = xr.DataArray(data=cost_newtech_values) - cost_newline_v = xr.DataArray(data=cost_newline_values) - income_v = xr.DataArray(data=income_values) - ds = xr.Dataset(data_vars={ - 'trans_import': trans_import_v, - 'trans_export': trans_export_v, - 'gen': gen_v, - 'carbon': carbon_v, - 'install': install_v, - 'cost': cost_v, - 'cost_var': cost_var_v, - 'cost_var_breakdown': cost_var_breakdown_v, - 'cost_fix_breakdown': cost_fix_breakdown_v, - 'cost_newtech_breakdown': cost_newtech_breakdown_v, - 'cost_newline_breakdown': cost_newline_breakdown_v, - 'carbon_breakdown': carbon_breakdown_v, - 'cost_fix': cost_fix_v, - 'charge': charge_v, - 'cost_newtech': cost_newtech_v, - 'cost_newline': cost_newline_v, - 'income': income_v - }) - return ds + return xr.Dataset(data_vars) @timer -def extract_results_hydro(model): +def extract_results_hydro( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] +) -> xr.Dataset: """Extracts results for hydro models. Parameters ---------- - model : pyoptinterface._src.solver.Model + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] Model to be solved. Returns @@ -206,34 +162,24 @@ def extract_results_hydro(model): of the model. """ ds = extract_results_non_hydro(model) - - stations = model.station - hour = model.hour - month = model.month - year = model.year - - genflow_v = create_data_array( - [[[[model.get_value(model.genflow[s, h, m, y]) - for h in hour] for m in month] for y in year] for s in stations], - ['station', 'year', 'month', 'hour'], - {'station': stations, 'year': year, 'month': month, 'hour': hour}, - 'm**3s**-1' + data_vars = {} + data_vars['genflow'] = create_data_array( + model.genflow, ['station', 'hour', 'month', 'year'], 'm**3s**-1', + model ) - spillflow_v = create_data_array( - [[[[model.get_value(model.spillflow[s, h, m, y]) for h in hour] - for m in month] for y in year] for s in stations], - ['station', 'year', 'month', 'hour'], - {'station': stations, 'year': year, 'month': month, 'hour': hour}, - 'm**3s**-1' + data_vars['spillflow'] = create_data_array( + model.spillflow, ['station', 'hour', 'month', 'year'], 'm**3s**-1', + model ) - ds = ds.assign({'genflow': genflow_v, 'spillflow': spillflow_v}) - - return ds + return ds.assign(data_vars) @timer -def save_to_excel(ds, output_filename): +def save_to_excel( + ds : xr.Dataset, + output_filename : str +) -> None: """Save the results to an Excel file. Parameters @@ -243,7 +189,10 @@ def save_to_excel(ds, output_filename): output_filename : str The name of the output file. """ - with pd.ExcelWriter(f'{output_filename}.xlsx') as writer: + # pylint: disable=abstract-class-instantiated + with pd.ExcelWriter( + f'{output_filename}.xlsx', engine='xlsxwriter' + ) as writer: for key in ds.data_vars: if len(ds[key].shape) == 0: df = pd.DataFrame([ds[key].values.max()], columns=[key]) @@ -252,7 +201,14 @@ def save_to_excel(ds, output_filename): df.to_excel(writer, sheet_name=key, merge_cells=False) -def save_result(model): +def save_result( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] +) -> None: """Extracts results from the provided model. Parameters @@ -277,7 +233,10 @@ def save_result(model): logging.info("Results are written to separate excel files") -def update_output_filename(output_filename, args): +def update_output_filename( + output_filename : str, + args : argparse.Namespace +) -> str: """Update the output filename based on the arguments. Parameters diff --git a/prepshot/set_up.py b/prepshot/set_up.py index bbc8ede..dd42966 100644 --- a/prepshot/set_up.py +++ b/prepshot/set_up.py @@ -1,24 +1,27 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -"""This module contains functions for setting parameters and configuring +"""This module contains functions for setting params and configuring logging. """ import logging import argparse from os import path, makedirs +from typing import Dict, List, Union, Tuple + +import pandas as pd from prepshot.load_data import load_json, extract_config_data, process_data from prepshot.logs import setup_logging, log_parameter_info -def parse_cli_arguments(params_list): +def parse_cli_arguments(params_list : List[str]) -> argparse.Namespace: """Parse command-line arguments from a list of parameter names. Parameters ---------- - params_list : list + params_list : List[str] List of parameter names. Returns @@ -37,18 +40,40 @@ def parse_cli_arguments(params_list): return parser.parse_args() -def initialize_environment(config_files): - """Load configuration data, set up logging, and process input parameters. +def initialize_environment( + config_files : Dict[str, str] +) -> Dict[ + str, + Union[ + int, float, bool, str, argparse.Namespace, pd.DataFrame, pd.Series, + List[Union[str, int]], + Dict[ + Union[str, int, Tuple[Union[str, int]]], + Union[str, float] + ] + ] + ]: + """Load configuration data, set up logging, and process input params. Parameters ---------- config_files : dict - Dictionary containing paths to parameters and configuration files. + Dictionary containing paths to params and configuration files. Returns ------- - tuple - A tuple containing the parameters dictionary and the output filename. + Dict[ + str, + Union[ + int, float, bool, str, argparse.Namespace, pd.DataFrame, pd.Series, + List[Union[str, int]], + Dict[ + Union[str, int, Tuple[Union[str, int]]], + Union[str, float] + ] + ] + ] + Dictionary containing the global params. """ params_filename = config_files['params_filename'] config_filename = config_files['config_filename'] @@ -68,18 +93,18 @@ def initialize_environment(config_files): input_filename = str(config_data['general_parameters']['input_folder']) input_filepath = path.join(filepath, input_filename) - # Update parameters with command-line arguments if provided. + # Update params with command-line arguments if provided. for param in params.keys(): if getattr(args, param) is not None: params[param]["file_name"] = params[param]["file_name"] \ + f"_{getattr(args, param)}" - # Load and process parameters data - parameters = process_data(params, input_filepath) - parameters['command_line_args'] = args + # Load and process params data + params = process_data(params, input_filepath) + params['command_line_args'] = args # Combine the configuration data with processed parameter data. - parameters.update(required_config_data) + params.update(required_config_data) # Determine the output folder path. output_folder = './' \ @@ -93,6 +118,6 @@ def initialize_environment(config_files): # Determine the output filename. output_filename = output_folder + '/' \ + str(config_data['general_parameters']['output_filename']) - - parameters['output_filename'] = output_filename - return parameters + + params['output_filename'] = output_filename + return params diff --git a/prepshot/solver.py b/prepshot/solver.py index cd63256..6dda308 100644 --- a/prepshot/solver.py +++ b/prepshot/solver.py @@ -5,6 +5,7 @@ """ import logging +from typing import Union import pyoptinterface as poi from pyoptinterface import mosek @@ -15,7 +16,12 @@ from prepshot.logs import timer from prepshot._model.head_iteration import run_model_iteration -def get_solver(params): +def get_solver(params : dict) -> Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ]: """Retrieve the solver object based on parameters. Parameters @@ -25,8 +31,13 @@ def get_solver(params): Returns ------- - pyoptinterface._src.solver - Solver object. + Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] + Type object of the solver module. """ solver_map = { 'mosek': mosek, @@ -57,12 +68,22 @@ def get_solver(params): return poi_solver -def set_solver_parameters(model): +def set_solver_parameters(model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ]) -> None: """Set the solver-specific parameters for the model. Parameters ---------- - model : pyoptinterface._src.solver.Model + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] Model to configurable. """ for key, value in model.params['solver'].items(): @@ -70,12 +91,25 @@ def set_solver_parameters(model): model.set_raw_parameter(key, value) @timer -def solve_model(model, params): +def solve_model( + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ], + params : dict +) -> bool: """Solve the model using the provided parameters. Parameters ---------- - model : pyoptinterface._src.solver.Model + model : Union[ + poi._src.highs.Model, + poi._src.gurobi.Model, + poi._src.mosek.Model, + poi._src.copt.Model + ] Model to solve. params : dict Configuration parameters for solving the model. diff --git a/prepshot/utils.py b/prepshot/utils.py index 6c2fe73..8da276f 100644 --- a/prepshot/utils.py +++ b/prepshot/utils.py @@ -3,17 +3,19 @@ """This module contains utility functions for the model. """ - +from typing import Union, Tuple, List from itertools import product from scipy import interpolate +import pandas as pd +import numpy as np -def check_positive(*values): +def check_positive(*values : Union[int, float]) -> None: """Ensure all values are greater than 0. Parameters ---------- - values : int or float + values : Union[int, float] Values to be checked. Raises @@ -26,24 +28,31 @@ def check_positive(*values): raise ValueError("All arguments must be greater than 0.") def calc_inv_cost_factor( - dep_period, interest_rate, year_built, discount_rate, year_min, year_max -): - """Compute the investment cost factor. + dep_period : int, + interest_rate : float, + year_built : int, + discount_rate : int, + year_min : int, + year_max : int +) -> float: + """Compute the investment cost factor. When the depreciation period is + greater than the planning horizon, the investment cost factor is calculated + by only considering the period within the planning horizon. Parameters ---------- dep_period : int - Depreciation period. + Depreciation period, in years, i.e., lifetime of the infrastructure. interest_rate : float Interest rate. year_built : int - Year of construction. + Year of investment. discount_rate : float Discount rate. year_min : int - Minimum year. + Minimum year, i.e., the first year of the planning horizon. year_max : int - Maximum year. + Maximum year, i.e., the last year of the planning horizon. Returns ------- @@ -55,6 +64,21 @@ def calc_inv_cost_factor( ValueError If year_max <= year_min, year_max < year_built, or year_built < year_min. + + Examples + -------- + Given a depreciation period of 20 years, interest rate of 0.05, year of + investment in 2025, discount rate of 0.05, planning horizon from 2020 to + 2050, compute the investment cost factor: + >>> calc_inv_cost_factor(20, 0.05, 2025, 0.05, 2020, 2050) + 0.783526 + + If the depreciation perios is 100 years, compute the investment cost factor + for the same scenario: + + >>> calc_inv_cost_factor(100, 0.05, 2025, 0.05, 2020, 2050) + 0.567482 + """ check_positive(dep_period, interest_rate, year_built, year_min, year_max) if (year_max <= year_min) or (year_max < year_built) \ @@ -67,32 +91,49 @@ def calc_inv_cost_factor( * (1 - (1 + discount_rate) ** (-min(dep_period, years_to_max))) / (discount_rate * (1 + discount_rate) ** years_since_min)) -def calc_cost_factor(discount_rate, modeled_year, year_min, next_modeled_year): - """Compute the cost factor. +def calc_cost_factor( + discount_rate : float, + modeled_year : int, + year_min : int, + next_modeled_year : int +) -> float: + """Compute the variable and fixed cost factor while considering the + multi-stage planning horizon. Parameters ---------- discount_rate : float - Discount rate. + The discount rate to apply. modeled_year : int - Modeled year. + The year in which the cost occurs. year_min : int - Minimum year. + The first year of the planning horizon. All costs are discounted to + this year. next_modeled_year : int - Next year. + The subsequent modeled year. The cost incurred bewteen modeled_year + and modeled_year and next_modeled_year is calculated. Returns ------- float - Cost factor. + The computed cost factor. Raises ------ ValueError if next_modeled_year < modeled_year. + + Examples + -------- + Given annual cost incurred in 2025, next_modeled_year = 2030, and starting + year = 2020, compute present value in 2020 of the cost incurred in + 2025-2029: + >>> calc_cost_factor(0.05, 2025, 2020, 2030) + 3.561871 + """ check_positive(discount_rate, modeled_year, year_min, next_modeled_year) - if next_modeled_year < modeled_year: + if next_modeled_year <= modeled_year: raise ValueError( "Next modeled year must be greater than or" + "equal to the current modeled year." @@ -103,42 +144,54 @@ def calc_cost_factor(discount_rate, modeled_year, year_min, next_modeled_year): return (1 - (1 + discount_rate) ** (-years_to_next)) \ / (discount_rate * (1 + discount_rate) ** (years_since_min - 1)) -def interpolate_z_by_q_or_s(name, qs, zqv): +def interpolate_z_by_q_or_s( + name : str, + qs : Union[np.ndarray, float], + zqv : pd.DataFrame +) -> float: """Interpolate forebay water level (Z) by reservoir storage (S) or tailrace water level (Z) by the reservoir outflow (Q). Parameters ---------- - name : str - Name of the hydropower station. - qs : numpy.ndarray - Array of Q or S values. + name : str + Code of the hydropower station. + qs : Union[np.ndarray, float] + Reservoir storage or outflow values. zqv : pandas.DataFrame DataFrame of ZQ or ZV values. Returns ------- - scipy.interpolate.interp1d - Array of interpolated values. + Union[np.ndarray, float] + Interpolated values. """ - zqv_temp = zqv[(zqv.name == int(name)) | (zqv.name == str(name))] - x = zqv_temp.Q if 'Q' in zqv_temp.columns else zqv_temp.V - f_zqv = interpolate.interp1d(x, zqv_temp.Z, fill_value='extrapolate') + zqv_station = zqv[(zqv.name == int(name)) | (zqv.name == str(name))] + x = zqv_station.Q if 'Q' in zqv_station.columns else zqv_station.V + f_zqv = interpolate.interp1d(x, zqv_station.Z, fill_value='extrapolate') return f_zqv(qs) - -def cartesian_product(*args): +def cartesian_product( + *args : List[Union[int, str]] +) -> List[Tuple[Union[int, str]]]: """Generate Cartesian product of input iterables. - + Parameters ---------- - args : iterable - Input iterables. - + args : List[Union[int, str]] + Iterables to be combined. + Returns ------- - list - List of tuples representing the Cartesian product. + List[Tuple[Union[int, str]]] + List of tuples representing the Cartesian product. + + Examples + -------- + Combine two lists [1, 2] and [7, 8]: + + >>> cartesian_product([1, 2], [7, 8]) + [(1, 7), (1, 8), (2, 7), (2, 8)] + """ - # [1, 2], [7, 8] -> [(1, 7), (1, 8), (2, 7), (2, 8)] return list(product(*args)) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..3d781f1 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,121 @@ +"""This module contains tests for the utils module. +""" + +import unittest + +import numpy as np +import pandas as pd + +from prepshot.utils import calc_cost_factor +from prepshot.utils import calc_inv_cost_factor +from prepshot.utils import check_positive +from prepshot.utils import interpolate_z_by_q_or_s +from prepshot.utils import cartesian_product + +class TestUtils(unittest.TestCase): + """Tests for the utils module. + """ + + def test_calc_cost_factor(self): + """Test the prepshot.utils.calc_cost_factor function. + """ + self.assertAlmostEqual( + calc_cost_factor(0.05, 2025, 2020, 2030), + 3.561871, places=6 + ) + with self.assertRaises(ValueError): + calc_cost_factor(0.05, 2025, 2020, 2024) + with self.assertRaises(ValueError): + calc_cost_factor(0.05, 2025, 2026, 2025) + with self.assertRaises(ValueError): + calc_cost_factor(-0.05, 2025, 2020, 2030) + + def test_calc_inv_cost_factor(self): + """Test the prepshot.utils.calc_inv_cost_factor function. + """ + self.assertAlmostEqual( + calc_inv_cost_factor(20, 0.05, 2025, 0.05, 2020, 2050), + 0.783526, places=6 + ) + self.assertAlmostEqual( + calc_inv_cost_factor(100, 0.05, 2025, 0.05, 2020, 2050), + 0.567482, places=6 + ) + + def test_check_positive(self): + """Test the prepshot.utils.check_positive function. + """ + with self.assertRaises(ValueError): + check_positive(-0.01) + with self.assertRaises(ValueError): + check_positive(0) + + def test_interpolate_z_by_q_or_s(self): + """Test the prepshot.utils.interpolate_z_by_q_or_s function. + """ + name = '001' + qs_val = 100 + qs_arr = [0, 200, 250] + qs_beyond_val = 400 + qs_beyoud_arr = [-50, 200, 400] + zq = pd.DataFrame({ + 'name' : [name] * 3, + 'Q' : [0, 200, 300], + 'Z' : [0, 100, 200] + }) + zs = pd.DataFrame({ + 'name' : [name] * 3, + 'V' : [0, 200, 300], + 'Z' : [0, 100, 200] + }) + self.assertAlmostEqual( + interpolate_z_by_q_or_s(name, qs_val, zq), + 50 + ) + self.assertAlmostEqual( + interpolate_z_by_q_or_s(name, qs_val, zs), + 50 + ) + np.testing.assert_allclose( + interpolate_z_by_q_or_s(name, qs_arr, zq), + np.array([0, 100, 150]) + ) + np.testing.assert_allclose( + interpolate_z_by_q_or_s(name, qs_arr, zs), + np.array([0, 100, 150]) + ) + self.assertAlmostEqual( + interpolate_z_by_q_or_s(name, qs_beyond_val, zq), + 300 + ) + self.assertAlmostEqual( + interpolate_z_by_q_or_s(name, qs_beyond_val, zs), + 300 + ) + np.testing.assert_allclose( + interpolate_z_by_q_or_s(name, qs_beyoud_arr, zq), + np.array([-25, 100, 300]) + ) + np.testing.assert_allclose( + interpolate_z_by_q_or_s(name, qs_beyoud_arr, zs), + np.array([-25, 100, 300]) + ) + + def test_cartestian(self): + """Test the prepshot.utils.cartestian function. + """ + self.assertListEqual( + cartesian_product([1, 2], [3, 4]), + [(1, 3), (1, 4), (2, 3), (2, 4)] + ) + self.assertListEqual( + cartesian_product([1], [3, 4]), + [(1, 3), (1, 4)] + ) + self.assertListEqual( + cartesian_product([1, 2]), + [(1,), (2,)] + ) + +if __name__ == '__main__': + unittest.main()