Source code for pywatershed.hydrology.prms_canopy

import pathlib as pl
from typing import Literal, Union
from warnings import warn

import numpy as np
from numba import prange

from ..base.adapter import adaptable
from ..base.conservative_process import ConservativeProcess
from ..base.control import Control
from ..constants import (
    CovType,
    HruType,
    dnearzero,
    nearzero,
    numba_num_threads,
    zero,
)
from ..parameters import Parameters

# set constants (may need .value for enum to be used in > comparisons)
BARESOIL = CovType.BARESOIL.value
GRASSES = CovType.GRASSES.value
LAND = HruType.LAND.value
LAKE = HruType.LAKE.value
RAIN = 0
SNOW = 1
OFF = 0
ACTIVE = 1


[docs] class PRMSCanopy(ConservativeProcess): """PRMS canopy class. A canopy or vegetation representation from PRMS. Implementation based on PRMS 5.2.1 with theoretical documentation given in the PRMS-IV documentation: `Markstrom, S. L., Regan, R. S., Hay, L. E., Viger, R. J., Webb, R. M., Payn, R. A., & LaFontaine, J. H. (2015). PRMS-IV, the precipitation-runoff modeling system, version 4. US Geological Survey Techniques and Methods, 6, B7. <https://pubs.usgs.gov/tm/6b7/pdf/tm6-b7.pdf>`__ Args: control: a Control object discretization: a discretization of class Parameters parameters: a parameter object of class Parameters pk_ice_prev: Previous snowpack ice on each HRU freeh2o_prev: Previous snowpack free water on each HRU transp_on: Flag indicating whether transpiration is occurring (0=no;1=yes) hru_ppt: Precipitation on each HRU hru_rain: Rain on each HRU hru_snow: Snow on each HRU imbalance_behavior: one of ["defer", None, "warn", "error"] with "defer" being the default and defering to control.options["imbalance_behavior"] when available. When control.options["imbalance_behavior"] is not avaiable, imbalance_behavior is set to "warn". calc_method: one of ["numba", "numpy"]. None defaults to "numba". verbose: Print extra information or not? load_n_time_batches: not-implemented restart_read: May be boolean or a Pathlib.Path. If False, control.options will be examined for this key. If True, the working directory is searched for restart files. If a Pathlib.Path, this specifies an alternative directory to search for restart files. Files searched for are of the pattern YYYY-mm-dd-varname.nc where the date is the control.init_time. The timestamp on the file is the valid time of the states in the file with the exception of processes with sub-daily timesteps. For example, the outflow_ts variable of PRMSChannel is instantaneous and valid at the 23rd hour of the timestampped day whereas its variable seg_outflow is the daily averge value over the timestampped day. restart_write: As for restart_read but for writing. The directory in either case will be attempted to be created if it does not exist. restart_write_freq: If False, then control.options is examined for this key. The follwing values set the frequency of restart output with "y" for yearly, "m" for monthly, "d" for daily, or "f" for final. "Final" means that restart files are written with the states at control.end_time to files timestampped with control.end_time. Yearly and monthly restart options write files with timestamps on the last day of each year or month during the run. If daily, restarts are written every day. If restart_write is not False and restart_write_freq is False, the default of "f" is used. """
[docs] def __init__( self, control: Control, discretization: Parameters, parameters: Parameters, pk_ice_prev: adaptable, freeh2o_prev: adaptable, transp_on: adaptable, hru_ppt: adaptable, hru_rain: adaptable, hru_snow: adaptable, potet: adaptable, pptmix: adaptable, imbalance_behavior: Literal["defer", None, "warn", "error"] = "defer", calc_method: Literal["numba", "numpy"] = None, input_aliases: dict = None, verbose: bool = None, restart_read: Union[pl.Path, bool] = False, restart_write: Union[pl.Path, bool] = False, restart_write_freq: Literal["y", "m", "d", "f", False] = False, ) -> None: super().__init__( control=control, discretization=discretization, parameters=parameters, input_aliases=input_aliases, restart_read=restart_read, restart_write=restart_write, restart_write_freq=restart_write_freq, ) self.name = "PRMSCanopy" # set hrutype to LAND as this is only type supported in NHM self._hru_type = np.array(self.nhru * [LAND]) self._set_inputs(locals()) self._set_options(locals()) self._set_budget() self._init_calc_method() return
[docs] @staticmethod def get_dimensions() -> tuple: return ("nhru",)
[docs] @staticmethod def get_parameters() -> tuple: return ( "cov_type", "covden_sum", "covden_win", "srain_intcp", "wrain_intcp", "snow_intcp", "potet_sublim", )
[docs] @staticmethod def get_inputs() -> tuple: return ( "pk_ice_prev", "freeh2o_prev", "transp_on", "hru_ppt", "hru_rain", "hru_snow", "potet", "pptmix", )
[docs] @staticmethod def get_init_values() -> dict: return { "net_ppt": zero, "net_rain": zero, "net_snow": zero, "intcp_changeover": zero, "intcp_evap": zero, "intcp_form": 0, # = RAIN bool would have to match RAIN/SNOW "intcp_stor": zero, "intcp_transp_on": 0, # could make boolean "hru_intcpevap": zero, "hru_intcpstor": zero, "hru_intcpstor_change": zero, "hru_intcpstor_old": zero, }
[docs] @staticmethod def get_restart_variables() -> list: return [ "hru_intcpstor", "intcp_stor", "intcp_transp_on", ]
[docs] @staticmethod def get_mass_budget_terms(): return { "inputs": ["hru_rain", "hru_snow"], "outputs": [ "net_rain", "net_snow", "hru_intcpevap", "intcp_changeover", # ?? always zero ], "storage_changes": ["hru_intcpstor_change"], }
def _set_initial_conditions(self): """Set initial conditions for variables not in get_init_values""" return def _init_diagnostic_vars(self) -> None: return def _init_calc_method(self): if self._calc_method is None: self._calc_method = "numba" avail_methods = ["numpy", "numba"] if self._calc_method.lower() not in avail_methods: msg = ( f"Invalid calc_method={self._calc_method} for {self.name}. " f"Setting calc_method to 'numba' for {self.name}" ) warn(msg) self._calc_method = "numba" if self._calc_method.lower() in ["numba"]: import numba as nb numba_msg = f"{self.name} jit compiling with numba " nb_parallel = (numba_num_threads is not None) and ( numba_num_threads > 1 ) if nb_parallel: numba_msg += f"and using {numba_num_threads} threads" print(numba_msg, flush=True) # JLM: note. I gave up on specifying signatures because it # appears impossible/undocumented how to specify the type # of a passed function. The work is not in vain as then # types will be required for f90. The signatures remain # here commented for that reason and in case we can get it # to work in the future. # self._intercept_numba = nb.njit( # nb.types.Tuple( # ( # nb.float64[:], # intcp_stor # nb.float64[:], # net_precip # ) # )( # nb.float64[:], # precip # nb.float64[:], # stor_max # nb.float64[:], # cov # nb.float64[:], # intcp_stor # nb.float64[:], # net_precip # ), # fastmath=True, # )(self._intercept) self._intercept = nb.njit(fastmath=True)(self._intercept) # self._calculate_numba = nb.njit( # nb.types.Tuple( # ( # nb.float64[:], # intcp_evap # nb.float64[:], # intcp_stor # nb.float64[:], # net_rain # nb.float64[:], # net_snow # nb.float64[:], # net_ppt # nb.float64[:], # hru_intcpstor # nb.float64[:], # hru_intcpevap # nb.float64[:], # intcp_changeover # nb.int32[:], # intcp_transp_on # ) # )( # nb.int32, # np.int32(self.nhru) # nb.int64[:], # cov_type # nb.float64[:], # covden_sum # nb.float64[:], # covden_win # nb.float64[:], # hru_intcpstor # nb.float64[:], # hru_intcpevap # nb.float64[:], # hru_ppt # nb.float64[:], # hru_rain # nb.float64[:], # hru_snow # nb.float64[:], # intcp_changeover # nb.float64[:], # intcp_evap # nb.int32[:], # intcp_form # nb.float64[:], # intcp_stor # nb.int32[:], # intcp_transp_on # nb.float64[:], # net_ppt # nb.float64[:], # net_rain # nb.float64[:], # net_snow # nb.float64[:], # self.pk_ice_prev # nb.float64[:], # self.freeh2o_prev # nb.float64[:], # potet # nb.float64[:], # potet_sublim # nb.float64[:], # snow_intcp # nb.float64[:], # srain_intcp # nb.float64[:], # transp_on # nb.float64[:], # wrain_intcp # nb.float64, # np.float64(time_length), # nb.int64[:], # self._hru_type # nb.float64, # nearzero # nb.float64, # dnearzero, # nb.int32, # BARESOIL, # nb.int32, # GRASSES, # nb.int32, # LAND, # nb.int32, # LAKE, # nb.int32, # RAIN, # nb.int32, # SNOW, # nb.int32, # OFF, # nb.int32, # ACTIVE, # nb.typeof(self._intercept_numba), # function. # ), # fastmath=True, # )(self._calculate_procedural) self._calculate_canopy = nb.njit( fastmath=True, parallel=nb_parallel )(self._calculate_numpy) else: self._calculate_canopy = self._calculate_numpy return def _advance_variables(self) -> None: """Advance canopy variables""" self.hru_intcpstor_old[:] = self.hru_intcpstor return def _calculate(self, time_length) -> None: """Calculate canopy terms for a time step Args: simulation_time: current simulation time Returns: None """ ( self.intcp_evap[:], self.intcp_form[:], self.intcp_stor[:], self.net_rain[:], self.net_snow[:], self.pptmix[:], self.net_ppt[:], self.hru_intcpstor[:], self.hru_intcpevap[:], self.intcp_changeover[:], self.intcp_transp_on[:], ) = self._calculate_canopy( nhru=np.int32(self.nhru), cov_type=self.cov_type, covden_sum=self.covden_sum, covden_win=self.covden_win, hru_intcpstor=self.hru_intcpstor, hru_intcpevap=self.hru_intcpevap, hru_ppt=self.hru_ppt, hru_rain=self.hru_rain, hru_snow=self.hru_snow, intcp_changeover=self.intcp_changeover, intcp_evap=self.intcp_evap, intcp_stor=self.intcp_stor, intcp_transp_on=self.intcp_transp_on, net_ppt=self.net_ppt, net_rain=self.net_rain, net_snow=self.net_snow, pptmix=self.pptmix, pk_ice_prev=self.pk_ice_prev, freeh2o_prev=self.freeh2o_prev, potet=self.potet, potet_sublim=self.potet_sublim, snow_intcp=self.snow_intcp, srain_intcp=self.srain_intcp, transp_on=self.transp_on, wrain_intcp=self.wrain_intcp, time_length=time_length, hru_type=self._hru_type, nearzero=nearzero, dnearzero=dnearzero, baresoil=np.int32(BARESOIL), grasses=np.int32(GRASSES), land=np.int32(LAND), lake=np.int32(LAKE), rain=np.int32(RAIN), snow=np.int32(SNOW), off=np.int32(OFF), active=np.int32(ACTIVE), intercept=self._intercept, ) self.hru_intcpstor_change[:] = ( self.hru_intcpstor - self.hru_intcpstor_old ) return @staticmethod def _calculate_numpy( nhru, cov_type, covden_sum, covden_win, hru_intcpstor, hru_intcpevap, hru_ppt, hru_rain, hru_snow, intcp_changeover, intcp_evap, intcp_stor, intcp_transp_on, net_ppt, net_rain, net_snow, pptmix, pk_ice_prev, freeh2o_prev, potet, potet_sublim, snow_intcp, srain_intcp, transp_on, wrain_intcp, time_length, hru_type, nearzero, dnearzero, baresoil, grasses, land, lake, rain, snow, off, active, intercept, ): # TODO: would be nice to alphabetize the arguments # probably while keeping constants at the end. # intcp_form and intcp_transp_on also do not appear to be # actual inputs # Keep the f90 call signature consistent with the args in # python/numba. intcp_form = np.full_like(hru_rain, -9999, dtype="int32") for i in prange(nhru): netrain = hru_rain[i] netsnow = hru_snow[i] if transp_on[i] == ACTIVE: cov = covden_sum[i] stor_max_rain = srain_intcp[i] else: cov = covden_win[i] stor_max_rain = wrain_intcp[i] intcp_form[i] = RAIN if hru_snow[i] > 0.0: intcp_form[i] = SNOW intcpstor = intcp_stor[i] intcpevap = 0.0 changeover = 0.0 extra_water = 0.0 # lake or bare ground hrus if hru_type[i] == LAKE or cov_type[i] == BARESOIL: if cov_type[i] == BARESOIL and intcpstor > 0.0: extra_water = intcp_stor[i] intcpstor = 0.0 # ***** go from summer to winter cover density if transp_on[i] == OFF and intcp_transp_on[i] == ACTIVE: intcp_transp_on[i] = OFF if intcpstor > 0.0: diff = covden_sum[i] - cov changeover = intcpstor * diff if cov > 0.0: if changeover < 0.0: intcpstor = intcpstor * covden_sum[i] / cov changeover = 0.0 else: intcpstor = 0.0 # **** go from winter to summer cover density, excess = throughfall elif transp_on[i] == ACTIVE and intcp_transp_on[i] == OFF: intcp_transp_on[i] = ACTIVE if intcpstor > 0.0: diff = covden_win[i] - cov changeover = intcpstor * diff if cov > 0.0: if changeover < 0.0: intcpstor = intcpstor * covden_win[i] / cov changeover = 0.0 else: intcpstor = 0.0 # *****Determine the amount of interception from rain # IF ( Hru_type(i)/=LAKE .AND. Cov_type(i)/=BARESOIL ) THEN if hru_type[i] != LAKE and cov_type[i] != BARESOIL: if hru_rain[i] > 0.0: if cov > 0.0: # IF ( Cov_type(i)>GRASSES ) THEN if cov_type[i] > GRASSES: # intercept( # Hru_rain(i), stor_max_rain, cov, intcpstor, # netrain) intcpstor, netrain = intercept( hru_rain[i], stor_max_rain, cov, intcpstor, netrain, ) elif cov_type[i] == GRASSES: # if there is no snowpack and no snowfall, then # apparently, grasses can intercept rain. # IF ( # pkwater_ante(i)<dnearzero # .AND. netsnow<nearzero ) THEN if ( pk_ice_prev[i] + freeh2o_prev[i] ) < dnearzero and netsnow < nearzero: intcpstor, netrain = intercept( hru_rain[i], stor_max_rain, cov, intcpstor, netrain, ) # ******Determine amount of interception from snow if hru_snow[i] > 0.0: if cov > 0.0: if cov_type[i] > GRASSES: intcpstor, netsnow = intercept( hru_snow[i], snow_intcp[i], cov, intcpstor, netsnow, ) if netsnow < nearzero: netrain = netrain + netsnow netsnow = 0.0 # todo: deal with newsnow and pptmix? # Newsnow(i) = OFF pptmix[i] = 0 # todo: canopy application of irrigation water based on irr_type # ******compute evaporation or sublimation of interception # if precipitation assume no evaporation or sublimation if intcpstor > 0.0: if hru_ppt[i] < nearzero: epan_coef = 1.0 evrn = potet[i] / epan_coef evsn = potet[i] * potet_sublim[i] # todo: pan et # IF ( Use_pandata==ACTIVE ) THEN # evrn = Pan_evap(Hru_pansta(i)) # IF ( evrn<0.0 ) evrn = 0.0 # ENDIF if intcp_form[i] == SNOW: z = intcpstor - evsn if z > 0: intcpstor = z intcpevap = evsn else: intcpevap = intcpstor intcpstor = 0.0 else: d = intcpstor - evrn if d > 0.0: intcpstor = d intcpevap = evrn else: intcpevap = intcpstor intcpstor = 0.0 if intcpevap * cov > potet[i]: last = intcpevap if cov > 0.0: intcpevap = potet[i] / cov else: intcpevap = 0.0 intcpstor = intcpstor + last - intcpevap # Store calculated values in output variables intcp_evap[i] = intcpevap intcp_stor[i] = intcpstor net_rain[i] = netrain net_snow[i] = netsnow net_ppt[i] = netrain + netsnow hru_intcpstor[i] = intcpstor * cov hru_intcpevap[i] = intcpevap * cov intcp_changeover[i] = changeover + extra_water return ( intcp_evap, intcp_form, intcp_stor, net_rain, net_snow, pptmix, net_ppt, hru_intcpstor, hru_intcpevap, intcp_changeover, intcp_transp_on, ) @staticmethod def _intercept(precip, stor_max, cov, intcp_stor, net_precip): net_precip = precip * (1.0 - cov) intcp_stor = intcp_stor + precip if intcp_stor > stor_max: net_precip = net_precip + (intcp_stor - stor_max) * cov intcp_stor = stor_max return intcp_stor, net_precip
[docs] @staticmethod def update_net_precip( precip, stor_max, covden, intcp_stor, net_precip, idx ): net_precip[idx] = precip[idx] * (1.0 - covden[idx]) intcp_stor[idx] += precip[idx] for i in idx[0]: if intcp_stor[i] > stor_max[i]: net_precip[i] += (intcp_stor[i] - stor_max[i]) * covden[i] intcp_stor[i] = stor_max[i] return