# Copyright 2016-2017 Gorm Andresen (Aarhus University), Jonas Hoersch (FIAS), Tom Brown (FIAS)
# Copyright 2020 Michael Davidson (UCSD), William Honaker, Jiahe Feng (UCSD), Yuanbo Shi
# Copyright 2023-2024 Xiqiang Liu
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Cutout class to handle a subset of a Dataset.
"""
import datetime as dt
import logging
import os
import sys
from collections.abc import Iterable
from functools import partial
from operator import itemgetter
from pathlib import Path
from typing import Literal, Optional, Union
import numpy as np
import pyproj
import shapely
import xarray as xr
from shapely.geometry import box
from tqdm.auto import tqdm
from . import config
from .convert import (
convert_pm25,
convert_pv,
convert_solar_thermal,
convert_wind,
convert_windspd,
convert_windwpd,
get_orientation,
)
from .resource import (
get_solarpanelconfig,
get_windturbineconfig,
windturbine_smooth,
)
from .mask import Mask
from .preparation import (
cutout_get_meta,
cutout_get_meta_view,
cutout_prepare,
cutout_produce_specific_dataseries,
)
[docs]
logger = logging.getLogger(__name__)
class Cutout:
"""Cutout class to handle a subset of a Dataset.
Args:
module (Literal["era5", "merra2"]): name of the dataset module to use.
weather_data_config (str): name of the weather data config to use.
the name will be automatically generated.
years (slice): years of the cutout.
name (Optional[str]): name of the cutout. Optional. If not specified,
cutout_dir (str): path to the cutout directory. Defaults to config.cutout_dir.
bounds (Optional[Iterable]): bounds of the cutout. Optional. If not specified,
the bounds will be automatically generated.
months (Optional[slice]): months of the cutout. Optional. If not specified,
the months will be automatically generated.
xs (Optional[slice]): longitude coordinates of the cutout. Optional. If not specified,
the x coordinates will be automatically generated.
ys (Optional[slice]): latitude coordinates of the cutout. Optional. If not specified,
the y coordinates will be automatically generated.
"""
def __init__(
self,
module: Literal["era5", "merra2"],
weather_data_config: str,
years: slice,
name: Optional[str] = None,
cutout_dir: Union[str, Path] = config.cutout_dir,
bounds: Optional[Iterable] = None,
months: Optional[slice] = None,
xs: Optional[slice] = None,
ys: Optional[slice] = None,
):
self.name = name
self.cutout_dir = os.path.join(cutout_dir, name)
self.prepared = False
self.empty = False
self.meta_append = 0
self.config = weather_data_config
self.meta = None
self.merged_mask = None
self.shape_mask = None
self.area = None
params_dict = {
"module": module,
"years": years,
"months": months,
"xs": xs,
"ys": ys,
}
if bounds is not None and (xs is not None or ys is not None):
raise TypeError("Cannot specify both bounds and xs/ys arguments.")
if bounds is not None:
# if passed bounds array instead of xs, ys slices
x1, y1, x2, y2 = bounds
params_dict.update(xs=slice(x1, x2), ys=slice(y1, y2))
if months is None:
logger.info("No months specified, defaulting to 1-12")
params_dict.update(months=slice(1, 12))
if os.path.isdir(self.cutout_dir):
# If cutout dir exists, check completness of files
if os.path.isfile(self.datasetfn()): # open existing meta file
self.meta = xr.open_dataset(self.datasetfn()).stack(
**{"year-month": ("year", "month")}
)
if (
self.meta is not None
and "years" in params_dict
and "months" in params_dict
and all(
os.path.isfile(self.datasetfn([y, m]))
for y in range(
params_dict["years"].start, params_dict["years"].stop + 1
)
for m in range(
params_dict["months"].start, params_dict["months"].stop + 1
)
)
):
# All files are accounted for. Checking basic data and coverage
if "module" not in self.meta.attrs:
raise TypeError("No module given in meta file of cutout.")
# load dataset module based on file metadata
from geodata import Dataset
self.dataset_module: Dataset = sys.modules[
"geodata.datasets." + self.meta.attrs["module"]
]
params_dict["module"] = self.meta.attrs["module"]
logger.info("All cutout (%s, %s) files available.", name, cutout_dir)
# At least one of xs, ys is in params_dict
if {"xs", "ys"}.intersection(params_dict):
# Passed some subsetting bounds
self.meta = self.get_meta_view(**params_dict)
if self.meta is not None:
# Subset is available
self.prepared = True
logger.info("Cutout subset prepared: %s", self)
else:
logger.info("Cutout subset not available: %s", self)
else:
# No subsetting of bounds. Keep full cutout
self.prepared = True
logger.info("Cutout prepared: %s", self)
else:
# Not all files accounted for
self.prepared = False
logger.info("Cutout (%s, %s) not complete.", name, cutout_dir)
if not self.prepared:
# Still need to prepare cutout
if "module" not in params_dict:
raise TypeError("Module is required to create cutout.")
# load module from geodata library
self.dataset_module = sys.modules[
"geodata.datasets." + params_dict["module"]
]
logger.info("Cutout (%s, %s) not found or incomplete.", name, cutout_dir)
if {"xs", "ys", "years"}.difference(params_dict):
raise TypeError(
"Arguments `xs`, `ys`, and `years` need to be specified for a cutout."
)
if self.meta is not None:
# if meta.nc exists, close and delete it
self.meta.close()
os.remove(self.datasetfn())
## Main preparation call for metadata
# preparation.cutout_get_meta
# cutout.meta_data_config
# dataset_module.meta_data_config (e.g. prepare_meta_era5)
self.meta = self.get_meta(**params_dict)
# Ensure cutout directory exists
if not os.path.isdir(self.cutout_dir):
os.mkdir(self.cutout_dir)
# Write meta file
self.meta_clean.unstack("year-month").to_netcdf(self.datasetfn())
def datasetfn(self, *args):
"""Return path to dataset xarray files related to this Cutout.
Args:
*args: optional arguments to append to the filename. If not specified,
the meta file will be returned. If specified, the dataset file will be returned, depending
on the number of arguments. One argument will return the dataset file for the given
year-month string, two arguments will return the dataset file for the given year and month.
Returns:
str: path to dataset xarray files related to this Cutout.
"""
dataset = None
if len(args) == 2:
dataset = args
elif len(args) == 1:
dataset = args[0]
else:
dataset = None
return os.path.join(
# pylint: disable=consider-using-f-string
self.cutout_dir,
("meta.nc" if dataset is None else "{}{:0>2}.nc".format(*dataset)),
# pylint: enable=consider-using-f-string
)
@property
def meta_data_config(self):
"""Metadata configuration for the Cutout"""
return dict(
tasks_func=self.dataset_module.weather_data_config[self.config][
"tasks_func"
],
prepare_func=self.dataset_module.weather_data_config[self.config][
"meta_prepare_func"
],
template=self.dataset_module.weather_data_config[self.config]["template"],
file_granularity=self.dataset_module.weather_data_config[self.config][
"file_granularity"
],
)
@property
def weather_data_config(self):
"""The weather data configuration for the Cutout."""
return self.dataset_module.weather_data_config
@property
def variables(self):
"""The variables contained in the Cutout."""
return self.dataset_module.weather_data_config[self.config]["variables"]
@property
def info(self):
"""Summary information about the Cutout."""
return dict(
name=self.name,
config=self.config,
prepared=self.prepared,
projection=self.dataset_module.projection,
shape=[len(self.coords["y"]), len(self.coords["x"])],
extent=(
list(self.coords["x"].values[[0, -1]])
+ list(self.coords["y"].values[[-1, 0]])
),
dimensions=self.meta.dims,
coordinates=self.meta.coords,
variables=self.dataset_module.weather_data_config[self.config]["variables"],
dataset_module=self.dataset_module,
cutout_dir=self.cutout_dir,
)
@property
def projection(self):
"""The projection of the Cutout."""
return self.dataset_module.projection
@property
def coords(self):
"""The coordinates covered by the Cutout."""
return self.meta.coords
@property
def meta_clean(self):
# produce clean version of meta file for export to NetCDF (cannot export slices)
meta = self.meta
if meta.attrs.get("view", {}):
view = {}
for name, value in meta.attrs.get("view", {}).items():
view.update({name: [value.start, value.stop]})
meta.attrs["view"] = str(view)
return meta
@property
def shape(self):
"""The shape of the Cutout by (y, x)."""
return len(self.coords["y"]), len(self.coords["x"])
@property
def extent(self):
"""The extent of the Cutout by (x_min, x_max, y_min, y_max)."""
return list(self.coords["x"].values[[0, -1]]) + list(
self.coords["y"].values[[-1, 0]]
)
@property
def years(self):
"""Cutout's covered years as slice object."""
return slice(*self.coords["year"].values)
@property
def months(self):
"""Cutout's covered months as slice object."""
return slice(*self.coords["month"].values)
def grid_coordinates(self):
"""Return grid coordinates of the Cutout."""
xs, ys = np.meshgrid(self.coords["x"], self.coords["y"])
return np.asarray((np.ravel(xs), np.ravel(ys))).T
def grid_cells(self):
"""Return grid cells of the Cutout."""
coords = self.grid_coordinates()
span = (coords[self.shape[1] + 1] - coords[0]) / 2
return [box(*c) for c in np.hstack((coords - span, coords + span))]
def __repr__(self):
yearmonths = self.coords["year-month"].to_index()
# pylint: disable=consider-using-f-string
return "<Cutout {} x={:.2f}-{:.2f} y={:.2f}-{:.2f} time={}/{}-{}/{} {}prepared>".format(
self.name,
self.coords["x"].values[0],
self.coords["x"].values[-1],
self.coords["y"].values[0],
self.coords["y"].values[-1],
yearmonths[0][0],
yearmonths[0][1],
yearmonths[-1][0],
yearmonths[-1][1],
"" if self.prepared else "UN",
)
# pylint: enable=consider-using-f-string
def add_mask(self, name: str, merged_mask: bool = True, shape_mask: bool = True):
"""Add mask attribute to the cutout, from a previously saved mask objects.
The masks will be coarsened to the same dimension with the cutout metadata in xarray.
Args:
name (str): The name of the previously saved mask. The mask object should be saved in
mask_dir in config.py.
merged_mask (bool): If true, the program will try to include the merged_mask from the mask object.
Defaults to True.
shape_mask (bool): If true, the program will try to include the extracted dictionary of
shape_mask from the mask object. Defaults to True.
"""
# make sure data is in correct format for coarsening
xr_ds = ds_reformat_index(self.meta)
mask = Mask.from_name(name, mask_dir=config.MASK_DIR)
if not mask.merged_mask and not mask.shape_mask:
raise ValueError(
f"No mask found in {mask.name}. Please create a proper mask object first."
)
if mask.merged_mask and merged_mask:
self.merged_mask = coarsen(mask.load_merged_xr(), xr_ds)
logger.info("Cutout.merged_mask added.")
if mask.shape_mask and shape_mask:
self.shape_mask = {
k: coarsen(v, xr_ds) for k, v in mask.load_shape_xr().items()
}
logger.info("Cutout.shape_mask added.")
def add_grid_area(
self, axis: tuple[str] = ("lat", "lon"), adjust_coord: bool = True
):
"""Add attribute 'area' to the cutout containing area for each grid cell
in the cutout metedata xarray.
Args:
axis (tuple[str]): The name of the axes to include in the result xarray dataset.
Defaults to ("lat", "lon").
adjust_coord (bool): Whether to sort the data by latitude and longitude values if true.
Defaults to True.
"""
xr_ds = ds_reformat_index(self.meta)
area_arr = np.zeros((xr_ds.lat.shape[0], xr_ds.lon.shape[0]))
lat_diff = np.abs((xr_ds.lat[1].values - xr_ds.lat[0].values))
for i, lat in enumerate(xr_ds.lat.values):
lat_bottom = lat - lat_diff / 2
lat_top = lat + lat_diff / 2
# calculate the area for grid cells with same latitude
area_arr[i] = np.round(
calc_grid_area(
[
(xr_ds.lon.values[0], lat_top),
(xr_ds.lon.values[0], lat_bottom),
(xr_ds.lon.values[1], lat_bottom),
(xr_ds.lon.values[1], lat_top),
]
),
2,
)
if axis == ("lat", "lon"):
xr_ds = xr_ds.assign({"area": (axis, area_arr)})
elif axis == ("time", "lat", "lon"):
xr_ds = xr_ds.assign({"area": (axis, [area_arr])})
if adjust_coord:
if (
("lon" not in xr_ds.coords)
and ("lat" not in xr_ds.coords)
and ("x" in xr_ds.coords and "y" in xr_ds.coords)
):
xr_ds = xr_ds.rename({"x": "lon", "y": "lat"})
xr_ds = xr_ds.sortby(["lat", "lon"])
self.area = xr_ds
def mask(
self,
dataset: xr.Dataset,
true_area: bool = True,
merged_mask: bool = True,
shape_mask: bool = True,
):
"""Mask a converted `xarray.Dataset` from cutout with previously added mask attribute
with optional area inclusion, and return a dictionary of xarray Dataset.
The program will search for 'merged_mask' and 'shape_mask' attributes in the
cutout object, these Xarray data can be generate through 'add_mask', unless the user
specify 'merged_mask = False' or 'shape_mask = False', the masks in shape_mask
will have the same key in the dictionary returned, and the mask for merged_mask will
have the key name "merged_mask".
Args:
dataset (xr.Dataset): The dataset to be masked.
true_area (bool): Whether the returned masks will have the area variable. Defaults to True.
merged_mask (bool): If true, the program will try to
include the merged_mask from the cutout object. Defaults to True.
shape_mask (bool): If true, the program will try to
include the extracted dictionary of shape_mask from the cutout object. Defaults to True.
Returns:
dict[xr.Dataset]: A dictionary of xarray Dataset with masks combined with the dataset.
"""
axis = ("lat", "lon")
dataset = ds_reformat_index(dataset)
dataset = dataset.transpose("time", "lat", "lon")
if self.merged_mask is None and self.shape_mask is None:
raise ValueError(
"No mask found in cutout. Please add masks with self.add_mask()"
)
if self.area is None and true_area:
raise ValueError(
"No area data found. Please call self.add_grid_area() or set true_area to False."
)
res = {}
if self.merged_mask is not None and merged_mask:
ds = dataset.assign({"mask": (axis, self.merged_mask.data[0])})
if true_area:
ds = ds.assign({"area": (axis, self.area["area"].data)})
res["merged_mask"] = ds
logger.info("merged_mask combined with dataset. ")
if self.shape_mask is not None and shape_mask:
for key, val in self.shape_mask.items():
ds = dataset.assign({"mask": (axis, val.data[0])})
if true_area:
ds = ds.assign({"area": (axis, self.area["area"].data)})
res[key] = ds
logger.info("shape_mask combined with dataset. ")
return res
# Preparation functions
get_meta = cutout_get_meta # preparation.cutout_get_meta
get_meta_view = cutout_get_meta_view # preparation.cutout_get_meta_view
prepare = cutout_prepare # preparation.cutout_prepare
produce_specific_dataseries = cutout_produce_specific_dataseries
# Conversion and aggregation functions
def _convert_cutout(
self, convert_func: callable, show_progress: bool = False, **convert_kwds
) -> xr.DataArray:
"""Convert and aggregate a weather-based renewable generation time-series.
NOTE: This is a function designed to be used internally instead of being used by
the user themself. Instead, this is a gateway function that is called by all
the individual time-series generation functions like pv and wind. Thus, all its parameters are also
available from these.
Args:
convert_func (callable): Function to convert the cutout
show_progress (bool): Show progress bar. Set to False by default.
**convert_kwds: Keyword arguments passed to convert_func
Returns:
xr.DataArray: Converted cutout
"""
if not self.prepared:
raise RuntimeError("The cutout has to be prepared first.")
results = []
yearmonths = self.coords["year-month"].to_index()
if isinstance(show_progress, str):
prefix = show_progress
else:
func_name = (
convert_func.__name__[len("convert_") :]
if convert_func.__name__.startswith("convert_")
else convert_func.__name__
)
prefix = f"Convert `{func_name}`: "
for ym in tqdm(
yearmonths, desc=prefix, disable=not show_progress, dynamic_ncols=True
):
with xr.open_dataset(self.datasetfn(ym)) as ds:
if "view" in self.meta.attrs:
if isinstance(self.meta.attrs["view"], str):
self.meta.attrs["view"] = {}
self.meta.attrs.setdefault("view", {})["x"] = slice(
min(self.meta.coords["x"]).values.tolist(),
max(self.meta.coords["x"]).values.tolist(),
)
self.meta.attrs.setdefault("view", {})["y"] = slice(
min(self.meta.coords["y"]).values.tolist(),
max(self.meta.coords["y"]).values.tolist(),
)
ds = ds.sel(**self.meta.attrs["view"])
da = convert_func(ds, **convert_kwds)
results.append(da.load())
return xr.concat(results, dim="time")
def heat_demand(
self,
threshold: float = 15.0,
a: float = 1.0,
constant: float = 0.0,
hour_shift: float = 0.0,
**params,
):
"""Convert outside temperature into daily heat demand using the
degree-day approximation.
Since "daily average temperature" means different things in
different time zones and since xarray coordinates do not handle
time zones gracefully like pd.DateTimeIndex, you can provide an
hour_shift to redefine when the day starts.
E.g. for Moscow in winter, hour_shift = 4, for New York in winter,
hour_shift = -5
This time shift applies across the entire spatial scope of ds for
all times. More fine-grained control will be built in a some
point, i.e. space- and time-dependent time zones.
WARNING: Because the original data is provided every month, at the
month boundaries there is untidiness if you use a time shift. The
resulting xarray will have duplicates in the index for the parts
of the day in each month at the boundary. You will have to
re-average these based on the number of hours in each month for
the duplicated day.
Args:
threshold (float): Outside temperature in degrees Celsius above which there is no heat demand.
a (float): Linear factor relating heat demand to outside temperature.
constant (float): Constant part of heat demand that does not depend on outside
temperature (e.g. due to water heating).
hour_shift (float): Time shift relative to UTC for taking daily average
Returns:
xr.DataArray: Heat demand
Note:
You can also specify all of the general conversion arguments
documented in the `convert_cutout` function.
"""
def convert_heat_demand(
ds: xr.Dataset,
threshold: float,
a: float,
constant: float,
hour_shift: float,
):
# Temperature is in Kelvin; take daily average
T = ds["temperature"]
T.coords["time"].values += np.timedelta64(dt.timedelta(hours=hour_shift))
T = ds["temperature"].resample(time="1D").mean(dim="time")
threshold += 273.15
heat_demand_value = a * (threshold - T)
heat_demand_value.values[heat_demand_value.values < 0.0] = 0.0
return constant + heat_demand_value
return self._convert_cutout(
convert_func=convert_heat_demand,
threshold=threshold,
a=a,
constant=constant,
hour_shift=hour_shift,
**params,
)
def temperature(self, **convert_params):
"""Convert temperature in Cutout to outside temperature.
Args:
convert_params: Keyword arguments passed to `convert_cutout` function
Returns:
xr.DataArray: Data of the Cutout with temperature converted to outside temperatures.
"""
return self._convert_cutout(
convert_func=lambda ds: ds["temperature"] - 273.15, **convert_params
)
def soil_temperature(self, **convert_params):
"""Return soil temperature (useful for e.g. heat pump T-dependent
coefficient of performance).
Args:
convert_params: Keyword arguments passed to `convert_cutout` function
Returns:
xr.DataArray: Data of the Cutout with temperature converted to soil temperatures.
"""
return self._convert_cutout(
convert_func=lambda ds: (ds["soil temperature"] - 273.15).fillna(0.0),
**convert_params,
)
def solar_thermal(
self,
orientation: Optional[Union[dict, str, callable]] = None,
trigon_model: str = "simple",
clearsky_model: Literal["simple", "enhanced"] = "simple",
c0: float = 0.8,
c1: float = 3.0,
t_store: float = 80.0,
**params,
):
"""Convert downward short-wave radiation flux and outside temperature
into time series for solar thermal collectors.
Mathematical model and defaults for c0, c1 based on model in [1].
Args:
orientation (Union[dict, str, callable]): Panel orientation with slope and azimuth
(units of degrees), or 'latitude_optimal'.
trigon_model (str): Type of trigonometry model
clearsky_model (str): Type of clearsky model for diffuse irradiation. Either
`simple` or `enhanced`.
c0 (float): Parameter for model in [1] This defaults to 0.8.
c1 (float): Parameter for model in [1] This defaults to 3.0.
t_store (float): Store temperature in degree Celsius
Note:
You can also specify all of the general conversion arguments
documented in the `convert_cutout` function.
References:
[1] Henning and Palzer, Renewable and Sustainable Energy Reviews 30
(2014) 1003-1018
"""
if orientation is None:
orientation = {"slope": 45.0, "azimuth": 180.0}
if not callable(orientation):
orientation = get_orientation(orientation)
return self._convert_cutout(
convert_func=convert_solar_thermal,
orientation=orientation,
trigon_model=trigon_model,
clearsky_model=clearsky_model,
c0=c0,
c1=c1,
t_store=t_store,
**params,
)
# NOTE: The following wind-related functions will be deprecated in the future
# in favor of the wind modeling module.
def wind(
self, turbine: Union[str, dict], smooth: Union[bool, dict] = False, **params
):
"""
Generate wind generation time-series
- loads turbine dict based on passed parameters (resource.get_windturbineconfig)
- optionally, smooths turbine power curve (resource.windturbine_smooth)
- calls convert_wind (convert.convert_cutout)
Args:
turbine (Union[str, dict]): Name of a turbine known by the reatlas client or a
turbineconfig dictionary with the keys 'hub_height' for the
hub height and 'V', 'POW' defining the power curve.
smooth (Union[bool, dict]): If True smooth power curve with a gaussian kernel as
determined for the Danish wind fleet to Delta_v = 1.27 and
sigma = 2.s29. A dict allows to tune these values.
Note:
You can also specify all of the general conversion arguments
documented in the `convert_cutout` function.
References:
[1] Andresen G B, Søndergaard A A and Greiner M 2015 Energy 93, Part 1
1074 - 1088. doi:10.1016/j.energy.2015.09.071
"""
if isinstance(turbine, str):
turbine = get_windturbineconfig(turbine)
if smooth:
turbine = windturbine_smooth(turbine, params=smooth)
return self._convert_cutout(
convert_func=convert_wind, turbine=turbine, **params
)
def windspd(self, **params):
"""
Generate wind speed time-series
convert.convert_cutout → convert.convert_windspd
Parameters
----------
**params
Must have 1 of:
turbine : str or dict
Name of a turbine
hub_height : num
Extrapolation height
Can also specify all of the general conversion arguments
documented in the `convert_cutout` function.
e.g. var_height='lml'
"""
if "turbine" in params:
turbine = params.pop("turbine")
if isinstance(turbine, str):
turbine = get_windturbineconfig(turbine)
else:
raise ValueError(f"Turbine ({turbine}) not found.")
hub_height = itemgetter("hub_height")(turbine)
elif "hub_height" in params:
hub_height = params.pop("hub_height")
elif "to_height" in params:
hub_height = params.pop("to_height")
else:
raise ValueError("Either a turbine or hub_height must be specified.")
params["hub_height"] = hub_height
return self._convert_cutout(convert_func=convert_windspd, **params)
def windwpd(self, **params):
"""
Generate wind power density time-series
convert.convert_cutout → convert.convert_windwpd
Parameters
----------
**params
Must have 1 of:
turbine : str or dict
Name of a turbine
hub_height : num
Extrapolation height
Can also specify all of the general conversion arguments
documented in the `convert_cutout` function.
e.g. var_height='lml'
"""
if "turbine" in params:
turbine = params.pop("turbine")
if isinstance(turbine, str):
turbine = get_windturbineconfig(turbine)
else:
raise ValueError(f"Turbine ({turbine}) not found.")
hub_height = itemgetter("hub_height")(turbine)
elif "hub_height" in params:
hub_height = params.pop("hub_height")
elif "to_height" in params:
hub_height = params.pop("to_height")
else:
raise ValueError("Either a turbine or hub_height must be specified.")
params["hub_height"] = hub_height
return self._convert_cutout(convert_func=convert_windwpd, **params)
def pv(
self,
panel: Union[str, dict],
orientation: Union[str, dict, callable],
clearsky_model: Optional[str] = None,
**params,
):
"""Convert downward-shortwave, upward-shortwave radiation flux and
ambient temperature into a pv generation time-series.
Args:
panel (Union[str, dict]): Panel name known to the reatlas client or a panel config
dictionary with the parameters for the electrical model in [3].
orientation (Union[str, dict, callback]): Panel orientation can be chosen from either
'latitude_optimal', a constant orientation {'slope': 0.0,
'azimuth': 0.0} or a callback function with the same signature
as the callbacks generated by the
`geodata.pv.orientation.make_*` functions.
clearsky_model (Optional[str]): Either the 'simple' or the 'enhanced' Reindl clearsky
model. The default choice of None will choose dependending on
data availability, since the 'enhanced' model also
incorporates ambient air temperature and relative humidity.
Returns:
xr.DataArray: Time-series or capacity factors based on additional general
conversion arguments.
Note:
You can also specify all of the general conversion arguments
documented in the `convert_cutout` function.
References:
[1] Soteris A. Kalogirou. Solar Energy Engineering: Processes and Systems,
pages 49-117,469-516. Academic Press, 2009. ISBN 0123745012.
[2] D.T. Reindl, W.A. Beckman, and J.A. Duffie. Diffuse fraction correla-
tions. Solar Energy, 45(1):1 - 7, 1990.
[3] Hans Georg Beyer, Gerd Heilscher and Stefan Bofinger. A Robust Model
for the MPP Performance of Different Types of PV-Modules Applied for
the Performance Check of Grid Connected Systems, Freiburg, June 2004.
Eurosun (ISES Europe Solar Congress).
"""
if isinstance(panel, str):
panel = get_solarpanelconfig(panel)
if not callable(orientation):
orientation = get_orientation(orientation)
return self._convert_cutout(
convert_func=convert_pv,
panel=panel,
orientation=orientation,
clearsky_model=clearsky_model,
**params,
)
def pm25(self, **params):
"""
Generate PM2.5 time series [ug / m3]
(see convert_pm25 for details)
Parameters
----------
**params : None needed currently.
Returns
-------
pm25 : xr.DataArray
"""
return self._convert_cutout(convert_func=convert_pm25, **params)
def ds_reformat_index(ds: xr.DataArray) -> xr.DataArray:
"""Format the dataArray generated from the convert function.
Args:
ds (xr.DataArray): dataArray generated from the convert function.
Returns:
xr.DataArray: DataArray with lat and lon as dimensions.
"""
if "lat" in ds.dims and "lon" in ds.dims:
return ds.sortby(["lat", "lon"])
elif "lat" in ds.coords and "lon" in ds.coords:
return (
ds.reset_coords(["lon", "lat"], drop=True)
.rename({"x": "lon", "y": "lat"})
.sortby(["lat", "lon"])
)
return ds.rename({"x": "lon", "y": "lat"}).sortby(["lat", "lon"])
def _find_intercept(list1, list2, start, threshold=0):
"""Find_intercept is a helper function to find the best start point for doing coarsening
in order to make the coordinates of the coarsen as close to the target as possible.
"""
min_res = 0
init = 0
for i in range(len(list1) - start):
resid = ((list1[start + i] - list2[0]) % (list2[1] - list2[0])).values.tolist()
if i == 0:
init = resid
if resid <= threshold:
return i
if resid > min_res:
min_res = resid
else:
min_res = resid
break
if min_res == init:
return 0
else:
return i # type: ignore
def coarsen(ori: xr.Dataset, tar: xr.Dataset, func: Literal["sum", "mean"] = "mean"):
"""This function will reindex the original xarray dataset according to the coordiantes of the target.
There might be a bias for lattitudes and longitudes. The bias are normally within 0.01 degrees.
In order to not lose too much data, a threshold for bias in degree could be given.
When threshold = 0, it means that the function is going to find the best place with smallest bias.
Args:
ori (xr.Dataset): The original xarray dataset.
tar (xr.Dataset): The target xarray dataset.
func (Literal['sum', 'mean']): The function to be used for reduction. Defaults to "mean".
Returns:
xr.Dataset: The reindexed xarray dataset.
Raises:
ValueError: reduction method can only be 'mean' or 'sum'.
"""
lat_multiple = round(
((tar.lat[1] - tar.lat[0]) / (ori.lat[1] - ori.lat[0])).values.tolist()
)
lon_multiple = round(
((tar.lon[1] - tar.lon[0]) / (ori.lon[1] - ori.lon[0])).values.tolist()
)
lat_start = _find_intercept(ori.lat, tar.lat, (lat_multiple - 1) // 2)
lon_start = _find_intercept(ori.lon, tar.lon, (lon_multiple - 1) // 2)
if func == "mean":
_coarsen = (
ori.isel(lat=slice(lat_start, None), lon=slice(lon_start, None))
.coarsen(
dim={"lat": lat_multiple, "lon": lon_multiple},
side={"lat": "left", "lon": "left"},
boundary="pad",
)
.mean()
)
elif func == "sum":
_coarsen = (
ori.isel(lat=slice(lat_start, None), lon=slice(lon_start, None))
.coarsen(
dim={"lat": lat_multiple, "lon": lon_multiple},
side={"lat": "left", "lon": "left"},
boundary="pad",
)
.sum()
)
else:
raise ValueError("func can only be 'mean' or 'sum'")
return coarsen.reindex_like(tar, method="nearest")
def calc_grid_area(lis_lats_lons):
"""Calculate area in km^2 for a grid cell given lats and lon border, with help from:
https://stackoverflow.com/questions/4681737/how-to-calculate-the-area-of-a-polygon-on-the-earths-surface-using-python
"""
lons, lats = zip(*lis_lats_lons)
ll = list(set(lats))[::-1]
var = []
for i in range(len(ll)):
var.append("lat_" + str(i + 1))
st = ""
for v, l in zip(var, ll): # noqa: E741
st = st + str(v) + "=" + str(l) + " " + "+"
st = (
st
+ "lat_0="
+ str(np.mean(ll))
+ " "
+ "+"
+ "lon_0"
+ "="
+ str(np.mean(lons))
)
tx = "+proj=aea +" + st
pa = pyproj.Proj(tx)
x, y = pa(lons, lats)
cop = {"type": "Polygon", "coordinates": [zip(x, y)]}
return shapely.geometry.shape(cop).area / 1000000
def calc_shp_area(shp, shp_projection="+proj=latlon"):
"""calculate area in km^2 of the shapes for each shp object"""
temp_shape = shapely.ops.transform(
partial(
pyproj.transform,
pyproj.Proj(shp_projection),
pyproj.Proj(proj="aea", lat_1=shp.bounds[1], lat_2=shp.bounds[3]),
),
shp,
)
return temp_shape.area / 1000000