import datetime
import pathlib as pl
from copy import deepcopy
from typing import Union
from warnings import warn
import numpy as np
import yaml
from ..base import meta
from ..constants import fileish
from ..utils.path import assert_exists, dict_pl_to_str, path_rel_to_yaml
from ..utils.time_utils import (
datetime_dowy,
datetime_doy,
datetime_epiweek,
datetime_month,
datetime_year,
)
from ..utils.utils import diff_dicts
from .accessor import Accessor
# This is the list of control variables currently used by pywatershed
# It is important to maintain this list to issue warnings about what
# variables are unrecognized/ignored in legacy and non-legacy control
# files
# The following are duplicated in the Control docstring below and that
# docstring needs updated whenever any of these change.
pws_control_options_avail = [
"imbalance_behavior",
"calc_method",
"dprst_flag",
"input_dir", #
"input_file",
"intcp_changeover_in_net_rain",
"netcdf_output_dir",
"netcdf_output_var_names",
"netcdf_output_separate_files",
"parameter_file",
"restart_read",
"restart_write",
"restart_write_freq",
"start_time",
"stream_temp_flag",
"stream_temp_shade_flag",
"streamflow_module",
"strmtemp_humidity_flag",
"time_step_units",
"verbosity",
"dyn_ag_frac_flag",
"ag_frac_dynamic",
"AET_cbh_file",
"iter_aet_flag",
"executable_desc",
"init_vars_from_file",
"var_save_file",
]
prms_legacy_options_avail = [
"dprst_flag",
"end_time",
"initial_deltat",
"nhruOutBaseFileName",
"nhruOutVar_names",
"nsegmentOutBaseFileName",
"nsegmentOutVar_names",
"param_file",
"start_time",
"stream_temp_flag",
"stream_temp_shade_flag",
"strmflow_module",
"strmtemp_humidity_flag",
"print_debug",
"dyn_ag_frac_flag",
"ag_frac_dynamic",
"AET_cbh_file",
"iter_aet_flag",
"executable_desc",
"init_vars_from_file",
"var_save_file",
]
prms_to_pws_option_map = {
# "init_vars_from_file": "restart",
"initial_deltat": "time_step",
"nhruOutBaseFileName": "netcdf_output_dir",
"nhruOutVar_names": "netcdf_output_var_names",
"nsegmentOutBaseFileName": "netcdf_output_dir",
"nsegmentOutVar_names": "netcdf_output_var_names",
"param_file": "parameter_file",
"print_debug": "verbosity",
"strmflow_module": "streamflow_module",
}
assert (
len(set(prms_to_pws_option_map.keys()) - set(prms_legacy_options_avail))
== 0
)
[docs]
class Control(Accessor):
"""Control manages global time and options, and provides metadata.
Args:
init_time: The initialization or restart time. Typically one day prior
to start_time (but not necessarily).
start_time: The first time of model integration, NOT the restart or
init time.
end_time: The final integration time of the run.
time_step: Length of the time step (not currently optional).
options: A dictionary of global Process options.
Available pywatershed options:
* imbalance_behavior: one of [None, "warn", "error"]
* calc_method: one of ["numpy", "numba", "fortran"]
* dprst_flag: boolean if depression storage is included (true) or not.
* input_dir: str or pathlib.path directory to search for input data. Use
exactly one of input_dir or input_file.
* input_file: str or pathlib.path file name containing all input data.
Use exactly one of input_dir or input_file.
* netcdf_output_dir: str or pathlib.Path directory for output
* netcdf_output_var_names: a list of variable names to output
* netcdf_output_separate_files: bool if output is grouped by Process or
if each variable is written to an individual file
* parameter_file: the name of a parameter file to use
* streamflow_module: the selected streamflow module in PRMS.
* start_time: np.datetime64
* end_time: np.datetime64
* time_step_units: str containing single character code for
np.timedelta64
* verbosity: 0-10
Available PRMS legacy options:
Either used as-is or mapped to pywatershed options as indicated below.
* start_time
* end_time
* dprst_flag: integer is converted to boolean.
* initial_deltat: translates to "time_step"
* init_vars_from_file: translates to "restart"
* nhruOutBaseFileName: translates to "netcdf_output_dir"
* nhruOutVar_names: translates to a subset of "netcdf_output_var_names"
* nsegmentOutBaseFileName: translates to "netcdf_output_dir"
* nsegmentOutVar_names: translates to a subset of
"netcdf_output_var_names"
* param_file: translates to "parameter_file"
* print_debug: translates to "verbosity"
Examples:
---------
>>> import pathlib as pl
>>>
>>> import numpy as np
>>> import pywatershed as pws
>>>
>>> control = pws.Control(
... start_time=np.datetime64("2023-01-01T00:00:00"),
... end_time=np.datetime64("2023-01-02T00:00:00"),
... time_step=np.timedelta64(24, "h"),
... options={"input_dir": pl.Path("./input")},
... )
>>>
>>> # A more interesting example reads from a PRMS control file
>>> pws_root = pws.constants.__pywatershed_root__
>>> drb_control_file = pws_root / "data/drb_2yr/control.test"
>>> control_drb = pws.Control.load_prms(
... drb_control_file,
... warn_unused_options=False,
... )
>>> control_drb.current_time
numpy.datetime64('1978-12-31T00:00:00')
>>> control_drb.previous_time
>>> control_drb.init_time
numpy.datetime64('1978-12-31T00:00:00')
>>> control_drb.time_step
numpy.timedelta64(24,'h')
>>> control_drb.time_step_seconds
86400.0
>>> control_drb.start_time
numpy.datetime64('1979-01-01T00:00:00')
>>> control_drb.advance()
>>> control_drb.current_time
numpy.datetime64('1979-01-01T00:00:00')
>>> control_drb.current_doy
1
>>> control_drb.current_dowy
93
>>> control_drb.previous_time
numpy.datetime64('1978-12-31T00:00:00')
"""
[docs]
def __init__(
self,
start_time: np.datetime64,
end_time: np.datetime64,
time_step: np.timedelta64,
init_time: np.datetime64 = None,
options: dict = None,
only_warn_invalid: bool = False,
):
super().__init__()
self.name = "Control"
if end_time < start_time:
raise ValueError("end_time < start_time")
n_times_m1 = (end_time - start_time) / time_step
if n_times_m1 != int(n_times_m1):
raise ValueError("time_step does not divide end_time - start_time")
self._start_time = start_time
self._end_time = end_time
self._time_step = time_step
self._n_times = int(n_times_m1) + 1
if init_time:
self._init_time = init_time
else:
self._init_time = self._start_time - time_step
self._current_time = self._init_time
self._previous_time = None
self._itime_step = -1
self._only_warn_invalid = only_warn_invalid
if options is None:
options = OptsDict()
self.options = options
self.meta = meta
# This will have the time dimension name
# This will have the time coordimate name
[docs]
@classmethod
def load(
cls,
control_file: fileish,
warn_unused_options: bool = True,
) -> "Control":
msg = "Control.load will be deprecated for Control.load_prms"
warn(msg, PendingDeprecationWarning)
return Control.load_prms(
control_file, warn_unused_options=warn_unused_options
)
[docs]
@classmethod
def load_prms(
cls,
control_file: fileish,
warn_unused_options: bool = True,
keep_unused_options: bool = False,
) -> "Control":
"""Initialize a control object from a PRMS control file.
Args:
control_file: PRMS control file
warn_unused_options: bool if warnings are to be issued for unused
options from the PRMS control file. Recommended and True by
default. See Control for a list of used/available legacy
options. "Invalid" options will still be warned and can only be
silenced by trapping warnings.
keep_unused_options: Retain all key information in the control
file. "Invalid" options will still be warned and can only be
silenced by trapping warnings.
Returns:
An instance of a Control object.
"""
from ..utils import ControlVariables
control = ControlVariables.load(control_file)
if warn_unused_options:
for vv in control.control.keys():
if vv not in prms_legacy_options_avail:
msg = (
f"Option '{vv}' in supplied control file is not used "
"by pywatershed"
)
warn(msg, RuntimeWarning)
opts = control.control
opt_names = list(opts.keys())
for oo in opt_names:
if oo not in prms_legacy_options_avail:
if not keep_unused_options:
del opts[oo]
if oo in prms_to_pws_option_map.keys():
pws_option_key = prms_to_pws_option_map[oo]
val = opts[oo]
del opts[oo]
if pws_option_key in opts.keys():
# combine to a list with only unique entries
# use value instead of list if only one value in list
opt_val = opts[pws_option_key]
if isinstance(opt_val, np.ndarray):
opt_val = opt_val.tolist()
elif isinstance(opt_val, str):
opt_val = [opt_val]
opts[pws_option_key] = list(set(opt_val + val.tolist()))
if len(opts[pws_option_key]) == 1:
opts[pws_option_key] = opts[pws_option_key][0]
else:
opts[pws_option_key] = val
# some special cases
if (
pws_option_key
in [
"parameter_file",
"netcdf_output_dir",
"streamflow_module",
]
and len(val) == 1
):
opts[pws_option_key] = val[0]
# special cases, unmapped names
if oo == "dprst_flag":
opts[oo] = bool(opts[oo][0])
if oo == "iter_aet_flag":
opts[oo] = bool(opts[oo][0])
start_time = control.control["start_time"]
end_time = control.control["end_time"]
del control.control["start_time"]
del control.control["end_time"]
# sometimes initial_deltat is missing... ?
if "time_step" in control.control.keys():
time_step = control.control["time_step"]
del control.control["time_step"]
else:
time_step = np.timedelta64(24, "h")
return cls(
start_time=start_time,
end_time=end_time,
time_step=time_step,
options=control.control,
only_warn_invalid=keep_unused_options,
)
def _set_options(self, options: dict):
if not isinstance(options, (OptsDict, dict)):
raise ValueError("control.options must be a dictionary")
valid_options = OptsDict(self._only_warn_invalid)
for key, val in options.items():
valid_options[key] = val
return valid_options
def __setitem__(self, key, value) -> None:
if key == "options":
value = self._set_options(value)
super().__setitem__(key, value)
return None
def __setattr__(self, name, value) -> None:
if name == "options":
value = self._set_options(value)
super().__setattr__(name, value)
return None
def __copy__(self):
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
def __deepcopy__(self, memo):
del self.meta
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
setattr(result, k, deepcopy(v, memo))
self.meta = meta
result.meta = meta
return result
@property
def current_time(self) -> np.datetime64:
"""Get the current time."""
return self._current_time
@property
def current_datetime(self) -> datetime.datetime:
"""Get the current time as a datetime.datetime object"""
return self._current_time.astype(datetime.datetime)
@property
def current_year(self) -> int:
"""Get the current year."""
return datetime_year(self._current_time)
@property
def current_month(self) -> int:
"""Get the current month in 1-12 (unless zero based)."""
return datetime_month(self._current_time)
@property
def current_doy(self) -> int:
"""Get the current day of year in 1-366 (unless zero based)."""
return datetime_doy(self._current_time)
@property
def current_jsol(self) -> int:
"""The number of days since the last winter solstice.
Currently ASSUMES Norther Hemisphere.
This is based on GSFLOW 2.4.0:rc/prms/sm_utils_prms.f90:L470
julian_day function with argument "solar".
"""
from ..utils.time_utils import datetime_jsol
return datetime_jsol(self.current_time)
@property
def current_dowy(self) -> int:
"""Get the current day of water year in 1-366 (unless zero-based)."""
return datetime_dowy(self._current_time)
@property
def current_epiweek(self) -> int:
"""Get the current epiweek [1, 53]."""
return datetime_epiweek(self._current_time)
@property
def previous_time(self) -> np.datetime64:
"""The previous time."""
return self._previous_time
@property
def itime_step(self) -> int:
"""The counth of the current time [0, self.n_times-1]"""
return self._itime_step
@property
def time_step(self) -> int:
"""The time step"""
return self._time_step
@property
def init_time(self) -> np.datetime64:
"""Get the simulation initialization time"""
return self._init_time
@property
def start_time(self) -> np.datetime64:
"""The simulation start time"""
return self._start_time
@property
def start_doy(self) -> int:
"""The simulation start day of year."""
return datetime_doy(self._start_time)
@property
def start_month(self) -> int:
"""The simulation start month."""
return datetime_month(self._start_time)
@property
def end_time(self) -> np.datetime64:
"""The simulation end time."""
return self._end_time
@property
def n_times(self) -> int:
"""The number of time steps."""
return self._n_times
@property
def time_step_seconds(self) -> np.float64:
"""The timestep length in units of seconds."""
return self.time_step / np.timedelta64(1, "s")
[docs]
def advance(self) -> None:
"""Advance time."""
if self._current_time == self._end_time:
raise ValueError("End of time reached")
self._itime_step += 1
if self._current_time is None:
self._current_time = self._start_time
return
self._previous_time = self._current_time
self._current_time += self.time_step
return None
[docs]
def edit_init_start_times(
self,
new_init_time: np.datetime64,
new_start_time: np.datetime64 = None,
) -> None:
"""Supply a new init and start times for the simulation.
The initialization time is the time of the restart files. The start
time is generally one timestep later, the time of the end of the first
model advance but not necessarily so. Initialization could happen from
an arbitrarty time/state. If new_start_time is not supplied, it is
calculated as one timestep greater than the required new_init_time.
Args:
new_init_time: The time of the restart files to use.
new_start_time: The new time after the first advance of the model.
"""
if self._itime_step > -1:
warn("Control object can not be edited after first advance")
return
self._init_time = new_init_time
self._current_time = self._init_time
if new_start_time is not None:
self._start_time = new_start_time
else:
self._start_time = self._init_time + self._time_step
msg = (
"You may need to use edit_end_time before using "
"edit_init_start_times."
)
assert self._end_time - self._start_time >= 0, msg
self._n_times = (
int((self._end_time - self._start_time) / self._time_step) + 1
)
return None
[docs]
def edit_end_time(self, new_end_time: np.datetime64) -> None:
"""Supply a new end time for the simulation.
Args:
new_end_time: the new time at which to end the simulation.
"""
if self._itime_step > -1:
warn("Control object can not be edited after first advance")
return
self._end_time = new_end_time
assert self._end_time - self._start_time >= 0
self._n_times = (
int((self._end_time - self._start_time) / self._time_step) + 1
)
return None
[docs]
def edit_n_time_steps(self, new_n_time_steps: int) -> None:
"""Supply a new number of timesteps to change the simulation end time.
Args:
new_n_time_steps: The new number of timesteps.
"""
self._n_times = new_n_time_steps
self._end_time = (
self._start_time + (self._n_times - 1) * self._time_step
)
return None
def __str__(self):
from pprint import pformat
return pformat(self.to_dict())
def __repr__(self):
# TODO: this is not really an object representation
return self.__str__()
[docs]
def to_dict(self, deep_copy=True) -> dict:
"""Export a control object to a dictionary
Args:
deep_copy: If the dictionary should be a deep copy or not.
"""
control_dict = {}
# I suppose this list could grow with time but these are
# the only non .option items in __dict__ required to reconstitute a
# Control instance
control_dict["start_time"] = str(self.start_time)
control_dict["end_time"] = str(self.end_time)
control_dict["time_step"] = str(self.time_step)[0:2]
control_dict["time_step_units"] = str(self.time_step)[3:4]
if deep_copy:
control = deepcopy(self)
else:
control = self
control_dict["options"] = {}
for kk, vv in control.options.items():
control_dict["options"][kk] = control.options[kk]
return control_dict
[docs]
def to_yaml(self, yaml_file: Union[pl.Path, str]) -> None:
"""Export to a yaml file
Args:
yaml_file: The file to write to.
Note: This flattens .options to the top level of the yaml/dict
so that option keys are all at the same level as "start_time",
"end_time", "time_step", and "time_step_units". Using .from_yaml
will restore options to a nested dictionary.
Args:
yaml_file: pl.Path or str to designate the output path/file.
"""
control_dict = dict_pl_to_str(self.to_dict())
opts = control_dict["options"]
for kk, vv in opts.items():
if kk in control_dict.keys():
msg = "Control option keys collide with non-option keys"
raise ValueError(msg)
control_dict[kk] = vv
del control_dict["options"]
yaml_file = pl.Path(yaml_file)
with open(yaml_file, "w") as file:
_ = yaml.dump(control_dict, file)
assert yaml_file.exists()
return None
[docs]
@staticmethod
def from_yaml(yaml_file: Union[str, pl.Path]) -> "Control":
"""Instantate a Control object from a yaml file
Args:
yaml_file: a yaml file to parse.
Required key:value pairs:
* imbalance_behavior: None | "warn" | "error"
* calc_method: None | "numpy" | "numba" | "fortran" (depending on
availability)
* end_time: ISO8601 string for numpy datetime64, e.g.
1980-12-31T00:00:00.
* start_time: ISO8601 string for numpy datetime64, e.g.
1979-01-01T00:00:00.
* time_step: The first argument to get a numpy.timedelta64, e.g. 24
* time_step_units: The second argument to get a numpy.timedelta64,
e.g. 'h'.
* verbosity: integer 0-10
Optional key, value pairs:
* input_dir: path relative to this file. Use exactly one of
input_dir or input_file.
* input_file: path relative to this file. Use exactly one of
input_dir or input_file.
* netcdf_output: boolean
* netcdf_output_var_names: list of variable names to output, e.g.
- albedo
- cap_infil_tot
- contrib_fraction
Returns:
Control object
"""
import yaml
with pl.Path(yaml_file).open("r") as file_stream:
control_dict = yaml.load(file_stream, Loader=yaml.Loader)
start_time = np.datetime64(control_dict["start_time"])
end_time = np.datetime64(control_dict["end_time"])
time_step = np.timedelta64(
control_dict["time_step"], control_dict["time_step_units"]
)
del control_dict["start_time"]
del control_dict["end_time"]
del control_dict["time_step"]
del control_dict["time_step_units"]
paths_to_convert = ["input_dir", "input_file"]
for path_name in paths_to_convert:
if path_name in control_dict.keys():
control_dict[path_name] = path_rel_to_yaml(
control_dict[path_name], yaml_file
)
assert_exists(control_dict[path_name])
control = Control(
start_time,
end_time,
time_step,
options=control_dict,
)
return control
[docs]
def diff(self, other) -> None:
"""Diff self with another Control instance
Args:
other: An other dict against which to compare.
"""
diff_dicts(self.__dict__, other.__dict__, ["options"])
diff_dicts(self.options, other.options)
return
class OptsDict(dict):
def __init__(self, only_warn: bool = False):
super().__init__()
self._only_warn = only_warn
return
def __setitem__(self, key, value):
if key not in pws_control_options_avail:
msg = f"'{key}' is not an available control option"
if not self._only_warn:
raise NameError(msg)
else:
warn(msg)
super().__setitem__(key, value)
return None