Source code for pywatershed.hydrology.source_sink_flow_node
import numpy as np
import pandas as pd
from pywatershed.base.control import Control
from pywatershed.base.flow_graph import FlowNode, FlowNodeMaker
from pywatershed.base.parameters import Parameters
from pywatershed.constants import nan, zero
[docs]
class SourceSinkFlowNode(FlowNode):
"""A FlowNode that adds or removes flow above some minimum flow parameter.
See :class:`FlowGraph` for examples and discussion.
"""
[docs]
def __init__(
self,
control: Control,
flow_min: np.float64,
source_sink_data: pd.Series,
missing_data_as_zero: bool = False,
# JLM TODO: budget
):
"""Initialize an SourceSinkFlowNode.
Args:
control: a Control object.
flow_min: A floating point value for the minium flow.
source_sink_data: A pandas Series object of sources/sinks at this
location. See SourceSinkFlowNodeMaker for a description of the
pd.DataFrame passed to supply this data.
missing_data_as_zero: Bool option to treat missing times in the
timeseries as having zero source/sink.
"""
from datetime import datetime
self.control = control
self._flow_min = flow_min
self._source_sink_data = source_sink_data
self._missing_data_as_zero = missing_data_as_zero
self._sink_source = zero
first_time = source_sink_data.index[0]
if not isinstance(first_time, pd.Timestamp):
try:
_ = datetime.strptime(first_time, "%Y-%m-%d")
except ValueError as err_msg:
msg = (
f"The source_sink_data index Date column is malformed: "
f"{err_msg}"
)
raise ValueError(msg)
return
# @staticmethod
# def get_mass_budget_terms():
# """Get a dictionary of variable names for mass budget terms."""
# return {
# "inputs": [
# "_lake_inflow",
# ],
# "outputs": [
# "_lake_release",
# "_lake_spill",
# ],
# "storage_changes": [
# "_lake_storage_change_flow_units",
# ],
# }
[docs]
def prepare_timestep(self) -> None:
ymd = self.control.current_datetime.strftime("%Y-%m-%d")
if ymd not in self._source_sink_data.index:
if not self._missing_data_as_zero:
msg = (
"Missing data not handled by SourceSinkFlowNode unless "
"missing_data_as_zero set to True."
)
raise ValueError(msg)
# <
self._source_sink_requested = zero
else:
self._source_sink_requested = self._source_sink_data[ymd]
# <
self._sink_source_sum = zero
return
[docs]
def calculate_subtimestep(
self,
isubstep: int,
inflow_upstream: float,
inflow_lateral: float,
):
inflow = inflow_upstream + inflow_lateral
source_sink = self._source_sink_requested
min_flow = self._flow_min
if source_sink >= zero:
# a source is always applied
outflow = inflow + source_sink
elif (source_sink < zero) and (inflow < min_flow):
# sink is not applied when inflow < min_flow
outflow = inflow
source_sink = zero
elif (source_sink < zero) and (inflow >= min_flow):
if (inflow + source_sink) < min_flow:
# difference order is for negative sign convention for sink
source_sink = min_flow - inflow
outflow = min_flow
else:
outflow = inflow + source_sink
# <
self._seg_outflow = outflow
self._sink_source_sum += source_sink
self._sink_source = self._sink_source_sum / (isubstep + 1)
return
[docs]
def finalize_timestep(self) -> None:
return
[docs]
def advance(self) -> None:
# if self.mass_budget is not None:
# self.mass_budget.advance()
# self.mass_budget.calculate()
return
@property
def outflow(self) -> np.float64:
return self._seg_outflow
@property
def outflow_substep(self) -> np.float64:
return self._seg_outflow
@property
def storage_change(self) -> np.float64:
return np.float64(zero)
@property
def storage(self) -> np.float64:
return np.float64(nan)
@property
def sink_source(self) -> np.float64:
"""Average sink and source through the last subtimestep
Sink is negative, indicating that incoming flow is being discarded (if
it were being stored, the storage change would be the opposite sign).
Source is positive, indicating that incoming flow is being augmented
(if it were being stored, the storage change would be the opposite
sign).
"""
return self._sink_source
[docs]
class SourceSinkFlowNodeMaker(FlowNodeMaker):
"""A FlowNodeMaker for SourceSinkFlowNode.
See :class:`FlowGraph` for related examples and discussion.
"""
[docs]
def __init__(
self,
parameters: Parameters,
source_sink_df: pd.DataFrame,
missing_data_as_zero: bool = False,
) -> None:
"""Initialize a ObsInFlowNodeMaker.
Args:
parameters: A pywatershed Parameters object.
source_sink_df: A pandas DataFrame of observations with a time
index which can be selected by '%Y-%m-%d' strftime of a datetime64.
The column names are not used and may be anything, for example the
nhm_seg of the upstream segment. However, the columns order MUST
be collated with the input data vectors. The sign convention is:
sources are positive and sinks are negative. That is, the sign is
from the perspective of the node. The units are cubic feet per
second.
missing_data_as_zero: Bool option to treat missing times in the
timeseries as having zero source/sink.
"""
self.name = "SourceSinkNodeMaker"
self._parameters = parameters
self._source_sink_df = source_sink_df
self._missing_data_as_zero = missing_data_as_zero
[docs]
def get_node(self, control: Control, index: int):
flow_min = self._parameters.parameters["flow_min"][index]
col_name = self._source_sink_df.columns[index]
# pd.Series() for typing
data_ts = pd.Series(self._source_sink_df[col_name])
return SourceSinkFlowNode(
control, flow_min, data_ts, self._missing_data_as_zero
)
[docs]
@staticmethod
def get_dimensions() -> tuple:
"""Get a tuple of dimension names for this SourceSinkFlowNodeMaker."""
return ("nreservoirs",)
[docs]
@staticmethod
def get_parameters() -> tuple:
"""Get a tuple of parameter names for SourceSinkFlowNodeMaker."""
return ("source_sink_storage_min",)