Source code for geodata.preparation

# Copyright 2016-2017 Gorm Andresen (Aarhus University), Jonas Hoersch (FIAS), Tom Brown (FIAS)

# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""Various Preparation Functions for Geodata's modules"""

import calendar
import logging
import os
import shutil
import subprocess
import tempfile
from glob import glob
from multiprocessing import Pool

import dask
import numpy as np
import pandas as pd
import xarray as xr

[docs] logger = logging.getLogger(__name__)
def cutout_do_task(task, write_to_file=True): task = task.copy() prepare_func = task.pop("prepare_func") if write_to_file: datasetfns = task.pop("datasetfns") # Force dask to use just one thread (to save memory) with dask.config.set(scheduler="single-threaded"): try: data = prepare_func(**task) if data is None: data = [] if write_to_file: for yearmonth, ds in data: if ds is None: continue fn = datasetfns[yearmonth] # type: ignore logger.debug("Writing to %s", os.path.basename(fn)) ds.to_netcdf(fn) logger.debug( "Write variable(s) %s to %s generated by %s", ", ".join(ds.data_vars), os.path.basename(fn), prepare_func.__name__, ) else: return data except Exception as e: logger.exception( "Exception occured in the task with prepare_func `%s`: %s", prepare_func.__name__, e.args[0], ) raise e def cutout_prepare(cutout, overwrite=False, nprocesses=None, gebco_height=False): """ Main preparation function """ if cutout.prepared and not overwrite: logger.info( "The cutout is already prepared. If you want to recalculate it, supply an `overwrite=True` argument." ) return True if cutout.empty is True: logger.warning( "Cutout dimensions are empty. One or more of xs, ys, or time are not available in Dataset." ) return False logger.info("Starting preparation of cutout '%s'", cutout.name) cutout_dir = cutout.cutout_dir yearmonths = cutout.coords["year-month"].to_index() xs = cutout.meta.indexes["x"] ys = cutout.meta.indexes["y"] # 1. (here) uncomment and change yearmonths to only new ones. delete yearmonths line above # 2. (elsewhere) uncomment meta_append / append lines # if cutout.meta_append == 1: # # appended meta. prepare only new year-months # yearmonths = cutout.coords['year-month'].to_index() # else: # yearmonths = cutout.coords['year-month'].to_index() if gebco_height: logger.info("Interpolating gebco to the dataset grid") cutout.meta["height"] = _prepare_gebco_height(xs, ys) # Check if cutout_dir exists if os.path.isdir(cutout_dir): # Delete all files except meta.nc logger.debug("Deleting cutout files in '%s'", cutout_dir) for delete_file in glob(os.path.join(cutout_dir, "*.*")): if not delete_file.endswith("meta.nc"): logger.debug(delete_file) os.remove(delete_file) # shutil.rmtree(cutout_dir) else: os.mkdir(cutout_dir) # # Moved to cutout __init__: Write meta file # (cutout.meta_clean # .unstack('year-month') # .to_netcdf(cutout.datasetfn())) # Compute data and fill files tasks = [] # for series in itervalues(cutout.weather_data_config[cutout.config]): # dict of tasks w/structure (tasks_func, prepare_func) # .. could be one task and prepare (eg prepare_month_era5) series = cutout.weather_data_config[cutout.config] series["meta_attrs"] = cutout.meta.attrs tasks_func = series["tasks_func"] # form call to task_func (eg tasks_monthly_merra2) # .. **series contains prepare_func # returns: dict(prepare_func=prepare_func, xs=xs, ys=ys, year=year, month=month) # .. or: dict(prepare_func=prepare_func, xs=xs, ys=ys, fn=next(glob...), engine=engine, yearmonth=ym) tasks += tasks_func(xs=xs, ys=ys, yearmonths=yearmonths, **series) for i, t in enumerate(tasks): def datasetfn_with_id(ym): # returns a filename with incrementing id at end eg `201101-01.nc` base, ext = os.path.splitext(cutout.datasetfn(ym)) return f"{base}-{i}{ext}" t["datasetfns"] = {ym: datasetfn_with_id(ym) for ym in yearmonths.tolist()} # TODO: Using multiple processes will cause issues with geodata currently # because geodata write the metadata dataframe to the cutout directory # in every instantiation of a new Cutout object in an unprepared state. # By running multiple processes, the metadata file will be overwritten # and there are no locking mechanism currently in place. # As the result, we will run the tasks in a single process for now. # In the future, we can consider doing it in a multiprocessing scheme as long as # we can ensure that the metadata file is being processed in places other than # the constructor. # logger.info( # "%d tasks have been collected. Starting running them on %s.", # len(tasks), # f"{nprocesses} processes" if nprocesses is not None else "all processors", # ) pool = Pool(processes=1) try: pool.map(cutout_do_task, tasks) except Exception as e: pool.terminate() logger.info( "Preparation of cutout '%s' has been interrupted by an exception. " "Purging the incomplete cutout_dir.", cutout.name, ) shutil.rmtree(cutout_dir) raise e pool.close() logger.info("Merging variables into monthly compound files") for fn in map(cutout.datasetfn, yearmonths.tolist()): # Find all files with yearmonth prefix eg `201101-XX.nc` base, ext = os.path.splitext(fn) fns = glob(base + "-*" + ext) if len(fns) == 1 and not gebco_height: # Just a single file. Simply rename os.rename(fns[0], fn) else: # Multiple files for yearmonth # open_mfdataset: auto-magically determines appropriate concat and merge of datasets with xr.open_mfdataset(fns, combine="by_coords") as ds: if gebco_height: ds["height"] = cutout.meta["height"] ds.to_netcdf(fn) for tfn in fns: os.unlink(tfn) logger.debug("Completed files %s", os.path.basename(fn)) logger.info("Cutout '%s' has been successfully prepared", cutout.name) cutout.prepared = True def cutout_produce_specific_dataseries(cutout, yearmonth, series_name): xs = cutout.coords["x"] ys = cutout.coords["y"] series = cutout.weather_data_config[series_name].copy() series["meta_attrs"] = cutout.meta.attrs tasks_func = series["tasks_func"] tasks = tasks_func(xs=xs, ys=ys, yearmonths=[yearmonth], **series) assert len(tasks) == 1 data = cutout_do_task(tasks[0], write_to_file=False) assert len(data) == 1 and data[0][0] == yearmonth # type: ignore return data[0][1] # type: ignore def cutout_get_meta(cutout, xs, ys, years, months=None, **dataset_params): # called in cutout.py as `get_meta()` # Loads various metadata (coordinates, dims...) from dataset via dataset_module.prepare_func (eg prepare_meta_merra2) if months is None: months = slice(1, 12) ys = _prepare_lat_direction(cutout.dataset_module.lat_direction, ys) meta_kwds = cutout.meta_data_config.copy() meta_kwds.update(dataset_params) # Assign task function here? tasks_func = meta_kwds["tasks_func"] # noqa: F841 # test before removing # Get metadata prepare_func = meta_kwds.pop("prepare_func") ds = prepare_func(xs=xs, ys=ys, year=years.stop, month=months.stop, **meta_kwds) ds.attrs.update(dataset_params) # Check if cutout dimenions are empty dims = [len(ds.indexes[i]) for i in ds.indexes] if np.prod(dims) == 0: logger.warning( "Cutout dimensions are empty. One or more of xs, ys, or time are not available in Dataset." ) cutout.empty = True # with metadata, load various parameters meta_file_granularity = meta_kwds["file_granularity"] month_start = pd.Timestamp(f"{years.stop}-{months.stop}") ds.coords["year"] = range(years.start, years.stop + 1) ds.coords["month"] = range(months.start, months.stop + 1) if meta_file_granularity == "daily": start, second, end = map(pd.Timestamp, ds.coords["time"].values[[0, 1, -1]]) offset_start = start - month_start offset_end = end - (month_start + pd.offsets.MonthBegin()) step = (second - start).components.hours ds.coords["time"] = pd.date_range( start=pd.Timestamp(f"{years.start}-{months.start}") + offset_start, end=(month_start + pd.offsets.MonthBegin() + offset_end), freq="h" if step == 1 else f"{step}h", ) elif meta_file_granularity == "dailymeans": ds.coords["time"] = pd.date_range( start=pd.Timestamp(f"{years.start}-{months.start}-1"), end=pd.Timestamp( f"{years.stop}-{months.stop}-{calendar.monthrange(years.stop, months.stop)[1]}" ), freq="d", ) elif meta_file_granularity == "monthly": ds.coords["time"] = pd.date_range( start=pd.Timestamp(f"{years.start}-{months.start}"), end=pd.Timestamp(f"{years.stop}-{months.stop}"), freq="MS", ) ds = ds.stack(**{"year-month": ("year", "month")}) # if cutout.meta_append == 1: # # Append to existing meta # (ds # .unstack('year-month') # .to_netcdf(cutout.datasetfn('meta','2')) ) # # with xr.open_mfdataset([cutout.datasetfn(),cutout.datasetfn('meta','2')], combine='by_coords') as ds_comb: # ds = ds_comb # ds = ds.stack(**{'year-month': ('year', 'month')}) return ds def cutout_get_meta_view( cutout, xs=None, ys=None, years=slice(None), months=slice(None), **dataset_params ): # called in cutout as `get_meta_view()` # Create subset of metadata based on xs, ys, years, months # Returns None if any of the dimensions of the subset are empty meta = cutout.meta meta.attrs["view"] = {} if xs is not None: meta.attrs.setdefault("view", {})["x"] = xs if ys is not None: meta.attrs.setdefault("view", {})["y"] = _prepare_lat_direction( cutout.dataset_module.lat_direction, ys ) meta = ( meta.unstack("year-month") .sel(year=years, month=months, **meta.attrs.get("view", {})) .stack(**{"year-month": ("year", "month")}) ) meta = meta.sel( time=slice( *( f"{ym[0]:04d}-{ym[1]:02d}" for ym in meta["year-month"][[0, -1]].to_index() ) ) ) # Check if returned non-zero subset # Future work: can check if whole subset is available dim_len = [len(meta.indexes[i]) for i in meta.indexes] if all(d > 0 for d in dim_len): return meta else: logger.info(dim_len) return None def _prepare_gebco_height(xs, ys, gebco_fn=None): # gebco bathymetry heights for underwater if gebco_fn is None: from .config import gebco_path gebco_fn = gebco_path tmpdir = tempfile.mkdtemp() cornersc = np.array(((xs[0], ys[0]), (xs[-1], ys[-1]))) minc = np.minimum(*cornersc) maxc = np.maximum(*cornersc) span = (maxc - minc) / (np.asarray((len(xs), len(ys))) - 1) minx, miny = minc - span / 2.0 maxx, maxy = maxc + span / 2.0 tmpfn = os.path.join(tmpdir, "resampled.nc") try: ret = subprocess.call( [ "gdalwarp", "-of", "NETCDF", "-ts", str(len(xs)), str(len(ys)), "-te", str(minx), str(miny), str(maxx), str(maxy), "-r", "average", gebco_fn, tmpfn, ] ) assert ret == 0, "gdalwarp was not able to resample gebco" except OSError: logger.warning( "gdalwarp was not found for resampling gebco. " "Next-neighbour interpolation will be used instead!" ) tmpfn = gebco_path # type: ignore with xr.open_dataset(tmpfn) as ds_gebco: height = ( ds_gebco.rename({"lon": "x", "lat": "y", "Band1": "height"}) .reindex(x=xs, y=ys, method="nearest") .load()["height"] ) shutil.rmtree(tmpdir) return height def _prepare_lat_direction(lat_direction, ys): # Check direction of latitudes encoded in dataset, flip if necessary if not lat_direction and ys.stop > ys.start: ys = slice(ys.stop, ys.start, -ys.step if ys.step is not None else None) if lat_direction and ys.stop < ys.start: ys = slice(ys.stop, ys.start, ys.step if ys.step is not None else None) return ys