pywatershed.FlowGraph#

class pywatershed.FlowGraph(control, discretization, parameters, inflows, node_maker_dict, addtl_output_vars=None, params_not_to_netcdf=None, budget_type='defer', allow_disconnected_nodes=False, type_check_nodes=False, verbose=None)[source]#

FlowGraph manages and computes FlowNodes given by FlowNodeMakers.

FlowGraph lets users combine FlowNodes of different kinds into a single mathmetical graph of flow solution. FlowNodes provide explicit solutions of flow (currently not involving a head term) on a single spatial unit. The FlowGraph allows these different flow solutions to be combined in arbitrary order on a mathematical graph. There are many applications, but a common one is to add a reservoir representation into an existing graph of flow, such as exists within PRMSChannel which computes a Muskingum-Mann solution of flow. This example is shown schematically in the following figure.

fg1

Above a node of class B is inserted into the original graph. Class B may have a different flow solution than class A in the original graph, but FlowGraph handles new nodes wherever you want to put them. FlowGraph checks mass balance over the graph.

To delve a bit deeper, the relationship between FlowGraph, FlowNode, and FlowNodeMaker is shown in the figure below.

fg2

The figure above illustrates how FlowNodeMakers already have a certain kind of FlowNode class composed into them. A user instantiates each FlowNodeMaker by passing all the data required for all the FlowNodes. FlowGraph recieves instantiated FlowNodeMakers and calls them, in turn, to instantiate the FlowNodes in the FlowGraph.

Note that users generally do not create types of FlowNodes or FlowNodeMakers themselves, this is typically the work of code developers. But users pass parameters, an inflow Adapter, and instantiated FlowNodeMakers to FlowGraph. The example below shows the nuts and bolts of setting up a FlowGraph similar to that illustrated above, where a single pass-through node is inserted.

For users specifically interested in adding new nodes into the PRMSChannel MuskingumMann routing solutions, there are helper functions available which greatly simplify the code. See the notebook examples/06_flow_graph_starfit.ipynb which highlights both helper functions prms_channel_flow_graph_to_model_dict() and prms_channel_flow_graph_postprocess().

For developers looking to add new FlowNodes, please read the FlowNode base class code and also the code for FlowNodeMaker.

Examples:#

This example shows how to insert a pass-through node into a PRMSChannel simulation. It’s a bit underwhelming because the flows on the PRMSChannel nodes are unaltered, but shows the full mechanism without helper functions.

>>> import numpy as np
>>> from tqdm.auto import tqdm
>>> import xarray as xr
>>> import pywatershed as pws
>>> from pywatershed.constants import nan, zero
>>> from pywatershed.constants import __pywatershed_root__ as pkg_root_dir
>>> # this example requries the repository with test data previously generated
>>> domain_dir = pkg_root_dir / "../test_data/drb_2yr"
>>> control_file = domain_dir / "nhm.control"
>>> control = pws.Control.load_prms(
...     control_file, warn_unused_options=False
... )
>>> dis_hru_file = domain_dir / "parameters_dis_hru.nc"
>>> dis_seg_file = domain_dir / "parameters_dis_seg.nc"
>>> discretization_prms = pws.Parameters.merge(
...     pws.Parameters.from_netcdf(dis_hru_file, encoding=False),
...     pws.Parameters.from_netcdf(dis_seg_file, encoding=False),
... )
>>> param_file = domain_dir / "parameters_PRMSChannel.nc"
>>> parameters_prms = pws.parameters.PrmsParameters.from_netcdf(param_file)
>>> # Build the parameters for the FlowGraph
>>> nnodes = parameters_prms.dims["nsegment"] + 1
>>> node_maker_name = ["prms_channel"] * nnodes
>>> node_maker_name[-1] = "pass_throughs"
>>> node_maker_index = np.arange(nnodes)
>>> node_maker_index[-1] = 0
>>> to_graph_index = np.zeros(nnodes, dtype=np.int64)
>>> dis_params = discretization_prms.parameters
>>> to_graph_index[0:-1] = dis_params["tosegment"] - 1
>>> nhm_seg_intervene_above = 1829
>>> wh_intervene_above_nhm = np.where(
...     dis_params["nhm_seg"] == nhm_seg_intervene_above
... )
>>> wh_intervene_below_nhm = np.where(
...     (dis_params["tosegment"] - 1) == wh_intervene_above_nhm[0][0]
... )
... # have to map to the graph from an index found in prms_channel
>>> wh_intervene_above_graph = np.where(
...     (np.array(node_maker_name) == "prms_channel")
...     & (node_maker_index == wh_intervene_above_nhm[0][0])
... )
>>> wh_intervene_below_graph = np.where(
...     (np.array(node_maker_name) == "prms_channel")
...     & np.isin(node_maker_index, wh_intervene_below_nhm)
... )
>>> to_graph_index[-1] = wh_intervene_above_graph[0][0]
>>> to_graph_index[wh_intervene_below_graph] = nnodes - 1
>>> parameters_flow_graph = pws.Parameters(
...     dims={
...         "nnodes": nnodes,
...     },
...     coords={
...         "node_coord": np.arange(nnodes),
...     },
...     data_vars={
...         "node_maker_name": node_maker_name,
...         "node_maker_index": node_maker_index,
...         "to_graph_index": to_graph_index,
...     },
...     metadata={
...         "node_coord": {"dims": ["nnodes"]},
...         "node_maker_name": {"dims": ["nnodes"]},
...         "node_maker_index": {"dims": ["nnodes"]},
...         "to_graph_index": {"dims": ["nnodes"]},
...     },
...     validate=True,
... )
>>> # Get the FlowNodeMakers instantiated and named
>>> node_maker_dict = {
...     "prms_channel": pws.PRMSChannelFlowNodeMaker(
...         discretization_prms, parameters_prms
...     ),
...     "pass_throughs": pws.PassThroughNodeMaker(),
... }
>>> # Get the inputs to PRMSChannel combined, then add inputs to the
... # additional node using a custom Adapter.
>>> input_variables = {}
>>> for key in pws.PRMSChannel.get_inputs():
...     nc_path = domain_dir / f"output/{key}.nc"
...     input_variables[key] = pws.AdapterNetcdf(nc_path, key, control)
...
>>> inflows_prms = pws.HruSegmentFlowAdapter(
...     parameters_prms, **input_variables
... )
>>> class GraphInflowAdapter(pws.Adapter):
...     def __init__(
...         self,
...         prms_inflows: pws.Adapter,
...         variable: str = "inflows",
...     ):
...         self._variable = variable
...         self._prms_inflows = prms_inflows
...
...         self._nnodes = len(self._prms_inflows.current) + 1
...         self._current_value = np.zeros(self._nnodes) * nan
...         return
...
...     def advance(self) -> None:
...         self._prms_inflows.advance()
...         self._current_value[0:-1] = self._prms_inflows.current
...         self._current_value[-1] = zero  # no inflow at the pass through
...         return
...
>>> inflows_graph = GraphInflowAdapter(inflows_prms)
>>> # Instantiate the FlowGraph
>>> flow_graph = pws.FlowGraph(
...     control,
...     discretization=None,
...     parameters=parameters_flow_graph,
...     inflows=inflows_graph,
...     node_maker_dict=node_maker_dict,
...     budget_type="error",
... )
>>> # Save out the full timeseries of flows for all nodes
>>> graph_seg_outflows = np.zeros([control.n_times, nnodes])
>>> # Run the flow graph
>>> for istep in tqdm(range(control.n_times)):
...     control.advance()
...     flow_graph.advance()
...     flow_graph.calculate(1.0)
...     graph_seg_outflows[istep, :] = flow_graph["node_outflows"]
...
>>> flow_graph.finalize()
>>> # Compare to the results of PRMSChannel run with out a pass-through
... # node.
>>> prms_seg_outflows = xr.open_dataarray(
...     domain_dir / "output/seg_outflow.nc"
... )
... # The final node is the passthrough node, drop it from comparisons.
>>> assert (
...     abs(graph_seg_outflows[:, 0:-1] - prms_seg_outflows.values) < 1e-10
... ).all()
__init__(control, discretization, parameters, inflows, node_maker_dict, addtl_output_vars=None, params_not_to_netcdf=None, budget_type='defer', allow_disconnected_nodes=False, type_check_nodes=False, verbose=None)[source]#

Initialize a FlowGraph.

Parameters:
  • control (Control) – A Control object

  • discretization (Parameters) – Currently unused by FlowGraph but required by it’s superclass, ConservativeProcess.

  • parameters (Parameters) – A Parameter object with the FlowGraph parameters as described below.

  • inflows (Union[str, Path, ndarray, Adapter]) – An adaptable of inflows to the graph, often referred to as “lateral” flows (not flows inside the graph).

  • node_maker_dict (dict) – A dictionary of FlowNodeMaker instances with keys/names supplied in the parameters, e.g. {key1: flow_node_maker_instance, …}.

  • params_not_to_netcdf (list[str]) – A list of string names for parameter to NOT write to NetCDF output files. By default all parameters are included in each file written.

  • addtl_output_vars (list[str]) – A list of string names for variables to collect for NetCDF output from FlowNodes. These variables do not have to be available in all FlowNodes but must be present in at least one.

  • budget_type (Literal['defer', None, 'warn', 'error']) – one of [“defer”, None, “warn”, “error”] with “defer” being the default and defering to control.options[“budget_type”] when available. When control.options[“budget_type”] is not avaiable, budget_type is set to “warn”.

  • allow_disconnected_nodes (bool) – If False, an error is thrown when disconnected nodes are found in the graph. This happens often in PRMS, so allowing is a convenience but bad practive.

  • type_check_nodes (bool) – Intended for debugging if FlowNodes are not compliant with their required float return values, which can cause a lot or warnings or errors.

  • verbose (bool) – Print extra diagnostic messages?

The parameters argument is a Parameters object which contains the following data:

  • node_maker_name: A list or np.array of the FlowNodeMaker name for each node.

  • node_maker_index: An np.array of the indices to ask for from the associated/collated FlowNodeMaker (above) for each node

  • node_maker_id: An np.array of the integer ids used for each node by its node maker. Not used internally but necessary or helpful to users in post-processing to identify nodes. Ids may not be unique in this list but should probably be unique to each node maker.

  • to_graph_index: np.array of the index of the downstream index in the FlowGraph with -1 indicating an outflow node. This must specify a DAG.

The inputs inflows, node_maker_name, node_maker_index, and to_graph_index are collated. The order of execution of the graph is not the same as the supplied order, the execution order is solved from to_graph_index. Note that initial conditions are set by the node makers via their parameters.

Methods

__init__(control, discretization, ...[, ...])

Initialize a FlowGraph.

advance()

Advance the Process in time.

calculate(time_length[, n_substeps])

Calculate Process terms for a time step

description()

A dictionary description of this Process.

finalize()

Finalizes the Process, including output methods.

get_dimensions()

Get a tuple of dimension names for this Process.

get_init_values()

FlowNode initial values.

get_inputs()

Get a tuple of input variable names for this Process.

get_mass_budget_terms()

Get a dictionary of variable names for mass budget terms.

get_outflow_mask()

Get a mask indicataing on which nodes flow exits the graph.

get_parameters()

Get a tuple of parameter names for this Process.

get_restart_variables()

Get a list of restart varible names.

get_variables()

Get a tuple of (public) variable names for this Process.

initialize_netcdf([output_dir, ...])

Initialize NetCDF output.

output()

Output data to previously initialized output types.

output_to_csv(pth)

Save each output variable to separate csv file in specified path

set_input_to_adapter(input_variable_name, ...)

Set input variables to adapter.current and manage the adapter.

Attributes

dimensions

A tuple of parameter names.

init_values

A dictionary of initial values for each public variable.

inputs

A tuple of input variable names.

mass_budget_terms

A dictionary of variable names for the mass budget terms.

outflow_mask

A mask indicating on which nodes flow exits the graph.

parameters

A tuple of parameter names.

restart_variables

A tuple of restart variable names.

variables

A tuple of public variable names.

advance()#

Advance the Process in time.

Returns:

None

calculate(time_length, n_substeps=24)[source]#

Calculate Process terms for a time step

Parameters:

simulation_time – current simulation time

Return type:

None

Returns:

None

classmethod description()#

A dictionary description of this Process.

Return type:

dict

Returns:

All metadata for all variables in inputs, variables,parameters, and mass_budget_terms for this Process.

property dimensions: tuple#

A tuple of parameter names.

finalize()#

Finalizes the Process, including output methods. :rtype: None :returns: None

static get_dimensions()[source]#

Get a tuple of dimension names for this Process.

Return type:

tuple

static get_init_values()[source]#

FlowNode initial values.

Return type:

dict

static get_inputs()[source]#

Get a tuple of input variable names for this Process.

Return type:

tuple

static get_mass_budget_terms()[source]#

Get a dictionary of variable names for mass budget terms.

get_outflow_mask()[source]#

Get a mask indicataing on which nodes flow exits the graph.

static get_parameters()[source]#

Get a tuple of parameter names for this Process.

Return type:

tuple

static get_restart_variables()#

Get a list of restart varible names.

Return type:

list

classmethod get_variables()[source]#

Get a tuple of (public) variable names for this Process.

Return type:

tuple

property init_values: dict#

A dictionary of initial values for each public variable.

initialize_netcdf(output_dir=None, separate_files=None, budget_args=None, output_vars=None, extra_coords=None)[source]#

Initialize NetCDF output.

Parameters:
  • output_dir ([<class ‘str’>, <class ‘pathlib.Path’>]) – base directory path or NetCDF file path if separate_files is True

  • separate_files (bool) – boolean indicating if storage component output variables should be written to a separate file for each variable

  • output_vars (list) – list of variable names to outuput.

Return type:

None

Returns:

None

property inputs: tuple#

A tuple of input variable names.

property mass_budget_terms: dict#

A dictionary of variable names for the mass budget terms.

property outflow_mask#

A mask indicating on which nodes flow exits the graph.

output()#

Output data to previously initialized output types. :rtype: None :returns: None

output_to_csv(pth)#

Save each output variable to separate csv file in specified path

property parameters: tuple#

A tuple of parameter names.

property restart_variables: tuple#

A tuple of restart variable names.

set_input_to_adapter(input_variable_name, adapter)#

Set input variables to adapter.current and manage the adapter.

TODO: make this private?

Parameters:
  • input_variable_name (str) – key of input variable

  • adapter (Adapter) – the Adapter for the input.

property variables: tuple#

A tuple of public variable names.