Source code for geodata.dataset

# Copyright 2020 Michael Davidson (UCSD), William Honaker.
# Copyright 2020, 2023 Xiqiang Liu.

# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""
GEODATA

Geospatial Data Collection and "Pre-Analysis" Tools
"""

import importlib
import logging
import os
import shutil
from calendar import monthrange
from collections.abc import Iterable
from tempfile import mkstemp
from typing import Optional

import numpy as np
import xarray as xr
from shapely.geometry import box

from . import config, datasets  # noqa: F401

[docs] logger = logging.getLogger(__name__)
class Dataset: """Dataset is a class that encapsulates any datasets natively supported by geodata. It provides a streamlined workflow for downloading, preprocessing, and storing of these datasets. Args: module (str): Name of dataset module weather_data_config (str): The type of weather data configuration to use. For more information, please refer to the documentation for the dataset you are using. years (slice): The years to download. For example, `slice(2000, 2010)` will download all data from 2000 to 2010. months (slice, optional): The months to download. For example, `slice(1, 12)` will download all data from January to December. Defaults to `slice(1, 12)`. opendap (bool, optional): Whether to use OpenDAP protocol for downloading. Defaults to `False`. bounds (Optional[Iterable], optional): The bounds to download. For example, `[-180, 90, 180, -90]` will download all data from the entire globe. If omitted, global data will be downloaded. """ def __init__( self, module: str, weather_data_config: str, years: slice, months: Optional[slice] = None, opendap: bool = False, bounds: Optional[Iterable] = None, ): self.module = module self.config = weather_data_config self.dataset_module = importlib.import_module(f"geodata.datasets.{self.module}") self.weatherconfig = self.weather_data_config[self.config] self.datadir: str = self.dataset_module.datadir self.opendap = opendap self.years = years self.months = slice(1, 12) if months is None: logger.info("No months specified, defaulting to 1-12") else: self.months = months self.prepared = False self.toDownload = [] self.downloadedFiles = [] self.totalFiles = [] incomplete_count = 0 xs, ys = None, None if bounds is not None: self.bounds = bounds if isinstance(self.bounds, Iterable) and len(self.bounds) == 4: x1, y1, x2, y2 = bounds xs = slice(x1, x2) ys = slice(y2, y1) else: raise ValueError( "Specified bounds parameter should be list with North, West, South, East coordinates." ) else: logger.info("Bounds was not specified, default to global bounds.") self.bounds = None if os.path.isdir(self.datadir): # Directory for dataset exists logger.info("Directory %s found, checking for completeness.", self.datadir) self.prepared = True check_complete = True else: logger.info("Directory %s not found.", self.datadir) check_complete = False step = years.step if years.step else 1 yrs = range(years.start, years.stop + step, step) step = months.step if months.step else 1 mos = range(months.start, months.stop + step, step) if self.weatherconfig["file_granularity"] in {"daily", "dailymeans"}: # Separate files for each day (eg MERRA) # Check for complete set of files of year, month, day mo_tuples = [(yr, mo, monthrange(yr, mo)[1]) for yr in yrs for mo in mos] for mo_tuple in mo_tuples: # format: (yr, mo, number_days_in_month) yr, mo, nodays = mo_tuple for day in range(1, nodays + 1, 1): filename = self.datasetfn(self.weatherconfig["fn"], yr, mo, day) self.totalFiles.append((self.config, filename)) if not os.path.isfile(filename): self.prepared = False if check_complete: logger.info("File `%s` not found!", filename) incomplete_count += 1 if opendap and "url_opendap" in self.weatherconfig: self.toDownload.append( ( self.config, filename, self.datasetfn_opendap( self.weatherconfig["url_opendap"], self.weatherconfig["variables"], yr, mo, day, ), ) ) else: if opendap: logger.warning( "OpenDAP URL not specified for given dataset. " "Defaulting to standard download." ) self.toDownload.append( ( self.config, filename, self.datasetfn( self.weatherconfig["url"], yr, mo, day ), ) ) else: self.downloadedFiles.append((self.config, filename)) elif self.weatherconfig["file_granularity"] == "daily_multiple": # Separate files for each day (eg MERRA) # Check for complete set of files of year, month, day mo_tuples = [(yr, mo, monthrange(yr, mo)[1]) for yr in yrs for mo in mos] for mo_tuple in mo_tuples: # format: (yr, mo, number_days_in_month) yr, mo, nodays = mo_tuple for day in range(1, nodays + 1, 1): filename = self.datasetfn(self.weatherconfig["fn"], yr, mo, day) self.totalFiles.append((self.config, filename)) if not os.path.isfile(filename): self.prepared = False if check_complete: logger.info("File `%s` not found!", filename) incomplete_count += 1 if opendap and "url_opendap" in self.weatherconfig: self.toDownload.append( ( self.config, filename, self.datasetfn_opendap( self.weatherconfig["url_opendap"][0], self.weatherconfig["variables_list"][0], yr, mo, day, ), self.datasetfn_opendap( self.weatherconfig["url_opendap"][1], self.weatherconfig["variables_list"][1], yr, mo, day, ), ) ) else: if opendap: logger.warning( "OpenDAP URL not specified for given dataset. " "Defaulting to standard download." ) self.toDownload.append( ( self.config, filename, self.datasetfn( self.weatherconfig["url"][0], yr, mo, day ), self.datasetfn( self.weatherconfig["url"][1], yr, mo, day ), ) ) else: self.downloadedFiles.append((self.config, filename)) elif self.weatherconfig["file_granularity"] == "monthly": # Monthly files (eg ERA5) mo_tuples = [(yr, mo) for yr in yrs for mo in mos] for mo_tuple in mo_tuples: yr, mo = mo_tuple filename = self.datasetfn(self.weatherconfig["fn"], yr, mo) self.totalFiles.append((self.config, filename)) if not os.path.isfile(filename): self.prepared = False if check_complete: logger.info("File `%s` not found!", filename) incomplete_count += 1 if self.module == "era5": self.toDownload.append((self.config, filename, yr, mo)) else: self.toDownload.append( ( self.config, filename, self.datasetfn(self.weatherconfig["url"], yr, mo), ) ) else: self.downloadedFiles.append((self.config, filename)) elif self.weatherconfig["file_granularity"] == "monthly_multiple": mo_tuples = [(yr, mo) for yr in yrs for mo in mos] for mo_tuple in mo_tuples: yr, mo = mo_tuple filename = self.datasetfn(self.weatherconfig["fn"], yr, mo) self.totalFiles.append((self.config, filename)) if not os.path.isfile(filename): self.prepared = False if check_complete: logger.info("File `%s` not found!", filename) incomplete_count += 1 self.toDownload.append( ( self.config, filename, self.datasetfn(self.weatherconfig["url"][0], yr, mo), self.datasetfn(self.weatherconfig["url"][1], yr, mo), ) ) else: self.downloadedFiles.append((self.config, filename)) if not self.prepared: if xs is None or ys is None: logger.warning( "Arguments `xs` and `ys` not used in preparing dataset. Defaulting to global." ) logger.info("%s files not completed.", incomplete_count) else: logger.info("Directory complete.") def datasetfn(self, fn, *args): """Construct file name from template function in `weather_data_config` and args (yr, mo, day) Args: fn (str): Template function in `weather_data_config` *args: Year, month, day Returns: str: File name """ if len(args) == 3: dataset = dict(year=args[0], month=args[1], day=args[2]) elif len(args) == 2: dataset = dict(year=args[0], month=args[1]) else: return False if self.dataset_module.spinup_var: spinup = self.dataset_module.spinup_year(dataset["year"], dataset["month"]) dataset.update({"spinup": spinup}) return fn.format_map(dataset) def datasetfn_opendap(self, fn, variables, *args): """Construct url for OpenDap protocol. Depends on datasetfn() Args: fn (str): Template function in `weather_data_config` variables (list): List of variables to download *args: Year, month, day Returns: str: URL """ # MERRA2 requires uppercase variable names if self.module == "merra2": variables = [v.upper() for v in variables] # TODO: Add lat lon subsetting based on self.bounds # opendap URL format requires knowing the indexes in MERRA2 -- not just lat, lon bounds -- # e.g., ~/MERRA2/M2T1NXSLV.5.12.4/2021/01/MERRA2_400.tavg1_2d_slv_Nx.20210101.nc4.nc4?T2M[0:23][180:360][288:575],time,lat[180:360],lon[288:575] # noqa: E501 if self.bounds is not None: logger.info("Bounds not used in OpenDAP call. Defaulting to global.") fn = fn + "?" + ",".join(variables) + ",time,lat,lon" args = list(args) return self.datasetfn(fn, *args) def get_data(self, trim=True, testing=False): """Download data from server. Args: trim (bool, optional): Trim variables in file. Defaults to True. testing (bool, optional): Download only first file. Defaults to False. """ if testing is True: if len(self.downloadedFiles) > 0: logger.info("First file for testing has already been downloaded.") return else: self.toDownload = [self.toDownload[0]] self.totalFiles = [self.totalFiles[0]] self.testDataset = True api_func = self.weatherconfig["api_func"] if self.module == "era5": api_func( self.toDownload, self.bounds, self.weatherconfig["keywords"], self.weatherconfig["product"], self.weatherconfig["product_type"], self.downloadedFiles, ) elif self.module == "merra2": api_func( self.toDownload, self.weatherconfig["file_granularity"], self.downloadedFiles, ) if self.downloadedFiles == self.totalFiles: self.prepared = True if trim is True and self.module not in config.untrimmable_datasets: self.trim_variables() def trim_variables(self): """Reduce size of file by trimming variables in file.""" for f in self.downloadedFiles: file_path = f[1] variables = self.weatherconfig["variables"] with xr.open_dataset(file_path) as ds: var_rename = dict((v, v.lower()) for v in list(ds.data_vars)) ds = ds.rename(var_rename) ds = ds[variables] fd, target = mkstemp(suffix=".nc4") os.close(fd) ds.to_netcdf(target) shutil.move(target, file_path) @property def meta_data_config(self): """Metadata configuration for dataset.""" return dict( tasks_func=self.weatherconfig["tasks_func"], prepare_func=self.weatherconfig["meta_prepare_func"], template=self.weatherconfig["template"], file_granularity=self.weatherconfig["file_granularity"], ) @property def weather_data_config(self) -> dict: """Weather data config for dataset.""" return self.dataset_module.weather_data_config @property def projection(self) -> str: """Projection for dataset.""" return self.dataset_module.projection # TODO: meta property/attribute does not exists here! # @property # def coords(self): # """Coordinates for dataset.""" # return self.meta.coords # pylint: disable=no-member @property def shape(self): """The shape of the Cutout by (y, x).""" return len(self.coords["y"]), len(self.coords["x"]) @property def extent(self) -> Iterable[float]: """The extent of the Cutout by (x_min, x_max, y_min, y_max).""" return list(self.coords["x"].values[[0, -1]]) + list( self.coords["y"].values[[-1, 0]] ) def grid_coordinates(self): """Return grid coordinates of the Cutout.""" xs, ys = np.meshgrid(self.coords["x"], self.coords["y"]) return np.asarray((np.ravel(xs), np.ravel(ys))).T def grid_cells(self): """Return grid cells of the Cutout.""" coords = self.grid_coordinates() span = (coords[self.shape[1] + 1] - coords[0]) / 2 return [box(*c) for c in np.hstack((coords - span, coords + span))] def __repr__(self): return "<Dataset {} years={}-{} months={}-{} datadir={} {}>".format( self.module, self.years.start, self.years.stop, self.months.start, self.months.stop, self.datadir, "prepared" if self.prepared else "unprepared", )