Source code for pywatershed.atmosphere.prms_solar_geometry

import pathlib as pl
import warnings
from typing import Literal, Tuple, Union

import numpy as np

from pywatershed.base.process import Process
from pywatershed.utils.netcdf_utils import NetCdfWrite

from ..base.control import Control
from ..constants import dnearzero, nan, one, zero
from ..parameters import Parameters
from ..utils.prms5util import load_soltab_debug
from .solar_constants import ndoy, pi, pi_12, r1, solar_declination, two_pi

doy = np.arange(ndoy) + 1

# Dimensions
# The cyclic time dimension is ndoy (known apriori)
# The spatial dimension n_hru (only known on init from parameters)


def tile_space_to_time(arr: np.ndarray) -> np.ndarray:
    return np.tile(arr, (ndoy, 1))


# def tile_time_to_space(arr: np.ndarray, n_hru) -> np.ndarray:
#    return np.transpose(np.tile(arr, (n_hru, 1)))


[docs] class PRMSSolarGeometry(Process): """PRMS solar geometry. Implementation based on PRMS 5.2.1 with theoretical documentation given in the PRMS-IV documentation: `Markstrom, S. L., Regan, R. S., Hay, L. E., Viger, R. J., Webb, R. M., Payn, R. A., & LaFontaine, J. H. (2015). PRMS-IV, the precipitation-runoff modeling system, version 4. US Geological Survey Techniques and Methods, 6, B7. <https://pubs.usgs.gov/tm/6b7/pdf/tm6-b7.pdf>`__ Implements Swift's daily potential solar radiation and number of hours of duration on a sloping surface in [cal/cm2/day]. Swift, 1976, equation 6. Primary reference: Appendix E of Dingman, S. L., 1994, Physical Hydrology. Englewood Cliffs, NJ: Prentice Hall, 575 p. Args: control: a Control object discretization: a discretization of class Parameters parameters: a parameter object of class Parameters verbose: Print extra information or not? from_prms_file: Load from a PRMS output file? from_nc_files_dir: [str, pl.Path] = None, restart_read: If not False will issue warning that PRMSSolarGeometry has no restart variables to read or write. restart_write: As for restart_read. restart_write_freq: As for restart_read. """
[docs] def __init__( self, control: Control, discretization: Parameters, parameters: Parameters, input_aliases: dict = None, verbose: bool = False, from_prms_file: [str, pl.Path] = None, from_nc_files_dir: [str, pl.Path] = None, restart_read: Union[pl.Path, bool] = False, restart_write: Union[pl.Path, bool] = False, restart_write_freq: Literal["y", "m", "d", False] = False, ): if (restart_read is not False) or (restart_write is not False): warnings.warn( "PRMSSolarGeometry has no restart variables to read or write." ) # self._time is needed by Process for timeseries arrays # TODO: this is redundant because the parameter doy is set # on load of prms file. Could pass the name to use for # self._time to super or come up with some other work around. self._time = doy super().__init__( control=control, discretization=discretization, parameters=parameters, input_aliases=input_aliases, ) self._set_inputs(locals()) self._set_options(locals()) self.name = "PRMSSolarGeometry" if from_prms_file: ( self.soltab_potsw.data[:], self.soltab_horad_potsw.data[:], self.soltab_sunhrs.data[:], ) = load_soltab_debug(from_prms_file) elif from_nc_files_dir: raise NotImplementedError() self._netcdf_initialized = False self._calculated = False return
[docs] @staticmethod def get_inputs() -> tuple: return ()
[docs] @staticmethod def get_variables(): return ( "soltab_horad_potsw", "soltab_potsw", "soltab_sunhrs", )
[docs] @staticmethod def get_init_values() -> dict: return { "soltab_horad_potsw": nan, "soltab_potsw": nan, "soltab_sunhrs": nan, }
[docs] @staticmethod def get_dimensions() -> tuple: return ("nhru", "ndoy")
[docs] @staticmethod def get_parameters() -> tuple: return ( "doy", "hru_slope", "radj_sppt", "radj_wppt", "hru_lat", "hru_area", "hru_aspect", )
[docs] @staticmethod def get_restart_variables() -> list: return []
def _init_diagnostic_vars(self) -> None: return def _set_initial_conditions(self): return def _calculate_all_time(self): self._hru_cossl = np.cos(np.arctan(self["hru_slope"])) # The potential radiation on horizontal surfce self.soltab_horad_potsw.data[:], _ = self.compute_soltab( np.zeros(self["nhru"]), np.zeros(self["nhru"]), self["hru_lat"], self.compute_t, self.func3, ) # The potential radiaton given slope and aspect ( self.soltab_potsw.data[:], self.soltab_sunhrs.data[:], ) = self.compute_soltab( self["hru_slope"], self["hru_aspect"], self["hru_lat"], self.compute_t, self.func3, ) self._calculated = True return def _advance_variables(self): if not self._calculated: self._calculate_all_time() for vv in self.variables: self[vv].advance() return def _calculate(self, time_length): return # @jit
[docs] @staticmethod def compute_soltab( slopes: np.ndarray, aspects: np.ndarray, lats: np.ndarray, compute_t: callable, func3: callable, ) -> Tuple[np.ndarray, np.ndarray]: """Calculate the solar table Args: cossl: cos(atan(hru_slope)) [nhru] ? slope: slope [nhru] aspect: aspect [nhru] latitude: latitude [nhru] Returns: (solt, sunh) solt: Swift's potential solar radiation on a sloping surface in [cal/cm2/day]. Swift, 1976, equation 6. Dimensions: [ndoy, nhru] sunh: The number of hours of direct sunlight Dimensions: [ndoy, nhru] """ nhru = len(slopes) # Slope derived quantities sl = np.arctan(slopes) sl_sin = np.sin(sl) sl_cos = np.cos(sl) # Aspect derived quantities aspects_rad = np.radians(aspects) aspects_cos = np.cos(aspects_rad) # Latitude derived quantities x0 = np.radians(lats) # errors in PRMS ingesting latitudes can be seen here (or earlier) x0_cos = np.cos(x0) # x1 latitude of equivalent slope # This is equation 13 from Lee, 1963 x1 = np.arcsin(sl_cos * np.sin(x0) + sl_sin * x0_cos * aspects_cos) # d1 is the denominator of equation 12, Lee, 1963 d1 = sl_cos * x0_cos - sl_sin * np.sin(x0) * aspects_cos d1 = np.where(np.abs(d1) < dnearzero, dnearzero, d1) # x2 is the difference in longitude between the location of # the HRU and the equivalent horizontal surface expressed in angle hour # This is equation 12 from Lee, 1963 x2 = np.arctan(sl_sin * np.sin(aspects_rad) / d1) wh_d1_lt_zero = np.where(d1 < zero) if len(wh_d1_lt_zero[0]) > 0: x2[wh_d1_lt_zero] = x2[wh_d1_lt_zero] + pi # The hour angle from the local meridian (local solar noon) to the # sunrise (negative) or sunset (positive) # t6: is the hour angle of sunrise on the equivalent slope # t7: is the hour angle of sunset on the equivalent slope tt = compute_t(x1, solar_declination) t6 = (-1 * tt) - x2 t7 = tt - x2 # Hours of sunrise and sunset on a horizontal surface at lat # t0: is the hour angle of sunrise on a hroizontal surface at the HRU # t1: is the hour angle of sunset on a hroizontal surface at the HRU tt = compute_t(x0, solar_declination) t0 = -1 * tt t1 = tt # For HRUs that have an east or west direction component to their # aspect, the longitude adjustment (moving the effective slope east # or west) will cause either: # (1) sunrise to be earlier than at the horizontal plane at the HRU # (2) sunset to be later than at the horizontal plane at the HRU # This is not possible. The if statements below check for this and # adjust the sunrise/sunset angle hours on the equivalent slopes as # necessary. # t2: is the hour angle of sunset on the slope at the HRU # t3: is the hour angle of sunrise on the slope at the HRU t3 = t7 wh_t7_gt_t1 = np.where(t7 > t1) if len(wh_t7_gt_t1[0]) > 0: t3[wh_t7_gt_t1] = t1[wh_t7_gt_t1] t2 = t6 wh_t6_lt_t0 = np.where(t6 < t0) if len(wh_t6_lt_t0[0]) > 0: t2[wh_t6_lt_t0] = t0[wh_t6_lt_t0] # JLM: vectorizing requires are reverse looped order t6 = t6 + two_pi t7 = t7 - two_pi wh_t3_lt_t2 = np.where(t3 < t2) if len(wh_t3_lt_t2[0]): t2[wh_t3_lt_t2] = zero t3[wh_t3_lt_t2] = zero # This is if no other conditions are met solt = func3(x2, x1, t3, t2) sunh = (t3 - t2) * pi_12 # t7 > t0 wh_t7_gt_t0 = np.where(t7 > t0) if len(wh_t7_gt_t0[0]): solt[wh_t7_gt_t0] = ( func3(x2, x1, t3, t2)[wh_t7_gt_t0] + func3(x2, x1, t7, t0)[wh_t7_gt_t0] ) sunh[wh_t7_gt_t0] = (t3 - t2 + t7 - t0)[wh_t7_gt_t0] * pi_12 # t6 < t1 wh_t6_lt_t1 = np.where(t6 < t1) if len(wh_t6_lt_t1[0]): solt[wh_t6_lt_t1] = ( func3(x2, x1, t3, t2)[wh_t6_lt_t1] + func3(x2, x1, t1, t6)[wh_t6_lt_t1] ) sunh[wh_t6_lt_t1] = (t3 - t2 + t1 - t6)[wh_t6_lt_t1] * pi_12 # The first condition checked mask_sl_lt_dnearzero = tile_space_to_time(np.abs(sl)) < dnearzero solt = np.where( mask_sl_lt_dnearzero, func3(np.zeros(nhru), x0, t1, t0), solt ) sunh = np.where(mask_sl_lt_dnearzero, (t1 - t0) * pi_12, sunh) mask_sunh_lt_dnearzero = sunh < dnearzero sunh = np.where(mask_sunh_lt_dnearzero, zero, sunh) wh_solt_lt_zero = np.where(solt < zero) if len(wh_solt_lt_zero[0]): solt[wh_solt_lt_zero] = zero warnings.warn( f"{len(wh_solt_lt_zero[0])}/{np.product(solt.shape)} " f"locations-times with negative " f"potential solar radiation." ) return solt, sunh
[docs] @staticmethod def compute_t( lats: np.ndarray, solar_declination: np.ndarray ) -> np.ndarray: # JLM: why is division by earth's angular velocity not done here? """The "sunrise" equation Args: lats - latitudes of the hrus solar_declination: the declination of the sun on all julian days Returns: The angle hour from the local meridian (local solar noon) to the sunrise (negative) or sunset (positive). The Earth rotates at the angular speed of 15 degrees/hour (2 pi / 24 hour in radians) and, therefore, result*(24/(2pi) radians gives the time of sunrise as the number of hours before the local noon, or the time of sunset as the number of hours after the local noon. Here the term local noon indicates the local time when the sun is exactly to the south or north or exactly overhead. See reference: https://github.com/nhm-usgs/prms/blob/6.0.0_dev/src/prmslib/physics/sm_solar_radiation.f90 """ nhru = len(lats) lats_mat = np.tile(-1 * np.tan(lats), (ndoy, 1)) sol_dec_mat = np.transpose( np.tile(np.tan(solar_declination), (nhru, 1)) ) tx = lats_mat * sol_dec_mat # result = np.copy(tx) # result[np.where((tx >= (-1 * one)) & (tx <= one))] = np.arccos( # tx[np.where((tx >= (-1 * one)) & (tx <= one))] # ) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", r"invalid value encountered in arccos" ) result = np.arccos(np.copy(tx)) result[np.where(tx < (-1 * one))] = pi result[np.where(tx > one)] = zero return result
[docs] @staticmethod def func3( v: np.ndarray, w: np.ndarray, x: np.ndarray, y: np.ndarray, ) -> np.ndarray: """Potential solar radiation on the surface cal/cm2/day Radian angle version of FUNC3 (eqn 6) from Swift, 1976 or Lee, 1963 equation 5. (Names in parens below from Swift?) Args: v: (L2) latitude angle hour offset between actual and equivalent slope [nhru] w: (L1) latitude of the equivalent slope [nhru] x: (T3) hour angle of sunset on equivalent slope [ndoy, nhru] y: (T2) hour angle of sunrise on equivalent slope [ndoy, nhru] Returns: (R4) is potential solar radiation on the surface cal/cm2/day [ndoy, nhru] Constants r1: solar constant for 60 minutes [ndoy] solar_declination: declination of sun [ndoy] See also: https://github.com/nhm-usgs/prms/blob/6.0.0_dev/src/prmslib/physics/sm_solar_radiation.f90 """ # Must alter the dimensions of the inputs to get the outputs. nhru = len(v) vv = np.tile(v, (ndoy, 1)) ww = np.tile(w, (ndoy, 1)) # These are known at init time, not sure they are worth saving in self # and passing rr = np.transpose(np.tile(r1, (nhru, 1))) dd = np.transpose(np.tile(solar_declination, (nhru, 1))) f3 = ( rr * pi_12 * ( np.sin(dd) * np.sin(ww) * (x - y) + np.cos(dd) * np.cos(ww) * (np.sin(x + vv) - np.sin(y + vv)) ) ) return f3
def _write_netcdf_timeseries(self) -> None: if not self._netcdf_initialized: return if self._netcdf_separate: for var in self.variables: if var not in self._netcdf_output_vars: continue nc_path = self._netcdf_output_dir / f"{var}.nc" nc = NetCdfWrite( nc_path, self._params.coords, [var], {var: self.meta[var]}, ) nc.add_all_data( var, self[var].data, self.doy, time_coord="doy", ) nc.close() assert nc_path.exists() print(f"Wrote file: {nc_path}") else: nc_path = self._netcdf_output_dir / f"{self.name}.nc" nc = NetCdfWrite( nc_path, self._params.coords, self._netcdf_output_vars, self.meta, ) for var in self.variables: if var not in self._netcdf_output_vars: continue nc.add_all_data( var, self[var].data, self.doy, time_coord="doy", ) nc.close() assert nc_path.exists() print(f"Wrote file: {nc_path}") self._finalize_netcdf() return def _finalize_netcdf(self) -> None: self._netcdf_initialized = False return
[docs] def initialize_netcdf( self, output_dir: [str, pl.Path] = None, separate_files: bool = None, output_vars: list = None, **kwargs, ): if ( self._netcdf_initialized and "verbosity" in self.control.options.keys() and self.control.options["verbosity"] > 5 ): msg = ( f"{self.name} class previously initialized netcdf output " f"in {self._netcdf_output_dir}" ) warnings.warn(msg) return if ( "verbosity" in self.control.options.keys() and self.control.options["verbosity"] > 5 ): print(f"initializing netcdf output for: {self.name}") ( output_dir, output_vars, separate_files, ) = self._reconcile_nc_args_w_control_opts( output_dir, output_vars, separate_files ) # apply defaults if necessary if output_dir is None: msg = ( "An output directory is required to be specified for netcdf" "initialization." ) raise ValueError(msg) if separate_files is None: separate_files = True self._netcdf_separate = separate_files self._netcdf_initialized = True self._netcdf_output_dir = pl.Path(output_dir) if output_vars is None: self._netcdf_output_vars = self.variables else: self._netcdf_output_vars = list( set(output_vars).intersection(set(self.variables)) ) if len(self._netcdf_output_vars) == 0: self._netcdf_initialized = False return
[docs] def output(self): if self._netcdf_initialized: if self._verbose: print(f"writing FULL timeseries output for: {self.name}") self._write_netcdf_timeseries() return