Source code for pywatershed.hydrology.starfit_source_sink_flow_node

from typing import Literal

import numpy as np
import pandas as pd

from pywatershed.base.control import Control
from pywatershed.constants import cfs_to_cms, cms_to_cfs, nan1d, one, zero
from pywatershed.hydrology.starfit import (
    Starfit,
    StarfitFlowNode,
    StarfitFlowNodeMaker,
)
from pywatershed.parameters import Parameters


[docs] class StarfitSourceSinkFlowNode(StarfitFlowNode): """A StarfitFlowNode where sinks and sources interact with storage. James McCreight (UCAR/USGS), John Engott (USGS), and Noah Knowles (USGS) """
[docs] def __init__( self, control: Control, grand_id: np.int64, initial_storage: np.float64, start_time: np.datetime64, end_time: np.datetime64, inflow_mean: np.float64, NORhi_min: np.float64, NORhi_max: np.float64, NORhi_alpha: np.float64, NORhi_beta: np.float64, NORhi_mu: np.float64, NORlo_min: np.float64, NORlo_max: np.float64, NORlo_alpha: np.float64, NORlo_beta: np.float64, NORlo_mu: np.float64, Release_min: np.float64, Release_max: np.float64, Release_alpha1: np.float64, Release_alpha2: np.float64, Release_beta1: np.float64, Release_beta2: np.float64, Release_p1: np.float64, Release_p2: np.float64, Release_c: np.float64, GRanD_CAP_MCM: np.float64, Obs_MEANFLOW_CUMECS: np.float64, source_sink_storage_min: np.float64, source_sink_data: pd.Series, missing_data_as_zero: bool = False, calc_method: Literal["numba", "numpy"] = None, io_in_cfs: bool = True, nhrs_substep: int = one, imbalance_behavior: Literal["defer", None, "warn", "error"] = None, ) -> None: """Initialize a StarfitSourceSinkFlowNode. Args: control: A Control object grand_id: the GRanD id, initial_storage: initial storage value, may be NaN to use the middle of the normal operating range. start_time: May be NaN, the optional time at which to start simulating with STARFIT. end_time: As for start_Time. inflow_mean: STARFIT parameter. NORhi_min: STARFIT parameter. NORhi_max: STARFIT parameter. NORhi_alpha: STARFIT parameter. NORhi_beta: STARFIT parameter. NORhi_mu: STARFIT parameter. NORlo_min: STARFIT parameter. NORlo_max: STARFIT parameter. NORlo_alpha: STARFIT parameter. NORlo_beta: STARFIT parameter. NORlo_mu: STARFIT parameter. Release_min: STARFIT parameter. Release_max: STARFIT parameter. Release_alpha1: STARFIT parameter. Release_alpha2: STARFIT parameter. Release_beta1: STARFIT parameter. Release_beta2: STARFIT parameter. Release_p1: STARFIT parameter. Release_p2: STARFIT parameter. Release_c: STARFIT parameter. GRanD_CAP_MCM: STARFIT parameter. Obs_MEANFLOW_CUMECS: STARFIT parameter. source_sink_storage_min: A floating point value for the minium flow. source_sink_data: A pandas Series object of sources/sinks at this location. See SourceSinkFlowNodeMaker for a description of the pd.DataFrame passed to supply this data. Units are cubic feet per second. missing_data_as_zero: Bool option to treat missing times in the timeseries as having zero source/sink. calc_method: One of "numba" or "numpy". io_in_cfs: Are the units in cubic feet per second? False gives units of cubic meters per second. nhrs_substep: Number of hours in the subtimestep. imbalance_behavior: One of "defer", "warn", or "error". """ from datetime import datetime # needs to be above super.__init__ because of budget init in there. self._lake_storage_after_source_sink = nan1d() self._source_sink = nan1d() self._sink_source = nan1d() self._negative_sink_source = nan1d() self._source = nan1d() self._sink = nan1d() super().__init__( control=control, grand_id=grand_id, initial_storage=initial_storage, start_time=start_time, end_time=end_time, inflow_mean=inflow_mean, NORhi_min=NORhi_min, NORhi_max=NORhi_max, NORhi_alpha=NORhi_alpha, NORhi_beta=NORhi_beta, NORhi_mu=NORhi_mu, NORlo_min=NORlo_min, NORlo_max=NORlo_max, NORlo_alpha=NORlo_alpha, NORlo_beta=NORlo_beta, NORlo_mu=NORlo_mu, Release_min=Release_min, Release_max=Release_max, Release_alpha1=Release_alpha1, Release_alpha2=Release_alpha2, Release_beta1=Release_beta1, Release_beta2=Release_beta2, Release_p1=Release_p1, Release_p2=Release_p2, Release_c=Release_c, GRanD_CAP_MCM=GRanD_CAP_MCM, Obs_MEANFLOW_CUMECS=Obs_MEANFLOW_CUMECS, calc_method=calc_method, io_in_cfs=io_in_cfs, compute_daily=False, nhrs_substep=nhrs_substep, imbalance_behavior=imbalance_behavior, ) self.name = "StarfitSourceSinkFlowNode" # remaining code from source_sink_flow_node self._source_sink_storage_min = source_sink_storage_min self._source_sink_data = source_sink_data self._missing_data_as_zero = missing_data_as_zero first_time = source_sink_data.index[0] if not isinstance(first_time, pd.Timestamp): try: _ = datetime.strptime(first_time, "%Y-%m-%d") except ValueError as err_msg: msg = ( f"The source_sink_data index Date column is malformed: " f"{err_msg}" ) raise ValueError(msg) return
[docs] @staticmethod def get_mass_budget_terms(): """Get a dictionary of variable names for mass budget terms.""" return { "inputs": [ "_lake_inflow", ], "outputs": [ "_lake_release", "_lake_spill", ], "storage_changes": [ "_lake_storage_change_flow_units", "_negative_sink_source", ], }
@property def sink_source(self) -> np.float64: return self._sink_source[0] def _prepare_timestep_hourly(self) -> None: super()._prepare_timestep_hourly() self._prepare_timestep_source_sink() return def _prepare_timestep_source_sink(self) -> None: self._sink_source_sum = zero # remaining code from source_sink_flow_node ymd = self.control.current_datetime.strftime("%Y-%m-%d") if ymd not in self._source_sink_data.index: if not self._missing_data_as_zero: msg = ( "Missing data not handled by SourceSinkFlowNode unless " "missing_data_as_zero set to True." ) raise ValueError(msg) # < self._source_sink_requested = zero else: self._source_sink_requested = self._source_sink_data[ymd] if self._io_in_cfs: self._source_sink_requested *= cfs_to_cms return
[docs] def finalize_timestep(self): self._negative_sink_source[:] = -1 * self._sink_source super().finalize_timestep()
def _source_sink_calculations( self, isubstep: int, flow_to_vol_conversion: float, ) -> None: """Calculate flow diversion from storage on the time basis supplied. Note units are flow units Args: flow_to_vol_conversion: This conversion is from cubic meters per second to millions of cubic meteres (MCM) for some period of time. If the calculation is hourly, the conversion would be the pywatershed constant m3ps_to_MCM_hourly. Conversions could be made for other periods of time. """ source_sink_vol = self._source_sink_requested * flow_to_vol_conversion min_storage = self._source_sink_storage_min storage = self._lake_storage_sub.copy() # MCM if source_sink_vol >= zero: # a source is always applied storage += source_sink_vol elif (source_sink_vol < zero) and (storage < min_storage): # sink is not applied when storage < min_storage # storage stays the same source_sink_vol = zero elif (source_sink_vol < zero) and (storage >= min_storage): if (storage + source_sink_vol) < min_storage: # if the sink is too much, reduce it up to min storage # The difference here is to get the negative sign source_sink_vol = min_storage - storage storage = min_storage else: storage += source_sink_vol # < self._lake_storage_after_source_sink[:] = storage # MCM self._source_sink[:] = source_sink_vol * (one / flow_to_vol_conversion) # very confusing, move self._source_sink to self._sink_source_sub self._sink_source_sum += self._source_sink[0] self._sink_source[:] = self._sink_source_sum / (isubstep + 1) # conversion to cfs in self._release_calculations_post_hourly return def _calc_storage_change_sub_hourly(self) -> None: # spill is not included in this calculation self._lake_storage_change_sub[:] = ( self._lake_inflow_sub - self._lake_release_sub + self._source_sink ) * self._m3ps_to_MCM # MCM: million cubic meters return def _calc_subtimestep_hourly( self, isubstep, inflow_upstream, inflow_lateral ) -> None: self._pre_release_calculations_hourly(inflow_upstream, inflow_lateral) # given the existing storage, a storage minimum and a requested # diversion, calculate the diversion and the reesulting storage self._source_sink_calculations( isubstep=isubstep, flow_to_vol_conversion=self._m3ps_to_MCM ) ( self._lake_release_sub[:], self._lake_availability_status_sub[:], ) = Starfit._calc_istarf_release( epiweek=np.minimum(self.control.current_epiweek, 52), GRanD_CAP_MCM=self._GRanD_CAP_MCM, grand_id=self._grand_id, lake_inflow=self._lake_inflow_sub, lake_storage=self._lake_storage_after_source_sink, NORhi_alpha=self._NORhi_alpha, NORhi_beta=self._NORhi_beta, NORhi_max=self._NORhi_max, NORhi_min=self._NORhi_min, NORhi_mu=self._NORhi_mu, NORlo_alpha=self._NORlo_alpha, NORlo_beta=self._NORlo_beta, NORlo_max=self._NORlo_max, NORlo_min=self._NORlo_min, NORlo_mu=self._NORlo_mu, Obs_MEANFLOW_CUMECS=self._Obs_MEANFLOW_CUMECS, Release_alpha1=self._Release_alpha1, Release_alpha2=self._Release_alpha2, Release_beta1=self._Release_beta1, Release_beta2=self._Release_beta2, Release_c=self._Release_c, Release_max=self._Release_max, Release_min=self._Release_min, Release_p1=self._Release_p1, Release_p2=self._Release_p2, ) # output in m^3/d self._post_release_calculations_hourly(isubstep) return def _post_release_calculations_hourly(self, isubstep) -> None: super()._post_release_calculations_hourly(isubstep) if self._io_in_cfs: self._sink_source[:] *= cms_to_cfs return
[docs] class StarfitSourceSinkFlowNodeMaker(StarfitFlowNodeMaker): r"""STARFIT SourceSink FlowNodeMaker: STARFIT with diversions on storage.""" # noqa: E501
[docs] def __init__( self, discretization: Parameters, parameters: Parameters, source_sink_df: pd.DataFrame, missing_data_as_zero: bool = False, calc_method: Literal["numba", "numpy"] = None, io_in_cfs: bool = True, verbose: bool = None, imbalance_behavior: Literal["defer", None, "warn", "error"] = None, nhrs_substep: int = 1, ) -> None: """Instantiate StarfitFlowNodeMaker. Args: discretization: A :class:`Parameters` object. parameters: A :class:`Parmaeters` object with the parameters listed in :class:`StarfitFlowNode` dimensioned by number of reservoirs/nodes to instantiate. source_sink_df: A pandas DataFrame of observations with a time index which can be selected by '%Y-%m-%d' strftime of a datetime64. The column names are not used and may be anything, for example the nhm_seg of the upstream segment. However, the columns order MUST be collated with the input data vectors. The sign convention is: sources are positive and sinks are negative. That is, the sign is from the perspective of the node. Units are cubic feet per second. missing_data_as_zero: Bool option to treat missing times in the timeseries as having zero source/sink. calc_method: One of "numba" or "numpy". io_in_cfs: Are the units in cubic feet per second? False gives units of cubic meters per second. nhrs_substep: Number of hours in the subtimestep. imbalance_behavior: One of "defer", "warn", or "error". verbose: bool = None, """ super().__init__( discretization=discretization, parameters=parameters, calc_method=calc_method, io_in_cfs=io_in_cfs, verbose=verbose, imbalance_behavior=imbalance_behavior, nhrs_substep=nhrs_substep, ) self.name = "StarfitSourceSinkNodeMaker" self._source_sink_df = source_sink_df self._missing_data_as_zero = missing_data_as_zero
[docs] def get_node( self, control: Control, index: int ) -> StarfitSourceSinkFlowNode: stor_min = self._parameters.parameters["source_sink_storage_min"][ index ] col_name = self._source_sink_df.columns[index] data_ts = pd.Series(self._source_sink_df[col_name]) return StarfitSourceSinkFlowNode( control=control, grand_id=self.grand_id[index], initial_storage=self.initial_storage[index], start_time=self.start_time[index], end_time=self.end_time[index], inflow_mean=self.inflow_mean[index], NORhi_min=self.NORhi_min[index], NORhi_max=self.NORhi_max[index], NORhi_alpha=self.NORhi_alpha[index], NORhi_beta=self.NORhi_beta[index], NORhi_mu=self.NORhi_mu[index], NORlo_min=self.NORlo_min[index], NORlo_max=self.NORlo_max[index], NORlo_alpha=self.NORlo_alpha[index], NORlo_beta=self.NORlo_beta[index], NORlo_mu=self.NORlo_mu[index], Release_min=self.Release_min[index], Release_max=self.Release_max[index], Release_alpha1=self.Release_alpha1[index], Release_alpha2=self.Release_alpha2[index], Release_beta1=self.Release_beta1[index], Release_beta2=self.Release_beta2[index], Release_p1=self.Release_p1[index], Release_p2=self.Release_p2[index], Release_c=self.Release_c[index], GRanD_CAP_MCM=self.GRanD_CAP_MCM[index], Obs_MEANFLOW_CUMECS=self.Obs_MEANFLOW_CUMECS[index], source_sink_storage_min=stor_min, source_sink_data=data_ts, missing_data_as_zero=self._missing_data_as_zero, calc_method=self._calc_method, io_in_cfs=self._io_in_cfs, imbalance_behavior=self._imbalance_behavior, nhrs_substep=self._nhrs_substep, )
[docs] @staticmethod def get_dimensions() -> tuple: """Get a tuple of dimension names for this StarfitFlowNodeMaker.""" return ("nreservoirs",)
[docs] @staticmethod def get_parameters() -> tuple: """Get a tuple of parameter names for StarfitFlowNodeMaker.""" return ( "grand_id", "initial_storage", "start_time", "end_time", "inflow_mean", "NORhi_min", "NORhi_max", "NORhi_alpha", "NORhi_beta", "NORhi_mu", "NORlo_min", "NORlo_max", "NORlo_alpha", "NORlo_beta", "NORlo_mu", "Release_min", "Release_max", "Release_alpha1", "Release_alpha2", "Release_beta1", "Release_beta2", "Release_p1", "Release_p2", "Release_c", "GRanD_CAP_MCM", "Obs_MEANFLOW_CUMECS", "source_sink_storage_min", )