pywatershed.FlowGraph#
- class pywatershed.FlowGraph(control, discretization, parameters, inflows, node_maker_dict, addtl_output_vars=None, params_not_to_netcdf=None, budget_type='defer', allow_disconnected_nodes=False, type_check_nodes=False, verbose=None)[source]#
FlowGraph manages and computes FlowNodes given by FlowNodeMakers.
FlowGraph lets users combine
FlowNodes of different kinds into a single mathmetical graph of flow solution. FlowNodes provide explicit solutions of flow (currently not involving a head term) on a single spatial unit. The FlowGraph allows these different flow solutions to be combined in arbitrary order on a mathematical graph. There are many applications, but a common one is to add a reservoir representation into an existing graph of flow, such as exists withinPRMSChannelwhich computes a Muskingum-Mann solution of flow. This example is shown schematically in the following figure.
Above a node of class B is inserted into the original graph. Class B may have a different flow solution than class A in the original graph, but FlowGraph handles new nodes wherever you want to put them. FlowGraph checks mass balance over the graph.
To delve a bit deeper, the relationship between FlowGraph,
FlowNode, andFlowNodeMakeris shown in the figure below.
The figure above illustrates how
FlowNodeMakers already have a certain kind ofFlowNodeclass composed into them. A user instantiates eachFlowNodeMakerby passing all the data required for all theFlowNodes. FlowGraph recieves instantiatedFlowNodeMakers and calls them, in turn, to instantiate theFlowNodes in the FlowGraph.Note that users generally do not create types of
FlowNodes orFlowNodeMakers themselves, this is typically the work of code developers. But users pass parameters, an inflow Adapter, and instantiatedFlowNodeMakers to FlowGraph. The example below shows the nuts and bolts of setting up aFlowGraphsimilar to that illustrated above, where a single pass-through node is inserted.For users specifically interested in adding new nodes into the
PRMSChannelMuskingumMann routing solutions, there are helper functions available which greatly simplify the code. See the notebook examples/06_flow_graph_starfit.ipynb which highlights both helper functionsprms_channel_flow_graph_to_model_dict()andprms_channel_flow_graph_postprocess().For developers looking to add new
FlowNodes, please read theFlowNodebase class code and also the code forFlowNodeMaker.Examples:#
This example shows how to insert a pass-through node into a PRMSChannel simulation. It’s a bit underwhelming because the flows on the PRMSChannel nodes are unaltered, but shows the full mechanism without helper functions.
>>> import numpy as np >>> from tqdm.auto import tqdm >>> import xarray as xr >>> import pywatershed as pws >>> from pywatershed.constants import nan, zero >>> from pywatershed.constants import __pywatershed_root__ as pkg_root_dir >>> # this example requries the repository with test data previously generated >>> domain_dir = pkg_root_dir / "../test_data/drb_2yr" >>> control_file = domain_dir / "nhm.control" >>> control = pws.Control.load_prms( ... control_file, warn_unused_options=False ... ) >>> dis_hru_file = domain_dir / "parameters_dis_hru.nc" >>> dis_seg_file = domain_dir / "parameters_dis_seg.nc" >>> discretization_prms = pws.Parameters.merge( ... pws.Parameters.from_netcdf(dis_hru_file, encoding=False), ... pws.Parameters.from_netcdf(dis_seg_file, encoding=False), ... ) >>> param_file = domain_dir / "parameters_PRMSChannel.nc" >>> parameters_prms = pws.parameters.PrmsParameters.from_netcdf(param_file) >>> # Build the parameters for the FlowGraph >>> nnodes = parameters_prms.dims["nsegment"] + 1 >>> node_maker_name = ["prms_channel"] * nnodes >>> node_maker_name[-1] = "pass_throughs" >>> node_maker_index = np.arange(nnodes) >>> node_maker_index[-1] = 0 >>> to_graph_index = np.zeros(nnodes, dtype=np.int64) >>> dis_params = discretization_prms.parameters >>> to_graph_index[0:-1] = dis_params["tosegment"] - 1 >>> nhm_seg_intervene_above = 1829 >>> wh_intervene_above_nhm = np.where( ... dis_params["nhm_seg"] == nhm_seg_intervene_above ... ) >>> wh_intervene_below_nhm = np.where( ... (dis_params["tosegment"] - 1) == wh_intervene_above_nhm[0][0] ... ) ... # have to map to the graph from an index found in prms_channel >>> wh_intervene_above_graph = np.where( ... (np.array(node_maker_name) == "prms_channel") ... & (node_maker_index == wh_intervene_above_nhm[0][0]) ... ) >>> wh_intervene_below_graph = np.where( ... (np.array(node_maker_name) == "prms_channel") ... & np.isin(node_maker_index, wh_intervene_below_nhm) ... ) >>> to_graph_index[-1] = wh_intervene_above_graph[0][0] >>> to_graph_index[wh_intervene_below_graph] = nnodes - 1 >>> parameters_flow_graph = pws.Parameters( ... dims={ ... "nnodes": nnodes, ... }, ... coords={ ... "node_coord": np.arange(nnodes), ... }, ... data_vars={ ... "node_maker_name": node_maker_name, ... "node_maker_index": node_maker_index, ... "to_graph_index": to_graph_index, ... }, ... metadata={ ... "node_coord": {"dims": ["nnodes"]}, ... "node_maker_name": {"dims": ["nnodes"]}, ... "node_maker_index": {"dims": ["nnodes"]}, ... "to_graph_index": {"dims": ["nnodes"]}, ... }, ... validate=True, ... ) >>> # Get the FlowNodeMakers instantiated and named >>> node_maker_dict = { ... "prms_channel": pws.PRMSChannelFlowNodeMaker( ... discretization_prms, parameters_prms ... ), ... "pass_throughs": pws.PassThroughNodeMaker(), ... } >>> # Get the inputs to PRMSChannel combined, then add inputs to the ... # additional node using a custom Adapter. >>> input_variables = {} >>> for key in pws.PRMSChannel.get_inputs(): ... nc_path = domain_dir / f"output/{key}.nc" ... input_variables[key] = pws.AdapterNetcdf(nc_path, key, control) ... >>> inflows_prms = pws.HruSegmentFlowAdapter( ... parameters_prms, **input_variables ... ) >>> class GraphInflowAdapter(pws.Adapter): ... def __init__( ... self, ... prms_inflows: pws.Adapter, ... variable: str = "inflows", ... ): ... self._variable = variable ... self._prms_inflows = prms_inflows ... ... self._nnodes = len(self._prms_inflows.current) + 1 ... self._current_value = np.zeros(self._nnodes) * nan ... return ... ... def advance(self) -> None: ... self._prms_inflows.advance() ... self._current_value[0:-1] = self._prms_inflows.current ... self._current_value[-1] = zero # no inflow at the pass through ... return ... >>> inflows_graph = GraphInflowAdapter(inflows_prms) >>> # Instantiate the FlowGraph >>> flow_graph = pws.FlowGraph( ... control, ... discretization=None, ... parameters=parameters_flow_graph, ... inflows=inflows_graph, ... node_maker_dict=node_maker_dict, ... budget_type="error", ... ) >>> # Save out the full timeseries of flows for all nodes >>> graph_seg_outflows = np.zeros([control.n_times, nnodes]) >>> # Run the flow graph >>> for istep in tqdm(range(control.n_times)): ... control.advance() ... flow_graph.advance() ... flow_graph.calculate(1.0) ... graph_seg_outflows[istep, :] = flow_graph["node_outflows"] ... >>> flow_graph.finalize() >>> # Compare to the results of PRMSChannel run with out a pass-through ... # node. >>> prms_seg_outflows = xr.open_dataarray( ... domain_dir / "output/seg_outflow.nc" ... ) ... # The final node is the passthrough node, drop it from comparisons. >>> assert ( ... abs(graph_seg_outflows[:, 0:-1] - prms_seg_outflows.values) < 1e-10 ... ).all()
- __init__(control, discretization, parameters, inflows, node_maker_dict, addtl_output_vars=None, params_not_to_netcdf=None, budget_type='defer', allow_disconnected_nodes=False, type_check_nodes=False, verbose=None)[source]#
Initialize a FlowGraph.
- Parameters:
control (
Control) – A Control objectdiscretization (
Parameters) – Currently unused by FlowGraph but required by it’s superclass, ConservativeProcess.parameters (
Parameters) – A Parameter object with the FlowGraph parameters as described below.inflows (
Union[str,Path,ndarray,Adapter]) – An adaptable of inflows to the graph, often referred to as “lateral” flows (not flows inside the graph).node_maker_dict (
dict) – A dictionary of FlowNodeMaker instances with keys/names supplied in the parameters, e.g. {key1: flow_node_maker_instance, …}.params_not_to_netcdf (
list[str]) – A list of string names for parameter to NOT write to NetCDF output files. By default all parameters are included in each file written.addtl_output_vars (
list[str]) – A list of string names for variables to collect for NetCDF output from FlowNodes. These variables do not have to be available in all FlowNodes but must be present in at least one.budget_type (
Literal['defer',None,'warn','error']) – one of [“defer”, None, “warn”, “error”] with “defer” being the default and defering to control.options[“budget_type”] when available. When control.options[“budget_type”] is not avaiable, budget_type is set to “warn”.allow_disconnected_nodes (
bool) – If False, an error is thrown when disconnected nodes are found in the graph. This happens often in PRMS, so allowing is a convenience but bad practive.type_check_nodes (
bool) – Intended for debugging if FlowNodes are not compliant with their required float return values, which can cause a lot or warnings or errors.verbose (
bool) – Print extra diagnostic messages?
The parameters argument is a
Parametersobject which contains the following data:node_maker_name: A list or np.array of the FlowNodeMaker name for each node.
node_maker_index: An np.array of the indices to ask for from the associated/collated FlowNodeMaker (above) for each node
node_maker_id: An np.array of the integer ids used for each node by its node maker. Not used internally but necessary or helpful to users in post-processing to identify nodes. Ids may not be unique in this list but should probably be unique to each node maker.
to_graph_index: np.array of the index of the downstream index in the FlowGraph with -1 indicating an outflow node. This must specify a DAG.
The inputs inflows, node_maker_name, node_maker_index, and to_graph_index are collated. The order of execution of the graph is not the same as the supplied order, the execution order is solved from to_graph_index. Note that initial conditions are set by the node makers via their parameters.
Methods
__init__(control, discretization, ...[, ...])Initialize a FlowGraph.
advance()Advance the Process in time.
calculate(time_length[, n_substeps])Calculate Process terms for a time step
A dictionary description of this Process.
finalize()Finalizes the Process, including output methods.
Get a tuple of dimension names for this Process.
FlowNode initial values.
Get a tuple of input variable names for this Process.
Get a dictionary of variable names for mass budget terms.
Get a mask indicataing on which nodes flow exits the graph.
Get a tuple of parameter names for this Process.
Get a list of restart varible names.
Get a tuple of (public) variable names for this Process.
initialize_netcdf([output_dir, ...])Initialize NetCDF output.
output()Output data to previously initialized output types.
output_to_csv(pth)Save each output variable to separate csv file in specified path
set_input_to_adapter(input_variable_name, ...)Set input variables to adapter.current and manage the adapter.
Attributes
A tuple of parameter names.
A dictionary of initial values for each public variable.
A tuple of input variable names.
A dictionary of variable names for the mass budget terms.
A mask indicating on which nodes flow exits the graph.
A tuple of parameter names.
A tuple of restart variable names.
A tuple of public variable names.
- advance()#
Advance the Process in time.
- Returns:
None
- calculate(time_length, n_substeps=24)[source]#
Calculate Process terms for a time step
- Parameters:
simulation_time – current simulation time
- Return type:
- Returns:
None
- classmethod description()#
A dictionary description of this Process.
- Return type:
- Returns:
All metadata for all variables in inputs, variables,parameters, and mass_budget_terms for this Process.
- classmethod get_variables()[source]#
Get a tuple of (public) variable names for this Process.
- Return type:
- initialize_netcdf(output_dir=None, separate_files=None, budget_args=None, output_vars=None, extra_coords=None)[source]#
Initialize NetCDF output.
- Parameters:
output_dir ([<class ‘str’>, <class ‘pathlib.Path’>]) – base directory path or NetCDF file path if separate_files is True
separate_files (
bool) – boolean indicating if storage component output variables should be written to a separate file for each variableoutput_vars (
list) – list of variable names to outuput.
- Return type:
- Returns:
None
- property outflow_mask#
A mask indicating on which nodes flow exits the graph.
- output_to_csv(pth)#
Save each output variable to separate csv file in specified path
- set_input_to_adapter(input_variable_name, adapter)#
Set input variables to adapter.current and manage the adapter.
TODO: make this private?