from typing import Literal
from warnings import warn
import numpy as np
from numba import prange
from ..base.adapter import adaptable
from ..base.conservative_process import ConservativeProcess
from ..base.control import Control
from ..constants import (
CovType,
HruType,
dnearzero,
nearzero,
numba_num_threads,
zero,
)
from ..parameters import Parameters
# set constants (may need .value for enum to be used in > comparisons)
BARESOIL = CovType.BARESOIL.value
GRASSES = CovType.GRASSES.value
LAND = HruType.LAND.value
LAKE = HruType.LAKE.value
RAIN = 0
SNOW = 1
OFF = 0
ACTIVE = 1
[docs]
class PRMSCanopy(ConservativeProcess):
"""PRMS canopy class.
A canopy or vegetation representation from PRMS.
Implementation based on PRMS 5.2.1 with theoretical documentation given in
the PRMS-IV documentation:
`Markstrom, S. L., Regan, R. S., Hay, L. E., Viger, R. J., Webb, R. M.,
Payn, R. A., & LaFontaine, J. H. (2015). PRMS-IV, the
precipitation-runoff modeling system, version 4. US Geological Survey
Techniques and Methods, 6, B7.
<https://pubs.usgs.gov/tm/6b7/pdf/tm6-b7.pdf>`__
Args:
control: a Control object
discretization: a discretization of class Parameters
parameters: a parameter object of class Parameters
pk_ice_prev: Previous snowpack ice on each HRU
freeh2o_prev: Previous snowpack free water on each HRU
transp_on: Flag indicating whether transpiration is occurring
(0=no;1=yes)
hru_ppt: Precipitation on each HRU
hru_rain: Rain on each HRU
hru_snow: Snow on each HRU
budget_type: one of ["defer", None, "warn", "error"] with "defer" being
the default and defering to control.options["budget_type"] when
available. When control.options["budget_type"] is not avaiable,
budget_type is set to "warn".
calc_method: one of ["numba", "numpy"]. None defaults to
"numba".
verbose: Print extra information or not?
load_n_time_batches: not-implemented
"""
[docs]
def __init__(
self,
control: Control,
discretization: Parameters,
parameters: Parameters,
pk_ice_prev: adaptable,
freeh2o_prev: adaptable,
transp_on: adaptable,
hru_ppt: adaptable,
hru_rain: adaptable,
hru_snow: adaptable,
potet: adaptable,
pptmix: adaptable,
budget_type: Literal["defer", None, "warn", "error"] = "defer",
calc_method: Literal["numba", "numpy"] = None,
verbose: bool = None,
):
super().__init__(
control=control,
discretization=discretization,
parameters=parameters,
)
self.name = "PRMSCanopy"
# set hrutype to LAND as this is only type supported in NHM
self._hru_type = np.array(self.nhru * [LAND])
self._set_inputs(locals())
self._set_options(locals())
self._set_budget()
self._init_calc_method()
return
[docs]
@staticmethod
def get_dimensions() -> tuple:
return ("nhru",)
[docs]
@staticmethod
def get_parameters() -> tuple:
return (
"cov_type",
"covden_sum",
"covden_win",
"srain_intcp",
"wrain_intcp",
"snow_intcp",
"potet_sublim",
)
[docs]
@staticmethod
def get_init_values() -> dict:
return {
"net_ppt": zero,
"net_rain": zero,
"net_snow": zero,
"intcp_changeover": zero,
"intcp_evap": zero,
"intcp_form": 0, # = RAIN bool would have to match RAIN/SNOW
"intcp_stor": zero,
"intcp_transp_on": 0, # could make boolean
"hru_intcpevap": zero,
"hru_intcpstor": zero,
"hru_intcpstor_change": zero,
"hru_intcpstor_old": zero,
}
[docs]
@staticmethod
def get_mass_budget_terms():
return {
"inputs": ["hru_rain", "hru_snow"],
"outputs": [
"net_rain",
"net_snow",
"hru_intcpevap",
"intcp_changeover", # ?? always zero
],
"storage_changes": ["hru_intcpstor_change"],
}
def _set_initial_conditions(self):
return
def _init_calc_method(self):
if self._calc_method is None:
self._calc_method = "numba"
avail_methods = ["numpy", "numba"]
if self._calc_method.lower() not in avail_methods:
msg = (
f"Invalid calc_method={self._calc_method} for {self.name}. "
f"Setting calc_method to 'numba' for {self.name}"
)
warn(msg)
self._calc_method = "numba"
if self._calc_method.lower() in ["numba"]:
import numba as nb
numba_msg = f"{self.name} jit compiling with numba "
nb_parallel = (numba_num_threads is not None) and (
numba_num_threads > 1
)
if nb_parallel:
numba_msg += f"and using {numba_num_threads} threads"
print(numba_msg, flush=True)
# JLM: note. I gave up on specifying signatures because it
# appears impossible/undocumented how to specify the type
# of a passed function. The work is not in vain as then
# types will be required for f90. The signatures remain
# here commented for that reason and in case we can get it
# to work in the future.
# self._intercept_numba = nb.njit(
# nb.types.Tuple(
# (
# nb.float64[:], # intcp_stor
# nb.float64[:], # net_precip
# )
# )(
# nb.float64[:], # precip
# nb.float64[:], # stor_max
# nb.float64[:], # cov
# nb.float64[:], # intcp_stor
# nb.float64[:], # net_precip
# ),
# fastmath=True,
# )(self._intercept)
self._intercept = nb.njit(fastmath=True)(self._intercept)
# self._calculate_numba = nb.njit(
# nb.types.Tuple(
# (
# nb.float64[:], # intcp_evap
# nb.float64[:], # intcp_stor
# nb.float64[:], # net_rain
# nb.float64[:], # net_snow
# nb.float64[:], # net_ppt
# nb.float64[:], # hru_intcpstor
# nb.float64[:], # hru_intcpevap
# nb.float64[:], # intcp_changeover
# nb.int32[:], # intcp_transp_on
# )
# )(
# nb.int32, # np.int32(self.nhru)
# nb.int64[:], # cov_type
# nb.float64[:], # covden_sum
# nb.float64[:], # covden_win
# nb.float64[:], # hru_intcpstor
# nb.float64[:], # hru_intcpevap
# nb.float64[:], # hru_ppt
# nb.float64[:], # hru_rain
# nb.float64[:], # hru_snow
# nb.float64[:], # intcp_changeover
# nb.float64[:], # intcp_evap
# nb.int32[:], # intcp_form
# nb.float64[:], # intcp_stor
# nb.int32[:], # intcp_transp_on
# nb.float64[:], # net_ppt
# nb.float64[:], # net_rain
# nb.float64[:], # net_snow
# nb.float64[:], # self.pk_ice_prev
# nb.float64[:], # self.freeh2o_prev
# nb.float64[:], # potet
# nb.float64[:], # potet_sublim
# nb.float64[:], # snow_intcp
# nb.float64[:], # srain_intcp
# nb.float64[:], # transp_on
# nb.float64[:], # wrain_intcp
# nb.float64, # np.float64(time_length),
# nb.int64[:], # self._hru_type
# nb.float64, # nearzero
# nb.float64, # dnearzero,
# nb.int32, # BARESOIL,
# nb.int32, # GRASSES,
# nb.int32, # LAND,
# nb.int32, # LAKE,
# nb.int32, # RAIN,
# nb.int32, # SNOW,
# nb.int32, # OFF,
# nb.int32, # ACTIVE,
# nb.typeof(self._intercept_numba), # function.
# ),
# fastmath=True,
# )(self._calculate_procedural)
self._calculate_canopy = nb.njit(
fastmath=True, parallel=nb_parallel
)(self._calculate_numpy)
else:
self._calculate_canopy = self._calculate_numpy
return
def _advance_variables(self):
"""Advance canopy
Returns:
None
"""
self.hru_intcpstor_old[:] = self.hru_intcpstor
return
def _calculate(self, time_length):
"""Calculate canopy terms for a time step
Args:
simulation_time: current simulation time
Returns:
None
"""
(
self.intcp_evap[:],
self.intcp_form[:],
self.intcp_stor[:],
self.net_rain[:],
self.net_snow[:],
self.pptmix[:],
self.net_ppt[:],
self.hru_intcpstor[:],
self.hru_intcpevap[:],
self.intcp_changeover[:],
self.intcp_transp_on[:],
) = self._calculate_canopy(
nhru=np.int32(self.nhru),
cov_type=self.cov_type,
covden_sum=self.covden_sum,
covden_win=self.covden_win,
hru_intcpstor=self.hru_intcpstor,
hru_intcpevap=self.hru_intcpevap,
hru_ppt=self.hru_ppt,
hru_rain=self.hru_rain,
hru_snow=self.hru_snow,
intcp_changeover=self.intcp_changeover,
intcp_evap=self.intcp_evap,
intcp_stor=self.intcp_stor,
intcp_transp_on=self.intcp_transp_on,
net_ppt=self.net_ppt,
net_rain=self.net_rain,
net_snow=self.net_snow,
pptmix=self.pptmix,
pk_ice_prev=self.pk_ice_prev,
freeh2o_prev=self.freeh2o_prev,
potet=self.potet,
potet_sublim=self.potet_sublim,
snow_intcp=self.snow_intcp,
srain_intcp=self.srain_intcp,
transp_on=self.transp_on,
wrain_intcp=self.wrain_intcp,
time_length=time_length,
hru_type=self._hru_type,
nearzero=nearzero,
dnearzero=dnearzero,
baresoil=np.int32(BARESOIL),
grasses=np.int32(GRASSES),
land=np.int32(LAND),
lake=np.int32(LAKE),
rain=np.int32(RAIN),
snow=np.int32(SNOW),
off=np.int32(OFF),
active=np.int32(ACTIVE),
intercept=self._intercept,
)
self.hru_intcpstor_change[:] = (
self.hru_intcpstor - self.hru_intcpstor_old
)
return
@staticmethod
def _calculate_numpy(
nhru,
cov_type,
covden_sum,
covden_win,
hru_intcpstor,
hru_intcpevap,
hru_ppt,
hru_rain,
hru_snow,
intcp_changeover,
intcp_evap,
intcp_stor,
intcp_transp_on,
net_ppt,
net_rain,
net_snow,
pptmix,
pk_ice_prev,
freeh2o_prev,
potet,
potet_sublim,
snow_intcp,
srain_intcp,
transp_on,
wrain_intcp,
time_length,
hru_type,
nearzero,
dnearzero,
baresoil,
grasses,
land,
lake,
rain,
snow,
off,
active,
intercept,
):
# TODO: would be nice to alphabetize the arguments
# probably while keeping constants at the end.
# intcp_form and intcp_transp_on also do not appear to be
# actual inputs
# Keep the f90 call signature consistent with the args in
# python/numba.
intcp_form = np.full_like(hru_rain, -9999, dtype="int32")
for i in prange(nhru):
netrain = hru_rain[i]
netsnow = hru_snow[i]
if transp_on[i] == ACTIVE:
cov = covden_sum[i]
stor_max_rain = srain_intcp[i]
else:
cov = covden_win[i]
stor_max_rain = wrain_intcp[i]
intcp_form[i] = RAIN
if hru_snow[i] > 0.0:
intcp_form[i] = SNOW
intcpstor = intcp_stor[i]
intcpevap = 0.0
changeover = 0.0
extra_water = 0.0
# lake or bare ground hrus
if hru_type[i] == LAKE or cov_type[i] == BARESOIL:
if cov_type[i] == BARESOIL and intcpstor > 0.0:
extra_water = intcp_stor[i]
intcpstor = 0.0
# ***** go from summer to winter cover density
if transp_on[i] == OFF and intcp_transp_on[i] == ACTIVE:
intcp_transp_on[i] = OFF
if intcpstor > 0.0:
diff = covden_sum[i] - cov
changeover = intcpstor * diff
if cov > 0.0:
if changeover < 0.0:
intcpstor = intcpstor * covden_sum[i] / cov
changeover = 0.0
else:
intcpstor = 0.0
# **** go from winter to summer cover density, excess = throughfall
elif transp_on[i] == ACTIVE and intcp_transp_on[i] == OFF:
intcp_transp_on[i] = ACTIVE
if intcpstor > 0.0:
diff = covden_win[i] - cov
changeover = intcpstor * diff
if cov > 0.0:
if changeover < 0.0:
intcpstor = intcpstor * covden_win[i] / cov
changeover = 0.0
else:
intcpstor = 0.0
# *****Determine the amount of interception from rain
# IF ( Hru_type(i)/=LAKE .AND. Cov_type(i)/=BARESOIL ) THEN
if hru_type[i] != LAKE and cov_type[i] != BARESOIL:
if hru_rain[i] > 0.0:
if cov > 0.0:
# IF ( Cov_type(i)>GRASSES ) THEN
if cov_type[i] > GRASSES:
# intercept(
# Hru_rain(i), stor_max_rain, cov, intcpstor,
# netrain)
intcpstor, netrain = intercept(
hru_rain[i],
stor_max_rain,
cov,
intcpstor,
netrain,
)
elif cov_type[i] == GRASSES:
# if there is no snowpack and no snowfall, then
# apparently, grasses can intercept rain.
# IF (
# pkwater_ante(i)<dnearzero
# .AND. netsnow<nearzero ) THEN
if (
pk_ice_prev[i] + freeh2o_prev[i]
) < dnearzero and netsnow < nearzero:
intcpstor, netrain = intercept(
hru_rain[i],
stor_max_rain,
cov,
intcpstor,
netrain,
)
# ******Determine amount of interception from snow
if hru_snow[i] > 0.0:
if cov > 0.0:
if cov_type[i] > GRASSES:
intcpstor, netsnow = intercept(
hru_snow[i],
snow_intcp[i],
cov,
intcpstor,
netsnow,
)
if netsnow < nearzero:
netrain = netrain + netsnow
netsnow = 0.0
# todo: deal with newsnow and pptmix?
# Newsnow(i) = OFF
pptmix[i] = 0
# todo: canopy application of irrigation water based on irr_type
# ******compute evaporation or sublimation of interception
# if precipitation assume no evaporation or sublimation
if intcpstor > 0.0:
if hru_ppt[i] < nearzero:
epan_coef = 1.0
evrn = potet[i] / epan_coef
evsn = potet[i] * potet_sublim[i]
# todo: pan et
# IF ( Use_pandata==ACTIVE ) THEN
# evrn = Pan_evap(Hru_pansta(i))
# IF ( evrn<0.0 ) evrn = 0.0
# ENDIF
if intcp_form[i] == SNOW:
z = intcpstor - evsn
if z > 0:
intcpstor = z
intcpevap = evsn
else:
intcpevap = intcpstor
intcpstor = 0.0
else:
d = intcpstor - evrn
if d > 0.0:
intcpstor = d
intcpevap = evrn
else:
intcpevap = intcpstor
intcpstor = 0.0
if intcpevap * cov > potet[i]:
last = intcpevap
if cov > 0.0:
intcpevap = potet[i] / cov
else:
intcpevap = 0.0
intcpstor = intcpstor + last - intcpevap
# Store calculated values in output variables
intcp_evap[i] = intcpevap
intcp_stor[i] = intcpstor
net_rain[i] = netrain
net_snow[i] = netsnow
net_ppt[i] = netrain + netsnow
hru_intcpstor[i] = intcpstor * cov
hru_intcpevap[i] = intcpevap * cov
intcp_changeover[i] = changeover + extra_water
return (
intcp_evap,
intcp_form,
intcp_stor,
net_rain,
net_snow,
pptmix,
net_ppt,
hru_intcpstor,
hru_intcpevap,
intcp_changeover,
intcp_transp_on,
)
@staticmethod
def _intercept(precip, stor_max, cov, intcp_stor, net_precip):
net_precip = precip * (1.0 - cov)
intcp_stor = intcp_stor + precip
if intcp_stor > stor_max:
net_precip = net_precip + (intcp_stor - stor_max) * cov
intcp_stor = stor_max
return intcp_stor, net_precip
[docs]
@staticmethod
def update_net_precip(
precip, stor_max, covden, intcp_stor, net_precip, idx
):
net_precip[idx] = precip[idx] * (1.0 - covden[idx])
intcp_stor[idx] += precip[idx]
for i in idx[0]:
if intcp_stor[i] > stor_max[i]:
net_precip[i] += (intcp_stor[i] - stor_max[i]) * covden[i]
intcp_stor[i] = stor_max[i]
return