import logging
from math import exp, log, pi
from typing import Dict
import scipy.spatial
import numpy as np
import pandas as pd
logging.basicConfig(format='%(asctime)s\t\t%(message)s', level=logging.ERROR)
logger = logging.getLogger(__name__)
# Columns in settlements file must match these exactly
SET_COUNTRY = 'Country' # This cannot be changed, lots of code will break
SET_X_DEG = 'X_deg' # Coordinates in degrees
SET_Y_DEG = 'Y_deg'
SET_POP = 'Pop' # Population in people per point (equally, people per km2)
SET_POP_CALIB = 'PopStartYear' # Calibrated population to reference year, same units
SET_GRID_DIST_CURRENT = 'GridDistCurrent' # Distance in km from current grid
SET_GRID_DIST_PLANNED = 'GridDistPlan' # Distance in km from current and future grid
SET_ROAD_DIST = 'RoadDist' # Distance in km from road network
SET_NIGHT_LIGHTS = 'NightLights' # Intensity of night time lights (from NASA), range 0 - 63
SET_TRAVEL_HOURS = 'TravelHours' # Travel time to large city in hours
SET_GHI = 'GHI' # Global horizontal irradiance in kWh/m2/day
SET_WINDVEL = 'WindVel' # Wind velocity in m/s
SET_WINDCF = 'WindCF' # Wind capacity factor as percentage (range 0 - 1)
SET_HYDRO = 'Hydropower' # Hydropower potential in kW
SET_HYDRO_DIST = 'HydropowerDist' # Distance to hydropower site in km
SET_HYDRO_FID = 'HydropowerFID' # the unique tag for eah hydropower, to not over-utilise
SET_SUBSTATION_DIST = 'SubstationDist'
SET_ELEVATION = 'Elevation' # in metres
SET_SLOPE = 'Slope' # in degrees
SET_LAND_COVER = 'LandCover'
SET_road_dist_classified = 'RoadDistClassified'
SET_substation_dist_classified = 'SubstationDistClassified'
SET_elevation_classified = 'ElevationClassified'
SET_slope_classified = 'SlopeClassified'
SET_land_cover_classified = 'LandCoverClassified'
SET_combined_classification = 'GridClassification'
SET_GRID_PENALTY = 'GridPenalty'
SET_URBAN = 'IsUrban' # Whether the site is urban (0 or 1)
SET_ENERGY_PER_CELL = 'EnergyPerSettlement'
SET_NUM_PEOPLE_PER_HH = 'NumPeoplePerHH'
SET_ELEC_CURRENT = 'ElecStart' # If the site is currently electrified (0 or 1)
SET_NEW_CONNECTIONS = 'NewConnections' # Number of new people with electricity connections
SET_MIN_GRID_DIST = 'MinGridDist'
SET_LCOE_GRID = 'Grid' # All LCOE's in USD/kWh
SET_LCOE_SA_PV = 'SA_PV'
SET_LCOE_SA_DIESEL = 'SA_Diesel'
SET_LCOE_MG_WIND = 'MG_Wind'
SET_LCOE_MG_DIESEL = 'MG_Diesel'
SET_LCOE_MG_PV = 'MG_PV'
SET_LCOE_MG_HYDRO = 'MG_Hydro'
SET_MIN_OFFGRID = 'Minimum_Tech_Off_grid' # The technology with lowest lcoe (excluding grid)
SET_MIN_OVERALL = 'MinimumOverall' # Same as above, but including grid
SET_MIN_OFFGRID_LCOE = 'Minimum_LCOE_Off_grid' # The lcoe value for minimum tech
SET_MIN_OVERALL_LCOE = 'MinimumOverallLCOE' # The lcoe value for overall minimum
SET_MIN_OVERALL_CODE = 'MinimumOverallCode' # And a code from 1 - 7 to represent that option
SET_MIN_CATEGORY = 'MinimumCategory' # The category with minimum lcoe (grid, minigrid or standalone)
SET_NEW_CAPACITY = 'NewCapacity' # Capacity in kW
SET_INVESTMENT_COST = 'InvestmentCost' # The investment cost in USD
SET_ELEC_ORDER = "ElectrificationOrder"
SET_LIMIT = "ElecStatusIn"
SET_MIN_OFFGRID_CODE = "Off_Grid_Code"
SET_ELEC_FINAL_CODE = "FinalElecCode"
SET_DIST_TO_TRANS = "TransformerDist"
SET_TOTAL_ENERGY_PER_CELL = "TotalEnergyPerCell" # all previous + current timestep
SET_RESIDENTIAL_DEMAND = "ResidentialDemand"
SET_AGRI_DEMAND = 'AgriDemand'
SET_HEALTH_DEMAND = 'HealthDemand'
SET_EDU_DEMAND = 'EducationDemand'
SET_COMMERCIAL_DEMAND = 'CommercialDemand'
SET_GRID_CELL_AREA = 'GridCellArea'
SET_MV_CONNECT_DIST = 'MVConnectDist'
SET_HV_DIST_CURRENT = 'CurrentHVLineDist'
SET_HV_DIST_PLANNED = 'PlannedHVLineDist'
SET_MV_DIST_CURRENT = 'CurrentMVLineDist'
SET_MV_DIST_PLANNED = 'PlannedMVLineDist'
SET_ELEC_POP = 'ElecPop'
SET_ELEC_POP_CALIB = 'ElecPopCalib'
SET_WTFtier = "ResidentialDemandTier"
SET_TIER = 'Tier'
SET_INVEST_PER_CAPITA = "InvestmentCapita"
SET_CALIB_GRID_DIST = 'GridDistCalibElec'
SET_CAPITA_DEMAND = 'PerCapitaDemand'
SET_RESIDENTIAL_TIER = 'ResidentialDemandTier'
SET_MIN_TD_DIST = 'minTDdist'
SET_SA_DIESEL_FUEL = 'SADieselFuelCost'
SET_MG_DIESEL_FUEL = 'MGDieselFuelCost'
# General
LHV_DIESEL = 9.9445485 # (kWh/l) lower heating value
HOURS_PER_YEAR = 8760
[docs]class Technology:
"""
Used to define the parameters for each electricity access technology, and to calculate the LCOE depending on
input parameters.
"""
def __init__(self,
tech_life=0, # in years
base_to_peak_load_ratio=0,
distribution_losses=0, # percentage
connection_cost_per_hh=0, # USD/hh
om_costs=0.0, # OM costs as percentage of capital costs
capital_cost={float("inf"): 0}, # USD/kW
capacity_factor=0.9, # percentage
grid_penalty_ratio=1, # multiplier
efficiency=1.0, # percentage
diesel_price=0.0, # USD/litre
grid_price=0.0, # USD/kWh for grid electricity
standalone=False,
mini_grid=False,
existing_grid_cost_ratio=0.1, # percentage
grid_capacity_investment=0.0, # USD/kW for on-grid capacity investments (excluding grid itself)
diesel_truck_consumption=0, # litres/hour
diesel_truck_volume=0, # litres
om_of_td_lines=0): # percentage
self.distribution_losses = distribution_losses
self.connection_cost_per_hh = connection_cost_per_hh
self.base_to_peak_load_ratio = base_to_peak_load_ratio
self.tech_life = tech_life
self.om_costs = om_costs
self.capital_cost = capital_cost
self.capacity_factor = capacity_factor
self.grid_penalty_ratio = grid_penalty_ratio
self.efficiency = efficiency
self.diesel_price = diesel_price
self.grid_price = grid_price
self.standalone = standalone
self.mini_grid = mini_grid
self.existing_grid_cost_ratio = existing_grid_cost_ratio
self.grid_capacity_investment = grid_capacity_investment
self.diesel_truck_consumption = diesel_truck_consumption
self.diesel_truck_volume = diesel_truck_volume
self.om_of_td_lines = om_of_td_lines
[docs] @classmethod
def set_default_values(cls, base_year, start_year, end_year, discount_rate, hv_line_type=69, hv_line_cost=53000,
mv_line_type=33, mv_line_amperage_limit=8.0, mv_line_cost=7000, lv_line_type=0.240,
lv_line_cost=4250, lv_line_max_length=0.5, service_transf_type=50, service_transf_cost=4250,
max_nodes_per_serv_trans=300, mv_lv_sub_station_type=400, mv_lv_sub_station_cost=10000,
mv_mv_sub_station_cost=10000, hv_lv_sub_station_type=1000, hv_lv_sub_station_cost=25000,
hv_mv_sub_station_cost=25000, power_factor=0.9, load_moment=9643):
"""Initialises the class with parameter values common to all Technologies
"""
cls.base_year = base_year
cls.start_year = start_year
cls.end_year = end_year
# RUN_PARAM: Here are the assumptions related to cost and physical properties of grid extension elements
cls.discount_rate = discount_rate
cls.hv_line_type = hv_line_type # kV
cls.hv_line_cost = hv_line_cost # $/km for 69kV
cls.mv_line_type = mv_line_type # kV
cls.mv_line_amperage_limit = mv_line_amperage_limit # Ampere (A)
cls.mv_line_cost = mv_line_cost # $/km for 11-33 kV
cls.lv_line_type = lv_line_type # kV
cls.lv_line_cost = lv_line_cost # $/km
cls.lv_line_max_length = lv_line_max_length # km
cls.service_transf_type = service_transf_type # kVa
cls.service_transf_cost = service_transf_cost # $/unit
cls.max_nodes_per_serv_trans = max_nodes_per_serv_trans # max number of nodes served by a service transformer
cls.mv_lv_sub_station_type = mv_lv_sub_station_type # kVa
cls.mv_lv_sub_station_cost = mv_lv_sub_station_cost # $/unit
cls.mv_mv_sub_station_cost = mv_mv_sub_station_cost # $/unit
cls.hv_lv_sub_station_type = hv_lv_sub_station_type # kVa
cls.hv_lv_sub_station_cost = hv_lv_sub_station_cost # $/unit
cls.hv_mv_sub_station_cost = hv_mv_sub_station_cost # $/unit
cls.power_factor = power_factor
cls.load_moment = load_moment # for 50mm aluminum conductor under 5% voltage drop (kW m)
[docs] def get_lcoe(self, energy_per_cell, people, num_people_per_hh, start_year, end_year, new_connections,
total_energy_per_cell, prev_code, grid_cell_area, additional_mv_line_length=0.0,
capacity_factor=0.9, grid_penalty_ratio=1, fuel_cost=0, elec_loop=0, productive_nodes=0,
additional_transformer=0, penalty=1):
"""Calculates the LCOE depending on the parameters.
Parameters
----------
people : float or pandas.Series
Number of people in settlement
new_connections : float or pandas.Series
Number of new people in settlement to connect
prev_code : int or pandas.Series
Code representation of previous supply technology in settlement
total_energy_per_cell : float or pandas.Series
Total annual energy demand in cell, including already met demand
energy_per_cell : float or pandas.Series
Annual energy demand in cell, excluding already met demand
num_people_per_hh : float or pandas.Series
Number of people per household in settlement
grid_cell_area : float or pandas.Series
Area of settlement (km2)
additional_mv_line_length : float or pandas.Series
Distance to connect the settlement
additional_transformer : int
If a transformer is needed on other end to connect to HV line
productive_nodes : int or pandas.Series
Additional connections (schools, health facilities, shops)
elec_loop : int or pandas.Series
Round of extension in grid extension algorithm
penalty : float or pandas.Series
Cost penalty factor for T&D network, e.g. https://www.mdpi.com/2071-1050/12/3/777
start_year : int
end_year : int
capacity_factor : float or pandas.Series
grid_penalty_ratio : float or pandas.Series
fuel_cost : float or pandas.Series
Returns
-------
lcoe or discounted investment cost
"""
if type(people) == int or type(people) == float or type(people) == np.float64:
if people == 0:
# If there are no people, set the people low (prevent div/0 error) and continue.
people = 0.00001
else:
people = np.maximum(people, 0.00001)
if type(energy_per_cell) == int or type(energy_per_cell) == float or type(energy_per_cell) == np.float64:
if energy_per_cell == 0:
# If there is no demand, set the demand low (prevent div/0 error) and continue.
energy_per_cell = 0.000000000001
else:
energy_per_cell = np.maximum(energy_per_cell, 0.000000000001)
grid_penalty_ratio = np.maximum(1, grid_penalty_ratio)
generation_per_year, peak_load, td_investment_cost = self.td_network_cost(people,
new_connections,
prev_code,
total_energy_per_cell,
energy_per_cell,
num_people_per_hh,
grid_cell_area,
additional_mv_line_length,
additional_transformer,
productive_nodes,
elec_loop,
penalty)
generation_per_year = pd.Series(generation_per_year)
peak_load = pd.Series(peak_load)
td_investment_cost = pd.Series(td_investment_cost)
td_investment_cost = td_investment_cost * grid_penalty_ratio
td_om_cost = td_investment_cost * self.om_of_td_lines * penalty
installed_capacity = peak_load / capacity_factor
cap_cost = td_investment_cost * 0
cost_dict_list = self.capital_cost.keys()
cost_dict_list = sorted(cost_dict_list)
for key in cost_dict_list:
if self.standalone:
cap_cost.loc[((installed_capacity / (people / num_people_per_hh)) < key) & (cap_cost == 0)] = \
self.capital_cost[key]
else:
cap_cost.loc[(installed_capacity < key) & (cap_cost == 0)] = self.capital_cost[key]
capital_investment = installed_capacity * cap_cost * penalty
total_om_cost = td_om_cost + (cap_cost * penalty * self.om_costs * installed_capacity)
total_investment_cost = td_investment_cost + capital_investment
if self.grid_price > 0:
fuel_cost = self.grid_price
# Perform the time-value LCOE calculation
project_life = end_year - self.base_year + 1
reinvest_year = 0
step = start_year - self.base_year
# If the technology life is less than the project life, we will have to invest twice to buy it again
if self.tech_life + step < project_life:
reinvest_year = self.tech_life + step
year = np.arange(project_life)
el_gen = np.outer(np.asarray(generation_per_year), np.ones(project_life))
for s in range(step):
el_gen[:, s] = 0
discount_factor = (1 + self.discount_rate) ** year
investments = np.zeros(project_life)
investments[step] = 1
# Calculate the year of re-investment if tech_life is smaller than project life
if reinvest_year:
investments[reinvest_year] = 1
investments = np.outer(total_investment_cost, investments)
grid_capacity_investments = np.zeros(project_life)
grid_capacity_investments[step] = 1
# Calculate the year of re-investment if tech_life is smaller than project life
if reinvest_year:
grid_capacity_investments[reinvest_year] = 1
grid_capacity_investments = np.outer(peak_load * self.grid_capacity_investment, grid_capacity_investments)
# Calculate salvage value if tech_life is bigger than project life
salvage = np.zeros(project_life)
if reinvest_year > 0:
used_life = (project_life - step) - self.tech_life
else:
used_life = project_life - step - 1
salvage[-1] = 1
salvage = np.outer(total_investment_cost * (1 - used_life / self.tech_life), salvage)
operation_and_maintenance = np.ones(project_life)
for s in range(step):
operation_and_maintenance[s] = 0
operation_and_maintenance = np.outer(total_om_cost, operation_and_maintenance)
fuel = np.outer(np.asarray(generation_per_year), np.zeros(project_life))
for p in range(project_life):
fuel[:, p] = el_gen[:, p] * fuel_cost
discounted_investments = investments / discount_factor
dicounted_grid_capacity_investments = grid_capacity_investments / discount_factor
investment_cost = np.sum(discounted_investments, axis=1) + np.sum(dicounted_grid_capacity_investments, axis=1)
discounted_costs = (investments + operation_and_maintenance + fuel - salvage) / discount_factor
discounted_generation = el_gen / discount_factor
lcoe = np.sum(discounted_costs, axis=1) / np.sum(discounted_generation, axis=1)
#lcoe = pd.DataFrame(lcoe[:, np.newaxis])
#investment_cost = pd.DataFrame(investment_cost[:, np.newaxis])
#installed_capacity = pd.DataFrame(installed_capacity[:, np.newaxis])
lcoe = pd.DataFrame(lcoe)
investment_cost = pd.DataFrame(investment_cost)
installed_capacity = pd.DataFrame(installed_capacity)
return lcoe, investment_cost, installed_capacity
[docs] def transmission_network(self, peak_load, additional_mv_line_length=0, additional_transformer=0,
mv_distribution=False):
"""This method calculates the required components for connecting the settlement
Settlements can be connected to grid or a hydropower source
This includes potentially HV lines, MV lines and substations
Arguments
---------
peak_load : float
Peak load in the settlement (kW)
additional_mv_line_length : float
Distance to connect the settlement
additional_transformer : int
If a transformer is needed on other end to connect to HV line
mv_distribution : bool
True if distribution network in settlement contains MV lines
Notes
-----
Based on: https://www.mdpi.com/1996-1073/12/7/1395
"""
mv_km = 0
hv_km = 0
no_of_hv_mv_subs = 0
no_of_mv_mv_subs = 0
no_of_hv_lv_subs = 0
no_of_mv_lv_subs = 0
if not self.standalone:
# Sizing HV/MV
hv_to_mv_lines = self.hv_line_cost / self.mv_line_cost
max_mv_load = self.mv_line_amperage_limit * self.mv_line_type * hv_to_mv_lines
mv_amperage = self.mv_lv_sub_station_type / self.mv_line_type
no_of_mv_lines = np.ceil(peak_load / (mv_amperage * self.mv_line_type))
hv_amperage = self.hv_lv_sub_station_type / self.hv_line_type
no_of_hv_lines = np.ceil(peak_load / (hv_amperage * self.hv_line_type))
if additional_transformer > 0:
mv_km = 0
hv_km = additional_mv_line_length * no_of_hv_lines
else:
mv_km = np.where((peak_load <= max_mv_load) & (additional_mv_line_length < 50),
additional_mv_line_length * no_of_mv_lines,
0)
hv_km = np.where((peak_load <= max_mv_load) & (additional_mv_line_length < 50),
0,
additional_mv_line_length * no_of_hv_lines)
no_of_hv_mv_subs = np.where(mv_distribution & (hv_km > 0),
np.ceil(peak_load / self.mv_lv_sub_station_type),
0)
no_of_mv_mv_subs = np.where(mv_distribution & (mv_km > 0),
np.ceil(peak_load / self.mv_lv_sub_station_type),
0)
no_of_hv_lv_subs = np.where(mv_distribution,
0,
np.where(hv_km > 0, np.ceil(peak_load / self.hv_lv_sub_station_type), 0))
if self.mini_grid:
no_of_mv_lv_subs = np.where(mv_km > 0,
np.ceil(peak_load / self.mv_lv_sub_station_type),
0)
else:
no_of_mv_lv_subs = np.where(mv_distribution,
np.where(hv_km == 0, np.where(
mv_km == 0, np.ceil(peak_load / self.mv_lv_sub_station_type), 0), 0),
np.ceil(peak_load / self.mv_lv_sub_station_type))
no_of_hv_mv_subs += additional_transformer # to connect the MV line to the HV grid
return hv_km, mv_km, no_of_hv_mv_subs, no_of_mv_mv_subs, no_of_hv_lv_subs, no_of_mv_lv_subs
[docs] def distribution_network(self, people, energy_per_cell, num_people_per_hh, grid_cell_area,
productive_nodes=0):
"""This method calculates the required components for the distribution network
This includes potentially MV lines, LV lines and service transformers
Arguments
---------
people : float
Number of people in settlement
energy_per_cell : float
Annual energy demand in settlement (kWh)
num_people_per_hh : float
Number of people per household in settlement
grid_cell_area : float
Area of settlement (km2)
productive_nodes : int
Additional connections (schools, health facilities, shops)
Notes
-----
Based on: https://www.mdpi.com/1996-1073/12/7/1395
"""
consumption = energy_per_cell # kWh/year
average_load = consumption / (1 - self.distribution_losses) / HOURS_PER_YEAR # kW
peak_load = average_load / self.base_to_peak_load_ratio # kW
if self.standalone:
cluster_mv_lines_length = 0
lv_km = 0
no_of_service_transf = 0
total_nodes = 0
else:
s_max = peak_load / self.power_factor
max_transformer_area = pi * self.lv_line_max_length ** 2
total_nodes = (people / num_people_per_hh) + productive_nodes
no_of_service_transf = np.ceil(
np.maximum(s_max / self.service_transf_type, np.maximum(total_nodes / self.max_nodes_per_serv_trans,
grid_cell_area / max_transformer_area)))
transformer_radius = ((grid_cell_area / no_of_service_transf) / pi) ** 0.5
transformer_load = peak_load / no_of_service_transf
cluster_radius = (grid_cell_area / pi) ** 0.5
# Sizing lv lines in settlement
cluster_lv_lines_length = np.where(2 / 3 * cluster_radius * transformer_load * 1000 < self.load_moment,
2 / 3 * cluster_radius * no_of_service_transf,
0)
cluster_mv_lines_length = np.where(2 / 3 * cluster_radius * transformer_load * 1000 >= self.load_moment,
2 * transformer_radius * no_of_service_transf,
0)
hh_area = grid_cell_area / total_nodes
hh_diameter = 2 * ((hh_area / pi) ** 0.5)
transformer_lv_lines_length = hh_diameter * total_nodes
lv_km = cluster_lv_lines_length + transformer_lv_lines_length
return cluster_mv_lines_length, lv_km, no_of_service_transf, consumption, peak_load, total_nodes
[docs] def td_network_cost(self, people, new_connections, prev_code, total_energy_per_cell, energy_per_cell,
num_people_per_hh, grid_cell_area, additional_mv_line_length=0, additional_transformer=0,
productive_nodes=0, elec_loop=0, penalty=1):
"""Calculates all the transmission and distribution network components
Parameters
----------
people : float
Number of people in settlement
new_connections : float
Number of new people in settlement to connect
prev_code : int
Code representation of previous supply technology in settlement
total_energy_per_cell : float
Total annual energy demand in cell, including already met demand
energy_per_cell : float
Annual energy demand in cell, excluding already met demand
num_people_per_hh : float
Number of people per household in settlement
grid_cell_area : float
Area of settlement (km2)
additional_mv_line_length : float
Distance to connect the settlement
additional_transformer : int
If a transformer is needed on other end to connect to HV line
productive_nodes : int
Additional connections (schools, health facilities, shops)
elec_loop : int
Round of extension in grid extension algorithm
penalty : float
Cost penalty factor for T&D network, e.g. https://www.mdpi.com/2071-1050/12/3/777
"""
# Start by calculating the distribution network required to meet all of the demand
cluster_mv_lines_length_total, cluster_lv_lines_length_total, no_of_service_transf_total, \
generation_per_year_total, peak_load_total, total_nodes_total = \
self.distribution_network(people, total_energy_per_cell, num_people_per_hh, grid_cell_area,
productive_nodes)
# Next calculate the network that is already there
cluster_mv_lines_length_existing, cluster_lv_lines_length_existing, no_of_service_transf_existing, \
generation_per_year_existing, peak_load_existing, total_nodes_existing = \
self.distribution_network(np.maximum((people - new_connections), 1),
(total_energy_per_cell - energy_per_cell),
num_people_per_hh, grid_cell_area, productive_nodes)
# Then calculate the difference between the two
mv_lines_distribution_length_additional = \
np.maximum(cluster_lv_lines_length_total - cluster_lv_lines_length_existing, 0)
total_lv_lines_length_additional = \
np.maximum(cluster_lv_lines_length_total - cluster_lv_lines_length_existing, 0)
num_transformers_additional = np.maximum(no_of_service_transf_total - no_of_service_transf_existing, 0)
generation_per_year_additional = np.maximum(generation_per_year_total - generation_per_year_existing, 0)
peak_load_additional = np.maximum(peak_load_total - peak_load_existing, 0)
total_nodes_additional = np.maximum(total_nodes_total - total_nodes_existing, 0)
# Examine if there are any MV lines in the distribution network, used to determine transformer type
mv_distribution = np.where(mv_lines_distribution_length_additional > 0, True, False)
# Then calculate the transmission network (HV or MV lines plus transformers) using the same methodology
hv_lines_total_length_total, mv_lines_connection_length_total, no_of_hv_mv_substation_total, \
no_of_mv_mv_substation_total, no_of_hv_lv_substation_total, no_of_mv_lv_substation_total = \
self.transmission_network(peak_load_total, additional_mv_line_length, additional_transformer,
mv_distribution=mv_distribution)
hv_lines_total_length_existing, mv_lines_connection_length_existing, no_of_hv_mv_substation_existing, \
no_of_mv_mv_substation_existing, no_of_hv_lv_substation_existing, no_of_mv_lv_substation_existing = \
self.transmission_network(peak_load_existing, additional_mv_line_length, additional_transformer,
mv_distribution=mv_distribution)
hv_lines_total_length_additional = np.maximum(hv_lines_total_length_total - hv_lines_total_length_existing, 0)
mv_lines_connection_length_additional = \
np.maximum(mv_lines_connection_length_total - mv_lines_connection_length_existing, 0)
no_of_hv_lv_substation_additional = \
np.maximum(no_of_hv_lv_substation_total - no_of_hv_lv_substation_existing, 0)
no_of_hv_mv_substation_additional = \
np.maximum(no_of_hv_mv_substation_total - no_of_hv_mv_substation_existing, 0)
no_of_mv_mv_substation_additional = \
np.maximum(no_of_mv_mv_substation_total - no_of_mv_mv_substation_existing, 0)
no_of_mv_lv_substation_additional = \
np.maximum(no_of_mv_lv_substation_total - no_of_mv_lv_substation_existing, 0)
# If no distribution network is present, perform the calculations only once
mv_lines_distribution_length_new, total_lv_lines_length_new, num_transformers_new, generation_per_year_new, \
peak_load_new, total_nodes_new = self.distribution_network(people, energy_per_cell, num_people_per_hh,
grid_cell_area, productive_nodes)
mv_distribution = np.where(mv_lines_distribution_length_new > 0, True, False)
hv_lines_total_length_new, mv_lines_connection_length_new, no_of_hv_mv_substation_new, \
no_of_mv_mv_substation_new, no_of_hv_lv_substation_new, no_of_mv_lv_substation_new = \
self.transmission_network(peak_load_new, additional_mv_line_length, additional_transformer,
mv_distribution=mv_distribution)
mv_lines_distribution_length = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
mv_lines_distribution_length_additional,
mv_lines_distribution_length_new)
hv_lines_total_length = np.where((people != new_connections) & (prev_code < 2),
hv_lines_total_length_additional,
hv_lines_total_length_new)
mv_lines_connection_length = np.where((people != new_connections) & (prev_code < 2),
mv_lines_connection_length_additional,
mv_lines_connection_length_new)
total_lv_lines_length = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
total_lv_lines_length_additional,
total_lv_lines_length_new)
num_transformers = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
num_transformers_additional,
num_transformers_new)
total_nodes = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
total_nodes_additional,
total_nodes_new)
no_of_hv_lv_substation = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
no_of_hv_lv_substation_additional,
no_of_hv_lv_substation_new)
no_of_hv_mv_substation = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
no_of_hv_mv_substation_additional,
no_of_hv_mv_substation_new)
no_of_mv_mv_substation = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
no_of_mv_mv_substation_additional,
no_of_mv_mv_substation_new)
no_of_mv_lv_substation = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
no_of_mv_lv_substation_additional,
no_of_mv_lv_substation_new)
generation_per_year = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
generation_per_year_additional,
generation_per_year_new)
peak_load = np.where((people != new_connections) & ((prev_code < 2) | (prev_code > 3)),
peak_load_additional,
peak_load_new)
td_investment_cost = (hv_lines_total_length * self.hv_line_cost * (
1 + self.existing_grid_cost_ratio * elec_loop) +
mv_lines_connection_length * self.mv_line_cost * (
1 + self.existing_grid_cost_ratio * elec_loop) +
total_lv_lines_length * self.lv_line_cost +
mv_lines_distribution_length * self.mv_line_cost +
num_transformers * self.service_transf_cost +
total_nodes * self.connection_cost_per_hh +
no_of_hv_lv_substation * self.hv_lv_sub_station_cost +
no_of_hv_mv_substation * self.hv_mv_sub_station_cost +
no_of_mv_mv_substation * self.mv_mv_sub_station_cost +
no_of_mv_lv_substation * self.mv_lv_sub_station_cost) * penalty
return generation_per_year, peak_load, td_investment_cost
[docs]class SettlementProcessor:
"""
Processes the DataFrame and adds all the columns to determine the cheapest option and the final costs and summaries
"""
def __init__(self, path):
try:
self.df = pd.read_csv(path)
except FileNotFoundError:
print("Please make sure that the country name you provided and the .csv file, both have the same name")
raise
try:
self.df[SET_GHI]
except KeyError:
self.df = pd.read_csv(path, sep=';')
try:
self.df[SET_GHI]
except ValueError:
print('Column "GHI" not found, check column names in calibrated csv-file')
raise
@staticmethod
def _diesel_fuel_cost_calculator(diesel_price: float,
diesel_truck_consumption: float,
diesel_truck_volume: float,
traveltime: np.ndarray,
efficiency: float):
"""We apply the Szabo formula to calculate the transport cost for the diesel
Formulae is::
p = (p_d + 2*p_d*consumption*time/volume)*(1/mu)*(1/LHVd)
Arguments
---------
diesel_price: float
diesel_truck_consumption: float
diesel_truck_volume: float
traveltime: numpy.ndarray
efficiency: float
Returns
-------
numpy.ndarray
"""
return (diesel_price + 2 * diesel_price * diesel_truck_consumption *
traveltime / diesel_truck_volume) / LHV_DIESEL / efficiency
[docs] def compute_diesel_cost(self,
dataframe: pd.DataFrame,
sa_diesel_cost: Dict,
mg_diesel_cost: Dict,
year: int):
"""Calculate diesel fuel cost
Arguments
---------
dataframe: pandas.DataFrame
sa_diesel_cost: Dict
mg_diesel_cost: Dict
year: int
Returns
-------
pandas.DataFrame
"""
df = dataframe.copy(deep=True)
travel_time = df[SET_TRAVEL_HOURS].values
df[SET_SA_DIESEL_FUEL + "{}".format(year)] = self._diesel_fuel_cost_calculator(
diesel_price=sa_diesel_cost['diesel_price'],
diesel_truck_volume=sa_diesel_cost['diesel_truck_volume'],
diesel_truck_consumption=sa_diesel_cost['diesel_truck_consumption'],
efficiency=sa_diesel_cost['efficiency'],
traveltime=travel_time)
df[SET_MG_DIESEL_FUEL + "{}".format(year)] = self._diesel_fuel_cost_calculator(
diesel_price=mg_diesel_cost['diesel_price'],
diesel_truck_volume=mg_diesel_cost['diesel_truck_volume'],
diesel_truck_consumption=mg_diesel_cost['diesel_truck_consumption'],
efficiency=mg_diesel_cost['efficiency'],
traveltime=travel_time)
return df.drop(columns=SET_TRAVEL_HOURS)
[docs] def diesel_cost_columns(self, sa_diesel_cost: Dict[str, float],
mg_diesel_cost: Dict[str, float], year: int):
"""Calculate diesel fuel cost based on TravelHours column
Arguments
---------
sa_diesel_cost: Dict
mg_diesel_cost: Dict
year: int
Returns
-------
"""
diesel_cost = self.compute_diesel_cost(self.df[[SET_TRAVEL_HOURS]],
sa_diesel_cost, mg_diesel_cost, year)
self.df = self.df.join(diesel_cost)
[docs] def condition_df(self):
"""
Do any initial data conditioning that may be required.
"""
logging.info('Ensure that columns that are supposed to be numeric are numeric')
columns = [SET_NIGHT_LIGHTS, SET_POP, SET_GRID_CELL_AREA, SET_ELEC_POP, SET_GHI, SET_WINDVEL, SET_TRAVEL_HOURS,
SET_ELEVATION, SET_SLOPE, SET_LAND_COVER, SET_SUBSTATION_DIST, SET_HV_DIST_CURRENT,
SET_HV_DIST_PLANNED, SET_MV_DIST_CURRENT, SET_MV_DIST_PLANNED, SET_ROAD_DIST, SET_X_DEG, SET_Y_DEG,
SET_DIST_TO_TRANS, SET_HYDRO_DIST, SET_HYDRO, SET_HYDRO_FID, SET_URBAN, SET_CAPITA_DEMAND,
SET_AGRI_DEMAND, SET_HEALTH_DEMAND, SET_EDU_DEMAND, SET_COMMERCIAL_DEMAND, SET_ELEC_ORDER,
'ResidentialDemandTierCustom', 'ResidentialDemandTier1', 'ResidentialDemandTier2',
'ResidentialDemandTier3', 'ResidentialDemandTier4', 'ResidentialDemandTier5']
for column in columns:
self.df[column] = pd.to_numeric(self.df[column], errors='coerce')
logging.info('Replace null values with zero')
self.df.fillna(0, inplace=True)
logging.info('Sort by country, Y and X')
self.df.sort_values(by=[SET_Y_DEG, SET_X_DEG], inplace=True)
[docs] @staticmethod
def classify_road_distance(road_distance):
"""Classify the road distance according to bins and labels
Arguments
---------
road_distance : list
"""
# Define bins
road_distance_bins = [0, 5, 10, 25, 50, float("inf")]
# Define classifiers
road_distance_labels = [5, 4, 3, 2, 1]
return pd.cut(road_distance, road_distance_bins, labels=road_distance_labels, include_lowest=True).astype(float)
[docs] @staticmethod
def classify_substation_distance(substation_distance):
"""Classify the substation distance according to bins and labels
Arguments
---------
substation_distance : list
"""
# Define bins
substation_distance_bins = [0, 0.5, 1, 5, 10, float("inf")]
# Define classifiers
substation_distance_labels = [5, 4, 3, 2, 1]
return pd.cut(substation_distance, substation_distance_bins, labels=substation_distance_labels).astype(float)
[docs] @staticmethod
def classify_elevation(elevation):
"""Classify the elevation distance according to bins and labels
Arguments
---------
elevation : list
"""
# Define bins
elevation_bins = [float("-inf"), 500, 1000, 2000, 3000, float("inf")]
# Define classifiers
elevation_labels = [5, 4, 3, 2, 1]
return pd.cut(elevation, elevation_bins, labels=elevation_labels).astype(float)
[docs] @staticmethod
def classify_slope(slope):
"""Classify the slope according to bins and labels
Arguments
---------
slope : list
"""
# Define bins
slope_bins = [0, 10, 20, 30, 40, float("inf")]
# Define classifiers
slope_labels = [5, 4, 3, 2, 1]
return pd.cut(slope, slope_bins, labels=slope_labels, include_lowest=True).astype(float)
[docs] @staticmethod
def classify_land_cover(column):
"""this is a different method employed to classify land cover and create new columns with the classification
Arguments
---------
column : series
Notes
-----
0, 11 = 1
6, 8 = 2
1, 3, 5, 12, 13, 15 = 3
2, 4 = 4
7, 9, 10, 14, 16 = 5
"""
land_cover_labels = [1, 3, 4, 3, 4, 3, 2, 5, 2, 5, 5, 1, 3, 3, 5, 3, 5]
return column.apply(lambda x: land_cover_labels[int(x)])
[docs] def grid_penalties(self, data_frame):
"""this method calculates the grid penalties in each settlement
First step classifies the parameters and creates new columns with classification
Second step adds the grid penalty to increase grid cost in areas that higher road distance, higher substation
distance, unsuitable land cover, high slope angle or high elevation
"""
logging.info('Classify road dist')
road_dist_classified = self.classify_road_distance(data_frame[SET_ROAD_DIST])
logging.info('Classify substation dist')
substation_dist_classified = self.classify_substation_distance(data_frame[SET_SUBSTATION_DIST])
logging.info('Classify elevation')
elevation_classified = self.classify_elevation(data_frame[SET_ELEVATION])
logging.info('Classify slope')
slope_classified = self.classify_slope(data_frame[SET_SLOPE])
logging.info('Classify land cover')
land_cover_classified = self.classify_land_cover(data_frame[SET_LAND_COVER])
logging.info('Combined classification')
combined_classification = (0.15 * road_dist_classified +
0.20 * substation_dist_classified +
0.15 * elevation_classified +
0.30 * slope_classified +
0.20 * land_cover_classified)
logging.info('Grid penalty')
"""this calculates the penalty from the results obtained from the combined classifications"""
classification = combined_classification.astype(float)
c = 1 + (np.exp(.85 * np.abs(1 - classification)) - 1) / 100
return c
[docs] @staticmethod
def calc_wind_cfs(wind_vel):
logging.info('Calculate Wind CF')
mu = 0.97 # availability factor
t = 8760
p_rated = 600
z = 55 # hub height
zr = 80 # velocity measurement height
es = 0.85 # losses in wind electricity
u_arr = range(1, 26)
p_curve = [0, 0, 0, 0, 30, 77, 135, 208, 287, 371, 450, 514, 558,
582, 594, 598, 600, 600, 600, 600, 600, 600, 600, 600, 600]
wind_speed = np.where(wind_vel > 0, wind_vel, 99)
# Adjust for the correct hub height
alpha = (0.37 - 0.088 * np.log(wind_speed)) / (1 - 0.088 * log(zr / 10))
u_z = wind_speed * (z / zr) ** alpha
# Rayleigh distribution and sum of series
rayleigh = [(pi / 2) * (u / u_z ** 2) * np.exp((-pi / 4) * (u / u_z) ** 2) for u in u_arr]
energy_produced = sum([mu * es * t * p * r for p, r in zip(p_curve, rayleigh)])
cf = np.where(wind_vel > 0, energy_produced / (p_rated * t), 0)
return cf
[docs] def prepare_wtf_tier_columns(self, num_people_per_hh_rural, num_people_per_hh_urban,
tier_1, tier_2, tier_3, tier_4, tier_5):
""" Prepares the five Residential Demand Tier Targets based customized for each country
"""
# The MTF approach is given as per yearly household consumption
# (BEYOND CONNECTIONS Energy Access Redefined, ESMAP, 2015).
# Tiers in kWh/capita/year depends on the average ppl/hh which is different in every country
logging.info('Populate ResidentialDemandTier columns')
tier_num = [1, 2, 3, 4, 5]
ppl_hh_average = (num_people_per_hh_urban + num_people_per_hh_rural) / 2
tier_1 = tier_1 / ppl_hh_average # 38.7 refers to kWh/household/year (mean value between Tier 1 and Tier 2)
tier_2 = tier_2 / ppl_hh_average
tier_3 = tier_3 / ppl_hh_average
tier_4 = tier_4 / ppl_hh_average
tier_5 = tier_5 / ppl_hh_average
wb_tiers_all = {1: tier_1, 2: tier_2, 3: tier_3, 4: tier_4, 5: tier_5}
for num in tier_num:
self.df[SET_WTFtier + "{}".format(num)] = wb_tiers_all[num]
[docs] def calibrate_current_pop_and_urban(self, pop_actual, urban_current):
"""
The function calibrates population values and urban/rural split (as estimated from GIS layers) based
on actual values provided by the user for the start year.
"""
logging.info('Population calibration process')
# First, calculate ratio between GIS retrieved and user provided population
pop_ratio = pop_actual / self.df[SET_POP].sum()
# Use above ratio to calibrate the population in a new column
self.df[SET_POP_CALIB] = self.df.apply(lambda row: row[SET_POP] * pop_ratio, axis=1)
pop_modelled = self.df[SET_POP_CALIB].sum()
self.df[SET_ELEC_POP_CALIB] = self.df[SET_ELEC_POP] * pop_ratio
logging.info('Urban/rural calibration process')
# RUN_PARAM: This is where manual calibration of urban/rural population takes place.
# The model uses 0, 1, 2 as follows; 0 = rural, 1 = peri-urban, 2 = urban.
# The calibration build into the model only classifies into urban/rural
self.df.sort_values(by=[SET_POP_CALIB], inplace=True, ascending=False)
cumulative_urban_pop = self.df[SET_POP_CALIB].cumsum()
self.df[SET_URBAN] = np.where(cumulative_urban_pop < (urban_current * self.df[SET_POP_CALIB].sum()), 2, 0)
self.df.sort_index(inplace=True)
# Get the calculated urban ratio and compare to the actual ratio
pop_urb = self.df.loc[self.df[SET_URBAN] > 1, SET_POP_CALIB].sum()
urban_modelled = pop_urb / pop_actual
if abs(urban_modelled - urban_current) > 0.01:
print('The modelled urban ratio is {:.2f}. '
'In case this is not acceptable please revise this part of the code'.format(urban_modelled))
return pop_modelled, urban_modelled
[docs] def project_pop_and_urban(self, pop_future, urban_future, start_year, years_of_analysis):
"""
This function projects population and urban/rural ratio for the different years of the analysis
"""
project_life = years_of_analysis[-1] - start_year
# Project future population, with separate growth rates for urban and rural
logging.info('Population projection process')
start_year_pop = self.df[SET_POP_CALIB].sum()
start_year_urban_ratio = self.df.loc[self.df[SET_URBAN] > 1, SET_POP_CALIB].sum() / start_year_pop
urban_growth = (urban_future * pop_future) / (start_year_urban_ratio * start_year_pop)
rural_growth = ((1 - urban_future) * pop_future) / ((1 - start_year_urban_ratio) * start_year_pop)
yearly_urban_growth_rate = urban_growth ** (1 / project_life)
yearly_rural_growth_rate = rural_growth ** (1 / project_life)
for year in years_of_analysis:
self.df[SET_POP + "{}".format(year)] = \
self.df.apply(lambda row: row[SET_POP_CALIB] * (yearly_urban_growth_rate ** (year - start_year))
if row[SET_URBAN] > 1
else row[SET_POP_CALIB] * (yearly_rural_growth_rate ** (year - start_year)), axis=1)
self.df[SET_POP + "{}".format(start_year)] = self.df.apply(lambda row: row[SET_POP_CALIB], axis=1)
[docs] def calibrate_elec_current(self, elec_actual, elec_actual_urban, elec_actual_rural, start_year, min_night_lights=0,
min_pop=50, max_transformer_dist=2, max_mv_dist=2, max_hv_dist=5, buffer=True):
"""
Calibrate the current electrification status
"""
self.df[SET_ELEC_POP_CALIB] = self.df[SET_ELEC_POP] * (self.df[SET_POP_CALIB].sum() / self.df[SET_POP].sum())
urban_pop = (self.df.loc[self.df[SET_URBAN] > 1, SET_POP_CALIB].sum()) # Total start year urban population
rural_pop = (self.df.loc[self.df[SET_URBAN] <= 1, SET_POP_CALIB].sum()) # Total start year rural population
total_pop = self.df[SET_POP_CALIB].sum() # Total start year population
factor = (total_pop * elec_actual) / (urban_pop * elec_actual_urban + rural_pop * elec_actual_rural)
elec_actual_urban *= factor
elec_actual_rural *= factor
urban_electrified = urban_pop * elec_actual_urban # Total start year electrified urban population
rural_electrified = rural_pop * elec_actual_rural # Total start year electrified rural population
total_electrified = urban_electrified + rural_electrified
# Ensure initially considered electrified population in settlement !> calibrated pop in settlement
self.df.loc[self.df[SET_ELEC_POP] > self.df[SET_POP_CALIB], SET_ELEC_POP] = self.df[SET_POP]
# REVIEW: The way this works now, for all urban or rural settlements that fit the conditioning.
# The population SET_ELEC_POP is reduced by equal amount to match urban/rural national statistics respectively.
# TODO We might need to update with off-grid electrified in future versions
elec_modelled = 0
self.df.loc[self.df[SET_NIGHT_LIGHTS] <= 0, [SET_ELEC_POP_CALIB]] = 0
logging.info('Calibrate current electrification')
self.df[SET_ELEC_CURRENT] = 0 # 0 = unelectrified, 1 = electrified. Initially all settlements set to 0
# This if function here skims through T&D columns in csv to identify which GIS grid distance information exists;
# Then it defines calibration method accordingly.
if max(self.df[SET_DIST_TO_TRANS]) < 9999:
self.df[SET_CALIB_GRID_DIST] = self.df[SET_DIST_TO_TRANS]
dist_limit = max_transformer_dist
grid_data = 'service transformers'
elif max(self.df[SET_MV_DIST_CURRENT]) < 9999:
self.df[SET_CALIB_GRID_DIST] = self.df[SET_MV_DIST_CURRENT]
dist_limit = max_mv_dist
grid_data = 'MV lines'
else:
self.df[SET_CALIB_GRID_DIST] = self.df[SET_HV_DIST_CURRENT]
dist_limit = max_hv_dist
grid_data = 'HV lines'
print('We have identified the existence of {} as input data; therefore we proceed using '
'those for the calibration'.format(grid_data))
# Update which settlements are considered to be electrified according to the conditions
self.df.loc[(self.df[SET_CALIB_GRID_DIST] < dist_limit) & (self.df[SET_NIGHT_LIGHTS] > min_night_lights) &
(self.df[SET_POP_CALIB] > min_pop), SET_ELEC_CURRENT] = 1
max_rural_ntl_elec = self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) &
(self.df[SET_URBAN] < 2), SET_POP_CALIB].sum()
max_urban_ntl_elec = self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) &
(self.df[SET_URBAN] == 2), SET_POP_CALIB].sum()
max_total_ntl_elec = max_rural_ntl_elec + max_urban_ntl_elec
if max_total_ntl_elec < elec_actual:
if buffer:
print('Not enough people meet the criteria. '
'Proceeding with calibration using buffer around existing grid infrastructure.')
else:
print('Not enough people meet the criteria. Buffer option is not chosen, '
'therefore calibrated elec pop is smaller than actual elec pop.')
print('If this is not okay, consider revising the starting conditions')
self.df.loc[self.df[SET_ELEC_CURRENT] == 1, SET_ELEC_POP_CALIB] = self.df[SET_POP_CALIB]
elif (max_urban_ntl_elec < urban_electrified) or (max_rural_ntl_elec < rural_electrified):
print('Not enough urban or rural population meet the criteria. '
'Calibrating to match total electrified population only')
total_elec_modelled = self.df.loc[self.df[SET_ELEC_CURRENT] == 1, SET_ELEC_POP_CALIB].sum()
total_elec_factor = total_elec_modelled / (urban_electrified + rural_electrified)
if total_elec_factor > 1:
self.df.loc[(self.df[SET_ELEC_CURRENT] == 1), SET_ELEC_POP_CALIB] *= (1 / total_elec_factor)
else:
i = 0
while total_elec_factor <= 1:
if i < 10:
self.df.loc[
(self.df[SET_ELEC_CURRENT] == 1), SET_ELEC_POP_CALIB] *= 1.1
self.df[SET_ELEC_POP_CALIB] = np.minimum(self.df[SET_ELEC_POP_CALIB], self.df[SET_POP_CALIB])
total_elec_modelled = self.df.loc[self.df[SET_ELEC_CURRENT] == 1, SET_ELEC_POP_CALIB].sum()
total_elec_factor = total_elec_modelled / (urban_electrified + rural_electrified)
i += 1
else:
break
else:
print('Calibrating to match both urban and rural electrified population')
urban_elec_modelled = self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] == 2),
SET_ELEC_POP_CALIB].sum()
rural_elec_modelled = self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] < 2),
SET_ELEC_POP_CALIB].sum()
urban_elec_factor = urban_elec_modelled / urban_electrified
rural_elec_factor = rural_elec_modelled / rural_electrified
if urban_elec_factor > 1:
self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] == 2), SET_ELEC_POP_CALIB] *= (
1 / urban_elec_factor)
else:
i = 0
while urban_elec_factor <= 1:
if i < 10:
self.df.loc[
(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] == 2), SET_ELEC_POP_CALIB] *= 1.1
self.df[SET_ELEC_POP_CALIB] = np.minimum(self.df[SET_ELEC_POP_CALIB], self.df[SET_POP_CALIB])
urban_elec_modelled = self.df.loc[
(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] == 2), SET_ELEC_POP_CALIB].sum()
urban_elec_factor = urban_elec_modelled / urban_electrified
i += 1
else:
break
if rural_elec_factor > 1:
self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] < 2), SET_ELEC_POP_CALIB] *= (
1 / rural_elec_factor)
else:
i = 0
while rural_elec_factor <= 1:
if i < 10:
self.df.loc[
(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] < 2), SET_ELEC_POP_CALIB] *= 1.1
self.df[SET_ELEC_POP_CALIB] = np.minimum(self.df[SET_ELEC_POP_CALIB],
self.df[SET_POP_CALIB])
rural_elec_modelled = self.df.loc[
(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] < 2), SET_ELEC_POP_CALIB].sum()
rural_elec_factor = rural_elec_modelled / rural_electrified
i += 1
else:
break
pop_elec = self.df.loc[self.df[SET_ELEC_CURRENT] == 1, SET_ELEC_POP_CALIB].sum()
elec_modelled = pop_elec / total_pop
buffer_used = 'No'
td_dist_2 = 0
if buffer:
# If not enough people live where there is NTL, and buffer option set to True,
# electrify people around buffer from existing grid
i = 0
while abs(elec_actual - elec_modelled) > 0.01:
buffer_used = 'Yes'
pop_elec_2 = self.df.loc[(self.df[SET_ELEC_CURRENT] == 0) & (self.df[SET_POP_CALIB] > min_pop) &
(self.df[SET_CALIB_GRID_DIST] < td_dist_2), SET_POP_CALIB].sum()
if i < 50:
if (pop_elec + pop_elec_2) / total_pop > elec_actual:
elec_modelled = (pop_elec + pop_elec_2) / total_pop
self.df.loc[(self.df[SET_ELEC_CURRENT] == 0) & (self.df[SET_POP_CALIB] > min_pop) &
(self.df[SET_CALIB_GRID_DIST] < td_dist_2), SET_ELEC_POP_CALIB] = self.df[
SET_POP_CALIB]
self.df.loc[(self.df[SET_ELEC_CURRENT] == 0) & (self.df[SET_POP_CALIB] > min_pop) &
(self.df[SET_CALIB_GRID_DIST] < td_dist_2), SET_ELEC_CURRENT] = 1
else:
i += 1
td_dist_2 += 0.1
else:
self.df.loc[(self.df[SET_ELEC_CURRENT] == 0) & (self.df[SET_POP_CALIB] > min_pop) &
(self.df[SET_CALIB_GRID_DIST] < td_dist_2), SET_ELEC_POP_CALIB] = self.df[
SET_POP_CALIB]
self.df.loc[(self.df[SET_ELEC_CURRENT] == 0) & (self.df[SET_POP_CALIB] > min_pop) &
(self.df[SET_CALIB_GRID_DIST] < td_dist_2), SET_ELEC_CURRENT] = 1
elec_modelled = (pop_elec + pop_elec_2) / total_pop
break
if elec_modelled > elec_actual:
self.df[SET_ELEC_POP_CALIB] *= elec_actual / elec_modelled
pop_elec = self.df.loc[self.df[SET_ELEC_CURRENT] == 1, SET_ELEC_POP_CALIB].sum()
elec_modelled = pop_elec / total_pop
urban_elec_modelled = self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] == 2),
SET_ELEC_POP_CALIB].sum()
rural_elec_modelled = self.df.loc[(self.df[SET_ELEC_CURRENT] == 1) & (self.df[SET_URBAN] < 2),
SET_ELEC_POP_CALIB].sum()
print('The modelled electrification rate differ by {0:.2f}. '
'Urban elec. rate differ by {1:.2f} and Rural elec. rate differ by {2:.2f}. \n'
'If this is not acceptable please '
'revise this part of the algorithm'.format(elec_modelled - elec_actual,
(urban_elec_modelled / urban_pop - elec_actual_urban),
(rural_elec_modelled / rural_pop - elec_actual_rural)))
self.df[SET_ELEC_FINAL_CODE + "{}".format(start_year)] = 99
self.df.loc[self.df[SET_ELEC_CURRENT] == 1, SET_ELEC_FINAL_CODE + "{}".format(start_year)] = 1
return elec_modelled, rural_elec_modelled / rural_pop, urban_elec_modelled / urban_pop, grid_data, dist_limit, \
min_night_lights, min_pop, buffer_used, td_dist_2
[docs] def current_mv_line_dist(self):
logging.info('Determine current MV line length')
self.df[SET_MV_CONNECT_DIST] = 0
self.df.loc[self.df[SET_ELEC_CURRENT] == 1, SET_MV_CONNECT_DIST] = self.df[SET_HV_DIST_CURRENT]
self.df[SET_MIN_TD_DIST] = self.df[[SET_MV_DIST_PLANNED, SET_HV_DIST_PLANNED]].min(axis=1)
[docs] def pre_electrification(self, grid_price, year, time_step, end_year, grid_calc, grid_capacity_limit,
grid_connect_limit):
"""" ... """
logging.info('Define the initial electrification status')
grid_investment = np.zeros(len(self.df[SET_X_DEG]))
grid_capacity = np.zeros(len(self.df[SET_X_DEG]))
prev_code = self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)].copy(deep=True)
# Grid-electrified settlements
electrified_loce, electrified_investment, electrified_capacity = self.get_grid_lcoe(0, 0, 0, year, time_step,
end_year, grid_calc)
electrified_investment = electrified_investment[0]
electrified_capacity = electrified_capacity[0]
grid_investment = np.where(self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] == 1,
electrified_investment, grid_investment)
grid_capacity = np.where(self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] == 1,
electrified_capacity, grid_capacity)
self.df[SET_LCOE_GRID + "{}".format(year)] = 99
self.df.loc[self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] == 1,
SET_LCOE_GRID + "{}".format(year)] = grid_price
# Two restrictions may be imposed on the grid. The new grid generation capacity that can be added and the
# number of new households that can be connected. The next step calculates how much of that will be used up due
# to demand (population) growth in already electrified settlements
grid_capacity_limit -= grid_capacity.sum()
self.df['Densification_connections'] = self.df[SET_NEW_CONNECTIONS + "{}".format(year)] / self.df[
SET_NUM_PEOPLE_PER_HH]
grid_connect_limit -= sum(self.df.loc[prev_code == 1]['Densification_connections'])
del self.df['Densification_connections']
return pd.Series(grid_investment), pd.Series(grid_capacity), grid_capacity_limit, grid_connect_limit
[docs] def elec_extension(self, grid_calc, max_dist, year, start_year, end_year, time_step, grid_capacity_limit,
grid_connect_limit, new_investment, new_capacity, auto_intensification=0, prioritization=0):
"""
Iterate through all electrified settlements and find which settlements can be economically connected to the grid
Repeat with newly electrified settlements until no more are added
"""
prio = int(prioritization)
prev_code = self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)].copy(deep=True)
if year - time_step == start_year:
elecorder = self.df[SET_ELEC_ORDER].copy(deep=True)
else:
elecorder = self.df[SET_ELEC_ORDER + "{}".format(year - time_step)].copy(deep=True)
grid_penalty_ratio = self.df[SET_GRID_PENALTY].copy(deep=True)
min_code_lcoes = self.df[SET_MIN_OFFGRID_LCOE + "{}".format(year)].copy(deep=True)
new_lcoes = self.df[SET_LCOE_GRID + "{}".format(year)].copy(deep=True)
cell_path_real = self.df[SET_MV_CONNECT_DIST].copy(deep=True)
cell_path_adjusted = list(np.zeros(len(prev_code)).tolist())
mv_planned = self.df[SET_MV_DIST_PLANNED].copy(deep=True)
# Start by identifying which settlements are grid-connected already
electrified = np.where(prev_code == 1, 1, 0)
# The grid may be forced to expand around existing MV lines if this option has been selected, regardless
# off-grid alternatives are less costly. The following section implements that
if (prio == 2) or (prio == 4):
mv_dist_adjusted = np.nan_to_num(grid_penalty_ratio * mv_planned)
intensification_lcoe, intensification_investment, intensification_capacity = \
self.get_grid_lcoe(dist_adjusted=mv_dist_adjusted, elecorder=0, additional_transformer=0, year=year,
time_step=time_step, end_year=end_year, grid_calc=grid_calc)
intensification_lcoe = new_lcoes.copy(deep=True)
intensification_lcoe.loc[(mv_planned < auto_intensification) & (prev_code != 1)] = 0.01
intensification_lcoe = pd.DataFrame(intensification_lcoe)
intensification_lcoe.columns = [0]
grid_capacity_limit, grid_connect_limit, cell_path_real, cell_path_adjusted, elecorder, electrified, \
new_lcoes, new_investment, new_capacity \
= self.update_grid_extension_info(grid_lcoe=intensification_lcoe, dist=mv_planned,
dist_adjusted=mv_dist_adjusted, prev_dist=0, elecorder=elecorder,
new_elec_order=1, max_dist=max_dist, new_lcoes=new_lcoes,
grid_capacity_limit=grid_capacity_limit,
grid_connect_limit=grid_connect_limit, cell_path_real=cell_path_real,
cell_path_adjusted=cell_path_adjusted, electrified=electrified,
year=year, grid_calc=grid_calc,
grid_investment=intensification_investment,
new_investment=new_investment,
grid_capacity=intensification_capacity,
new_capacity=new_capacity)
# Find the unelectrified settlements where grid can be less costly than off-grid
filter_lcoe, filter_investment, filter_capacity = self.get_grid_lcoe(0, 0, 0, year, time_step, end_year,
grid_calc)
filter_lcoe = filter_lcoe[0]
filter_lcoe.loc[electrified == 1] = 99
unelectrified = np.where(filter_lcoe < min_code_lcoes)
unelectrified = unelectrified[0].tolist()
logging.info('Initially {} electrified'.format(sum(electrified)))
# First round of extension from MV network
mv_dist = pd.Series(self.df[SET_MV_DIST_PLANNED])
mv_dist_adjusted = np.nan_to_num(grid_penalty_ratio * mv_dist)
grid_lcoe, grid_investment, grid_capacity = self.get_grid_lcoe(dist_adjusted=mv_dist_adjusted, elecorder=0,
additional_transformer=0, year=year,
time_step=time_step,
end_year=end_year, grid_calc=grid_calc)
grid_capacity_limit, grid_connect_limit, cell_path_real, cell_path_adjusted, elecorder, electrified, \
new_lcoes, new_investment, new_capacity \
= self.update_grid_extension_info(grid_lcoe=grid_lcoe, dist=mv_dist, dist_adjusted=mv_dist_adjusted,
prev_dist=0, elecorder=elecorder, new_elec_order=1, max_dist=max_dist,
new_lcoes=new_lcoes, grid_capacity_limit=grid_capacity_limit,
grid_connect_limit=grid_connect_limit, cell_path_real=cell_path_real,
cell_path_adjusted=cell_path_adjusted, electrified=electrified, year=year,
grid_calc=grid_calc, grid_investment=grid_investment,
new_investment=new_investment, grid_capacity=grid_capacity,
new_capacity=new_capacity)
# Second round of extension from HV lines
hv_dist = np.nan_to_num(self.df[SET_HV_DIST_PLANNED])
hv_dist_adjusted = np.nan_to_num(hv_dist * grid_penalty_ratio)
grid_lcoe, grid_investment, grid_capacity = self.get_grid_lcoe(dist_adjusted=hv_dist_adjusted, elecorder=0,
additional_transformer=1, year=year,
time_step=time_step,
end_year=end_year, grid_calc=grid_calc)
grid_capacity_limit, grid_connect_limit, cell_path_real, cell_path_adjusted, elecorder, electrified, \
new_lcoes, new_investment, new_capacity \
= self.update_grid_extension_info(grid_lcoe=grid_lcoe, dist=hv_dist, dist_adjusted=hv_dist_adjusted,
prev_dist=0, elecorder=elecorder, new_elec_order=1, max_dist=999999,
new_lcoes=new_lcoes, grid_capacity_limit=grid_capacity_limit,
grid_connect_limit=grid_connect_limit, cell_path_real=cell_path_real,
cell_path_adjusted=cell_path_adjusted, electrified=electrified,
year=year, grid_calc=grid_calc, grid_investment=grid_investment,
new_investment=new_investment, grid_capacity=grid_capacity,
new_capacity=new_capacity)
# Third to last round of extension loops from electrified settlements. First considering all
# electrified settlements up until this point, then from the newly electrified settlements in each round
prev_electrified = np.zeros(len(prev_code))
loops = 1
while sum(electrified) > sum(prev_electrified):
new_electrified = electrified - prev_electrified
prev_electrified = electrified
logging.info('Electrification loop {} with {} electrified'.format(loops, int(sum(new_electrified))))
loops += 1
if sum(new_electrified) > 1:
# Calculating the distance and adjusted distance from each unelectrified settelement to the closest
# electrified settlement, as well as the electrification order an total MV distance to that electrified
# settlement
nearest_dist_adjusted, nearest_elec_order, prev_dist, nearest_dist = \
self.closest_electrified_settlement(new_electrified, unelectrified, cell_path_real,
grid_penalty_ratio, elecorder)
grid_lcoe, grid_investment, grid_capacity = self.get_grid_lcoe(dist_adjusted=nearest_dist_adjusted,
elecorder=nearest_elec_order,
additional_transformer=0, year=year,
time_step=time_step,
end_year=end_year, grid_calc=grid_calc)
grid_capacity_limit, grid_connect_limit, cell_path_real, cell_path_adjusted, elecorder, electrified, \
new_lcoes, new_investment, new_capacity = \
self.update_grid_extension_info(grid_lcoe=grid_lcoe, dist=nearest_dist,
dist_adjusted=nearest_dist_adjusted,
prev_dist=prev_dist, elecorder=elecorder,
new_elec_order=nearest_elec_order, max_dist=max_dist,
new_lcoes=new_lcoes, grid_capacity_limit=grid_capacity_limit,
grid_connect_limit=grid_connect_limit,
cell_path_real=cell_path_real,
cell_path_adjusted=cell_path_adjusted, electrified=electrified,
year=year, grid_calc=grid_calc, grid_investment=grid_investment,
new_investment=new_investment, grid_capacity=grid_capacity,
new_capacity=new_capacity)
return new_lcoes, cell_path_adjusted, elecorder, cell_path_real, pd.DataFrame(new_investment), pd.DataFrame(
new_capacity)
[docs] def get_grid_lcoe(self, dist_adjusted, elecorder, additional_transformer, year, time_step, end_year, grid_calc):
grid_lcoe, grid_investment, grid_capacity = \
grid_calc.get_lcoe(energy_per_cell=self.df[SET_ENERGY_PER_CELL + "{}".format(year)],
start_year=year - time_step,
end_year=end_year,
people=self.df[SET_POP + "{}".format(year)],
new_connections=self.df[SET_NEW_CONNECTIONS + "{}".format(year)],
total_energy_per_cell=self.df[SET_TOTAL_ENERGY_PER_CELL],
prev_code=self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)],
num_people_per_hh=self.df[SET_NUM_PEOPLE_PER_HH],
grid_cell_area=self.df[SET_GRID_CELL_AREA],
additional_mv_line_length=dist_adjusted,
elec_loop=elecorder,
additional_transformer=additional_transformer,
capacity_factor=grid_calc.capacity_factor)
return grid_lcoe, grid_investment, grid_capacity
[docs] def closest_electrified_settlement(self, new_electrified, unelectrified, cell_path_real, grid_penalty_ratio,
elecorder):
x = self.df[SET_X_DEG].copy(deep=True)
y = self.df[SET_Y_DEG].copy(deep=True)
# Identifies the electrified settlements from which to extend the network
extension_nodes = np.where(new_electrified == 1)
extension_nodes = extension_nodes[0].tolist()
# Creates an array of x, y coordinates of the electrified settlements
elec_nodes2 = np.transpose(np.array([x.loc[extension_nodes], y.loc[extension_nodes]]))
closest_elec_node = np.zeros(len(x), dtype=int)
nearest_dist = np.zeros(len(x))
nearest_elec_order = np.zeros(len(x), dtype=int)
prev_dist = np.zeros(len(x))
# Create an array of x-, y-coordinates of the (filtered) unelectrified settlements
unelectrified = np.setdiff1d(unelectrified, extension_nodes).tolist()
x_unelec = x.loc[unelectrified]
y_unelec = y.loc[unelectrified]
unelec_nodes = pd.DataFrame(np.transpose(np.array([x_unelec, y_unelec])))
# Calculate for each unelectrified settlement which is the closest electrified settlement
closest_node = self.do_kdtree(elec_nodes2, unelec_nodes)
# For each unelectrified settlement, find the electrifiecation order, distance to closest electrified
# settlement, distance including grid penalty, and total mv length up until the electrrfied settlement
j = 0
for unelec in unelectrified:
closest_elec_node[unelec] = closest_node[j]
j += 1
extension_nodes = np.array(extension_nodes)
x_closest_elec = x.loc[extension_nodes[closest_elec_node[unelectrified]]]
x_closest_elec.index = x_unelec.index
y_closest_elec = y.loc[extension_nodes[closest_elec_node[unelectrified]]]
y_closest_elec.index = y_unelec.index
dist = self.haversine_vector(x_closest_elec, y_closest_elec, x_unelec, y_unelec)
nearest_dist[unelectrified] = dist[unelectrified]
nearest_dist_adjusted = np.nan_to_num(nearest_dist * grid_penalty_ratio)
nearest_elec_order[unelectrified] = elecorder[extension_nodes[
closest_elec_node[unelectrified]]] + 1
prev_dist[unelectrified] = cell_path_real[
extension_nodes[closest_elec_node[unelectrified]]]
return nearest_dist_adjusted, nearest_elec_order, prev_dist, nearest_dist
[docs] def update_grid_extension_info(self, grid_lcoe, dist, dist_adjusted, prev_dist, elecorder, new_elec_order,
max_dist, new_lcoes, grid_capacity_limit, grid_connect_limit, cell_path_real,
cell_path_adjusted, electrified, year, grid_calc, grid_investment, new_investment,
grid_capacity, new_capacity):
min_code_lcoes = self.df[SET_MIN_OFFGRID_LCOE + "{}".format(year)].copy(deep=True)
grid_lcoe = grid_lcoe[0]
grid_investment = grid_investment[0]
grid_capacity = grid_capacity[0]
grid_lcoe.loc[electrified == 1] = 99
grid_lcoe.loc[prev_dist + dist_adjusted > max_dist] = 99
grid_lcoe.loc[grid_lcoe > new_lcoes] = 99
consumption = self.df[SET_ENERGY_PER_CELL + "{}".format(year)] # kWh/year
average_load = consumption / (1 - grid_calc.distribution_losses) / HOURS_PER_YEAR # kW
peak_load = average_load / grid_calc.base_to_peak_load_ratio # kW
peak_load.loc[grid_lcoe >= min_code_lcoes] = 0
peak_load_cum_sum = np.cumsum(peak_load)
grid_lcoe.loc[peak_load_cum_sum > grid_capacity_limit] = 99
new_grid_connections = self.df[SET_NEW_CONNECTIONS + "{}".format(year)] / self.df[SET_NUM_PEOPLE_PER_HH]
new_grid_connections.loc[grid_lcoe >= min_code_lcoes] = 0
new_grid_connections_cum_sum = np.cumsum(new_grid_connections)
grid_lcoe.loc[new_grid_connections_cum_sum > grid_connect_limit] = 99
# Update limiting values
grid_capacity_limit -= peak_load.loc[grid_lcoe < min_code_lcoes].sum()
grid_connect_limit -= new_grid_connections.loc[grid_lcoe < min_code_lcoes].sum()
# Update values for settlements that meet conditions
cell_path_real = np.where(grid_lcoe < min_code_lcoes, prev_dist + dist, cell_path_real)
cell_path_adjusted = np.where(grid_lcoe < min_code_lcoes, dist_adjusted, cell_path_adjusted)
elecorder = np.where(grid_lcoe < min_code_lcoes, new_elec_order, elecorder)
electrified = np.where(grid_lcoe < min_code_lcoes, 1, electrified)
new_lcoes = np.where(grid_lcoe < min_code_lcoes, grid_lcoe, new_lcoes)
new_investment = np.where(grid_lcoe < min_code_lcoes, grid_investment, new_investment)
new_capacity = np.where(grid_lcoe < min_code_lcoes, grid_capacity, new_capacity)
return grid_capacity_limit, grid_connect_limit, cell_path_real, cell_path_adjusted, elecorder, \
electrified, new_lcoes, new_investment, new_capacity
[docs] @staticmethod
def haversine_vector(lon1, lat1, lon2, lat2):
"""Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(np.deg2rad, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
c = 2 * np.arcsin(np.sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles
return c * r
[docs] @staticmethod
def do_kdtree(combined_x_y_arrays, points):
mytree = scipy.spatial.cKDTree(combined_x_y_arrays)
dist, indexes = mytree.query(points)
return indexes
[docs] def calculate_new_connections(self, year, time_step, start_year):
"""this method defines new connections for grid related purposes
Arguments
---------
year : int
time_step : int
start_year : int
"""
logging.info('Calculate new connections')
# Calculate new connections for grid related purposes
# TODO - This was changed based on your "newly created" column SET_ELEC_POP.
# Please review and check whether this creates any problem at your distribution_network function
# using people/new connections and energy_per_settlement/total_energy_per_settlement
if year - time_step == start_year:
# Assign new connections to those that are already electrified to a certain percent
self.df.loc[(self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] < 99),
SET_NEW_CONNECTIONS + "{}".format(year)] = \
(self.df[SET_POP + "{}".format(year)] - self.df[SET_ELEC_POP_CALIB])
# Assign new connections to those that are not currently electrified
self.df.loc[self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] == 99,
SET_NEW_CONNECTIONS + "{}".format(year)] = self.df[SET_POP + "{}".format(year)]
# Some conditioning to eliminate negative values if existing by mistake
self.df.loc[self.df[SET_NEW_CONNECTIONS + "{}".format(year)] < 0,
SET_NEW_CONNECTIONS + "{}".format(year)] = 0
else:
# Assign new connections to settlements that are already electrified
self.df.loc[self.df[SET_LIMIT + "{}".format(year - time_step)] == 1,
SET_NEW_CONNECTIONS + "{}".format(year)] = \
(self.df[SET_POP + "{}".format(year)] - self.df[SET_POP + "{}".format(year - time_step)])
# Assign new connections to settlements that were initially electrified,
# but not prioritized during the time_step
self.df.loc[(self.df[SET_LIMIT + "{}".format(year - time_step)] == 0) &
(self.df[SET_ELEC_CURRENT] == 1),
SET_NEW_CONNECTIONS + "{}".format(year)] = self.df[SET_POP + "{}".format(year)] - self.df[
SET_ELEC_POP_CALIB]
# Assing new connections to settlements that have not been electrified
self.df.loc[(self.df[SET_LIMIT + "{}".format(year - time_step)] == 0) & (
self.df[SET_ELEC_CURRENT] == 0),
SET_NEW_CONNECTIONS + "{}".format(year)] = self.df[SET_POP + "{}".format(year)]
# Some conditioning to eliminate negative values if existing by mistake
self.df.loc[
self.df[SET_NEW_CONNECTIONS + "{}".format(year)] < 0, SET_NEW_CONNECTIONS + "{}".format(year)] = 0
[docs] def set_residential_demand(self, rural_tier, urban_tier, num_people_per_hh_rural,
num_people_per_hh_urban, productive_demand):
"""this method defines residential demand per tier level for each target year
Arguments
---------
rural_tier : int
urban_tier : int
num_people_per_hh_rural : float
num_people_per_hh_urban : float
productive_demand : int
"""
logging.info('Setting electrification demand as per target per year')
if max(self.df[SET_CAPITA_DEMAND]) == 0:
# RUN_PARAM: This shall be changed if different urban/rural categorization is decided
wb_tier_rural = int(rural_tier)
wb_tier_urban_clusters = int(rural_tier)
wb_tier_urban_centers = int(urban_tier)
if wb_tier_urban_centers == 6:
wb_tier_urban_centers = 'Custom'
if wb_tier_urban_clusters == 6:
wb_tier_urban_clusters = 'Custom'
if wb_tier_rural == 6:
wb_tier_rural = 'Custom'
self.df[SET_CAPITA_DEMAND] = 0
# RUN_PARAM: This shall be changed if different urban/rural categorization is decided
# Create new columns assigning number of people per household as per Urban/Rural type
self.df.loc[self.df[SET_URBAN] == 0, SET_NUM_PEOPLE_PER_HH] = num_people_per_hh_rural
self.df.loc[self.df[SET_URBAN] == 1, SET_NUM_PEOPLE_PER_HH] = num_people_per_hh_rural
self.df.loc[self.df[SET_URBAN] == 2, SET_NUM_PEOPLE_PER_HH] = num_people_per_hh_urban
# Define per capita residential demand
self.df.loc[self.df[SET_URBAN] == 0, SET_CAPITA_DEMAND] = self.df[
SET_RESIDENTIAL_TIER + str(wb_tier_rural)]
self.df.loc[self.df[SET_URBAN] == 1, SET_CAPITA_DEMAND] = self.df[
SET_RESIDENTIAL_TIER + str(wb_tier_urban_clusters)]
self.df.loc[self.df[SET_URBAN] == 2, SET_CAPITA_DEMAND] = self.df[
SET_RESIDENTIAL_TIER + str(wb_tier_urban_centers)]
# TODO: REVIEW, added Tier column
tier_1 = 38.7 # 38.7 refers to kWh/household/year. It is the mean value between Tier 1 and Tier 2
tier_2 = 219
tier_3 = 803
tier_4 = 2117
self.df[SET_TIER] = 5
self.df.loc[self.df[SET_CAPITA_DEMAND] * self.df[SET_NUM_PEOPLE_PER_HH] < tier_4, SET_TIER] = 4
self.df.loc[self.df[SET_CAPITA_DEMAND] * self.df[SET_NUM_PEOPLE_PER_HH] < tier_3, SET_TIER] = 3
self.df.loc[self.df[SET_CAPITA_DEMAND] * self.df[SET_NUM_PEOPLE_PER_HH] < tier_2, SET_TIER] = 2
self.df.loc[self.df[SET_CAPITA_DEMAND] * self.df[SET_NUM_PEOPLE_PER_HH] < tier_1, SET_TIER] = 1
# Add commercial demand
# agri = True if 'y' in input('Include agricultural demand? <y/n> ') else False
# if agri:
if int(productive_demand) == 1:
self.df[SET_CAPITA_DEMAND] += self.df[SET_AGRI_DEMAND]
# commercial = True if 'y' in input('Include commercial demand? <y/n> ') else False
# if commercial:
if int(productive_demand) == 1:
self.df[SET_CAPITA_DEMAND] += self.df[SET_COMMERCIAL_DEMAND]
# health = True if 'y' in input('Include health demand? <y/n> ') else False
# if health:
if int(productive_demand) == 1:
self.df[SET_CAPITA_DEMAND] += self.df[SET_HEALTH_DEMAND]
# edu = True if 'y' in input('Include educational demand? <y/n> ') else False
# if edu:
if int(productive_demand) == 1:
self.df[SET_CAPITA_DEMAND] += self.df[SET_EDU_DEMAND]
[docs] def calculate_total_demand_per_settlement(self, year):
"""this method calculates total demand for each settlement per year
Arguments
---------
year : int
"""
self.df.loc[self.df[SET_URBAN] == 0, SET_ENERGY_PER_CELL + "{}".format(year)] = \
self.df[SET_CAPITA_DEMAND] * self.df[SET_NEW_CONNECTIONS + "{}".format(year)]
self.df.loc[self.df[SET_URBAN] == 1, SET_ENERGY_PER_CELL + "{}".format(year)] = \
self.df[SET_CAPITA_DEMAND] * self.df[SET_NEW_CONNECTIONS + "{}".format(year)]
self.df.loc[self.df[SET_URBAN] == 2, SET_ENERGY_PER_CELL + "{}".format(year)] = \
self.df[SET_CAPITA_DEMAND] * self.df[SET_NEW_CONNECTIONS + "{}".format(year)]
# if year - time_step == start_year:
self.df.loc[self.df[SET_URBAN] == 0, SET_TOTAL_ENERGY_PER_CELL] = \
self.df[SET_CAPITA_DEMAND] * self.df[SET_POP + "{}".format(year)]
self.df.loc[self.df[SET_URBAN] == 1, SET_TOTAL_ENERGY_PER_CELL] = \
self.df[SET_CAPITA_DEMAND] * self.df[SET_POP + "{}".format(year)]
self.df.loc[self.df[SET_URBAN] == 2, SET_TOTAL_ENERGY_PER_CELL] = \
self.df[SET_CAPITA_DEMAND] * self.df[SET_POP + "{}".format(year)]
[docs] def set_scenario_variables(self, year, num_people_per_hh_rural, num_people_per_hh_urban, time_step, start_year,
urban_tier, rural_tier, end_year_pop, productive_demand):
"""
this method determines some basic parameters required in LCOE calculation
it sets the basic scenario parameters that differ based on urban/rural so that they are in the table and
can be read directly to calculate LCOEs
Arguments
---------
year : int
num_people_per_hh_rural : float
num_people_per_hh_urban : float
time_step : int
start_year: int
urban_tier : int
rural_tier : int
end_year_pop : int
productive_demand : int
"""
self.calculate_new_connections(year, time_step, start_year)
self.set_residential_demand(rural_tier, urban_tier, num_people_per_hh_rural, num_people_per_hh_urban,
productive_demand)
self.calculate_total_demand_per_settlement(year)
[docs] def calculate_off_grid_lcoes(self, mg_hydro_calc, mg_wind_calc, mg_pv_calc, sa_pv_calc, mg_diesel_calc,
sa_diesel_calc, year, end_year, time_step, techs, tech_codes, diesel_techs=0):
"""
Calculate the LCOEs for all off-grid technologies
"""
logging.info('Calculate minigrid hydro LCOE')
self.df[SET_LCOE_MG_HYDRO + "{}".format(year)], mg_hydro_investment, mg_hydro_capacity = \
mg_hydro_calc.get_lcoe(energy_per_cell=self.df[SET_ENERGY_PER_CELL + "{}".format(year)],
start_year=year - time_step,
end_year=end_year,
people=self.df[SET_POP + "{}".format(year)],
new_connections=self.df[SET_NEW_CONNECTIONS + "{}".format(year)],
total_energy_per_cell=self.df[SET_TOTAL_ENERGY_PER_CELL],
prev_code=self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)],
num_people_per_hh=self.df[SET_NUM_PEOPLE_PER_HH],
grid_cell_area=self.df[SET_GRID_CELL_AREA],
additional_mv_line_length=self.df[SET_HYDRO_DIST],
capacity_factor=mg_hydro_calc.capacity_factor)
logging.info('Calculate minigrid PV LCOE')
self.df[SET_LCOE_MG_PV + "{}".format(year)], mg_pv_investment, mg_pv_capacity = \
mg_pv_calc.get_lcoe(energy_per_cell=self.df[SET_ENERGY_PER_CELL + "{}".format(year)],
start_year=year - time_step,
end_year=end_year,
people=self.df[SET_POP + "{}".format(year)],
new_connections=self.df[SET_NEW_CONNECTIONS + "{}".format(year)],
total_energy_per_cell=self.df[SET_TOTAL_ENERGY_PER_CELL],
prev_code=self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)],
num_people_per_hh=self.df[SET_NUM_PEOPLE_PER_HH],
grid_cell_area=self.df[SET_GRID_CELL_AREA],
capacity_factor=self.df[SET_GHI] / HOURS_PER_YEAR)
logging.info('Calculate minigrid wind LCOE')
self.df[SET_LCOE_MG_WIND + "{}".format(year)], mg_wind_investment, mg_wind_capacity = \
mg_wind_calc.get_lcoe(energy_per_cell=self.df[SET_ENERGY_PER_CELL + "{}".format(year)],
start_year=year - time_step,
end_year=end_year,
people=self.df[SET_POP + "{}".format(year)],
new_connections=self.df[SET_NEW_CONNECTIONS + "{}".format(year)],
total_energy_per_cell=self.df[SET_TOTAL_ENERGY_PER_CELL],
prev_code=self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)],
num_people_per_hh=self.df[SET_NUM_PEOPLE_PER_HH],
grid_cell_area=self.df[SET_GRID_CELL_AREA],
capacity_factor=self.df[SET_WINDCF])
if diesel_techs == 0:
self.df[SET_LCOE_MG_DIESEL + "{}".format(year)] = 99
self.df[SET_LCOE_SA_DIESEL + "{}".format(year)] = 99
sa_diesel_investment = mg_pv_investment * 0
mg_diesel_investment = mg_pv_investment * 0
sa_diesel_capacity = mg_pv_capacity * 0
mg_diesel_capacity = mg_pv_capacity * 0
else:
logging.info('Calculate minigrid diesel LCOE')
self.df[SET_LCOE_MG_DIESEL + "{}".format(year)], mg_diesel_investment, mg_diesel_capacity = \
mg_diesel_calc.get_lcoe(energy_per_cell=self.df[SET_ENERGY_PER_CELL + "{}".format(year)],
start_year=year - time_step,
end_year=end_year,
people=self.df[SET_POP + "{}".format(year)],
new_connections=self.df[SET_NEW_CONNECTIONS + "{}".format(year)],
total_energy_per_cell=self.df[SET_TOTAL_ENERGY_PER_CELL],
prev_code=self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)],
num_people_per_hh=self.df[SET_NUM_PEOPLE_PER_HH],
grid_cell_area=self.df[SET_GRID_CELL_AREA],
fuel_cost=self.df[SET_MG_DIESEL_FUEL + "{}".format(year)],
capacity_factor=mg_diesel_calc.capacity_factor)
logging.info('Calculate standalone diesel LCOE')
self.df[SET_LCOE_SA_DIESEL + "{}".format(year)], sa_diesel_investment, sa_diesel_capacity = \
sa_diesel_calc.get_lcoe(energy_per_cell=self.df[SET_ENERGY_PER_CELL + "{}".format(year)],
start_year=year - time_step,
end_year=end_year,
people=self.df[SET_POP + "{}".format(year)],
new_connections=self.df[SET_NEW_CONNECTIONS + "{}".format(year)],
total_energy_per_cell=self.df[SET_TOTAL_ENERGY_PER_CELL],
prev_code=self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)],
num_people_per_hh=self.df[SET_NUM_PEOPLE_PER_HH],
grid_cell_area=self.df[SET_GRID_CELL_AREA],
fuel_cost=self.df[SET_SA_DIESEL_FUEL + "{}".format(year)],
capacity_factor=sa_diesel_calc.capacity_factor)
logging.info('Calculate standalone PV LCOE')
self.df[SET_LCOE_SA_PV + "{}".format(year)], sa_pv_investment, sa_pv_capacity = \
sa_pv_calc.get_lcoe(energy_per_cell=self.df[SET_ENERGY_PER_CELL + "{}".format(year)],
start_year=year - time_step,
end_year=end_year,
people=self.df[SET_POP + "{}".format(year)],
new_connections=self.df[SET_NEW_CONNECTIONS + "{}".format(year)],
total_energy_per_cell=self.df[SET_TOTAL_ENERGY_PER_CELL],
prev_code=self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)],
num_people_per_hh=self.df[SET_NUM_PEOPLE_PER_HH],
grid_cell_area=self.df[SET_GRID_CELL_AREA],
capacity_factor=self.df[SET_GHI] / HOURS_PER_YEAR)
self.choose_minimum_off_grid_tech(year, mg_hydro_calc, techs, tech_codes)
return sa_diesel_investment, sa_diesel_capacity, sa_pv_investment, sa_pv_capacity, mg_diesel_investment, \
mg_diesel_capacity, mg_pv_investment, mg_pv_capacity, mg_wind_investment, mg_wind_capacity, \
mg_hydro_investment, mg_hydro_capacity
[docs] def choose_minimum_off_grid_tech(self, year, mg_hydro_calc, techs, tech_codes):
"""Choose minimum LCOE off-grid technology
First step determines the off-grid technology with minimum LCOE
Second step determines the value (number) of the selected minimum off-grid technology
Arguments
---------
year : int
mg_hydro_calc : dict
"""
off_grid_techs = techs.copy()
del off_grid_techs[0]
off_grid_techs = [x + str(year) for x in off_grid_techs]
off_grid_tech_codes = tech_codes.copy()
del off_grid_tech_codes[0]
logging.info('Determine minimum technology (off-grid)')
self.df[SET_MIN_OFFGRID + "{}".format(year)] = self.df[off_grid_techs].T.idxmin()
logging.info('Ensure hydro-power is not over-utilized')
self.limit_hydro_usage(mg_hydro_calc, year)
self.df[SET_MIN_OFFGRID + "{}".format(year)] = self.df[off_grid_techs].T.idxmin()
logging.info('Determine minimum off-grid tech LCOE')
self.df[SET_MIN_OFFGRID_LCOE + "{}".format(year)] = self.df[off_grid_techs].T.min()
# Add code numbers reflecting minimum off-grid technology code
for i in range(len(off_grid_techs)):
self.df.loc[self.df[SET_MIN_OFFGRID + "{}".format(year)] == off_grid_techs[i],
SET_MIN_OFFGRID_CODE + "{}".format(year)] = off_grid_tech_codes[i]
[docs] def limit_hydro_usage(self, mg_hydro_calc, year):
# A df with all hydro-power sites, to ensure that they aren't assigned more capacity than is available
hydro_used = 'HydropowerUsed' # the amount of the hydro potential that has been assigned
hydro_lcoe = self.df[SET_LCOE_MG_HYDRO + "{}".format(year)].copy()
hydro_df = self.df[[SET_HYDRO_FID, SET_HYDRO]].drop_duplicates(subset=SET_HYDRO_FID)
hydro_df[hydro_used] = 0
hydro_df = hydro_df.set_index(SET_HYDRO_FID)
max_hydro_dist = 5 # the max distance in km to consider hydropower viable
additional_capacity = (
(self.df[SET_ENERGY_PER_CELL + "{}".format(year)]) /
(HOURS_PER_YEAR * mg_hydro_calc.capacity_factor * mg_hydro_calc.base_to_peak_load_ratio *
(1 - mg_hydro_calc.distribution_losses)))
for index, row in hydro_df.iterrows():
hydro_usage = additional_capacity.loc[(self.df[SET_HYDRO_FID] == index) &
(self.df[SET_HYDRO_DIST] < max_hydro_dist)].sum()
if hydro_usage > hydro_df[SET_HYDRO][index]:
hydro_usage_cumsum = additional_capacity.loc[(self.df[SET_HYDRO_FID] == index) &
(self.df[SET_HYDRO_DIST] < max_hydro_dist)].cumsum()
hydro_usage = hydro_usage_cumsum.loc[hydro_usage_cumsum > hydro_df[SET_HYDRO][index]]
hydro_lcoe[hydro_usage.index] = 99
self.df[SET_LCOE_MG_HYDRO + "{}".format(year)] = hydro_lcoe
self.df.loc[self.df[SET_HYDRO_DIST] > max_hydro_dist, SET_LCOE_MG_HYDRO + "{}".format(year)] = 99
[docs] def results_columns(self, techs, tech_codes, year, time_step, prio, auto_intensification):
"""Calculate the capacity and investment requirements for each settlement
Once the grid extension algorithm has been run, determine the minimum overall option,
and calculate the capacity and investment requirements for each settlement
Arguments
---------
year : int
"""
all_techs = [x + str(year) for x in techs]
logging.info('Determine minimum overall tech')
self.df[SET_MIN_OVERALL + "{}".format(year)] = self.df[all_techs].T.idxmin()
# Ensure what is grid-connected in previous time-step remains grid-connected
self.df.loc[self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] == 1,
SET_MIN_OVERALL + "{}".format(year)] = 'Grid' + "{}".format(year)
# Ensure settlements within intensification distance are grid-connected
if (prio == 2) or (prio == 4):
self.df.loc[(self.df[SET_MV_DIST_PLANNED] < auto_intensification) &
(self.df[SET_LCOE_GRID + "{}".format(year)] != 99),
SET_MIN_OVERALL + "{}".format(year)] = 'Grid' + "{}".format(year)
logging.info('Determine minimum overall LCOE')
self.df[SET_MIN_OVERALL_LCOE + "{}".format(year)] = self.df[all_techs].T.min()
for i in range(len(techs)):
self.df.loc[self.df[SET_MIN_OVERALL + "{}".format(year)] == all_techs[i],
SET_MIN_OVERALL_CODE + "{}".format(year)] = tech_codes[i]
[docs] def calculate_investments_and_capacity(self, sa_diesel_investment, sa_diesel_capacity, sa_pv_investment,
sa_pv_capacity, mg_diesel_investment, mg_diesel_capacity, mg_pv_investment,
mg_pv_capacity, mg_wind_investment, mg_wind_capacity, mg_hydro_investment,
mg_hydro_capacity, grid_investment, grid_capacity, year):
grid = pd.DataFrame(np.where(self.df[SET_MIN_OVERALL_CODE + "{}".format(year)] == 1, 1, 0))
sa_diesel = pd.DataFrame(np.where(self.df[SET_MIN_OVERALL_CODE + "{}".format(year)] == 2, 1, 0))
sa_pv = pd.DataFrame(np.where(self.df[SET_MIN_OVERALL_CODE + "{}".format(year)] == 3, 1, 0))
mg_diesel = pd.DataFrame(np.where(self.df[SET_MIN_OVERALL_CODE + "{}".format(year)] == 4, 1, 0))
mg_pv = pd.DataFrame(np.where(self.df[SET_MIN_OVERALL_CODE + "{}".format(year)] == 5, 1, 0))
mg_wind = pd.DataFrame(np.where(self.df[SET_MIN_OVERALL_CODE + "{}".format(year)] == 6, 1, 0))
mg_hydro = pd.DataFrame(np.where(self.df[SET_MIN_OVERALL_CODE + "{}".format(year)] == 7, 1, 0))
logging.info('Calculate investment cost')
self.df[SET_INVESTMENT_COST + "{}".format(year)] = 0
self.df[SET_INVESTMENT_COST + "{}".format(year)] = grid * grid_investment + sa_diesel * sa_diesel_investment + \
sa_pv * sa_pv_investment + mg_diesel * mg_diesel_investment + mg_pv * mg_pv_investment + \
mg_wind * mg_wind_investment + mg_hydro * mg_hydro_investment
logging.info('Calculate new capacity')
self.df[SET_NEW_CAPACITY + "{}".format(year)] = 0
self.df[SET_NEW_CAPACITY + "{}".format(year)] = grid * grid_capacity + sa_diesel * sa_diesel_capacity + \
sa_pv * sa_pv_capacity + mg_diesel * mg_diesel_capacity + mg_pv * mg_pv_capacity + \
mg_wind * mg_wind_capacity + mg_hydro * mg_hydro_capacity
[docs] def apply_limitations(self, eleclimit, year, time_step, prioritization, auto_densification=0):
logging.info('Determine electrification limits')
choice = int(prioritization)
self.df[SET_LIMIT + "{}".format(year)] = 0
# Calculate the total population targeted to be electrified
elec_target_pop = eleclimit * self.df[SET_POP + "{}".format(year)].sum()
# Adding a column with investment/capita
self.df[SET_INVEST_PER_CAPITA + "{}".format(year)] = \
self.df[SET_INVESTMENT_COST + "{}".format(year)] / self.df[SET_NEW_CONNECTIONS + "{}".format(year)]
if choice == 4:
# Choose only already electrified settlements and settlements within intensification target range,
# regardless of target electrification rate
self.df.loc[(self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] < 99),
SET_LIMIT + "{}".format(year)] = 1
self.df.loc[self.df[SET_MV_DIST_PLANNED] < auto_densification, SET_LIMIT + "{}".format(year)] = 1
elif eleclimit == 1:
# If electrification target rate is 100%, set all settlements to electrified
self.df[SET_LIMIT + "{}".format(year)] = 1
elif choice == 2:
# Prioritize already electrified settlements, then intensification, then lowest investment per capita
self.df['Intensification'] = np.where(self.df[SET_MV_DIST_PLANNED] < auto_densification, 1, 0)
self.df.sort_values(by=[SET_ELEC_FINAL_CODE + "{}".format(year - time_step),
'Intensification',
SET_INVEST_PER_CAPITA + "{}".format(year)], inplace=True)
cumulative_pop = self.df[SET_POP + "{}".format(year)].cumsum()
self.df[SET_LIMIT + "{}".format(year)] = np.where(cumulative_pop < elec_target_pop, 1, 0)
del self.df['Intensification']
self.df.sort_index(inplace=True)
# Ensure already electrified settlements remain electrified
self.df.loc[(self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] < 99),
SET_LIMIT + "{}".format(year)] = 1
elif choice == 5:
# Prioritize already electrified settlements first, then lowest investment per capita
self.df.sort_values(by=[SET_ELEC_FINAL_CODE + "{}".format(year - time_step),
SET_INVEST_PER_CAPITA + "{}".format(year)], inplace=True)
cumulative_pop = self.df[SET_POP + "{}".format(year)].cumsum()
self.df[SET_LIMIT + "{}".format(year)] = np.where(cumulative_pop < elec_target_pop, 1, 0)
self.df.sort_index(inplace=True)
# Ensure already electrified settlements remain electrified
self.df.loc[(self.df[SET_ELEC_FINAL_CODE + "{}".format(year - time_step)] < 99),
SET_LIMIT + "{}".format(year)] = 1
elecrate = self.df.loc[self.df[SET_LIMIT + "{}".format(year)] == 1,
SET_POP + "{}".format(year)].sum() / self.df[SET_POP + "{}".format(year)].sum()
logging.info('Determine final electrification decision')
self.df[SET_ELEC_FINAL_CODE + "{}".format(year)] = self.df[SET_MIN_OVERALL_CODE + "{}".format(year)]
self.df.loc[(self.df[SET_LIMIT + "{}".format(year)] == 0), SET_ELEC_FINAL_CODE + "{}".format(year)] = 99
self.df.loc[(self.df[SET_LIMIT + "{}".format(year)] == 0), SET_INVESTMENT_COST + "{}".format(year)] = 0
self.df.loc[(self.df[SET_LIMIT + "{}".format(year)] == 0), SET_NEW_CAPACITY + "{}".format(year)] = 0
print("The electrification rate achieved in {} is {:.1f} %".format(year, elecrate * 100))
[docs] def calc_summaries(self, df_summary, sumtechs, tech_codes, year):
"""The next section calculates the summaries for technology split,
capacity added and total investment cost"""
logging.info('Calculate summaries')
i = 0
summaries = [SET_POP, SET_NEW_CONNECTIONS, SET_NEW_CAPACITY, SET_INVESTMENT_COST]
# Population Summaries
for s in summaries:
for t in tech_codes:
df_summary[year][sumtechs[i]] = sum(
self.df.loc[(self.df[SET_ELEC_FINAL_CODE + "{}".format(year)] == t) &
(self.df[SET_LIMIT + "{}".format(year)] == 1)]
[s + "{}".format(year)])
i += 1