from typing import Literal
import numpy as np
from pywatershed.base.adapter import adaptable
from pywatershed.base.budget import Budget
from pywatershed.base.conservative_process import ConservativeProcess
from pywatershed.base.control import Control
from pywatershed.base.flow_graph import FlowNode, FlowNodeMaker
from pywatershed.constants import (
cf_to_cm,
cfs_to_cms,
cm_to_cf,
cms_to_cfs,
nan,
nan1d,
one,
zero,
)
from pywatershed.parameters import Parameters
from pywatershed.utils.time_utils import datetime_epiweek
# Units note:
# All internal Starfit calculations are done in cms and MCM here. This is not
# consistent with the units used in pywatershed. The option io_in_cfs exists
# to bridge these two. Generally the conversions to and from cfs should happen
# in prepare and finalize of subtimesteps, when possible.
# Further, a variety of units is used but regression tests against the original
# code fail when composite conversion factors are used, apparently the order
# matters against the original.
# MCM = million cubic meters
# m3/day = m3pd
# m^3/second = m3ps
# m^3/week = m2pw
# local constant
omega = 1.0 / 52.0
metadata_patches_cfs = {
"lake_inflow": {"units": "cfs"},
"lake_outflow": {"units": "cfs"},
"lake_release": {"units": "cfs"},
"lake_spill": {"units": "cfs"},
"lake_storage": {"units": "cubic feet"},
"lake_storage_change": {"units": "cfs"},
"lake_storage_change_flow_units": {"units": "cfs"},
"lake_storage_old": {"units": "cubic feet"},
}
[docs]
class Starfit(ConservativeProcess):
"""starfit: Storage Targets And Release Function Inference Tool
Sean W.D. Turner, Jennie Clarice Steyaert, Laura Condon, Nathalie Voisin,
Water storage and release policies for all large reservoirs of conterminous
United States, Journal of Hydrology, Volume 603, Part A, 2021, 126843,
ISSN 0022-1694, https://doi.org/10.1016/j.jhydrol.2021.126843.
https://github.com/IMMM-SFA/starfit
Adapted from STARFIT implementation in the MOSART-WM model:
https://github.com/IMMM-SFA/mosartwmpy/blob/main/mosartwmpy/reservoirs/istarf.py
Noah Knowles (USGS) and James McCreight (UCAR/USGS)
Args:
control: a Control object
discretization: a discretization of class Parameters
parameters: a parameter object of class Parameters
lake_inflow: Daily lake inflow
imbalance_behavior: one of ["defer", None, "warn", "error"]
with "defer" being the default and defering to
control.options["imbalance_behavior"] when available. When
control.options["imbalance_behavior"] is not avaiable,
imbalance_behavior is set to "warn".
verbose: Print extra information or not?
load_n_time_batches: not-implemented
"""
[docs]
def __init__(
self,
control: Control,
discretization: Parameters,
parameters: Parameters,
lake_inflow: adaptable,
imbalance_behavior: Literal["defer", None, "warn", "error"] = "defer",
input_aliases: dict = None,
verbose: bool = False,
load_n_time_batches: int = 1,
io_in_cfs: bool = True,
) -> None:
metadata_patches = None
metadata_patch_conflicts = "error"
if io_in_cfs:
metadata_patches = metadata_patches_cfs
metadata_patch_conflicts = "left"
super().__init__(
control=control,
discretization=discretization,
parameters=parameters,
metadata_patches=metadata_patches,
metadata_patch_conflicts=metadata_patch_conflicts,
input_aliases=input_aliases,
)
del metadata_patches, metadata_patch_conflicts
self.name = "Starfit"
self._set_inputs(locals())
self._set_options(locals())
self._set_budget(basis="unit", ignore_nans=True)
self.m3ps_to_MCM_day = 24 * 60 * 60 / 1.0e6
self.MCM_to_m3ps_day = 1.0 / self.m3ps_to_MCM_day
return
[docs]
@staticmethod
def get_dimensions() -> tuple:
"""Get a tuple of dimension names for this Starfit."""
return ("nreservoirs",)
[docs]
@staticmethod
def get_parameters() -> tuple:
"""Get a tuple of parameter names for Starfit"""
return (
"grand_id",
"initial_storage",
"start_time",
"end_time",
"inflow_mean",
"NORhi_min",
"NORhi_max",
"NORhi_alpha",
"NORhi_beta",
"NORhi_mu",
"NORlo_min",
"NORlo_max",
"NORlo_alpha",
"NORlo_beta",
"NORlo_mu",
"Release_min",
"Release_max",
"Release_alpha1",
"Release_alpha2",
"Release_beta1",
"Release_beta2",
"Release_p1",
"Release_p2",
"Release_c",
"GRanD_CAP_MCM",
"Obs_MEANFLOW_CUMECS",
)
[docs]
@staticmethod
def get_mass_budget_terms() -> dict:
"""Get a dictionary of variable names for mass budget terms."""
return {
"inputs": [
"lake_inflow",
],
"outputs": [
"lake_release",
"lake_spill",
],
"storage_changes": [
"lake_storage_change_flow_units",
],
}
[docs]
@staticmethod
def get_init_values() -> dict:
return {
"lake_storage": nan,
"lake_storage_old": nan,
"lake_storage_change": nan,
"lake_storage_change_flow_units": nan,
"lake_inflow": nan,
"lake_release": nan,
"lake_spill": nan,
"lake_outflow": nan,
"lake_availability_status": nan,
}
def _set_initial_conditions(self):
# initialize storage when the appropriate time is reached
# currently each reservoir can start at a different time.
self.lake_storage[:] = nan
self.lake_storage_old[:] = nan
# JLM: this is sketchy seems like it should be a pre-process
self.Obs_MEANFLOW_CUMECS = np.where(
np.isnan(self.Obs_MEANFLOW_CUMECS),
self.inflow_mean,
self.Obs_MEANFLOW_CUMECS,
)
wh_initial_storage_nan = np.isnan(self.initial_storage)
self.start_time = np.where(
np.isnat(self.start_time),
self.control.current_time, # one day prior to start time
self.start_time,
)
start_epiweeks = np.array(
[datetime_epiweek(ss) for ss in self.start_time]
)
if len(wh_initial_storage_nan):
min = min_nor(
self.NORlo_max,
self.NORlo_min,
self.NORlo_alpha,
self.NORlo_beta,
self.NORlo_mu,
omega,
start_epiweeks,
)
max = max_nor(
self.NORhi_max,
self.NORhi_min,
self.NORhi_alpha,
self.NORhi_beta,
self.NORhi_mu,
omega,
start_epiweeks,
)
pct_res_cap = (min + max) / 2
nor_mean_cap = self.GRanD_CAP_MCM * pct_res_cap
self._initial_storage = np.where(
wh_initial_storage_nan, nor_mean_cap, self.initial_storage
)
# set lake_storage from initial_storage when start is not available
self.lake_storage[:] = np.where(
np.isnat(self.start_time), self.initial_storage, nan
)
return
def _advance_variables(self) -> None:
"""Advance the starfit variables
Returns:
None
"""
self.lake_storage_change[:] = self.lake_storage - self.lake_storage_old
self.lake_storage_old[:] = self.lake_storage
return
def _calculate(self, simulation_time) -> None:
"""Calculate starfit for a time step (vectorized)
Args:
simulation_time: current simulation time
Returns:
None
"""
self._simulation_time = simulation_time
# For NaTs, time comparisons (other than !=) are False. If there is no
# end_time (it is NaT), then this will be false.
wh_after_end = np.where(self.control.current_time > self.end_time)
self.lake_storage[wh_after_end] = nan
self.lake_release[wh_after_end] = nan
self.lake_spill[wh_after_end] = nan
self.lake_availability_status[wh_after_end] = nan
# For NaTs, time comparisons (other than !=) are False. If there is no
# start_time (it is NaT), then this will be false. For those locations
# without start_times, lake_storage was already set to initial_storage
# in _set_initial_conditions.
wh_start = np.where(self.control.current_time == self.start_time)
self.lake_storage[wh_start] = self.initial_storage[wh_start]
self.lake_storage_old[wh_start] = self.initial_storage[wh_start]
if self._io_in_cfs:
self.lake_inflow[:] *= cfs_to_cms
self.lake_storage[:] *= cf_to_cm
self.lake_storage_old[:] *= cf_to_cm
# <
(
self.lake_release[:],
self.lake_availability_status[:],
) = self._calc_istarf_release(
epiweek=np.minimum(self.control.current_epiweek, 52),
GRanD_CAP_MCM=self.GRanD_CAP_MCM,
grand_id=self.grand_id,
lake_inflow=self.lake_inflow,
lake_storage=self.lake_storage,
NORhi_alpha=self.NORhi_alpha,
NORhi_beta=self.NORhi_beta,
NORhi_max=self.NORhi_max,
NORhi_min=self.NORhi_min,
NORhi_mu=self.NORhi_mu,
NORlo_alpha=self.NORlo_alpha,
NORlo_beta=self.NORlo_beta,
NORlo_max=self.NORlo_max,
NORlo_min=self.NORlo_min,
NORlo_mu=self.NORlo_mu,
Obs_MEANFLOW_CUMECS=self.Obs_MEANFLOW_CUMECS,
Release_alpha1=self.Release_alpha1,
Release_alpha2=self.Release_alpha2,
Release_beta1=self.Release_beta1,
Release_beta2=self.Release_beta2,
Release_c=self.Release_c,
Release_max=self.Release_max,
Release_min=self.Release_min,
Release_p1=self.Release_p1,
Release_p2=self.Release_p2,
) # output in m^3/d
self.lake_release[:] = (
self.lake_release / 24 / 60 / 60
) # convert m3pd to m^3/s. THE NUMERICS ON THIS LINE ARE VERY
# SENSITIVE TO THE ORDER OF THE DIVISION when reproducing the
# test data. \_(`@`)_/
self.lake_storage_change[:] = (
self.lake_inflow - self.lake_release
) * self.m3ps_to_MCM_day
# Note: no lake_storage calculation here
# can't release more than storage + inflow. This assumes zero
# storage = deadpool which may not be accurate, but this situation
# rarely occurs since STARFIT releases are already designed to keep
# storage within NOR.
wh_neg_storage = np.where(
(self.lake_storage + self.lake_storage_change) < zero
)
if len(wh_neg_storage):
potential_release = self.lake_release[wh_neg_storage] + (
self.lake_storage[wh_neg_storage]
+ self.lake_storage_change[wh_neg_storage]
) * (self.MCM_to_m3ps_day) # both terms in m3ps
self.lake_release[wh_neg_storage] = np.maximum(
potential_release,
zero,
) # m3ps
self.lake_storage_change[wh_neg_storage] = (
self.lake_inflow[wh_neg_storage]
- self.lake_release[wh_neg_storage]
) * self.m3ps_to_MCM_day
self.lake_storage[:] = np.maximum(
self.lake_storage + self.lake_storage_change, zero
) # MCM
self.lake_spill[:] = nan
wh_active = np.where(~np.isnan(self.lake_storage))
self.lake_spill[wh_active] = zero
wh_spill = np.where(self.lake_storage > self.GRanD_CAP_MCM)
self.lake_spill[wh_spill] = (
self.lake_storage[wh_spill] - self.GRanD_CAP_MCM[wh_spill]
) * self.MCM_to_m3ps_day
self.lake_storage[wh_spill] = self.GRanD_CAP_MCM[wh_spill]
self.lake_storage_change[:] = self.lake_storage - self.lake_storage_old
self.lake_storage_change_flow_units[:] = (
self.lake_storage_change * self.MCM_to_m3ps_day
)
self.lake_outflow[:] = self.lake_release + self.lake_spill
if self._io_in_cfs:
self.lake_inflow[:] *= cms_to_cfs
self.lake_release[:] *= cms_to_cfs
self.lake_spill[:] *= cms_to_cfs
self.lake_outflow[:] *= cms_to_cfs
self.lake_storage[:] *= cm_to_cf
self.lake_storage_old[:] *= cm_to_cf
self.lake_storage_change_flow_units[:] *= cms_to_cfs
return
@staticmethod
def _calc_istarf_release(
epiweek,
GRanD_CAP_MCM,
grand_id,
lake_inflow,
lake_storage,
NORhi_alpha,
NORhi_beta,
NORhi_max,
NORhi_min,
NORhi_mu,
NORlo_alpha,
NORlo_beta,
NORlo_max,
NORlo_min,
NORlo_mu,
Obs_MEANFLOW_CUMECS,
Release_alpha1,
Release_alpha2,
Release_beta1,
Release_beta2,
Release_c,
Release_max,
Release_min,
Release_p1,
Release_p2,
):
# MCM to m^3
storage = lake_storage * 1.0e6
capacity = GRanD_CAP_MCM * 1.0e6
if not np.isfinite(grand_id).any():
raise ValueError("Some non-finite grand_ids present")
max_normal = max_nor(
NORhi_max,
NORhi_min,
NORhi_alpha,
NORhi_beta,
NORhi_mu,
omega,
epiweek,
)
min_normal = min_nor(
NORlo_max,
NORlo_min,
NORlo_alpha,
NORlo_beta,
NORlo_mu,
omega,
epiweek,
)
# TODO could make a better forecast?
# why not use cumulative volume for the current epiweek, and only
# extrapolate for the remainder of the week?
# JLM: these lines are sensitive to the order of calculation when
# JLM: comparing to the original code so wont use the constants here.
forecasted_weekly_volume = 7.0 * lake_inflow * 24.0 * 60.0 * 60.0
mean_weekly_volume = 7.0 * Obs_MEANFLOW_CUMECS * 24.0 * 60.0 * 60.0
# forecasted_weekly_volume = lake_inflow * m3ps_to_m3pw
# mean_weekly_volume = Obs_MEANFLOW_CUMECS * m3ps_to_m3pw
standardized_inflow = (
forecasted_weekly_volume / mean_weekly_volume
) - 1.0
standardized_weekly_release = (
Release_alpha1 * np.sin(2.0 * np.pi * omega * epiweek)
+ Release_alpha2 * np.sin(4.0 * np.pi * omega * epiweek)
+ Release_beta1 * np.cos(2.0 * np.pi * omega * epiweek)
+ Release_beta2 * np.cos(4.0 * np.pi * omega * epiweek)
)
# m3/week to m3/day
release_min_vol = mean_weekly_volume * (1 + Release_min) / 7.0
release_max_vol = mean_weekly_volume * (1 + Release_max) / 7.0
availability_status = (100.0 * storage / capacity - min_normal) / (
max_normal - min_normal
)
# m3/week to m3/day
release = (
mean_weekly_volume
* (
1
+ (
standardized_weekly_release
+ Release_c
+ Release_p1 * availability_status
+ Release_p2 * standardized_inflow
)
)
) / 7.0
# m3/week to m3/day
release_above_normal = (
storage
- (capacity * max_normal / 100.0)
+ forecasted_weekly_volume
) / 7.0
# m3/week to m3/day
release_below_normal = (
storage
- (capacity * min_normal / 100.0)
+ forecasted_weekly_volume
) / 7.0 # NK: The first part of this sum will be negative.
release = np.where(
availability_status > one, release_above_normal, release
)
release = np.where(
availability_status < zero, release_below_normal, release
)
release = np.where(release < release_min_vol, release_min_vol, release)
release = np.where(release > release_max_vol, release_max_vol, release)
# storage update and boundaries are enforced during the regulation step
return release, availability_status
def max_nor(
NORhi_max, NORhi_min, NORhi_alpha, NORhi_beta, NORhi_mu, omega, epiweek
):
return np.minimum(
NORhi_max,
np.maximum(
NORhi_min,
NORhi_mu
+ NORhi_alpha * np.sin(2.0 * np.pi * omega * epiweek)
+ NORhi_beta * np.cos(2.0 * np.pi * omega * epiweek),
),
)
def min_nor(
NORlo_max, NORlo_min, NORlo_alpha, NORlo_beta, NORlo_mu, omega, epiweek
):
return np.minimum(
NORlo_max,
np.maximum(
NORlo_min,
NORlo_mu
+ NORlo_alpha * np.sin(2.0 * np.pi * omega * epiweek)
+ NORlo_beta * np.cos(2.0 * np.pi * omega * epiweek),
),
)
[docs]
class StarfitFlowNode(FlowNode):
r"""STARFIT FlowNode: Storage Targets And Release Function Inference Tool
This :class:`FlowNode` implementation allows STARFIT solutions to be
computed in a :class:`FlowGraph`. The solution has the option for
subtimestep or daily computations.
Daily computations have the same outflows on the substeps of a day and
outflows and storages are calculated on the last subtimestep. On the first
subtimestep, we use the inflow of the first subtimestep as representative
of the mean inflow of the previous day in order to calculate an average
outflow for the first timestep.
The STARFIT reference:
Sean W.D. Turner, Jennie Clarice Steyaert, Laura Condon, Nathalie Voisin,
Water storage and release policies for all large reservoirs of conterminous
United States, Journal of Hydrology, Volume 603, Part A, 2021, 126843,
ISSN 0022-1694, https://doi.org/10.1016/j.jhydrol.2021.126843.
https://github.com/IMMM-SFA/starfit
Adapted from STARFIT implementation in the [MOSART-WM model](https://github.com/IMMM-SFA/mosartwmpy/blob/main/mosartwmpy/reservoirs/istarf.py)
Thurber, T., Rexer, E., Vernon, C., Sun, N., Turner, S., Yoon, J.,
Broman, D., & Voisin, N. (2022). mosartwmpy (Version 0.2.7)
[Computer software]. https://github.com/IMMM-SFA/mosartwmpy
See :class:`FlowGraph` for discussion and a worked example. The notebook
`examples/06_flow_graph_starfit.ipynb <https://github.com/DOI-USGS/pywatershed/blob/develop/examples/06_flow_graph_starfit.ipynb>`__
highlights adding a StarfitFlowNode a :class:`FlowGraph` otherwised
comprised of :class:`PRMSChannelFlowNode`\ s using the helper functions
:func:`prms_channel_flow_graph_to_model_dict`
and :func:`prms_channel_flow_graph_postprocess`.
Noah Knowles (USGS) and James McCreight (UCAR/USGS)
""" # noqa: E501
[docs]
def __init__(
self,
control: Control,
grand_id: np.int64,
initial_storage: np.float64,
start_time: np.datetime64,
end_time: np.datetime64,
inflow_mean: np.float64,
NORhi_min: np.float64,
NORhi_max: np.float64,
NORhi_alpha: np.float64,
NORhi_beta: np.float64,
NORhi_mu: np.float64,
NORlo_min: np.float64,
NORlo_max: np.float64,
NORlo_alpha: np.float64,
NORlo_beta: np.float64,
NORlo_mu: np.float64,
Release_min: np.float64,
Release_max: np.float64,
Release_alpha1: np.float64,
Release_alpha2: np.float64,
Release_beta1: np.float64,
Release_beta2: np.float64,
Release_p1: np.float64,
Release_p2: np.float64,
Release_c: np.float64,
GRanD_CAP_MCM: np.float64,
Obs_MEANFLOW_CUMECS: np.float64,
calc_method: Literal["numba", "numpy"] = None,
io_in_cfs: bool = True,
compute_daily: bool = False,
nhrs_substep: int = None,
imbalance_behavior: Literal["defer", None, "warn", "error"] = None,
):
"""Initialize a StarfitFlowNode.
Args:
control: A Control object
grand_id: the GRanD id,
initial_storage: initial storage value, may be NaN to use the
middle of the normal operating range.
start_time: May be NaN, the optional time at which to start
simulating with STARFIT.
end_time: As for start_Time.
inflow_mean: STARFIT parameter.
NORhi_min: STARFIT parameter.
NORhi_max: STARFIT parameter.
NORhi_alpha: STARFIT parameter.
NORhi_beta: STARFIT parameter.
NORhi_mu: STARFIT parameter.
NORlo_min: STARFIT parameter.
NORlo_max: STARFIT parameter.
NORlo_alpha: STARFIT parameter.
NORlo_beta: STARFIT parameter.
NORlo_mu: STARFIT parameter.
Release_min: STARFIT parameter.
Release_max: STARFIT parameter.
Release_alpha1: STARFIT parameter.
Release_alpha2: STARFIT parameter.
Release_beta1: STARFIT parameter.
Release_beta2: STARFIT parameter.
Release_p1: STARFIT parameter.
Release_p2: STARFIT parameter.
Release_c: STARFIT parameter.
GRanD_CAP_MCM: STARFIT parameter.
Obs_MEANFLOW_CUMECS: STARFIT parameter.
calc_method: One of "numba" or "numpy".
io_in_cfs: Are the units in cubic feet per second? False gives
units of cubic meters per second.
compute_daily: Daily or subtimestep calculation?
nhrs_substep: Number of hours in the subtimestep. If not passed,
compute_daily defaults to 24, otherwise 1.
imbalance_behavior: One of "defer", "warn", or "error".
"""
self.name = "StarfitFlowNode"
self.control = control
self._grand_id = grand_id
self._initial_storage = initial_storage
self._start_time = start_time
self._end_time = end_time
self._inflow_mean = inflow_mean
self._NORhi_min = NORhi_min
self._NORhi_max = NORhi_max
self._NORhi_alpha = NORhi_alpha
self._NORhi_beta = NORhi_beta
self._NORhi_mu = NORhi_mu
self._NORlo_min = NORlo_min
self._NORlo_max = NORlo_max
self._NORlo_alpha = NORlo_alpha
self._NORlo_beta = NORlo_beta
self._NORlo_mu = NORlo_mu
self._Release_min = Release_min
self._Release_max = Release_max
self._Release_alpha1 = Release_alpha1
self._Release_alpha2 = Release_alpha2
self._Release_beta1 = Release_beta1
self._Release_beta2 = Release_beta2
self._Release_p1 = Release_p1
self._Release_p2 = Release_p2
self._Release_c = Release_c
self._GRanD_CAP_MCM = GRanD_CAP_MCM
self._Obs_MEANFLOW_CUMECS = Obs_MEANFLOW_CUMECS
# calc_method ignored currently
self._io_in_cfs = io_in_cfs
if nhrs_substep is None:
if compute_daily:
nhrs_substep = 24
else:
nhrs_substep = 1
self._m3ps_to_MCM = nhrs_substep * 60 * 60 / 1.0e6
self._MCM_to_m3ps = 1.0 / self._m3ps_to_MCM
# These need to be numpy (nan1d) pointers for the Budget.
# TODO: make more init of variables conditional
self._lake_inflow = nan1d()
self._lake_inflow_cms = nan1d()
self._lake_inflow_accum = nan1d()
self._lake_inflow_sub = nan1d()
self._lake_inflow_previous = nan1d()
self._lake_outflow = nan1d()
self._lake_outflow_accum = nan1d()
self._lake_outflow_sub = nan1d()
if compute_daily:
self._lake_outflow_sub_next = nan1d()
self._lake_storage = nan1d()
self._lake_storage_old = nan1d()
self._lake_storage_accum = nan1d()
self._lake_storage_sub = nan1d()
self._lake_storage_old_sub = nan1d()
self._lake_storage_change_sub = nan1d()
self._lake_storage_change = nan1d()
self._lake_storage_change_flow_units = nan1d()
self._lake_storage_change_accum = nan1d()
self._lake_release = nan1d()
self._lake_release_sub = nan1d()
self._lake_release_accum = nan1d()
if compute_daily:
self._lake_release_sub_next = nan1d()
self._lake_spill = nan1d()
self._lake_spill_sub = nan1d()
self._lake_spill_accum = nan1d()
if compute_daily:
self._lake_spill_sub_next = nan1d()
self._lake_availability_status = nan1d()
self._lake_availability_status_sub = nan1d()
self._lake_availability_status_accum = nan1d()
if compute_daily:
self._lake_availability_status_next = nan1d()
if np.isnan(self._Obs_MEANFLOW_CUMECS):
self._Obs_MEANFLOW_CUMECS = self._inflow_mean
wh_initial_storage_nan = np.isnan(self._initial_storage)
if self._io_in_cfs:
self._initial_storage = np.where(
wh_initial_storage_nan,
self._initial_storage,
self._initial_storage * cf_to_cm,
)
start_time = np.where(
np.isnat(self._start_time),
self.control.current_time, # one day prior to start time
self._start_time,
)[()]
start_epiweeks = np.array([datetime_epiweek(start_time)])
if wh_initial_storage_nan:
min = min_nor(
self._NORlo_max,
self._NORlo_min,
self._NORlo_alpha,
self._NORlo_beta,
self._NORlo_mu,
omega,
start_epiweeks,
)
max = max_nor(
self._NORhi_max,
self._NORhi_min,
self._NORhi_alpha,
self._NORhi_beta,
self._NORhi_mu,
omega,
start_epiweeks,
)
pct_res_cap = (min + max) / 2 / 100
nor_mean_cap = self._GRanD_CAP_MCM * pct_res_cap
# note the leading dunder (double underscore): "private reserve"
initial_storage = np.where(
wh_initial_storage_nan, nor_mean_cap, self._initial_storage
)[()]
# print(f"{nor_mean_cap=}") # another way to get this info out?
# set lake_storage from initial_storage when start is not available
self._lake_storage_sub[:] = np.where(
np.isnat(self._start_time), initial_storage, nan
)
else:
self._lake_storage_sub[:] = self._initial_storage
self._imbalance_behavior = imbalance_behavior
if self._imbalance_behavior == "defer":
if "imbalance_behavior" in self.control.options.keys():
self._imbalance_behavior = self.control.options[
"imbalance_behavior"
]
else:
self._imbalance_behavior = "warn"
if self._imbalance_behavior is not None:
# this budget is not configured to output files
self.mass_budget = Budget.from_storage_unit(
self,
time_unit="D",
description=self.name,
imbalance_fatal=(self._imbalance_behavior == "error"),
basis="unit",
ignore_nans=False,
verbose=False,
)
else:
self.mass_budget = None
self._compute_daily = compute_daily
if self._compute_daily:
self.calculate_subtimestep = self._calc_subtimestep_daily
self.prepare_timestep = self._prepare_timestep_daily
else:
self.calculate_subtimestep = self._calc_subtimestep_hourly
self.prepare_timestep = self._prepare_timestep_hourly
return
[docs]
@staticmethod
def get_mass_budget_terms():
"""Get a dictionary of variable names for mass budget terms."""
return {
"inputs": [
"_lake_inflow",
],
"outputs": [
"_lake_release",
"_lake_spill",
],
"storage_changes": [
"_lake_storage_change_flow_units",
],
}
def _prepare_timestep_daily(self) -> None:
self._lake_inflow_accum[:] = np.array([zero])
if self._io_in_cfs:
self._lake_storage[:] *= cf_to_cm
self._lake_storage_old[:] *= cf_to_cm
# <
return
def _prepare_timestep_hourly(self) -> None:
# all of the subtimestep output is to be in cfs?
# so putting cfs conversion here is maybe impractical
self._lake_inflow_accum[:] = np.array([zero])
self._lake_outflow_accum[:] = np.array([zero])
self._lake_storage_accum[:] = np.array([zero])
self._lake_storage_change_accum[:] = np.array([zero])
self._lake_release_accum[:] = np.array([zero])
self._lake_spill_accum[:] = np.array([zero])
self._lake_availability_status_accum[:] = np.array([zero])
return
[docs]
def finalize_timestep(self):
if self._io_in_cfs:
# self._lake_outflow_sub[:] converted in subtimestep
self._lake_inflow[:] *= cms_to_cfs
self._lake_release[:] *= cms_to_cfs
self._lake_spill[:] *= cms_to_cfs
self._lake_outflow[:] *= cms_to_cfs
self._lake_storage[:] *= cm_to_cf
self._lake_storage_old[:] *= cm_to_cf # necessary
self._lake_storage_change_flow_units[:] *= cms_to_cfs
if self.mass_budget is not None:
self.mass_budget.advance()
self.mass_budget.calculate()
return
[docs]
def advance(self):
self._lake_storage_change[:] = (
self._lake_storage - self._lake_storage_old
)
self._lake_storage_old[:] = self._lake_storage
return
@property
def outflow(self) -> np.float64:
return self._lake_outflow[0]
@property
def outflow_substep(self) -> np.float64:
return self._lake_outflow_sub[0]
@property
def storage_change(self) -> np.float64:
return self._lake_storage_change_flow_units[0]
@property
def storage(self) -> np.float64:
return self._lake_storage[0]
@property
def release(self) -> np.float64:
"The release component of the STARFIT outflow."
return self._lake_release[0]
@property
def spill(self) -> np.float64:
"The spill component of the STARFIT outflow."
return self._lake_spill[0]
@property
def sink_source(self) -> np.float64:
return zero
def _calc_subtimestep_daily(
self, isubstep, inflow_upstream, inflow_lateral
) -> None:
# Here we only calculate outflows on the last subtimestep.
# Ouflows on substeps are all the same flow rates, and
# storages are only updated at the end of the timestep.
# how to get the number of substeps in a timestep? from control. this
# might not even be a fixed number
nsubsteps = 24
# accumulate inflows
self._lake_inflow_sub[:] = np.array([inflow_upstream + inflow_lateral])
if self._io_in_cfs:
self._lake_inflow_sub[:] *= cfs_to_cms
self._lake_inflow_accum[:] += self._lake_inflow_sub
# Special case for the very first subtimestep, we use the very first
# inflow on the first subtimestep as representative of the mean
# inflow of the previous day so we can calculate an average outflow
# to use for the first timestep
if self.control.itime_step == 0 and isubstep == 0:
# this is the representative for the nonexistent previous day
self._lake_inflow[:] = self._lake_inflow_accum
# two previous, we'll assume the same
self._lake_storage[:] = self._lake_storage_sub
self._lake_storage_old[:] = self._lake_storage_sub
self._lake_storage_change[:] = zero
elif isubstep < (nsubsteps - 1):
if isubstep == 0:
# already in cfs
self._lake_outflow_sub[:] = self._lake_outflow_sub_next
return
else:
if self.control.itime_step == 0:
# the end of the first timestep doesnt pass through advance()
self._lake_storage_old[:] = self._lake_storage
self._lake_inflow[:] = self._lake_inflow_accum / (isubstep + 1)
if self._io_in_cfs:
self._lake_outflow_sub[:] *= cfs_to_cms
self._lake_release_sub[:] *= cfs_to_cms
self._lake_spill_sub[:] *= cfs_to_cms
self._lake_outflow[:] = self._lake_outflow_sub
self._lake_release[:] = self._lake_release_sub
self._lake_spill[:] = self._lake_spill_sub
# calculate storage
self._lake_storage_change_flow_units[:] = (
self._lake_inflow - self._lake_outflow
)
# below, releases are never more than storage for the day, so this
# should never be negative since inflows are never negative
self._lake_storage_change[:] = (
self._lake_storage_change_flow_units * self._m3ps_to_MCM
)
self._lake_storage[:] += self._lake_storage_change
# <
self._lake_spill_sub[:] = np.array([zero])
if self._lake_storage > self._GRanD_CAP_MCM:
self._lake_spill_sub[:] = (
self._lake_storage - self._GRanD_CAP_MCM
) * self._MCM_to_m3ps
# spill dosent affect the storage until the next timestep
# now calculate the (avg) outflows for the next timestep
(
self._lake_release_sub[:],
self._lake_availability_status[:],
) = Starfit._calc_istarf_release(
epiweek=np.minimum(self.control.current_epiweek, 52),
GRanD_CAP_MCM=self._GRanD_CAP_MCM,
grand_id=self._grand_id,
lake_inflow=self._lake_inflow,
lake_storage=self._lake_storage,
NORhi_alpha=self._NORhi_alpha,
NORhi_beta=self._NORhi_beta,
NORhi_max=self._NORhi_max,
NORhi_min=self._NORhi_min,
NORhi_mu=self._NORhi_mu,
NORlo_alpha=self._NORlo_alpha,
NORlo_beta=self._NORlo_beta,
NORlo_max=self._NORlo_max,
NORlo_min=self._NORlo_min,
NORlo_mu=self._NORlo_mu,
Obs_MEANFLOW_CUMECS=self._Obs_MEANFLOW_CUMECS,
Release_alpha1=self._Release_alpha1,
Release_alpha2=self._Release_alpha2,
Release_beta1=self._Release_beta1,
Release_beta2=self._Release_beta2,
Release_c=self._Release_c,
Release_max=self._Release_max,
Release_min=self._Release_min,
Release_p1=self._Release_p1,
Release_p2=self._Release_p2,
) # output in m^3/d
self._lake_release_sub *= (
self._m3ps_to_MCM / 24 / 60 / 60
) # m3pd to MCM
if (self._lake_storage - self._lake_release_sub) < zero:
self._lake_release_sub[:] = self._lake_storage
self._lake_release_sub[:] *= self._MCM_to_m3ps
self._lake_outflow_sub_next[:] = (
self._lake_release_sub + self._lake_spill_sub
)
if self._io_in_cfs:
self._lake_outflow_sub[:] *= cms_to_cfs
self._lake_outflow_sub_next[:] *= cms_to_cfs
self._lake_release_sub[:] *= cms_to_cfs
self._lake_spill_sub[:] *= cms_to_cfs
if self.control.itime_step == 0 and isubstep == 0:
self._lake_outflow_sub[:] = self._lake_outflow_sub_next
return
def _calc_subtimestep_hourly(
self, isubstep, inflow_upstream, inflow_lateral
) -> None:
self._pre_release_calculations_hourly(inflow_upstream, inflow_lateral)
(
self._lake_release_sub[:],
self._lake_availability_status_sub[:],
) = Starfit._calc_istarf_release(
epiweek=np.minimum(self.control.current_epiweek, 52),
GRanD_CAP_MCM=self._GRanD_CAP_MCM,
grand_id=self._grand_id,
lake_inflow=self._lake_inflow_sub,
lake_storage=self._lake_storage_sub,
NORhi_alpha=self._NORhi_alpha,
NORhi_beta=self._NORhi_beta,
NORhi_max=self._NORhi_max,
NORhi_min=self._NORhi_min,
NORhi_mu=self._NORhi_mu,
NORlo_alpha=self._NORlo_alpha,
NORlo_beta=self._NORlo_beta,
NORlo_max=self._NORlo_max,
NORlo_min=self._NORlo_min,
NORlo_mu=self._NORlo_mu,
Obs_MEANFLOW_CUMECS=self._Obs_MEANFLOW_CUMECS,
Release_alpha1=self._Release_alpha1,
Release_alpha2=self._Release_alpha2,
Release_beta1=self._Release_beta1,
Release_beta2=self._Release_beta2,
Release_c=self._Release_c,
Release_max=self._Release_max,
Release_min=self._Release_min,
Release_p1=self._Release_p1,
Release_p2=self._Release_p2,
) # output in m^3/d
self._post_release_calculations_hourly(isubstep)
return
def _calc_storage_change_sub_hourly(self) -> None:
# TODO: should release be changed to outflow?
self._lake_storage_change_sub[:] = (
self._lake_inflow_sub - self._lake_release_sub
) * self._m3ps_to_MCM # MCM: million cubic meters
return
def _pre_release_calculations_hourly(
self, inflow_upstream, inflow_lateral
) -> None:
self._lake_inflow_sub[:] = np.array([inflow_upstream + inflow_lateral])
if self._io_in_cfs:
self._lake_inflow_sub[:] *= cfs_to_cms
# <
self._lake_storage_old_sub[:] = self._lake_storage_sub
return
def _post_release_calculations_hourly(self, isubstep) -> None:
self._lake_release_sub[:] = (
self._lake_release_sub / 24 / 60 / 60
) # m^3/s
self._calc_storage_change_sub_hourly()
# can't release more than storage + inflow. This assumes zero
# storage = deadpool which may not be accurate, but this situation
# rarely occurs since STARFIT releases are already designed to keep
# storage within NOR.
if (self._lake_storage_sub + self._lake_storage_change_sub) < zero:
potential_release = (
self._lake_release_sub
+ (self._lake_storage_sub + self._lake_storage_change_sub)
* self._MCM_to_m3ps
)
self._lake_release_sub[:] = np.maximum(
potential_release,
potential_release * zero,
) # m^3/s
self._calc_storage_change_sub_hourly()
# <
self._lake_storage_sub[:] = np.maximum(
self._lake_storage_sub + self._lake_storage_change_sub,
zero,
) # MCM
self._lake_spill_sub[:] = nan
if ~np.isnan(self._lake_storage_sub):
self._lake_spill_sub[:] = zero
if self._lake_storage_sub > self._GRanD_CAP_MCM:
self._lake_spill_sub[:] = (
self._lake_storage_sub - self._GRanD_CAP_MCM
) * self._MCM_to_m3ps
self._lake_storage_sub[:] = self._GRanD_CAP_MCM
self._lake_storage_change_sub[:] = (
self._lake_storage_sub - self._lake_storage_old_sub
)
# subtimestep to timestep calculations
# m^3/s
self._lake_inflow_accum[:] += self._lake_inflow_sub
self._lake_inflow[:] = self._lake_inflow_accum / (isubstep + 1)
self._lake_outflow_sub[:] = (
self._lake_release_sub + self._lake_spill_sub
)
self._lake_outflow_accum[:] += self._lake_outflow_sub
self._lake_outflow[:] = self._lake_outflow_accum / (isubstep + 1)
# these variables arent currently retrieved and output by FlowGraph
# but probably soon
self._lake_release_accum[:] += self._lake_release_sub
self._lake_release[:] = self._lake_release_accum / (isubstep + 1)
self._lake_spill_accum[:] += self._lake_spill_sub
self._lake_spill[:] = self._lake_spill_accum / (isubstep + 1)
self._lake_availability_status_accum[:] += (
self._lake_availability_status_sub
)
self._lake_availability_status[:] = (
self._lake_availability_status_accum / (isubstep + 1)
)
self._lake_storage_accum[:] += self._lake_storage_sub
self._lake_storage[:] = self._lake_storage_accum / (isubstep + 1)
# million volume units
# self._storage_change_subtimestep
self._lake_storage_change_accum[:] += self._lake_storage_change_sub
self._lake_storage_change[:] = self._lake_storage_change_accum / (
isubstep + 1
)
self._lake_storage_change_flow_units[:] = (
self._lake_storage_change * self._MCM_to_m3ps
)
if self._io_in_cfs:
self._lake_outflow_sub[:] *= cms_to_cfs
return
[docs]
class StarfitFlowNodeMaker(FlowNodeMaker):
r"""STARFIT FlowNodeMaker: Storage Targets And Release Function Inference Tool.
This FlowNodeMaker instantiates :class:`StarfitFlowNode`\ s for
:class:`FlowGraph`.
See :class:`FlowGraph` for discussion and a worked example. The notebook
`examples/06_flow_graph_starfit.ipynb <https://github.com/DOI-USGS/pywatershed/blob/develop/examples/06_flow_graph_starfit.ipynb>`__
highlights adding a StarfitFlowNode a :class:`FlowGraph` otherwised
comprised of :class:`PRMSChannelFlowNode`\ s using the helper functions
:func:`prms_channel_flow_graph_to_model_dict`
and :func:`prms_channel_flow_graph_postprocess`.
""" # noqa: E501
[docs]
def __init__(
self,
discretization: Parameters,
parameters: Parameters,
calc_method: Literal["numba", "numpy"] = None,
io_in_cfs: bool = True,
verbose: bool = None,
compute_daily: bool = False,
imbalance_behavior: Literal["defer", None, "warn", "error"] = None,
nhrs_substep: int = None,
) -> None:
"""Instantiate StarfitFlowNodeMaker.
Args:
discretization: A :class:`Parameters` object.
parameters: A :class:`Parmaeters` object with the parameters listed
in :class:`StarfitFlowNode` dimensioned by number of
reservoirs/nodes to instantiate.
calc_method: One of "numba" or "numpy".
io_in_cfs: Are the units in cubic feet per second? False gives
units of cubic meters per second.
compute_daily: Daily or subtimestep calculation?
nhrs_substep: Number of hours in the subtimestep.
imbalance_behavior: One of "defer", "warn", or "error".
verbose: bool = None,
"""
self.name = "StarfitFlowNodeMaker"
self._calc_method = calc_method
self._io_in_cfs = io_in_cfs
self._compute_daily = compute_daily
self._imbalance_behavior = imbalance_behavior
self._nhrs_substep = nhrs_substep
self._set_data(discretization, parameters)
return
[docs]
def get_node(self, control, index) -> StarfitFlowNode:
return StarfitFlowNode(
control=control,
grand_id=self.grand_id[index],
initial_storage=self.initial_storage[index],
start_time=self.start_time[index],
end_time=self.end_time[index],
inflow_mean=self.inflow_mean[index],
NORhi_min=self.NORhi_min[index],
NORhi_max=self.NORhi_max[index],
NORhi_alpha=self.NORhi_alpha[index],
NORhi_beta=self.NORhi_beta[index],
NORhi_mu=self.NORhi_mu[index],
NORlo_min=self.NORlo_min[index],
NORlo_max=self.NORlo_max[index],
NORlo_alpha=self.NORlo_alpha[index],
NORlo_beta=self.NORlo_beta[index],
NORlo_mu=self.NORlo_mu[index],
Release_min=self.Release_min[index],
Release_max=self.Release_max[index],
Release_alpha1=self.Release_alpha1[index],
Release_alpha2=self.Release_alpha2[index],
Release_beta1=self.Release_beta1[index],
Release_beta2=self.Release_beta2[index],
Release_p1=self.Release_p1[index],
Release_p2=self.Release_p2[index],
Release_c=self.Release_c[index],
GRanD_CAP_MCM=self.GRanD_CAP_MCM[index],
Obs_MEANFLOW_CUMECS=self.Obs_MEANFLOW_CUMECS[index],
calc_method=self._calc_method,
io_in_cfs=self._io_in_cfs,
compute_daily=self._compute_daily,
imbalance_behavior=self._imbalance_behavior,
nhrs_substep=self._nhrs_substep,
)
def _set_data(self, discretization, parameters):
self._parameters = parameters
self._discretization = discretization
for param in self.get_parameters():
if param in self._parameters.parameters.keys():
self[param] = self._parameters.parameters[param]
else:
self[param] = self._discretization.parameters[param]
self.nreservoirs = len(self.grand_id)
return
[docs]
@staticmethod
def get_dimensions() -> tuple:
"""Get a tuple of dimension names for this StarfitFlowNodeMaker."""
return ("nreservoirs",)
[docs]
@staticmethod
def get_parameters() -> tuple:
"""Get a tuple of parameter names for StarfitFlowNodeMaker."""
return (
"grand_id",
"initial_storage",
"start_time",
"end_time",
"inflow_mean",
"NORhi_min",
"NORhi_max",
"NORhi_alpha",
"NORhi_beta",
"NORhi_mu",
"NORlo_min",
"NORlo_max",
"NORlo_alpha",
"NORlo_beta",
"NORlo_mu",
"Release_min",
"Release_max",
"Release_alpha1",
"Release_alpha2",
"Release_beta1",
"Release_beta2",
"Release_p1",
"Release_p2",
"Release_c",
"GRanD_CAP_MCM",
"Obs_MEANFLOW_CUMECS",
)