Source code for pywatershed.atmosphere.prms_atmosphere

import pathlib as pl
from typing import Literal, Union
from warnings import warn

import numpy as np

from pywatershed.base.process import Process
from pywatershed.utils.netcdf_utils import NetCdfWrite

from ..base.adapter import adaptable
from ..base.control import Control
from ..constants import inch2cm, nan, nearzero, one, zero
from ..parameters import Parameters
from ..utils.time_utils import (
    datetime_day_of_month,
    datetime_jsol,
    datetime_month,
)
from .solar_constants import solf


# may not use this if they cant be called with jit
# if it can, put it in a common utility.
def tile_space_to_time(arr: np.ndarray, n_time) -> np.ndarray:
    return np.tile(arr, (n_time, 1))


def tile_time_to_space(arr: np.ndarray, n_space) -> np.ndarray:
    return np.transpose(np.tile(arr, (n_space, 1)))


[docs] class PRMSAtmosphere(Process): """PRMS atmospheric boundary layer model. Implementation based on PRMS 5.2.1 with theoretical documentation given in the PRMS-IV documentation: `Markstrom, S. L., Regan, R. S., Hay, L. E., Viger, R. J., Webb, R. M., Payn, R. A., & LaFontaine, J. H. (2015). PRMS-IV, the precipitation-runoff modeling system, version 4. US Geological Survey Techniques and Methods, 6, B7. <https://pubs.usgs.gov/tm/6b7/pdf/tm6-b7.pdf>`__ This representation uses precipitation and temperature inputs. Relative humidity could be added as well. The boundary layer calculates and manages the following variables (given by PRMSAtmosphere.get_variables()): * tmaxf, tminf, prmx, hru_ppt, hru_rain, hru_snow, swrad, potet, transp_on PRMS adjustments to temperature and precipitation are applied here to the inputs. Shortwave radiation (using degree day method) and potential evapotranspiration (Jensen and Haise ,1963) and a temperature based transpiration flag (transp_on) are also calculated. Note that all variables are calculated for all time upon the first advance and that all calculated variables are written to NetCDF (when netcdf output is requested) the first time output is requested. This is effectively a complete preprocessing of the input CBH files to the fields the model actually uses on initialization. For an example of preprocessing the variables in PRMSAtmosphere, see `this notebook <https://github.com/DOI-USGS/pywatershed/tree/main/examples/04_preprocess_atm.ipynb>`_. The full time version of a variable is given by the "private" version of the variable which is named with a single-leading underscore (eg tmaxf for all time is _tmaxf). This full-time initialization may not be tractable for large domains and/or long periods of time and require changes to batch the processing of the variables. The benefits of full-time initialization are 1) the code is vectorized and fast for such a large calculation, 2) the initialization of this class effectively preprocess all the inputs to the rest of the model and can then be skipped in subsequent model calls (unless the parameters are changing). Args: control: a Control object discretization: a discretization of class Parameters parameters: a parameter object of class Parameters prcp: daily precipitation tmax: daily maximum temperature tmin: daily minimum temperature soltab_potsw: the solar table of potential shortwave radiation soltab_horad_potsw: the solar table of potential shortwave radiation on a horizontal plane verbose: Print extra information or not? restart_read: May be boolean or a Pathlib.Path. If False, control.options will be examined for this key. If True, the working directory is searched for restart files. If a Pathlib.Path, this specifies an alternative directory to search for restart files. Files searched for are of the pattern YYYY-mm-dd-varname.nc where the date is the control.init_time. The timestamp on the file is the valid time of the states in the file with the exception of processes with sub-daily timesteps. For example, the outflow_ts variable of PRMSChannel is instantaneous and valid at the 23rd hour of the timestampped day whereas its variable seg_outflow is the daily averge value over the timestampped day. restart_write: As for restart_read but for writing. The directory in either case will be attempted to be created if it does not exist. restart_write_freq: If False, then control.options is examined for this key. The follwing values set the frequency of restart output with "y" for yearly, "m" for monthly, "d" for daily, or "f" for final. "Final" means that restart files are written with the states at control.end_time to files timestampped with control.end_time. Yearly and monthly restart options write files with timestamps on the last day of each year or month during the run. If daily, restarts are written every day. If restart_write is not False and restart_write_freq is False, the default of "f" is used. """
[docs] def __init__( self, control: Control, discretization: Parameters, parameters: Parameters, prcp: Union[str, pl.Path], tmax: Union[str, pl.Path], tmin: Union[str, pl.Path], soltab_potsw: adaptable, soltab_horad_potsw: adaptable, input_aliases: dict = None, verbose: bool = False, restart_read: Union[pl.Path, bool] = False, restart_write: Union[pl.Path, bool] = False, restart_write_freq: Literal["y", "m", "d", False] = False, ) -> None: # Defering handling batch handling of time chunks but self.n_time_chunk # is a dimension used in the metadata/variables dimensions. # TODO: make time chunking options work (esp with output) # if n_time_chunk <= 0: # self.n_time_chunk = control.n_times # else: # self.n_time_chunk = n_time_chunk # Initialize full time with nans self._time = np.full(control.n_times, nan, dtype="datetime64[s]") metadata_patches = { kk: {"dims": ("ntime", "nhru")} for kk in self.variables } super().__init__( control=control, discretization=discretization, parameters=parameters, input_aliases=input_aliases, metadata_patches=metadata_patches, metadata_patch_conflicts="left", restart_read=restart_read, restart_write=restart_write, restart_write_freq=restart_write_freq, ) self.name = "PRMSAtmosphere" self._set_inputs(locals()) self._set_options(locals()) self.calculate_transp = self.calc_transp_tindex self._calculated = False self._netcdf_initialized = False return
def _calculate_all_time(self): if self._calculated: return # Eventually refactor to work at specific chunks of time for input in ["prcp", "tmax", "tmin"]: # # this is a bit of a mess: ._dataset.dataset input_time = self._input_variables_dict[input]._nc_read.times # dont do this... # self[input][:] = ds.dataset[input][:].data if np.isnan(self._time[0]): self._time[:] = input_time else: assert (input_time == self._time).all() start_time_ind = np.where(self._time == self.control._start_time)[0] if not len(start_time_ind): msg = "Control start_time is not in the input data time" raise ValueError(msg) self._init_time_ind = start_time_ind[0] # Solve all variables for all time self._month_ind_12 = datetime_month(self._time) - 1 # (time) self._month_ind_1 = np.zeros(self._time.shape, dtype=int) # (time) self._month = datetime_month(self._time) # (time) self._dom = datetime_day_of_month(self._time) # (time) self.adjust_temperature() self.adjust_precip() self.calculate_sw_rad_degree_day() self.calculate_potential_et_jh() self.calculate_transp() # JLM todo: delete large variables on self for memory management self._calculated = True return
[docs] @staticmethod def get_dimensions(): return ("nhru", "nmonth", "ndoy", "ntime")
[docs] @staticmethod def get_parameters(): return ( "doy", "radadj_intcp", "radadj_slope", "tmax_index", "dday_slope", "dday_intcp", "radmax", "ppt_rad_adj", "tmax_allsnow", "tmax_allrain_offset", "hru_slope", "radj_sppt", "radj_wppt", "hru_lat", "hru_area", "hru_aspect", "jh_coef", "jh_coef_hru", "tmax_cbh_adj", "tmin_cbh_adj", "tmax_allsnow", "tmax_allrain_offset", "snow_cbh_adj", "rain_cbh_adj", "adjmix_rain", "transp_beg", "transp_end", "transp_tmax", "radadj_intcp", # below are solar params used by Atmosphere "radadj_slope", "tmax_index", "dday_slope", "dday_intcp", "radmax", "ppt_rad_adj", "tmax_allsnow", "tmax_allrain_offset", "hru_slope", "radj_sppt", "radj_wppt", "hru_lat", "hru_area", "temp_units", )
[docs] @staticmethod def get_inputs() -> tuple: return ( "prcp", "tmax", "tmin", "soltab_potsw", "soltab_horad_potsw", )
[docs] @staticmethod def get_init_values() -> dict: return { "tmaxf": nan, "tminf": nan, "prmx": nan, "hru_ppt": nan, "hru_rain": nan, "hru_snow": nan, "swrad": nan, "potet": nan, # "hru_actet": zero, # "available_potet": nan, "transp_on": 0, "tmax_sum": zero, "tmaxc": nan, "tavgc": nan, "tminc": nan, "pptmix": -9999, "orad_hru": nan, }
[docs] @staticmethod def get_restart_variables() -> list: # The restart capability in PRMS does not work, PRMS fails daily # restart test starting on many days, including 1979-12-31 # as documented on # nueva: ~/usgs/pywatershed/autotest/prms_restart_transp_on # so a more in-depth solution is necessary. # raise NotImplementedError( # "Restart capability not implemented for PRMSAtmosphere" # ) return ["tmax_sum", "transp_on"]
def _init_diagnostic_vars(self) -> None: return def _set_initial_conditions(self): return def _advance_variables(self): if not self._calculated: self._calculate_all_time() for vv in self.variables: self[vv].advance() return def _calculate(self, time_length): return
[docs] def adjust_temperature(self): """Input temperature adjustments using calibrated parameters.""" # throw an error if these have different shapes if self["tmax_cbh_adj"].shape != self["tmin_cbh_adj"].shape: msg = ( "Not implemented: tmin/tmax cbh adj parameters " "with different shapes" ) raise NotImplementedError(msg) if self["tmax_cbh_adj"].shape[0] == self.nmonth: month_ind = self._month_ind_12 elif self["tmax_cbh_adj"].shape[0] == 1: month_ind = self._month_ind_1 else: msg = ( "Unexpected month dimension for cbh " "temperature adjustment params" ) raise ValueError(msg) # (time, space) dimensions on these variables ivd = self._input_variables_dict self.tmaxf.data[:] = ivd["tmax"].data + self.tmax_cbh_adj[month_ind] self.tminf.data[:] = ivd["tmin"].data + self.tmin_cbh_adj[month_ind] self.tminc.data[:] = (self["tminf"].data - 32.0) * (5 / 9) self.tmaxc.data[:] = (self["tmaxf"].data - 32.0) * (5 / 9) self.tavgc.data[:] = (self["tmaxc"].data + self["tminc"].data) / 2.0 return
[docs] def adjust_precip(self): """Input precipitation adjustments using calibrated parameters. Snow/rain partitioning of total precip depends on adjusted temperature in addition to depending on additonal parameters. Returns: None """ ivd = self._input_variables_dict # throw an error shapes are inconsistent shape_list = np.array( [ self.tmax_allsnow.shape[0], self.tmax_allrain_offset.shape[0], self.snow_cbh_adj.shape[0], self.rain_cbh_adj.shape[0], self.adjmix_rain.shape[0], ] ) if not (shape_list == self.nmonth).all(): msg = ( "Not implemented: cbh precip adjustment parameters with " " different shapes" ) raise NotImplementedError(msg) tmax_allrain = self.tmax_allsnow + self.tmax_allrain_offset if self.tmax_allsnow.shape[0] == self.nmonth: month_ind = self._month_ind_12 elif self.tmax_allsnow.shape[0] == 1: month_ind = self._month_ind_1 else: msg = "Unexpected month dimension for cbh precip adjustment params" raise ValueError(msg) # Order MATTERS in calculating the prmx mask # The logic in PRMS is if(all_snow),elif(all_rain),else(mixed) # so we set the mask in the reverse order # Calculate the mix everywhere, then set the precip/rain/snow amounts # from the conditions. tdiff = self.tmaxf.data - self.tminf.data tdiff = np.where(tdiff < nearzero, 1.0e-4, tdiff) self.prmx.data[:] = ( (self.tmaxf.data - self.tmax_allsnow[month_ind]) / tdiff ) * self.adjmix_rain[month_ind] self.prmx.data[:] = np.where( self.prmx.data < zero, zero, self.prmx.data ) self.prmx.data[:] = np.where(self.prmx.data > one, one, self.prmx.data) del tdiff wh_all_rain = np.where( np.logical_or( self.tminf.data > self.tmax_allsnow[month_ind], self.tmaxf.data >= tmax_allrain[month_ind], ) ) self.prmx.data[wh_all_rain] = one wh_all_snow = np.where(self.tmaxf.data <= self.tmax_allsnow[month_ind]) self.prmx.data[wh_all_snow] = zero # This is in climate_hru as a condition of calling climateflow # (eye roll) self.prmx.data[:] = np.where( ivd["prcp"].data <= zero, zero, self.prmx.data ) # Recalculate/redefine these now based on prmx instead of the # temperature logic wh_all_snow = np.where(self.prmx.data <= zero) wh_all_rain = np.where(self.prmx.data >= one) # Mixed case (everywhere, to be overwritten by the all-snow/rain-fall # cases) self.hru_ppt.data[:] = ivd["prcp"].data * self.snow_cbh_adj[month_ind] self.hru_rain.data[:] = self.prmx.data * self.hru_ppt.data self.hru_snow.data[:] = self.hru_ppt.data - self.hru_rain.data # All precip is snow case # The condition to be used later: self.hru_ppt.data[wh_all_snow] = ( ivd["prcp"].data * self.snow_cbh_adj[month_ind] )[wh_all_snow] self.hru_snow.data[wh_all_snow] = self.hru_ppt.data[wh_all_snow] self.hru_rain.data[wh_all_snow] = zero # All precip is rain case # The condition to be used later: self.hru_ppt.data[wh_all_rain] = ( ivd["prcp"].data * self.rain_cbh_adj[month_ind] )[wh_all_rain] self.hru_rain.data[wh_all_rain] = self.hru_ppt.data[wh_all_rain] self.hru_snow.data[wh_all_rain] = zero cond = ( (self.hru_ppt.data > zero) & (self.tmaxf.data > self.tmax_allsnow[month_ind]) & ( (self.tminf.data <= self.tmax_allsnow[month_ind]) & (self.tmaxf.data < tmax_allrain[month_ind]) ) & (self.prmx.data < one) ) self.pptmix.data[:] = np.where(cond, 1, 0) return
[docs] def calculate_sw_rad_degree_day(self) -> None: """Calculate shortwave radiation using the degree day method. Returns: None """ solar_params = {} solar_param_names = ( "radadj_intcp", "radadj_slope", "tmax_index", "dday_slope", "dday_intcp", "radmax", "ppt_rad_adj", "tmax_allsnow", "tmax_allrain_offset", "hru_slope", "radj_sppt", "radj_wppt", "hru_lat", "hru_area", ) for name in solar_param_names: solar_params[name] = self[name] ivd = self._input_variables_dict self.swrad.data[:], self.orad_hru.data[:] = self._ddsolrad_run( dates=self._time, tmax_hru=self.tmaxf.data, hru_ppt=self.hru_ppt.data, soltab_potsw=ivd["soltab_potsw"].data, soltab_horad_potsw=ivd["soltab_horad_potsw"].data, **solar_params, nmonth=self.nmonth, ) return
# @jit @staticmethod def _ddsolrad_run( dates: np.ndarray, # [n_time] tmax_hru: np.ndarray, # [n_time, n_hru] hru_ppt: np.ndarray, # [n_time, n_hru] soltab_potsw: np.ndarray, # [n_time, n_hru] ?? soltab_horad_potsw: np.ndarray, # [n_time, n_hru] ?? radadj_intcp, # param [12, n_hru] radadj_slope, # " " tmax_index, dday_slope, dday_intcp, radmax, ppt_rad_adj, tmax_allsnow, tmax_allrain_offset, hru_slope, # pram [n_hru] radj_sppt, radj_wppt, hru_lat, hru_area, nmonth, ) -> (np.ndarray, np.ndarray): # [n_time, n_hru] # https://github.com/nhm-usgs/prms/blob/6.0.0_dev/src/prmslib/physics/sm_solar_radiation_degday.f90 n_time, n_hru = tmax_hru.shape # Transforms of time doy = (dates - dates.astype("datetime64[Y]")).astype( "timedelta64[h]" ).astype(int) / 24 + 1 doy = tile_time_to_space(doy, n_hru).astype(int) month = (dates.astype("datetime64[M]").astype(int) % nmonth) + 1 month = tile_time_to_space(month, n_hru) # is_summer # https://github.com/nhm-usgs/prms/blob/92f3c470bbf10e37ee23f015f60d42f6a028cf48/src/prmslib/misc/sm_prms_time.f90#L114 is_summer = (doy >= 79) & (doy <= 265) northern_hemisphere = hru_lat > zero if not any(northern_hemisphere): northern_hemisphere = tile_space_to_time( northern_hemisphere, n_time ) msg = "Implementation not checked" raise NotImplementedError(msg) is_summer = is_summer & northern_hemisphere hru_cossl = tile_space_to_time(np.cos(np.arctan(hru_slope)), n_time) def monthly_to_daily(arr): return arr[month[:, 0] - 1, :] def doy_to_daily(arr): return arr[doy[:, 0] - 1, :] # This is the start of the loop over hru & time # https://github.com/nhm-usgs/prms/blob/92f3c470bbf10e37ee23f015f60d42f6a028cf48/src/prmslib/physics/sm_solar_radiation_degday.f90#L103 # dday dday_slope_day = monthly_to_daily(dday_slope) dday_intcp_day = monthly_to_daily(dday_intcp) dday = (dday_slope_day * tmax_hru) + dday_intcp_day + one dday[np.where(dday < one)] = one del dday_slope_day, dday_intcp_day # radadj radmax_day = monthly_to_daily(radmax) radadj = monthly_to_daily(radmax) # the else condition wh_dday_lt_26 = np.where(dday < 26.0) if len(wh_dday_lt_26[0]): kp = dday.astype(int)[wh_dday_lt_26] # dddayi = float(kp) radadj[wh_dday_lt_26] = solf[kp - 1] + ( (solf[kp] - solf[kp - 1]) * (dday[wh_dday_lt_26] - kp) ) wh_radadj_gt_max = np.where(radadj > radmax_day) if len(wh_radadj_gt_max[0]): radadj[wh_radadj_gt_max] = radmax_day[wh_radadj_gt_max] del kp, wh_radadj_gt_max del radmax_day, wh_dday_lt_26 # pptadj pptadj = np.ones((n_time, n_hru)) radadj_intcp_day = monthly_to_daily(radadj_intcp) radadj_slope_day = monthly_to_daily(radadj_slope) tmax_index_day = monthly_to_daily(tmax_index) ppt_rad_adj_day = monthly_to_daily(ppt_rad_adj) # * if # This is the outer if. All of the following conditions work on this cond_ppt_gt_rad_adj = hru_ppt > ppt_rad_adj_day cond_if = cond_ppt_gt_rad_adj wh_if = np.where(cond_if) if len(wh_if[0]): # * if.else pptadj[wh_if] = ( radadj_intcp_day + radadj_slope_day * (tmax_hru - tmax_index_day) )[wh_if] # * if.else.if cond_ppt_adj_gt_one = pptadj > one cond_if_else_if = cond_if & cond_ppt_adj_gt_one wh_if_else_if = np.where(cond_if_else_if) if len(wh_if_else_if[0]): pptadj[wh_if_else_if] = one del radadj_intcp_day, radadj_slope_day, tmax_index_day # * if.if tmax_index_day = monthly_to_daily(tmax_index) cond_tmax_lt_index = tmax_hru < tmax_index_day cond_if_if = cond_if & cond_tmax_lt_index wh_if_if = np.where(cond_if_if) if len(wh_if_if[0]): # The logic equiv to but changed from the original radj_sppt_day = tile_space_to_time(radj_sppt, n_time) pptadj[wh_if_if] = radj_sppt_day[wh_if_if] # if.if.else: negate the if.if.if radj_wppt_day = tile_space_to_time(radj_wppt, n_time) tmax_allrain_day = monthly_to_daily( tmax_allrain_offset + tmax_allsnow ) cond_tmax_lt_allrain = tmax_hru < tmax_allrain_day cond_if_if_else = cond_if_if & cond_tmax_lt_allrain wh_if_if_else = np.where(cond_if_if_else) if len(wh_if_if_else[0]): pptadj[wh_if_if_else] = radj_wppt_day[wh_if_if_else] # if.if.if(.if): the cake is taken! cond_tmax_gt_allrain_and_not_summer = ( tmax_hru >= tmax_allrain_day ) & (~is_summer) cond_if_if_if = ( cond_if_if & cond_tmax_gt_allrain_and_not_summer ) wh_if_if_if = np.where(cond_if_if_if) if len(wh_if_if_if[0]): pptadj[wh_if_if_if] = radj_wppt_day[wh_if_if_if] radadj = radadj * pptadj wh_radadj_lt_2_tenths = np.where(radadj < 0.2) if len(wh_radadj_lt_2_tenths[0]): radadj[wh_radadj_lt_2_tenths] = 0.2 swrad = doy_to_daily(soltab_potsw) * radadj / hru_cossl orad_hru = radadj * doy_to_daily(soltab_horad_potsw) return swrad, orad_hru # https://github.com/nhm-usgs/prms/blob/6.0.0_dev/src/prmslib/physics/sm_potet_jh.f90
[docs] def calculate_potential_et_jh(self) -> None: """Calculate potential evapotranspiration following Jensen and Haise Jensen and Haise (1963) Returns: None """ self.potet.data[:] = self._potet_jh_run( dates=self._time, tavgc=self.tavgc.data, swrad=self.swrad.data, jh_coef=self.jh_coef, jh_coef_hru=self.jh_coef_hru, nmonth=self.nmonth, ) return
@staticmethod def _potet_jh_run(dates, tavgc, swrad, jh_coef, jh_coef_hru, nmonth): """This code mixes celcius and fahrenheit units in its calculation""" n_time, n_hru = tavgc.shape # The vector month = (dates.astype("datetime64[M]").astype(int) % nmonth) + 1 month = tile_time_to_space(month, n_hru) tavgf = (tavgc * 9 / 5) + 32 elh = (597.3 - (0.5653 * tavgc)) * inch2cm # JLM: move this out? def monthly_to_daily(arr): return arr[month[:, 0] - 1, :] jh_coef_day = monthly_to_daily(jh_coef) jh_coef_hru_day = tile_space_to_time(jh_coef_hru, n_time) potet = jh_coef_day * (tavgf - jh_coef_hru_day) * swrad / elh cond_potet_lt_zero = potet < zero wh_cond = np.where(cond_potet_lt_zero) if len(wh_cond[0]): potet[wh_cond] = zero return potet # # Track the amount of potential ET used at a given timestep # # JLM: is this strange to track here? I suppose not. # def consume_pot_et(self, requested_et): # et = requested_et # available_et = self.pot_et_current - self.pot_et_consumed # if et > available_et: # et = available_et # self.pot_et_consumed += et # return et
[docs] def calc_transp_tindex(self): # INIT: Process_flag==INIT # transp_on inited to 0 everywhere above # candidate for worst code lines if self._params.parameters["temp_units"] == 0: transp_tmax_f = self.transp_tmax else: transp_tmax_f = (self.transp_tmax * (9.0 / 5.0)) + 32.0 transp_check = self.transp_on.current.copy() # dim nhrus only start_day = self.control.start_doy start_month = self.control.start_month motmp = start_month + self.nmonth # time zero calculations for hh in range(self.nhru): if start_month == self.transp_beg[hh]: # rsr, why 10? if transp_tmax < 300, should be < 10 if start_day > 10: self.transp_on.data[0, hh] = 1 else: transp_check[hh] = 1 elif self.transp_end[hh] > self.transp_beg[hh]: if (start_month > self.transp_beg[hh]) and ( start_month < self.transp_end[hh] ): self.transp_on.data[0, hh] = 1 else: if (start_month > self.transp_beg[hh]) or ( motmp < self.transp_end[hh] + self.nmonth ): self.transp_on.data[0, hh] = 1 # vectorize ntime = self.transp_on.data.shape[0] # _transp_check = self.transp_on.data.copy() # self._transp_beg = tile_space_to_time(self.transp_beg, ntime) # self._transp_end = tile_space_to_time(self.transp_end, ntime) # RUN: Process_flag == RUN # Set switch for active transpiration period for tt in range(ntime): for hh in range(self.nhru): if tt > 0: self.transp_on.data[tt, hh] = self.transp_on.data[ tt - 1, hh ] self.tmax_sum.data[tt, hh] = self.tmax_sum.data[tt - 1, hh] # check for month to turn check switch on or # transpiration switch off if self._dom[tt] == 1: # check for end of period if self._month[tt] == self.transp_end[hh]: self.transp_on.data[tt, hh] = 0 transp_check[hh] = 0 self.tmax_sum.data[tt, hh] = zero # < # check for month to turn transpiration switch on or off if self._month[tt] == self.transp_beg[hh]: transp_check[hh] = 1 self.tmax_sum.data[tt, hh] = zero # << # If in checking period, then for each day # sum maximum temperature until greater than temperature index # parameter, at which time, turn transpiration switch on, check # switch off freezing temperature assumed to be 32 degrees # Fahrenheit if transp_check[hh] == 1: if self.tmaxf.data[tt, hh] > 32.0: self.tmax_sum.data[tt, hh] = ( self.tmax_sum.data[tt, hh] + self.tmaxf.data[tt, hh] ) # < if self.tmax_sum.data[tt, hh] > transp_tmax_f[hh]: self.transp_on.data[tt, hh] = 1 transp_check[hh] = 0 self.tmax_sum.data[tt, hh] = 0.0 # <<< return
def _write_netcdf_timeseries(self) -> None: if not self._netcdf_initialized: return if self._netcdf_separate: for var in self.variables: if var not in self._netcdf_output_vars: continue nc_path = self._netcdf_output_dir / f"{var}.nc" nc = NetCdfWrite( nc_path, self._params.coords, [var], {var: self.meta[var]}, ) nc.add_all_data( var, self[var].data, self._time, ) nc.close() assert nc_path.exists() print(f"Wrote file: {nc_path}") else: nc_path = self._netcdf_output_dir / f"{self.name}.nc" nc = NetCdfWrite( nc_path, self._params.coords, self._netcdf_output_vars, self.meta, ) for var in self.variables: if var not in self._netcdf_output_vars: continue nc.add_all_data( var, self[var].data, self._time, ) nc.close() assert nc_path.exists() print(f"Wrote file: {nc_path}") self._finalize_netcdf() return
[docs] def initialize_netcdf( self, output_dir: [str, pl.Path] = None, separate_files: bool = None, output_vars: list = None, **kwargs, ): if ( self._netcdf_initialized and "verbosity" in self.control.options.keys() and self.control.options["verbosity"] > 5 ): msg = ( f"{self.name} class previously initialized netcdf output " f"in {self._netcdf_output_dir}" ) warn(msg) return if ( "verbosity" in self.control.options.keys() and self.control.options["verbosity"] > 5 ): print(f"initializing netcdf output for: {self.name}") ( output_dir, output_vars, separate_files, ) = self._reconcile_nc_args_w_control_opts( output_dir, output_vars, separate_files ) # apply defaults if necessary if output_dir is None: msg = ( "An output directory is required to be specified for netcdf" "initialization." ) raise ValueError(msg) if separate_files is None: separate_files = True self._netcdf_separate = separate_files self._netcdf_initialized = True self._netcdf_output_dir = pl.Path(output_dir) if output_vars is None: self._netcdf_output_vars = self.variables else: self._netcdf_output_vars = list( set(output_vars).intersection(set(self.variables)) ) if len(self._netcdf_output_vars) == 0: self._netcdf_initialized = False return
def _finalize_netcdf(self) -> None: self._netcdf_initialized = False return
[docs] def output(self): if self._netcdf_initialized: if self._verbose: print( f"Writing FULL timeseries output for: {self.name}", flush=True, ) # < self._write_netcdf_timeseries() # logic for when to write restarts in the function self._output_restart() return
[docs] class PRMSAtmosphereTranspFrost(PRMSAtmosphere): """PRMS atmospheric boundary layer model with a frost transpiration model. In this subclass, the `PRMSAtmosphere` transpiration model using temperature index is replaced by a specified active period between the parameteres of (last) spring_frost and (first, killing) fall frost. This is as implemented in the PRMS module transp_frost.f90. See Also -------- PRMSAtmosphere """
[docs] def __init__( self, control: Control, discretization: Parameters, parameters: Parameters, prcp: Union[str, pl.Path], tmax: Union[str, pl.Path], tmin: Union[str, pl.Path], soltab_potsw: adaptable, soltab_horad_potsw: adaptable, input_aliases: dict = None, verbose: bool = False, restart_read: Union[pl.Path, bool] = False, restart_write: Union[pl.Path, bool] = False, restart_write_freq: Literal["y", "m", "d", False] = False, ) -> None: super().__init__( control=control, discretization=discretization, parameters=parameters, prcp=prcp, tmax=tmax, tmin=tmin, soltab_potsw=soltab_potsw, soltab_horad_potsw=soltab_horad_potsw, input_aliases=input_aliases, verbose=verbose, restart_read=restart_read, restart_write=restart_write, restart_write_freq=restart_write_freq, ) self.calculate_transp = self.calc_transp_frost return
[docs] @staticmethod def get_parameters(): return ( "doy", "radadj_intcp", "radadj_slope", "tmax_index", "dday_slope", "dday_intcp", "radmax", "ppt_rad_adj", "tmax_allsnow", "tmax_allrain_offset", "hru_slope", "radj_sppt", "radj_wppt", "hru_lat", "hru_area", "hru_aspect", "jh_coef", "jh_coef_hru", "tmax_cbh_adj", "tmin_cbh_adj", "tmax_allsnow", "tmax_allrain_offset", "snow_cbh_adj", "rain_cbh_adj", "adjmix_rain", "fall_frost", "spring_frost", "radadj_intcp", # below are solar params used by Atmosphere "radadj_slope", "tmax_index", "dday_slope", "dday_intcp", "radmax", "ppt_rad_adj", "tmax_allsnow", "tmax_allrain_offset", "hru_slope", "radj_sppt", "radj_wppt", "hru_lat", "hru_area", "temp_units", )
def _calculate_all_time(self): super()._calculate_all_time()
[docs] def calc_transp_frost(self) -> None: self._jsol = tile_time_to_space( datetime_jsol(self._time), self.nhru ) # (time) spring_frost = tile_space_to_time(self.spring_frost, self.ntime) fall_frost = tile_space_to_time(self.fall_frost, self.ntime) self.transp_on.data[:, :] = np.where( (self._jsol >= spring_frost) & (self._jsol <= fall_frost), 1, 0 ) return