Source code for pywatershed.hydrology.source_sink_flow_node

import numpy as np
import pandas as pd

from pywatershed.base.control import Control
from pywatershed.base.flow_graph import FlowNode, FlowNodeMaker
from pywatershed.base.parameters import Parameters
from pywatershed.constants import nan, zero


[docs] class SourceSinkFlowNode(FlowNode): """A FlowNode that adds or removes flow above some minimum flow parameter. See :class:`FlowGraph` for examples and discussion. """
[docs] def __init__( self, control: Control, flow_min: np.float64, source_sink_data: pd.Series, missing_data_as_zero: bool = False, # JLM TODO: budget ): """Initialize an SourceSinkFlowNode. Args: control: a Control object. flow_min: A floating point value for the minium flow. source_sink_data: A pandas Series object of sources/sinks at this location. See SourceSinkFlowNodeMaker for a description of the pd.DataFrame passed to supply this data. missing_data_as_zero: Bool option to treat missing times in the timeseries as having zero source/sink. """ from datetime import datetime self.control = control self._flow_min = flow_min self._source_sink_data = source_sink_data self._missing_data_as_zero = missing_data_as_zero self._sink_source = zero first_time = source_sink_data.index[0] if not isinstance(first_time, pd.Timestamp): try: _ = datetime.strptime(first_time, "%Y-%m-%d") except ValueError as err_msg: msg = ( f"The source_sink_data index Date column is malformed: " f"{err_msg}" ) raise ValueError(msg) return
# @staticmethod # def get_mass_budget_terms(): # """Get a dictionary of variable names for mass budget terms.""" # return { # "inputs": [ # "_lake_inflow", # ], # "outputs": [ # "_lake_release", # "_lake_spill", # ], # "storage_changes": [ # "_lake_storage_change_flow_units", # ], # }
[docs] def prepare_timestep(self) -> None: ymd = self.control.current_datetime.strftime("%Y-%m-%d") if ymd not in self._source_sink_data.index: if not self._missing_data_as_zero: msg = ( "Missing data not handled by SourceSinkFlowNode unless " "missing_data_as_zero set to True." ) raise ValueError(msg) # < self._source_sink_requested = zero else: self._source_sink_requested = self._source_sink_data[ymd] # < self._sink_source_sum = zero return
[docs] def calculate_subtimestep( self, isubstep: int, inflow_upstream: float, inflow_lateral: float, ): inflow = inflow_upstream + inflow_lateral source_sink = self._source_sink_requested min_flow = self._flow_min if source_sink >= zero: # a source is always applied outflow = inflow + source_sink elif (source_sink < zero) and (inflow < min_flow): # sink is not applied when inflow < min_flow outflow = inflow source_sink = zero elif (source_sink < zero) and (inflow >= min_flow): if (inflow + source_sink) < min_flow: # difference order is for negative sign convention for sink source_sink = min_flow - inflow outflow = min_flow else: outflow = inflow + source_sink # < self._seg_outflow = outflow self._sink_source_sum += source_sink self._sink_source = self._sink_source_sum / (isubstep + 1) return
[docs] def finalize_timestep(self) -> None: return
[docs] def advance(self) -> None: # if self.mass_budget is not None: # self.mass_budget.advance() # self.mass_budget.calculate() return
@property def outflow(self) -> np.float64: return self._seg_outflow @property def outflow_substep(self) -> np.float64: return self._seg_outflow @property def storage_change(self) -> np.float64: return np.float64(zero) @property def storage(self) -> np.float64: return np.float64(nan) @property def sink_source(self) -> np.float64: """Average sink and source through the last subtimestep Sink is negative, indicating that incoming flow is being discarded (if it were being stored, the storage change would be the opposite sign). Source is positive, indicating that incoming flow is being augmented (if it were being stored, the storage change would be the opposite sign). """ return self._sink_source
[docs] class SourceSinkFlowNodeMaker(FlowNodeMaker): """A FlowNodeMaker for SourceSinkFlowNode. See :class:`FlowGraph` for related examples and discussion. """
[docs] def __init__( self, parameters: Parameters, source_sink_df: pd.DataFrame, missing_data_as_zero: bool = False, ) -> None: """Initialize a ObsInFlowNodeMaker. Args: parameters: A pywatershed Parameters object. source_sink_df: A pandas DataFrame of observations with a time index which can be selected by '%Y-%m-%d' strftime of a datetime64. The column names are not used and may be anything, for example the nhm_seg of the upstream segment. However, the columns order MUST be collated with the input data vectors. The sign convention is: sources are positive and sinks are negative. That is, the sign is from the perspective of the node. The units are cubic feet per second. missing_data_as_zero: Bool option to treat missing times in the timeseries as having zero source/sink. """ self.name = "SourceSinkNodeMaker" self._parameters = parameters self._source_sink_df = source_sink_df self._missing_data_as_zero = missing_data_as_zero
[docs] def get_node(self, control: Control, index: int): flow_min = self._parameters.parameters["flow_min"][index] col_name = self._source_sink_df.columns[index] # pd.Series() for typing data_ts = pd.Series(self._source_sink_df[col_name]) return SourceSinkFlowNode( control, flow_min, data_ts, self._missing_data_as_zero )
[docs] @staticmethod def get_dimensions() -> tuple: """Get a tuple of dimension names for this SourceSinkFlowNodeMaker.""" return ("nreservoirs",)
[docs] @staticmethod def get_parameters() -> tuple: """Get a tuple of parameter names for SourceSinkFlowNodeMaker.""" return ("source_sink_storage_min",)