Source code for pywatershed.hydrology.prms_groundwater

import pathlib as pl
from typing import Literal, Union
from warnings import warn

import numpy as np

from ..base.adapter import adaptable, adapter_factory
from ..base.conservative_process import ConservativeProcess
from ..base.control import Control
from ..constants import nan, numba_num_threads
from ..parameters import Parameters


[docs] class PRMSGroundwater(ConservativeProcess): """PRMS groundwater reservoir. Implementation based on PRMS 5.2.1 with theoretical documentation given in the PRMS-IV documentation: `Markstrom, S. L., Regan, R. S., Hay, L. E., Viger, R. J., Webb, R. M., Payn, R. A., & LaFontaine, J. H. (2015). PRMS-IV, the precipitation-runoff modeling system, version 4. US Geological Survey Techniques and Methods, 6, B7. <https://pubs.usgs.gov/tm/6b7/pdf/tm6-b7.pdf>`__ Args: control: a Control object discretization: a discretization of class Parameters parameters: a parameter object of class Parameters soil_to_gw: Portion of excess flow to the capillary reservoir that drains to the associated GWR for each HRU ssr_to_gw: Drainage from the gravity-reservoir to the associated GWR for each HRU dprst_seep_hru: Seepage from surface-depression storage to associated GWR for each HRU imbalance_behavior: one of ["defer", None, "warn", "error"] with "defer" being the default and defering to control.options["imbalance_behavior"] when available. When control.options["imbalance_behavior"] is not avaiable, imbalance_behavior is set to "warn". calc_method: one of ["numba", "numpy"]. None defaults to "numba". verbose: Print extra information or not? restart_read: May be boolean or a Pathlib.Path. If False, control.options will be examined for this key. If True, the working directory is searched for restart files. If a Pathlib.Path, this specifies an alternative directory to search for restart files. Files searched for are of the pattern YYYY-mm-dd-varname.nc where the date is the control.init_time. The timestamp on the file is the valid time of the states in the file with the exception of processes with sub-daily timesteps. For example, the outflow_ts variable of PRMSChannel is instantaneous and valid at the 23rd hour of the timestampped day whereas its variable seg_outflow is the daily averge value over the timestampped day. restart_write: As for restart_read but for writing. The directory in either case will be attempted to be created if it does not exist. restart_write_freq: If False, then control.options is examined for this key. The follwing values set the frequency of restart output with "y" for yearly, "m" for monthly, "d" for daily, or "f" for final. "Final" means that restart files are written with the states at control.end_time to files timestampped with control.end_time. Yearly and monthly restart options write files with timestamps on the last day of each year or month during the run. If daily, restarts are written every day. If restart_write is not False and restart_write_freq is False, the default of "f" is used. """
[docs] def __init__( self, control: Control, discretization: Parameters, parameters: Parameters, soil_to_gw: adaptable, ssr_to_gw: adaptable, dprst_seep_hru: adaptable, dprst_flag: bool = None, imbalance_behavior: Literal["defer", None, "warn", "error"] = "defer", calc_method: Literal["numba", "numpy"] = None, input_aliases: dict = None, verbose: bool = None, restart_read: Union[pl.Path, bool] = False, restart_write: Union[pl.Path, bool] = False, restart_write_freq: Literal["y", "m", "d", "f", False] = False, ) -> None: super().__init__( control=control, discretization=discretization, parameters=parameters, input_aliases=input_aliases, restart_read=restart_read, restart_write=restart_write, restart_write_freq=restart_write_freq, ) self.name = "PRMSGroundwater" self._set_inputs(locals()) self._set_options(locals()) if self._dprst_flag is None: self._dprst_flag = True # This is a hacky dprst_flag == False approach. Improve design to # get rid of these inputs. if not self._dprst_flag: for kk, vv in self._input_variables_dict.items(): if vv is not None: continue self._input_variables_dict[kk] = adapter_factory( np.zeros(self._params.dimensions["nhru"]), variable_name=kk, control=self.control, ) self._set_budget() self._init_calc_method() return
[docs] @staticmethod def get_dimensions() -> tuple: return ("nhru",)
[docs] @staticmethod def get_parameters() -> tuple: return ( "hru_area", "hru_in_to_cf", "gwflow_coef", "gwsink_coef", "gwstor_init", "gwstor_min", )
[docs] @staticmethod def get_inputs() -> tuple: return ( "soil_to_gw", "ssr_to_gw", "dprst_seep_hru", )
[docs] @staticmethod def get_mass_budget_terms(): return { "inputs": [ "soil_to_gw", "ssr_to_gw", "dprst_seep_hru", ], "outputs": [ "gwres_flow", ], "storage_changes": [ "gwres_stor_change", ], }
[docs] @staticmethod def get_init_values() -> dict: return { "gwres_flow": nan, "gwres_flow_vol": nan, "gwres_sink": nan, "gwres_stor": nan, "gwres_stor_old": nan, "gwres_stor_change": nan, }
[docs] @staticmethod def get_restart_variables() -> list: return ["gwres_stor"]
def _set_initial_conditions(self): # initialize groundwater reservoir storage self.gwres_stor[:] = self.gwstor_init.copy() self.gwres_stor_old[:] = self.gwstor_init.copy() return def _init_diagnostic_vars(self) -> None: return def _init_calc_method(self): if self._calc_method is None: self._calc_method = "numba" avail_methods = ["numpy", "numba"] if self._calc_method.lower() not in avail_methods: msg = ( f"Invalid calc_method={self._calc_method} for {self.name}. " f"Setting calc_method to 'numba' for {self.name}" ) warn(msg) self._calc_method = "numba" if self._calc_method.lower() == "numba": import numba as nb numba_msg = f"{self.name} jit compiling with numba " nb_parallel = (numba_num_threads is not None) and ( numba_num_threads > 1 ) if nb_parallel: numba_msg += f"and using {numba_num_threads} threads" print(numba_msg, flush=True) self._calculate_gw = nb.njit( nb.types.UniTuple(nb.float64[:], 5)( nb.types.Array(nb.types.float64, 1, "C", readonly=True), nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:], nb.types.Array(nb.types.float64, 1, "C", readonly=True), nb.types.Array(nb.types.float64, 1, "C", readonly=True), nb.float64[:], nb.types.Array(nb.types.float64, 1, "C", readonly=True), ), fastmath=True, parallel=False, )(self._calculate_numpy) else: self._calculate_gw = self._calculate_numpy return def _advance_variables(self) -> None: self.gwres_stor_old[:] = self.gwres_stor return def _calculate(self, simulation_time) -> None: self._simulation_time = simulation_time ( self.gwres_stor[:], self.gwres_flow[:], self.gwres_sink[:], self.gwres_stor_change[:], self.gwres_flow_vol[:], ) = self._calculate_gw( self.hru_area, self.soil_to_gw, self.ssr_to_gw, self.dprst_seep_hru, self.gwres_stor, self.gwflow_coef, self.gwsink_coef, self.gwres_stor_old, self.hru_in_to_cf, ) return @staticmethod def _calculate_numpy( gwarea, soil_to_gw, ssr_to_gw, dprst_seep_hru, gwres_stor, gwflow_coef, gwsink_coef, gwres_stor_old, hru_in_to_cf, ): soil_to_gw_vol = soil_to_gw * gwarea ssr_to_gw_vol = ssr_to_gw * gwarea dprst_seep_hru_vol = dprst_seep_hru * gwarea # todo: what about route order _gwres_stor = gwres_stor * gwarea _gwres_stor += soil_to_gw_vol + ssr_to_gw_vol + dprst_seep_hru_vol _gwres_flow = _gwres_stor * gwflow_coef _gwres_stor -= _gwres_flow _gwres_sink = _gwres_stor * gwsink_coef idx = np.where(_gwres_sink > _gwres_stor) _gwres_sink[idx] = _gwres_stor[idx] _gwres_stor -= _gwres_sink # convert most units back to self variables # output variables gwres_stor = _gwres_stor / gwarea # for some stupid reason this is left in acre-inches gwres_flow = _gwres_flow / gwarea gwres_sink = _gwres_sink / gwarea gwres_stor_change = gwres_stor - gwres_stor_old gwres_flow_vol = gwres_flow * hru_in_to_cf return ( gwres_stor, gwres_flow, gwres_sink, gwres_stor_change, gwres_flow_vol, )