import pathlib as pl
from pathlib import Path
from typing import Literal, Union
from warnings import warn
import networkx as nx
import numba as nb
import numpy as np
from ..base.adapter import adaptable
from ..base.conservative_process import ConservativeProcess
from ..base.control import Control
from ..constants import cfs_to_cms, nearzero, zero
from ..parameters import Parameters
from .prms_stream_shade import (
PRMSStreamShade,
PRMSStreamShadeConstant,
PRMSStreamShadeDynamic,
)
# Constants from PRMS
NEARZERO = nearzero
PI = np.pi
CFS_TO_CMS = cfs_to_cms
NOFLOW_TEMP = -98.9
DAYS_YR = 365.25
MAX_DAYS_PER_YEAR = 366
ZERO_C = 273.15
TOLRN = 1.0e-4
AKZ = 1.65
A = 5.40e-8
MPS_CONVERT = 2.93981481e-07
# Physical constants for energy calculations
WATER_DENSITY = 1000.0 # kg/m³
SPECIFIC_HEAT_WATER = 4182.0 # J/(kg·°C)
LATENT_HEAT_VAPORIZATION = 2495.0e06 # J/m³
[docs]
class PRMSStreamTempHumidityCBH(ConservativeProcess):
"""PRMS stream temperature with time-varying humidity input.
PRMSStreamTemp and PRMSStreamTempHumidityCBH model stream temperature
following PRMS 5.2.1.1. The class :class:`PRMSStreamTemp` covers the
situation where humidity is supplied by the parameter files
(``strmtemp_humidity_flag=1``) and :class:`PRMSStreamTempHumidityCBH`
covers the case of when humidity is supplied by time-varying input files
(NetCDF CBH equivalents, e.g. ``strmtemp_humidity_flag=0``. Note there is
not currently an implementation for the PRMS case
``strmtemp_humidity_flag=2``). This documentation provides general
background for both classes.
The stream temperature classes use:
- :class:`PRMSHydraulicGeometryFull` and
:class:`PRMSHydraulicGeometryWidthOnly` as upstream processes which
provide the hydraulic geometry variables (and are renamed seg_flow_*
compared to the PRMS seg_* variables)
- :class:`PRMSStreamShadeConstant` and :class:`PRMSStreamShadeDynamic`
as models of stream shade. These are to be passed to or "composed" into
the stream temperature on initialization.
See the example notebooks
`examples/01_multi-process_models.ipynb
<https://github.com/DOI-USGS/pywatershed/blob/develop/examples/01_multi-process_models.ipynb>`__
and
`examples/02_prms_legacy_models.ipynb
<https://github.com/DOI-USGS/pywatershed/blob/develop/examples/02_prms_legacy_models.ipynb>`__
for worked examples.
This implementation is based on PRMS 5.2.1.1 with theoretical documentation
given by:
`Markstrom, Steven L. P2S -- Coupled simulation with the
Precipitation-Runoff Modeling System (PRMS) and the Stream Temperature
Network (SNTemp) Models.
No. 2012-1116. US Geological Survey, 2012.
<https://pubs.usgs.gov/publication/ofr20121116>`__
`Sanders, M.J., Markstrom, S.L., Regan, R.S., and Atkinson, R.D., 2017,
Documentation of a daily mean stream temperature module — An enhancement to
the Precipitation-Runoff Modeling System: U.S. Geological Survey Techniques
and Methods, book 6, chap. D4, 18 p. <https://doi.org/10.3133/tm6D4>`__
The stream temperature module computes daily mean water temperature for
each stream segment using an energy balance approach. The module accounts
for:
- Solar radiation (shortwave)
- Longwave radiation (atmospheric and vegetation)
- Convection and evaporation
- Conduction with the streambed
- Temperature of inflows (upstream, lateral, groundwater, subsurface)
- Shading from riparian vegetation and topography
HRU quantities are aggregated to segments internally, computing segment-
level meteorological variables (seg_tave_air, seg_humid, seg_ccov,
seg_melt, seg_rain) from HRU inputs. This follows the Fortran PRMS
stream_temp.f90 logic (lines ~699-850).
Args:
control: a Control object
discretization: a discretization of class Parameters
parameters: a parameter object of class Parameters
seg_outflow: Streamflow leaving each segment (from PRMSChannel)
seg_lateral_inflow: Lateral inflow entering each segment
(from PRMSChannel)
swrad: Solar radiation for each HRU (from PRMSAtmosphere)
potet: Potential ET for each HRU (from PRMSAtmosphere)
sroff: Surface runoff for each HRU (from PRMSRunoff)
ssres_flow: Subsurface flow for each HRU (from PRMSSoilzone)
gwres_flow: Groundwater flow for each HRU (from PRMSGroundwater)
tavgc: Average air temperature for each HRU in Celsius
(from PRMSAtmosphere)
snowmelt: Snowmelt for each HRU (from PRMSSnow)
hru_rain: Rainfall for each HRU (from PRMSAtmosphere)
soltab_potsw: Potential shortwave radiation table for current day
(from PRMSSolarGeometry or Adapter)
humidity_hru: Humidity for each HRU (from CBH via Adapter).
Values are relative humidity in percent (0-100).
seg_flow_width: Flow-dependent width from PRMSHydraulicGeometryFull
seg_flow_depth: Flow-dependent depth from PRMSHydraulicGeometryFull
seg_flow_area: Flow-dependent cross-sectional area from
PRMSHydraulicGeometryFull
seg_flow_velocity: Flow-dependent velocity from
PRMSHydraulicGeometryFull
stream_shade: PRMSStreamShade instance (Dynamic or Constant).
If provided, stream_shade_class and stream_shade_parameters
are ignored.
stream_shade_class: Class to use for stream shade computation
(e.g., PRMSStreamShadeDynamic, PRMSStreamShadeConstant).
Only used if stream_shade is None.
stream_shade_parameters: Parameters for stream shade computation.
Can be a Parameters object or a Path to a parameter file.
Only used if stream_shade is None and stream_shade_class is
provided.
imbalance_behavior: one of ["defer", None, "warn", "error"]
verbose: Print extra information or not?
use_vectorized_shade: Use vectorized shade computation for all
segments at once (default True)
track_energy_fluxes: Track energy flux terms for budget analysis
"""
[docs]
def __init__(
self,
control: Control,
discretization: Parameters,
parameters: Parameters,
seg_outflow: adaptable,
seg_lateral_inflow: adaptable,
swrad: adaptable,
potet: adaptable,
sroff: adaptable,
ssres_flow: adaptable,
gwres_flow: adaptable,
tavgc: adaptable,
snowmelt: adaptable,
hru_rain: adaptable,
soltab_potsw: adaptable,
humidity_hru: adaptable,
seg_flow_width: adaptable = None,
seg_flow_depth: adaptable = None,
seg_flow_area: adaptable = None,
seg_flow_velocity: adaptable = None,
stream_shade: Union[PRMSStreamShade, None] = None,
stream_shade_class: Union[type, None] = None,
stream_shade_parameters: Union[Parameters, Path, None] = None,
calc_method: Literal["numba", "numpy", None] = None,
imbalance_behavior: Literal["defer", None, "warn", "error"] = "defer",
input_aliases: dict = None,
verbose: bool = False,
use_vectorized_shade: bool = True,
track_energy_fluxes: bool = True,
atmos_exchange_factor: float = 1.0,
) -> None:
super().__init__(
control=control,
discretization=discretization,
parameters=parameters,
imbalance_behavior=imbalance_behavior,
input_aliases=input_aliases,
)
self.name = "PRMSStreamTempHumidityCBH"
# Handle stream_shade initialization before _set_inputs
# This resolves the stream_shade from multiple possible input styles
stream_shade = self._init_stream_shade(
stream_shade=stream_shade,
stream_shade_class=stream_shade_class,
stream_shade_parameters=stream_shade_parameters,
parameters=parameters,
discretization=discretization,
)
# Update locals with resolved stream_shade for _set_inputs
# Remove the class/parameters options since they've been resolved
input_locals = locals().copy()
input_locals["stream_shade"] = stream_shade
input_locals.pop("stream_shade_class", None)
input_locals.pop("stream_shade_parameters", None)
self._set_inputs(input_locals)
self._set_options(input_locals)
# Store atmospheric exchange factor for sensitivity testing
self._atmos_exchange_factor = atmos_exchange_factor
# Just the names without the catergorization, in one list.
self._energy_flux_vars = [
item
for vals in self.get_energy_budget_terms().values()
for item in vals
]
self._set_budget(basis="unit", quantity="energy")
self._initialize_stream_temp()
self._init_calc_method()
# Consistency checks for energy flux tracking
if not self._track_energy_fluxes:
# Check 1: imbalance_behavior must be None if not tracking
# energy fluxes
if imbalance_behavior is not None:
msg = (
"Inconsistent options: track_energy_fluxes=False "
f"requires imbalance_behavior=None, but "
f"imbalance_behavior={imbalance_behavior!r}"
)
raise ValueError(msg)
# Check 2: Set energy flux variables to None if not tracking
for var in self._energy_flux_vars:
if hasattr(self, var):
setattr(self, var, None)
else:
# Initialize arrays for storing energy flux terms during
# calculation
self._hs_terms = np.zeros(self.nsegment)
self._ha_terms = np.zeros(self.nsegment)
self._hf_terms = np.zeros(self.nsegment)
self._hv_terms = np.zeros(self.nsegment)
self._evap_terms = np.zeros(self.nsegment)
self._t_abs4_terms = np.zeros(self.nsegment)
self._upstream_flows = np.zeros(self.nsegment)
self._lateral_flows = np.zeros(self.nsegment)
return
[docs]
@staticmethod
def get_dimensions() -> tuple:
return ("nhru", "nsegment", "nmonth", "ndoy")
[docs]
@staticmethod
def get_parameters() -> tuple:
return (
"doy",
"hru_segment",
"hru_area",
"hru_slope",
"tosegment",
"albedo",
"lat_temp_adj",
"seg_length",
"seg_slope",
"seg_lat",
"seg_elev",
"seg_close",
"ss_tau",
"gw_tau",
"melt_temp",
"maxiter_sntemp",
"stream_tave_init",
)
[docs]
@staticmethod
def get_init_values() -> dict:
return {
"seg_tave_water": -99.9,
"seg_tave_upstream": 0.0,
"seg_tave_gw": 0.0,
"seg_tave_ss": 0.0,
"seg_tave_lat": 0.0,
"seg_shade": 0.0,
"seg_potet": 0.0,
"seginc_sroff": 0.0,
"seginc_ssflow": 0.0,
"seginc_gwflow": 0.0,
"seginc_swrad": 0.0,
# Segment-level meteorological variables (computed from HRU inputs)
"seg_tave_air": 0.0,
"seg_humid": 0.0,
"seg_ccov": 0.0,
"seg_melt": 0.0,
"seg_rain": 0.0,
# Energy flux variables (W)
"heat_upstream": 0.0,
"heat_lateral": 0.0,
"solar_radiation": 0.0,
"atmospheric_longwave": 0.0,
"friction_heat": 0.0,
"groundwater_conduction": 0.0,
"heat_outflow": 0.0,
"longwave_emission": 0.0,
"longwave_vegetation": 0.0,
"evaporative_cooling": 0.0,
"convective_exchange": 0.0,
}
[docs]
@staticmethod
def get_mass_budget_terms() -> dict:
# Temperature is not a mass, so no budget terms
return {
"inputs": [],
"outputs": [],
"storage_changes": [],
}
[docs]
@staticmethod
def get_energy_budget_terms() -> dict:
"""Get energy budget terms for stream temperature.
Returns:
Dictionary with inputs, outputs, exchanges, and storage_changes
for energy budget.
Notes:
Energy fluxes are computed in Watts (J/s). The budget tracks:
- Advective heat transport (upstream, lateral, outflow)
- Surface energy exchange (solar, longwave, evaporation)
- Internal sources (friction, groundwater conduction)
- Bi-directional exchanges that can be gains or losses
Storage changes are empty because the kinematic wave assumption
means water storage is constant - only temperature (and thus
heat content) changes, which is captured by the balance of
inputs and outputs.
Exchanges are net fluxes that can be positive (heat gain) or
negative (heat loss) depending on temperature gradients:
- convective_exchange: depends on air vs water temperature
While it seems that the following could be exchanges, their current
formulation restricts their sign and they have been categorized
to match that pre-determined sign:
- longwave_vegetation: net longwave exchange with vegetation
- groundwater_conduction: depends on groundwater vs water temp
"""
return {
"inputs": [
"heat_upstream", # Advective heat from upstream (W)
"heat_lateral", # Advective heat from lateral (W)
"solar_radiation", # Net shortwave radiation (W)
"atmospheric_longwave", # Atmospheric LW radiation (W)
"friction_heat", # Friction heating (W)
"groundwater_conduction", # Heat from groundwater (W)
],
"exchanges": [
"convective_exchange", # Sensible heat exchange (W, ±)
],
"outputs": [
"heat_outflow", # Advective heat leaving (W)
"longwave_emission", # LW radiation emitted (W)
"longwave_vegetation", # LW from vegetation (W)
"evaporative_cooling", # Latent heat loss (W)
],
"storage_changes": [
# Empty - kinematic wave assumption (no storage change)
],
}
def _init_stream_shade(
self,
stream_shade: Union[PRMSStreamShade, None],
stream_shade_class: Union[type, None],
stream_shade_parameters: Union[Parameters, Path, None],
parameters: Parameters,
discretization: Parameters,
) -> PRMSStreamShade:
"""Initialize stream shade from various input options.
Three cases are handled:
1. stream_shade is provided directly - use it as-is
2. stream_shade_class and optionally stream_shade_parameters are
provided - instantiate the class with the parameters
3. All are None - default to PRMSStreamShadeConstant using
parameters from self._params
Args:
stream_shade: Pre-instantiated PRMSStreamShade object
stream_shade_class: Class to instantiate for shade computation
stream_shade_parameters: Parameters for shade class (Parameters
object or Path)
parameters: The main parameters for PRMSStreamTemp
discretization: Discretization parameters
Returns:
PRMSStreamShade instance
"""
# Case 1: stream_shade provided directly
if stream_shade is not None:
if not isinstance(stream_shade, PRMSStreamShade):
raise TypeError(
f"stream_shade must be a PRMSStreamShade instance, "
f"got {type(stream_shade)}"
)
return stream_shade
# Case 2: stream_shade_class provided
if stream_shade_class is not None:
if not issubclass(stream_shade_class, PRMSStreamShade):
raise TypeError(
f"stream_shade_class must be a subclass of "
f"PRMSStreamShade, got {stream_shade_class}"
)
# Load parameters if provided as path
if stream_shade_parameters is not None:
if isinstance(stream_shade_parameters, (str, Path)):
shade_params = Parameters.from_netcdf(
stream_shade_parameters
)
else:
shade_params = stream_shade_parameters
else:
# Use the main parameters if no separate shade params provided
shade_params = parameters
return stream_shade_class(shade_params, discretization)
# zero is the PRMS default value for this option
stream_temp_shade_flag = self.control.options.get(
"stream_temp_shade_flag", 0
)
if stream_temp_shade_flag == 1:
stream_shade_class = PRMSStreamShadeConstant
elif stream_temp_shade_flag != 0:
raise ValueError(
f"Invalid value for {stream_temp_shade_flag=} option"
)
else:
stream_shade_class = PRMSStreamShadeDynamic
# Case 3: All None - default to PRMSStreamShadeConstant
# Try to instantiate using the main parameters
try:
return stream_shade_class(parameters, discretization)
except (KeyError, AttributeError) as e:
raise ValueError(
"stream_shade not provided and could not initialize "
f"{stream_shade_class} from parameters. Either provide "
"stream_shade, or stream_shade_class with "
"stream_shade_parameters, or ensure parameters contain "
f"the required shade parameters. Original error: {e}"
) from e
[docs]
def initialize_netcdf(
self,
output_dir: Union[str, pl.Path] = None,
separate_files: bool = None,
budget_args: dict = None,
output_vars: list = None,
extra_coords: dict = None,
addtl_output_vars: list = None,
) -> None:
"""Initialize NetCDF output with energy flux tracking checks.
This method overrides the parent class to add consistency checks
for energy flux tracking.
Args:
output_dir: base directory path or NetCDF file path if
separate_files is True
separate_files: boolean indicating if storage component output
variables should be written to a separate file for each
variable
budget_args: arguments to pass to budget initialization
output_vars: list of variable names to output
extra_coords: extra coordinates to add to the output
addtl_output_vars: additional output variables
Returns:
None
"""
(
output_dir,
output_vars,
separate_files,
) = self._reconcile_nc_args_w_control_opts(
output_dir, output_vars, separate_files
)
# Set output_vars appropriately based on tracking
if output_vars is None:
if self._track_energy_fluxes:
# Include all variables
output_vars = self.get_variables()
else:
# Exclude energy flux variables
output_vars = [
vv
for vv in self.get_variables()
if vv not in self._energy_flux_vars
]
else:
# Check if energy flux variables are requested when not tracking
output_vars = list(set(self.get_variables()) & set(output_vars))
if not self._track_energy_fluxes:
conflicting_vars = set(self._energy_flux_vars) & set(
output_vars
)
if conflicting_vars:
# Warn and filter out the conflicting variables
msg = (
"Variables omitted from NetCDF output because "
"PRMSStreamTemp energy fluxes are not tracked: "
f"{sorted(conflicting_vars)}"
)
warn(msg)
# Remove conflicting variables from output_vars
output_vars = [
v for v in output_vars if v not in conflicting_vars
]
# Call parent class initialize_netcdf
super().initialize_netcdf(
output_dir=output_dir,
separate_files=separate_files,
budget_args=budget_args,
output_vars=output_vars,
extra_coords=extra_coords,
addtl_output_vars=addtl_output_vars,
)
return
def _set_initial_conditions(self) -> None:
# Initialize state variables
# seg_tave_water starts as -99.9 from get_init_values
# Segments with no upstream HRUs will be set to NaN ("never has flow")
# in _initialize_stream_temp
# Don't override seg_tave_water here - it's already -99.9
self.seg_tave_upstream[:] = np.nan
self.seg_tave_gw[:] = zero
self.seg_tave_ss[:] = zero
self.seg_tave_lat[:] = zero
self.seg_shade[:] = zero
self._seg_inflow = np.zeros(self.nsegment, dtype=np.float64)
# Initialize circular buffers for temperature averaging if not
# already done
self.gw_silo = np.zeros(
(self.nsegment, MAX_DAYS_PER_YEAR), dtype=np.float64
)
self.ss_silo = np.zeros(
(self.nsegment, MAX_DAYS_PER_YEAR), dtype=np.float64
)
self.gw_index = np.zeros(self.nsegment, dtype=np.int32)
self.ss_index = np.zeros(self.nsegment, dtype=np.int32)
# Initialize circular buffers with -99.9 (invalid marker) to
# match Fortran PRMS
# Values will be filled as air temps are added over time
self.gw_silo[:, :] = -99.9
self.ss_silo[:, :] = -99.9
self.gw_index[:] = 0
self.ss_index[:] = 0
# Initialize running sum arrays for temperature averaging
self.gw_sum = np.zeros(self.nsegment, dtype=np.float64)
self.ss_sum = np.zeros(self.nsegment, dtype=np.float64)
self.gw_sum[:] = 0.0
self.ss_sum[:] = 0.0
return
def _initialize_stream_temp(self) -> None:
"""Initialize stream temperature data structures."""
self.nsegment = self._params.dims["nsegment"]
self.nhru = self._params.dims["nhru"]
# Extract scalar parameters (dimension "one")
# These are stored as arrays but should be scalars
self.maxiter_sntemp = float(self.maxiter_sntemp[0])
self.albedo = float(self.albedo[0])
self.melt_temp = float(self.melt_temp[0])
# Validate and correct seg_length (Fortran lines 428-435)
# Check before conversion to km
errors = []
for i in range(self.nsegment):
if self.seg_length[i] < NEARZERO:
errors.append(
f"seg_length too small for segment {i}: "
f"{self.seg_length[i]}"
)
elif self.seg_length[i] < 1.0 and self._verbose:
warn(
f"seg_length < 1.0 meters for segment {i}: "
f"{self.seg_length[i]}"
)
if errors:
raise ValueError("\n".join(errors))
# Convert seg_length from meters to kilometers (Fortran line 441)
self.seg_length_km = self.seg_length / 1000.0
# Validate and correct seg_slope (Fortran lines 436-440)
# New minimum is 0.0000001 (was 0.0001 in old version)
for i in range(self.nsegment):
if self.seg_slope[i] < 0.0000001:
if self._verbose:
warn(
f"seg_slope < 0.0000001 for segment {i}, "
f"setting to 0.0000001 (was {self.seg_slope[i]})"
)
self.seg_slope[i] = 0.0000001
# Note: lat_temp_adj units are degrees Celsius (not decimal fraction)
# This matches Fortran stream_temp.f90 line 202
# Compute hru_cossl from hru_slope (matches Fortran/PRMSSolarGeometry)
self._hru_cossl = np.cos(np.arctan(self.hru_slope))
# Get segment ordering for upstream-to-downstream calculations
self._compute_segment_order()
# Note: Circular buffers (gw_silo, ss_silo, gw_index, ss_index) are
# initialized in _set_initial_conditions to ensure proper timing
# Initialize segment aggregate arrays
self.seg_potet = np.zeros(self.nsegment, dtype=np.float64)
self.seginc_sroff = np.zeros(self.nsegment, dtype=np.float64)
self.seginc_ssflow = np.zeros(self.nsegment, dtype=np.float64)
self.seginc_gwflow = np.zeros(self.nsegment, dtype=np.float64)
self.seginc_swrad = np.zeros(self.nsegment, dtype=np.float64)
# Compute segment HRU areas (sum of HRU areas contributing to
# each segment)
self.segment_hruarea = np.zeros(self.nsegment, dtype=np.float64)
for j in range(self.nhru):
seg_idx = self.hru_segment[j]
if seg_idx > 0:
i = seg_idx - 1
self.segment_hruarea[i] += self.hru_area[j]
# Compute upstream segment information
self._compute_upstream_info()
# Compute segment_up - the single immediate upstream segment
# (matches Fortran Segment_up)
# Fortran's Segment_up is computed by iterating segments and
# assigning Segment_up(toseg) = j
# This means when multiple segments flow into one, it keeps the
# LAST one (highest j)
# So we need to find the last upstream in segment numbering order
segment_up = np.zeros(self.nsegment, dtype=np.int32)
for j in range(self.nsegment):
toseg = self.tosegment[j]
if toseg > 0:
# toseg is 1-based, convert to 0-based
segment_up[toseg - 1] = j
# Note: segment_up[i] = 0 means no upstream (default from zeros
# initialization)
# Save segment_up as instance variable (needed for routing
# aggregation logic)
self.segment_up = segment_up
# Handle seg_close parameter (new in PRMS 5.2.1.1)
# Matches Fortran stream_temp.f90 lines 540-576
# seg_close is now a parameter that can be:
# -1: auto-set to segment_up (with warning)
# >0: user-specified closest segment for segments without HRUs
# Check if seg_close is specified or should default
# (Fortran line 542-545)
if self.seg_close[0] == -1:
# Not specified - use segment_up with warning
self.seg_close = np.copy(segment_up)
self.seg_close_flag = 0
if self._verbose:
warn(
"seg_close parameter not specified (value=-1), "
"setting to segment_up or other segment if no segment_up"
)
else:
# User specified - validate that segments without HRUs have
# valid seg_close
self.seg_close_flag = 1
errors = []
for i in range(self.nsegment):
if self.segment_hruarea[i] <= NEARZERO:
if self.seg_close[i] == 0:
errors.append(
f"segment {i} does not have associated HRUs "
f"but seg_close is 0"
)
if errors:
raise ValueError("\n".join(errors))
# For auto-assigned seg_close (seg_close_flag=0), handle special
# cases (Fortran lines 563-576)
if self.seg_close_flag == 0:
for jj in range(self.nsegment):
i = self.segment_order[jj]
if self.segment_hruarea[i] <= NEARZERO:
# If segment_up is 0, need to find alternative
if self.segment_up[i] == 0:
if jj > 0:
# Use previous segment in route order
self.seg_close[i] = self.segment_order[jj - 1]
if self._verbose:
warn(
f"segment_up = 0 without associated HRU "
f"for segment {i}, will use previous "
f"segment in route order"
)
else:
raise ValueError(
f"Cannot set associated segment for segment "
f"without associated HRU for segment {i}. "
f"Must specify seg_close for this case."
)
# Mark segments with no upstream HRUs as never having flow
# (matches Fortran initialization around line 648)
for i in range(self.nsegment):
if self.segment_hruarea[i] <= NEARZERO:
# Check if any upstream segments have HRUs
has_upstream_hrus = False
this_seg = i
visited = set()
while this_seg not in visited:
visited.add(this_seg)
# Check upstream segments
found_upstream = False
for j in range(self.nsegment):
if (
self.tosegment[j] > 0
and self.tosegment[j] == this_seg + 1
):
if self.segment_hruarea[j] > NEARZERO:
has_upstream_hrus = True
break
this_seg = j
found_upstream = True
break
if has_upstream_hrus or not found_upstream:
break
# If no upstream HRUs, mark as never having flow (NaN)
# Valid segments are already -99.9 from get_init_values
if not has_upstream_hrus:
self.seg_tave_water[i] = np.nan
# Note: seg_length conversion to km is done earlier as seg_length_km
# Precompute solar geometry for each day of year
self._precompute_solar_geometry()
return
def _precompute_solar_geometry(self) -> None:
"""Precompute solar declination for each day of year."""
# Solar declination for each day of year (vectorized)
jdays = np.arange(MAX_DAYS_PER_YEAR)
k = jdays + 1 # Convert to 1-based day of year
self.declination = 0.40928 * np.cos(
((2.0 * PI) / DAYS_YR) * (172.0 - k)
) # radians
return
def _compute_segment_order(self) -> None:
"""Compute topologically sorted segment order for calculation.
Uses NetworkX for topological sorting, matching the approach in
PRMSChannel.
"""
# Build connectivity list (tosegment is 1-based, convert to 0-based)
connectivity = []
self._outflow_mask = np.full((len(self.tosegment)), False)
for iseg in range(self.nsegment):
toseg = self.tosegment[iseg] - 1 # Convert to 0-based
if toseg >= 0: # -1 means outlet in 0-based indexing
connectivity.append((iseg, toseg))
else:
self._outflow_mask[iseg] = True
# Use NetworkX for topological sort
if self.nsegment > 1 and len(connectivity) > 0:
graph = nx.DiGraph()
graph.add_edges_from(connectivity)
segment_order = list(nx.topological_sort(graph))
else:
segment_order = list(range(self.nsegment))
# if the domain contains links with no upstream or
# downstream reaches, we just throw these back at the
# top of the order since networkx wont handle such nonsense
wh_mask_set = set(np.where(self._outflow_mask)[0])
seg_ord_set = set(segment_order)
mask_not_seg_ord = list(wh_mask_set - seg_ord_set)
if len(mask_not_seg_ord):
segment_order = mask_not_seg_ord + segment_order
for pp in mask_not_seg_ord:
assert ((self.tosegment[pp] - 1) == -1) and (
pp not in (self.tosegment - 1)
)
self.segment_order = np.array(segment_order, dtype=np.int32)
return
def _compute_upstream_info(self) -> None:
"""Compute upstream segment information for each segment."""
# Count upstream segments
self.upstream_count = np.zeros(self.nsegment, dtype=np.int32)
max_upstream = 10 # Assume max 10 upstream segments
self.upstream_idx = np.zeros(
(self.nsegment, max_upstream), dtype=np.int32
)
for jj in range(self.nsegment):
count = 0
for ii in range(self.nsegment):
toseg = self.tosegment[ii]
if (
toseg > 0 and toseg == jj + 1
): # 1-based, with 0 meaning outlet
self.upstream_idx[jj, count] = ii
count += 1
self.upstream_count[jj] = count
return
def _init_calc_method(self):
"""Initialize calculation method (numpy or numba)."""
if self._calc_method is None:
self._calc_method = "numba"
avail_methods = ["numpy", "numba"]
if self._calc_method.lower() not in avail_methods:
msg = (
f"Invalid calc_method={self._calc_method} for {self.name}. "
f"Setting calc_method to 'numba' for {self.name}"
)
warn(msg)
self._calc_method = "numba"
if self._calc_method.lower() == "numba":
numba_msg = f"{self.name} jit compiling with numba"
print(numba_msg, flush=True)
# Wrap methods with numba jit compilation
self._update_running_avg_temp = nb.njit(
self._update_running_avg_temp
)
self._compute_lateral_temp = nb.njit(self._compute_lateral_temp)
self._compute_water_temp = nb.njit(self._compute_water_temp)
# else: methods are already available without wrapping
return
def _advance_variables(self) -> None:
"""Advance variables from current to previous timestep."""
# No explicit time advancement needed for this module
return
def _calculate(self, time_length) -> None:
"""Calculate stream temperature for all segments."""
# Get current month (1-based) and day of year
doy = self.control.current_doy - 1
# Get declination for current day
declination = self.declination[doy]
# Determine summer flag for vegetation density (1=summer, 0=winter)
summer_flag = 1 if 121 <= (doy + 1) <= 273 else 0
# Compute segment aggregate variables from HRU inputs
self._compute_segment_aggregates()
# Compute seg_potet using stream_temp.f90 logic (after aggregates)
self._compute_seg_potet()
# Note: Hydraulic geometry (seg_flow_*) is now provided by upstream
# PRMSHydraulicGeometryFull process, not computed here
# Compute running average temperatures for groundwater and subsurface
# Skip segments marked as never having flow (matches Fortran checks)
self._update_running_avg_temp(
self.segment_order,
self.seg_tave_water,
self.seginc_swrad,
self.seg_tave_air,
self.gw_tau,
self.gw_index,
self.gw_silo,
self.gw_sum,
self.seg_tave_gw,
self.ss_tau,
self.ss_index,
self.ss_silo,
self.ss_sum,
self.seg_tave_ss,
)
# Compute lateral flow temperatures
# Skip segments marked as never having flow or data
self._compute_lateral_temp(
self.segment_order,
self.seg_tave_water,
self.seginc_swrad,
self.seg_lateral_inflow,
self.seginc_sroff,
self.seginc_ssflow,
self.seginc_gwflow,
self.seg_melt,
self.seg_rain,
self.seg_tave_gw,
self.seg_tave_air,
self.seg_tave_ss,
self.melt_temp,
self.lat_temp_adj,
self.control.current_month,
self.seg_tave_lat,
)
# Initialize seg_tave_upstream to 0.0 each timestep
# (matches Fortran line 463)
# Segments that are skipped will keep this 0.0 value
self.seg_tave_upstream[:] = 0.0
# Compute shade - vectorized or loop-based depending on flag
if self._use_vectorized_shade:
# VECTORIZED: Compute shade for all segments at once
self.seg_shade[:], seg_svi_all = self._stream_shade.compute_all(
declination, summer_flag, self.seg_flow_width
)
else:
# LOOP-BASED: Compute shade one segment at a time (original method)
seg_svi_all = np.zeros(self.nsegment, dtype=np.float64)
# Compute water temperature for each segment (must be done in
# segment_order)
# Don't reset segments marked as NaN (never have flow)
# Valid segments get reset to -99.9 before calculation
for jj in self.segment_order:
if not np.isnan(self.seg_tave_water[jj]):
self.seg_tave_water[jj] = -99.9
# Note: Assumes vectorized shade is being used
self._compute_water_temp(
self.segment_order,
self.seg_tave_water,
self.seginc_swrad,
self.seg_outflow,
self.upstream_count,
self.upstream_idx,
self.seg_lateral_inflow,
self.seg_tave_upstream,
seg_svi_all,
self._seg_inflow,
self.seg_tave_lat,
self.seginc_swrad,
self.seg_humid,
self.seg_elev,
self.seg_potet,
self.seg_shade,
self.seg_ccov,
self.seg_flow_width,
self.seg_slope,
self.seg_tave_gw,
self.seg_length_km,
self.albedo,
int(self.maxiter_sntemp),
self._track_energy_fluxes,
self._hs_terms if self._track_energy_fluxes else np.zeros(1),
self._ha_terms if self._track_energy_fluxes else np.zeros(1),
self._hf_terms if self._track_energy_fluxes else np.zeros(1),
self._hv_terms if self._track_energy_fluxes else np.zeros(1),
self._evap_terms if self._track_energy_fluxes else np.zeros(1),
self._t_abs4_terms if self._track_energy_fluxes else np.zeros(1),
self._upstream_flows if self._track_energy_fluxes else np.zeros(1),
self._lateral_flows if self._track_energy_fluxes else np.zeros(1),
self._atmos_exchange_factor,
)
# Compute energy fluxes for all segments (vectorized)
if self._track_energy_fluxes:
self._compute_energy_fluxes_vectorized()
return
def _compute_segment_aggregates(self) -> None:
"""Compute segment aggregate variables from HRU inputs.
This implements the aggregation calculations from PRMS stream_temp.f90
around line 699-850. Computes:
- seginc_sroff, seginc_ssflow, seginc_gwflow, seginc_swrad (flow/rad)
- seg_tave_air, seg_melt, seg_rain, seg_ccov (met vars, non-humidity)
- seg_humid via _compute_seg_humid_cbh_numba (CBH humidity path)
"""
# Compute all segment aggregates except humidity
_compute_segment_aggregates_numba(
self.nhru,
self.nsegment,
self.hru_segment,
self.hru_area,
self.sroff,
self.ssres_flow,
self.gwres_flow,
self.swrad,
self.segment_hruarea,
self.segment_up,
self.tosegment,
self.seginc_sroff,
self.seginc_ssflow,
self.seginc_gwflow,
self.seginc_swrad,
self.tavgc,
self.snowmelt,
self.hru_rain,
self.soltab_potsw,
self._hru_cossl,
self.segment_order,
self.seg_close,
self.seg_tave_air,
self.seg_melt,
self.seg_rain,
self.seg_ccov,
)
# Compute seg_humid from CBH humidity_hru input (flag==0 path)
_compute_seg_humid_cbh_numba(
self.nhru,
self.nsegment,
self.hru_segment,
self.hru_area,
self.humidity_hru,
self.segment_hruarea,
self.segment_order,
self.seg_close,
self.seg_humid,
)
return
def _compute_seg_potet(self) -> None:
"""Compute seg_potet using stream_temp.f90 logic.
This matches stream_temp.f90 lines 807-848, using segment_order
and seg_close.
"""
# Use numba-optimized function for performance
_compute_seg_potet_numba(
self.nhru,
self.nsegment,
self.hru_segment,
self.hru_area,
self.potet,
self.segment_order,
self.segment_hruarea,
self.seg_close,
self.seg_potet,
)
return
# Note: _compute_hydraulic_geometry removed - now handled by
# PRMSHydraulicGeometryFull process
def _update_running_avg_temp_single(
self, seg_idx: int, comp_type: str
) -> None:
"""Update running average temperature for groundwater or
subsurface (single segment).
This matches the Fortran PRMS implementation which uses a running sum
divided by tau, with a circular buffer.
Args:
seg_idx: Segment index
comp_type: "gw" for groundwater or "ss" for subsurface
"""
if comp_type == "gw":
tau = self.gw_tau[seg_idx]
index = self.gw_index[seg_idx]
silo = self.gw_silo
sum_array = self.gw_sum
elif comp_type == "ss":
tau = self.ss_tau[seg_idx]
index = self.ss_index[seg_idx]
silo = self.ss_silo
sum_array = self.ss_sum
else:
raise ValueError(f"Invalid component type: {comp_type}")
# Remove old value from sum (circular buffer behavior)
sum_array[seg_idx] -= silo[seg_idx, index]
# Add new air temperature to silo
silo[seg_idx, index] = self.seg_tave_air[seg_idx]
# Add new value to sum
sum_array[seg_idx] += silo[seg_idx, index]
# Compute average as sum / tau (matches Fortran)
avg_temp = sum_array[seg_idx] / tau
if comp_type == "gw":
self.seg_tave_gw[seg_idx] = avg_temp
self.gw_index[seg_idx] = (index + 1) % int(tau)
elif comp_type == "ss":
self.seg_tave_ss[seg_idx] = avg_temp
self.ss_index[seg_idx] = (index + 1) % int(tau)
else:
raise ValueError(f"Invalid component type: {comp_type}")
return
def _compute_lateral_temp_single(
self, seg_idx: int, nowmonth: int
) -> None:
"""Compute lateral flow temperature for a segment (single segment).
Args:
seg_idx: Segment index
nowmonth: Current month (1-based)
"""
sroff = self.seginc_sroff[seg_idx]
ssflow = self.seginc_ssflow[seg_idx]
gwflow = self.seginc_gwflow[seg_idx]
melt = self.seg_melt[seg_idx]
rain = self.seg_rain[seg_idx]
tave_gw = self.seg_tave_gw[seg_idx]
tave_air = self.seg_tave_air[seg_idx]
tave_ss = self.seg_tave_ss[seg_idx]
melt_temp = self.melt_temp
# Use lat_inflow function for detailed lateral temperature calculation
tl_avg, qlat = self._lat_inflow(
self.seg_lateral_inflow[seg_idx],
sroff,
ssflow,
gwflow,
melt_temp,
tave_gw,
tave_air,
tave_ss,
melt,
rain,
)
# Apply monthly adjustment
if not np.isnan(tl_avg):
tl_avg += self.lat_temp_adj[nowmonth - 1, seg_idx]
# Ensure non-negative (also converts NaN to 0.0 to match Fortran)
if np.isnan(tl_avg) or tl_avg < 0.0:
tl_avg = 0.0
self.seg_tave_lat[seg_idx] = tl_avg
return
def _lat_inflow(
self,
seg_lateral_inflow,
seginc_sroff,
seginc_ssflow,
seginc_gwflow,
melt_temp,
tave_gw,
tave_air,
tave_ss,
melt,
rain,
):
"""Compute lateral inflow temperature from components.
This is the lat_inflow function from PRMS.
Args:
seg_lateral_inflow: Total lateral inflow to segment (cfs)
seginc_sroff: Surface runoff component (cfs)
seginc_ssflow: Subsurface flow component (cfs)
seginc_gwflow: Groundwater flow component (cfs)
melt_temp: Snowmelt temperature (degC)
tave_gw: Groundwater temperature (degC)
tave_air: Air temperature (degC)
tave_ss: Subsurface temperature (degC)
melt: Snowmelt (inches)
rain: Rainfall (inches)
Returns:
tl_avg: Weighted average lateral inflow temperature (degC)
qlat: Lateral inflow (cms)
"""
return _lat_inflow(
seg_lateral_inflow,
seginc_sroff,
seginc_ssflow,
seginc_gwflow,
melt_temp,
tave_gw,
tave_air,
tave_ss,
melt,
rain,
)
def _compute_upstream_temp(self, seg_idx: int) -> None:
"""Compute temperature from upstream segments.
Args:
seg_idx: Segment index
"""
flow_sum = 0.0
temp_sum = 0.0
for kk in range(self.upstream_count[seg_idx]):
up_idx = self.upstream_idx[seg_idx, kk]
if (
not np.isnan(self.seg_tave_water[up_idx])
and self.seg_tave_water[up_idx] > NOFLOW_TEMP
):
flow = self.seg_outflow[up_idx]
temp_sum += self.seg_tave_water[up_idx] * flow
flow_sum += flow
if flow_sum > 0.0:
self.seg_tave_upstream[seg_idx] = temp_sum / flow_sum
else:
self.seg_tave_upstream[seg_idx] = NOFLOW_TEMP
return
def _compute_shade(
self, seg_idx: int, declination: float, summer_flag: int
) -> float:
"""Compute shade using the composed stream_shade object.
NOTE: This method is used when self._use_vectorized_shade=False.
For vectorized computation, see compute_all in stream_shade.
Args:
seg_idx: Segment index
declination: Solar declination (radians)
summer_flag: 1 for summer, 0 for winter
Returns:
svi: Vegetation shade index
"""
# Delegate to composed shade computer
shade, svi = self._stream_shade.compute(
seg_idx,
declination,
summer_flag,
self.seg_flow_width[seg_idx],
)
self.seg_shade[seg_idx] = shade
return svi
def _compute_inflow(self, seg_idx: int) -> float:
"""Compute total inflow to a segment.
Args:
seg_idx: Segment index
Returns:
Total inflow (cfs)
"""
# Sum upstream flows
upstream_flow = 0.0
for kk in range(self.upstream_count[seg_idx]):
up_idx = self.upstream_idx[seg_idx, kk]
if not np.isnan(self.seg_tave_water[up_idx]):
upstream_flow += self.seg_outflow[up_idx]
# Add lateral inflow
total_inflow = upstream_flow + self.seg_lateral_inflow[seg_idx]
return total_inflow
def _compute_water_temp_single(self, seg_idx: int, svi: float) -> None:
"""Compute water temperature for a segment (single segment).
Args:
seg_idx: Segment index
svi: Vegetation shade index
"""
# Skip segments marked as permanently invalid (NaN = never has flow)
# (matches Fortran line 887, 894)
if np.isnan(self.seg_tave_water[seg_idx]):
# Never has flow - skip all calculations
return
if self.seginc_swrad[seg_idx] < -99.0:
# Never has data - mark and skip
self.seg_tave_water[seg_idx] = np.nan
return
# Check for no-flow conditions (matches Fortran check for
# seg_outflow <= 0)
if self.seg_outflow[seg_idx] <= 0.0:
self.seg_tave_water[seg_idx] = NOFLOW_TEMP
return
# Get upstream flow
upstream_flow = 0.0
for kk in range(self.upstream_count[seg_idx]):
up_idx = self.upstream_idx[seg_idx, kk]
if not np.isnan(self.seg_tave_water[up_idx]):
upstream_flow += self.seg_outflow[up_idx]
lateral_flow = self.seg_lateral_inflow[seg_idx]
# Check if we have any inflow
if upstream_flow < NEARZERO and lateral_flow < NEARZERO:
self.seg_tave_water[seg_idx] = NOFLOW_TEMP
return
# Compute mixed inlet temperature
t_in = self._compute_mixed_inlet_temp(
seg_idx, upstream_flow, lateral_flow
)
if np.isnan(t_in):
self.seg_tave_water[seg_idx] = NOFLOW_TEMP
return
# Compute equilibrium temperature using full energy balance
result = self._equilb(seg_idx, t_in, svi)
te, ak1, ak2 = result[0], result[1], result[2]
# Store energy flux terms if tracking (for vectorized computation)
if self._track_energy_fluxes:
self._hs_terms[seg_idx] = result[3]
self._ha_terms[seg_idx] = result[4]
self._hf_terms[seg_idx] = result[5]
self._hv_terms[seg_idx] = result[6]
self._evap_terms[seg_idx] = result[7]
self._t_abs4_terms[seg_idx] = result[8]
self._upstream_flows[seg_idx] = upstream_flow
self._lateral_flows[seg_idx] = lateral_flow
# Compute final temperature using twavg function
qlat = lateral_flow * CFS_TO_CMS
qup = upstream_flow # Upstream flow only (CFS)
tl_avg = self.seg_tave_lat[seg_idx]
self.seg_tave_water[seg_idx] = self._twavg(
qup, t_in, qlat, tl_avg, te, ak1, ak2, seg_idx
)
return
def _compute_mixed_inlet_temp(
self, seg_idx: int, upstream_flow: float, lateral_flow: float
) -> float:
"""Compute mixed inlet temperature from upstream and lateral sources.
Args:
seg_idx: Segment index
upstream_flow: Flow from upstream segments (cfs)
lateral_flow: Lateral inflow (cfs)
Returns:
Mixed inlet temperature (degC)
"""
return _compute_mixed_inlet_temp(
upstream_flow,
lateral_flow,
self.seg_tave_upstream[seg_idx],
self.seg_tave_lat[seg_idx],
)
def _compute_energy_fluxes_vectorized(self) -> None:
"""Compute energy fluxes for all segments after temperature
calculations.
This method uses intermediate terms stored during the temperature
calculation loop to avoid redundant computation. All terms are
computed vectorially for efficiency.
"""
# Surface areas (vectorized)
surface_area = (
self.seg_flow_width * self.seg_length_km * 1000.0
) # m² (convert km to m)
# Convert flows to m³/s (vectorized)
q_up_cms = self._upstream_flows * CFS_TO_CMS
q_lat_cms = self._lateral_flows * CFS_TO_CMS
q_out_cms = self.seg_outflow * CFS_TO_CMS
# === INPUTS (Heat gains) ===
# 1. Advective heat from upstream (W) - vectorized
self.heat_upstream[:] = np.where(
q_up_cms > NEARZERO,
q_up_cms
* self.seg_tave_upstream
* SPECIFIC_HEAT_WATER
* WATER_DENSITY,
0.0,
)
# 2. Advective heat from lateral inflow (W) - vectorized
self.heat_lateral[:] = np.where(
q_lat_cms > NEARZERO,
q_lat_cms
* self.seg_tave_lat
* SPECIFIC_HEAT_WATER
* WATER_DENSITY,
0.0,
)
# 3. Net shortwave solar radiation (W) - use stored terms
self.solar_radiation[:] = self._hs_terms * surface_area
# 4. Atmospheric longwave radiation (W) - use stored terms
self.atmospheric_longwave[:] = self._ha_terms * surface_area
# 5. Friction heating (W) - use stored terms
self.friction_heat[:] = self._hf_terms * surface_area
# 6. Groundwater conduction (W) - vectorized
self.groundwater_conduction[:] = self.seg_tave_gw * AKZ * surface_area
# === OUTPUTS (Heat losses) ===
# 7. Advective heat leaving segment (W) - vectorized
self.heat_outflow[:] = np.where(
q_out_cms > NEARZERO,
q_out_cms
* self.seg_tave_water
* SPECIFIC_HEAT_WATER
* WATER_DENSITY,
0.0,
)
# 8. Longwave radiation emitted by water surface (W)
# Use stored T^4 terms
self.longwave_emission[:] = A * self._t_abs4_terms * surface_area
# 9. Longwave radiation from riparian vegetation (W)
# Use stored terms
self.longwave_vegetation[:] = self._hv_terms * surface_area
# 10. Evaporative cooling (W) - use stored evap terms
self.evaporative_cooling[:] = (
self._evap_terms * LATENT_HEAT_VAPORIZATION * surface_area
)
# 11. Convective/sensible heat exchange (W) - vectorized residual
# Total inputs
total_inputs = (
self.heat_upstream
+ self.heat_lateral
+ self.solar_radiation
+ self.atmospheric_longwave
+ self.friction_heat
+ self.groundwater_conduction
)
# Total outputs (excluding convection)
total_outputs = (
self.heat_outflow
+ self.longwave_emission
+ self.longwave_vegetation
+ self.evaporative_cooling
)
# If the other potential exchanges could change sign, we'd need
# something like thisand the code below to calculate convective
# exchange as the residual
# total_exchanges = (
# self.groundwater_conduction + self.longwave_vegetation
# )
# Convection closes the balance (can be positive or negative)
self.convective_exchange[:] = total_outputs - total_inputs
# Sign flipped to be an exchange: positive = gain, negative = loss
# self.convective_exchange[:] = (
# total_outputs - total_inputs - total_exchanges
# )
return
def _equilb(self, seg_idx: int, t_o: float, svi: float):
"""Compute equilibrium temperature using full energy balance.
This is the equilb function from PRMS.
Args:
seg_idx: Segment index
t_o: Initial temperature (degC)
svi: Vegetation shade index
Returns:
te: Equilibrium temperature (degC)
ak1: First-order thermal exchange coefficient
ak2: Second-order thermal exchange coefficient
hs: Net shortwave solar radiation (W/m²)
ha: Atmospheric longwave radiation (W/m²)
hf: Friction heating (W/m²)
hv: Vegetation longwave radiation (W/m²)
evap: Evaporation rate (m/s)
t_abs4: Temperature to 4th power (K^4)
"""
return _equilb(
t_o,
svi,
self._seg_inflow[seg_idx],
self.seginc_swrad[seg_idx],
self.seg_humid[seg_idx],
self.seg_elev[seg_idx],
self.seg_potet[seg_idx],
self.seg_shade[seg_idx],
self.seg_ccov[seg_idx],
self.seg_flow_width[seg_idx],
self.seg_slope[seg_idx],
self.seg_tave_gw[seg_idx],
self.albedo,
int(self.maxiter_sntemp),
)
def _teak1(self, a_coef, b_coef, c_coef, d_coef, teq):
"""Solve for equilibrium temperature using Newton-Raphson iteration.
This is the teak1 function from PRMS.
Args:
a_coef, b_coef, c_coef, d_coef: Coefficients
teq: Initial guess for equilibrium temperature
Returns:
teq: Equilibrium temperature (degC)
ak1c: First-order thermal exchange coefficient
"""
return _teak1(
a_coef, b_coef, c_coef, d_coef, teq, int(self.maxiter_sntemp)
)
def _twavg(
self,
qup,
t0,
qlat,
tl_avg,
te,
ak1,
ak2,
seg_idx,
atmos_exchange_factor=None,
):
"""Compute average water temperature with lateral inflows.
This is the twavg function from PRMS.
Args:
qup: Upstream flow (cfs)
t0: Inlet temperature (degC)
qlat: Lateral flow (cms)
tl_avg: Lateral flow temperature (degC)
te: Equilibrium temperature (degC)
ak1: First-order thermal exchange coefficient
ak2: Second-order thermal exchange coefficient
seg_idx: Segment index
Returns:
tw: Average water temperature (degC)
"""
if atmos_exchange_factor is None:
atmos_exchange_factor = self._atmos_exchange_factor
return _twavg(
qup,
t0,
qlat,
tl_avg,
te,
ak1,
ak2,
self.seg_flow_width[seg_idx],
self.seg_length[seg_idx],
atmos_exchange_factor,
)
@staticmethod
def _update_running_avg_temp(
segment_order,
seg_tave_water,
seginc_swrad,
seg_tave_air,
gw_tau,
gw_index,
gw_silo,
gw_sum,
seg_tave_gw,
ss_tau,
ss_index,
ss_silo,
ss_sum,
seg_tave_ss,
):
"""Update running average temperatures for groundwater and subsurface.
This function can be optionally JIT-compiled with numba for
performance.
Args:
segment_order: Order to process segments (immutable)
seg_tave_water: Water temperature array (immutable, for skip
checks)
seginc_swrad: Solar radiation array (immutable, for skip checks)
seg_tave_air: Air temperature array (immutable)
gw_tau: Groundwater tau values (immutable)
gw_index: Groundwater circular buffer indices (MUTATED)
gw_silo: Groundwater circular buffer (MUTATED)
gw_sum: Groundwater running sums (MUTATED)
seg_tave_gw: Groundwater temperatures (MUTATED - output)
ss_tau: Subsurface tau values (immutable)
ss_index: Subsurface circular buffer indices (MUTATED)
ss_silo: Subsurface circular buffer (MUTATED)
ss_sum: Subsurface running sums (MUTATED)
seg_tave_ss: Subsurface temperatures (MUTATED - output)
"""
for jj in segment_order:
# Skip if marked as permanently invalid (no HRUs
# upstream/downstream)
if seginc_swrad[jj] < -99.0:
continue
# Update groundwater running average
idx_gw = gw_index[jj]
tau_gw = int(gw_tau[jj])
# Add new air temperature to silo
gw_silo[jj, idx_gw] = seg_tave_air[jj]
# Recompute sum and count valid entries (matches Fortran)
# This handles spin-up period correctly
at_sum = 0.0
at_cnt = 0
for j in range(tau_gw):
if gw_silo[jj, j] > -98.0:
at_sum += gw_silo[jj, j]
at_cnt += 1
# Compute average as sum / count
if at_cnt > 0:
seg_tave_gw[jj] = at_sum / at_cnt
else:
seg_tave_gw[jj] = 0.0
# Update index
if idx_gw < tau_gw - 1:
gw_index[jj] = idx_gw + 1
else:
gw_index[jj] = 0
# Update subsurface running average
idx_ss = ss_index[jj]
tau_ss = int(ss_tau[jj])
# Add new air temperature to silo
ss_silo[jj, idx_ss] = seg_tave_air[jj]
# Recompute sum and count valid entries (matches Fortran)
# This handles spin-up period correctly
at_sum = 0.0
at_cnt = 0
for j in range(tau_ss):
if ss_silo[jj, j] > -98.0:
at_sum += ss_silo[jj, j]
at_cnt += 1
# Compute average as sum / count
if at_cnt > 0:
seg_tave_ss[jj] = at_sum / at_cnt
else:
seg_tave_ss[jj] = 0.0
# Update index
if idx_ss < tau_ss - 1:
ss_index[jj] = idx_ss + 1
else:
ss_index[jj] = 0
@staticmethod
def _compute_lateral_temp(
segment_order,
seg_tave_water,
seginc_swrad,
seg_lateral_inflow,
seginc_sroff,
seginc_ssflow,
seginc_gwflow,
seg_melt,
seg_rain,
seg_tave_gw,
seg_tave_air,
seg_tave_ss,
melt_temp,
lat_temp_adj,
nowmonth,
seg_tave_lat,
):
"""Compute lateral flow temperatures for all segments.
This function can be optionally JIT-compiled with numba for
performance.
Args:
segment_order: Order to process segments (immutable)
seg_tave_water: Water temperature array (immutable, for skip
checks)
seginc_swrad: Solar radiation array (immutable, for skip checks)
seg_lateral_inflow: Lateral inflow array (immutable)
seginc_sroff: Surface runoff array (immutable)
seginc_ssflow: Subsurface flow array (immutable)
seginc_gwflow: Groundwater flow array (immutable)
seg_melt: Snowmelt array (immutable)
seg_rain: Rainfall array (immutable)
seg_tave_gw: Groundwater temperature array (immutable)
seg_tave_air: Air temperature array (immutable)
seg_tave_ss: Subsurface temperature array (immutable)
melt_temp: Melt temperature constant (immutable)
lat_temp_adj: Monthly lateral temperature adjustment array
(immutable)
nowmonth: Current month (immutable, 1-based)
seg_tave_lat: Lateral temperature array (MUTATED - output)
"""
for jj in segment_order:
# Skip if marked as never having flow (NaN = never has flow)
if np.isnan(seg_tave_water[jj]):
continue
# Skip if marked as permanently invalid
if seginc_swrad[jj] < -99.0:
continue
# Get segment values
sroff = seginc_sroff[jj]
ssflow = seginc_ssflow[jj]
gwflow = seginc_gwflow[jj]
melt = seg_melt[jj]
rain = seg_rain[jj]
tave_gw = seg_tave_gw[jj]
tave_air = seg_tave_air[jj]
tave_ss = seg_tave_ss[jj]
# Use lat_inflow function for detailed lateral temperature
# calculation
tl_avg, qlat = _lat_inflow(
seg_lateral_inflow[jj],
sroff,
ssflow,
gwflow,
melt_temp,
tave_gw,
tave_air,
tave_ss,
melt,
rain,
)
# Apply monthly adjustment
if not np.isnan(tl_avg):
tl_avg += lat_temp_adj[nowmonth - 1, jj]
# Ensure non-negative (also converts NaN to 0.0 to match Fortran)
if np.isnan(tl_avg) or tl_avg < 0.0:
tl_avg = 0.0
seg_tave_lat[jj] = tl_avg
@staticmethod
def _compute_water_temp(
segment_order,
seg_tave_water,
seginc_swrad,
seg_outflow,
upstream_count,
upstream_idx,
seg_lateral_inflow,
seg_tave_upstream,
seg_svi_all,
seg_inflow,
seg_tave_lat,
seginc_swrad_data,
seg_humid,
seg_elev,
seg_potet,
seg_shade,
seg_ccov,
seg_flow_width,
seg_slope,
seg_tave_gw,
seg_length,
albedo,
maxiter_sntemp,
track_energy_fluxes,
hs_terms,
ha_terms,
hf_terms,
hv_terms,
evap_terms,
t_abs4_terms,
upstream_flows,
lateral_flows,
atmos_exchange_factor=1.0,
):
"""Compute water temperature for all segments.
This function can be optionally JIT-compiled with numba for
performance.
Args:
segment_order: Order to process segments (immutable)
seg_tave_water: Water temperature array (MUTATED - input/output)
seginc_swrad: Solar radiation array (immutable, for skip checks)
seg_outflow: Segment outflow array (immutable)
upstream_count: Number of upstream segments for each segment
(immutable)
upstream_idx: Indices of upstream segments (immutable)
seg_lateral_inflow: Lateral inflow array (immutable)
seg_tave_upstream: Upstream temperature array (MUTATED - output)
seg_svi_all: Pre-computed vegetation shade index array (immutable)
seg_inflow: Segment inflow array (MUTATED - output)
seg_tave_lat: Lateral temperature array (immutable)
seginc_swrad_data: Solar radiation data for energy balance
(immutable)
seg_humid: Humidity array (immutable)
seg_elev: Elevation array (immutable)
seg_potet: Potential ET array (immutable)
seg_shade: Shade fraction array (immutable)
seg_ccov: Cloud cover array (immutable)
seg_flow_width: Flow width array (immutable)
seg_slope: Slope array (immutable)
seg_tave_gw: Groundwater temperature array (immutable)
seg_length: Segment length array (immutable)
albedo: Albedo value (immutable)
maxiter_sntemp: Maximum iterations for temperature solver
(immutable)
track_energy_fluxes: Whether to track energy flux components
(immutable)
hs_terms: Solar radiation terms (MUTATED if tracking - output)
ha_terms: Atmospheric longwave terms (MUTATED if tracking - output)
hf_terms: Friction heat terms (MUTATED if tracking - output)
hv_terms: Vegetation longwave terms (MUTATED if tracking - output)
evap_terms: Evaporation terms (MUTATED if tracking - output)
t_abs4_terms: Temperature^4 terms (MUTATED if tracking - output)
upstream_flows: Upstream flow values (MUTATED if tracking - output)
lateral_flows: Lateral flow values (MUTATED if tracking - output)
atmos_exchange_factor: Factor to amplify atmospheric exchange
(immutable, default=1.0)
"""
for jj in segment_order:
# Skip segments marked as never having flow (NaN = never has flow)
if np.isnan(seg_tave_water[jj]):
continue
# Compute upstream temperature and flow in one pass
# (consolidates _compute_upstream_temp and _compute_inflow)
flow_sum = 0.0
temp_sum = 0.0
upstream_flow = 0.0
for kk in range(upstream_count[jj]):
up_idx = upstream_idx[jj, kk]
if (
not np.isnan(seg_tave_water[up_idx])
and seg_tave_water[up_idx] > NOFLOW_TEMP
):
flow = seg_outflow[up_idx]
temp_sum += seg_tave_water[up_idx] * flow
flow_sum += flow
upstream_flow += flow
if flow_sum > 0.0:
seg_tave_upstream[jj] = temp_sum / flow_sum
else:
seg_tave_upstream[jj] = NOFLOW_TEMP
# Get svi from pre-computed array
svi = seg_svi_all[jj]
# Compute total inflow
seg_inflow[jj] = upstream_flow + seg_lateral_inflow[jj]
# Now compute water temperature (logic from _compute_water_temp)
# Skip if marked as permanently invalid
if seginc_swrad[jj] < -99.0:
seg_tave_water[jj] = np.nan
continue
# Check for no-flow conditions
if seg_outflow[jj] <= 0.0:
seg_tave_water[jj] = NOFLOW_TEMP
continue
lateral_flow = seg_lateral_inflow[jj]
qlat = lateral_flow * CFS_TO_CMS
# Match Fortran: check qlat in CMS (not lateral_flow in CFS)
if upstream_flow * CFS_TO_CMS <= NEARZERO and qlat <= NEARZERO:
seg_tave_water[jj] = NOFLOW_TEMP
continue
# Compute mixed inlet temperature
t_in = _compute_mixed_inlet_temp(
upstream_flow,
lateral_flow,
seg_tave_upstream[jj],
seg_tave_lat[jj],
)
if np.isnan(t_in):
seg_tave_water[jj] = NOFLOW_TEMP
continue
# Compute equilibrium temperature using full energy balance
result = _equilb(
t_in,
svi,
seg_inflow[jj],
seginc_swrad_data[jj],
seg_humid[jj],
seg_elev[jj],
seg_potet[jj],
seg_shade[jj],
seg_ccov[jj],
seg_flow_width[jj],
seg_slope[jj],
seg_tave_gw[jj],
albedo,
maxiter_sntemp,
)
te = result[0]
ak1 = result[1]
ak2 = result[2]
# Store energy flux terms if tracking
if track_energy_fluxes:
hs_terms[jj] = result[3]
ha_terms[jj] = result[4]
hf_terms[jj] = result[5]
hv_terms[jj] = result[6]
evap_terms[jj] = result[7]
t_abs4_terms[jj] = result[8]
upstream_flows[jj] = upstream_flow
lateral_flows[jj] = lateral_flow
# Compute final temperature using twavg function
qup = upstream_flow
tl_avg = seg_tave_lat[jj]
seg_tave_water[jj] = _twavg(
qup,
t_in,
qlat,
tl_avg,
te,
ak1,
ak2,
seg_flow_width[jj],
seg_length[jj],
atmos_exchange_factor,
)
[docs]
class PRMSStreamTemp(PRMSStreamTempHumidityCBH):
"""PRMS stream temperature with monthly segment humidity parameter.
Corresponds to ``strmtemp_humidity_flag=1`` where humidity is supplied via
the ``seg_humidity`` parameter (12 monthly values per segment) rather than
a time-varying CBH input. All other behaviour is identical to
:class:`PRMSStreamTempHumidityCBH`.
Args:
control: a Control object
discretization: a discretization of class Parameters
parameters: a parameter object of class Parameters — must include
``seg_humidity`` with shape ``(nmonth, nsegment)``
seg_outflow: Streamflow leaving each segment (from PRMSChannel)
seg_lateral_inflow: Lateral inflow entering each segment
swrad: Solar radiation for each HRU
potet: Potential ET for each HRU
sroff: Surface runoff for each HRU
ssres_flow: Subsurface flow for each HRU
gwres_flow: Groundwater flow for each HRU
tavgc: Average air temperature for each HRU in Celsius
snowmelt: Snowmelt for each HRU
hru_rain: Rainfall for each HRU
soltab_potsw: Potential shortwave radiation table for current day
seg_flow_width: Flow-dependent width from PRMSHydraulicGeometryFull
seg_flow_depth: Flow-dependent depth from PRMSHydraulicGeometryFull
seg_flow_area: Flow-dependent cross-sectional area
seg_flow_velocity: Flow-dependent velocity
stream_shade: PRMSStreamShade instance
stream_shade_class: Class to use for stream shade computation
stream_shade_parameters: Parameters for stream shade computation
imbalance_behavior: one of ["defer", None, "warn", "error"]
verbose: Print extra information or not?
use_vectorized_shade: Use vectorized shade computation
track_energy_fluxes: Track energy flux terms for budget analysis
"""
[docs]
def __init__(
self,
control: Control,
discretization: Parameters,
parameters: Parameters,
seg_outflow: adaptable,
seg_lateral_inflow: adaptable,
swrad: adaptable,
potet: adaptable,
sroff: adaptable,
ssres_flow: adaptable,
gwres_flow: adaptable,
tavgc: adaptable,
snowmelt: adaptable,
hru_rain: adaptable,
soltab_potsw: adaptable,
seg_flow_width: adaptable = None,
seg_flow_depth: adaptable = None,
seg_flow_area: adaptable = None,
seg_flow_velocity: adaptable = None,
stream_shade: Union[PRMSStreamShade, None] = None,
stream_shade_class: Union[type, None] = None,
stream_shade_parameters: Union[Parameters, Path, None] = None,
calc_method: Literal["numba", "numpy", None] = None,
imbalance_behavior: Literal["defer", None, "warn", "error"] = "defer",
input_aliases: dict = None,
verbose: bool = False,
use_vectorized_shade: bool = True,
track_energy_fluxes: bool = True,
atmos_exchange_factor: float = 1.0,
) -> None:
super().__init__(
control=control,
discretization=discretization,
parameters=parameters,
seg_outflow=seg_outflow,
seg_lateral_inflow=seg_lateral_inflow,
swrad=swrad,
potet=potet,
sroff=sroff,
ssres_flow=ssres_flow,
gwres_flow=gwres_flow,
tavgc=tavgc,
snowmelt=snowmelt,
hru_rain=hru_rain,
soltab_potsw=soltab_potsw,
humidity_hru=None,
seg_flow_width=seg_flow_width,
seg_flow_depth=seg_flow_depth,
seg_flow_area=seg_flow_area,
seg_flow_velocity=seg_flow_velocity,
stream_shade=stream_shade,
stream_shade_class=stream_shade_class,
stream_shade_parameters=stream_shade_parameters,
calc_method=calc_method,
imbalance_behavior=imbalance_behavior,
input_aliases=input_aliases,
verbose=verbose,
use_vectorized_shade=use_vectorized_shade,
track_energy_fluxes=track_energy_fluxes,
atmos_exchange_factor=atmos_exchange_factor,
)
self.name = "PRMSStreamTemp"
[docs]
@staticmethod
def get_dimensions() -> tuple:
return ("nhru", "nsegment", "nmonth", "ndoy")
[docs]
@staticmethod
def get_parameters() -> tuple:
return (
"doy",
"hru_segment",
"hru_area",
"hru_slope",
"tosegment",
"albedo",
"lat_temp_adj",
"seg_length",
"seg_slope",
"seg_lat",
"seg_elev",
"seg_close",
"ss_tau",
"gw_tau",
"melt_temp",
"maxiter_sntemp",
"stream_tave_init",
"seg_humidity",
)
def _compute_segment_aggregates(self) -> None:
"""Compute segment aggregates using monthly seg_humidity parameter.
Calls _compute_segment_aggregates_numba for all non-humidity variables,
then sets seg_humid directly from the seg_humidity parameter for the
current month (strmtemp_humidity_flag=1 path).
"""
_compute_segment_aggregates_numba(
self.nhru,
self.nsegment,
self.hru_segment,
self.hru_area,
self.sroff,
self.ssres_flow,
self.gwres_flow,
self.swrad,
self.segment_hruarea,
self.segment_up,
self.tosegment,
self.seginc_sroff,
self.seginc_ssflow,
self.seginc_gwflow,
self.seginc_swrad,
self.tavgc,
self.snowmelt,
self.hru_rain,
self.soltab_potsw,
self._hru_cossl,
self.segment_order,
self.seg_close,
self.seg_tave_air,
self.seg_melt,
self.seg_rain,
self.seg_ccov,
)
# Set seg_humid from monthly parameter.
# seg_humidity has dims (nmonth, nsegment); Nowmonth is 1-based.
# Matches PRMS stream_temp.f90 flag==1 path:
# Seg_humid(i) = Seg_humidity(i, Nowmonth)
nowmonth = self.control.current_month
self.seg_humid[:] = self.seg_humidity[nowmonth - 1, :]
@nb.jit(nopython=True)
def _compute_seg_humid_cbh_numba(
nhru,
nsegment,
hru_segment,
hru_area,
humidity_hru,
segment_hruarea,
segment_order,
seg_close,
seg_humid,
):
"""Compute seg_humid from CBH humidity_hru input.
Implements the flag==0 path from PRMS 5.2.1.1 stream_temp.f90:
- Reset seg_humid to 0 (ELSE branch, reinstated fix)
- Accumulate area-weighted humidity from HRUs, converting percent->fraction
- Normalise by segment HRU area in a single segment_order pass, copying
from seg_close for segments with no HRUs (matching PRMS single-pass).
Args:
nhru: Number of HRUs (immutable)
nsegment: Number of segments (immutable)
hru_segment: HRU to segment mapping (immutable, 1-based)
hru_area: HRU areas in acres (immutable)
humidity_hru: HRU relative humidity in percent 0-100 (immutable)
segment_hruarea: Total HRU area per segment (immutable)
segment_order: Order to process segments (immutable)
seg_close: Closest segment with HRUs for each segment (immutable)
seg_humid: Segment humidity in decimal fraction (MUTATED - output)
"""
# Reset before accumulation — matches corrected PRMS 5.2.1.1 ELSE branch.
# See humidity_bug_prms_fix.md "Secondary Bug" for full rationale.
seg_humid[:] = 0.0
# Accumulate area-weighted humidity from HRUs.
# *0.01 applied during accumulation (percent -> decimal fraction) so units
# are consistent throughout (stream_temp.f90 line 807):
# Seg_humid(i) = Seg_humid(i) + Humidity_hru(j) * 0.01 * harea
for j in range(nhru):
seg_idx = hru_segment[j]
if seg_idx > 0:
i = seg_idx - 1
seg_humid[i] += humidity_hru[j] * 0.01 * hru_area[j]
# Normalise in a single segment_order pass (matches PRMS single-pass).
# For no-HRU segments, copy from seg_close in the same pass so that the
# ordering matches PRMS exactly (stream_temp.f90 lines 829-844).
for jj in range(nsegment):
i = segment_order[jj]
if segment_hruarea[i] > NEARZERO:
seg_humid[i] /= segment_hruarea[i]
else:
seg_humid[i] = seg_humid[seg_close[i]]
@nb.jit(nopython=True)
def _compute_segment_aggregates_numba(
nhru,
nsegment,
hru_segment,
hru_area,
sroff,
ssres_flow,
gwres_flow,
swrad,
segment_hruarea,
segment_up,
tosegment,
seginc_sroff,
seginc_ssflow,
seginc_gwflow,
seginc_swrad,
# Meteorological aggregation inputs (excluding humidity)
tavgc,
snowmelt,
hru_rain,
soltab_potsw,
hru_cossl,
segment_order,
seg_close,
# Output arrays for meteorological variables (excluding seg_humid)
seg_tave_air,
seg_melt,
seg_rain,
seg_ccov,
):
"""Compute segment aggregate variables from HRU inputs.
Humidity (seg_humid) is handled separately by _compute_seg_humid_cbh_numba
(flag==0) or set directly from a parameter (flag==1).
Args:
nhru: Number of HRUs (immutable)
nsegment: Number of segments (immutable)
hru_segment: HRU to segment mapping (immutable, 1-based)
hru_area: HRU areas (immutable)
sroff: Surface runoff from HRUs (immutable)
ssres_flow: Subsurface flow from HRUs (immutable)
gwres_flow: Groundwater flow from HRUs (immutable)
swrad: Solar radiation from HRUs (immutable)
segment_hruarea: Total HRU area per segment (immutable)
segment_up: Upstream segment indices (immutable, 0-based)
tosegment: Downstream segment indices (immutable, 1-based)
seginc_sroff: Segment surface runoff (MUTATED - output)
seginc_ssflow: Segment subsurface flow (MUTATED - output)
seginc_gwflow: Segment groundwater flow (MUTATED - output)
seginc_swrad: Segment solar radiation (MUTATED - output)
tavgc: HRU average temperature in Celsius (immutable)
snowmelt: HRU snowmelt (immutable)
hru_rain: HRU rainfall (immutable)
soltab_potsw: Potential shortwave radiation for current day (immutable)
hru_cossl: Cosine of HRU slope (immutable)
segment_order: Order to process segments (immutable)
seg_close: Closest segment with HRUs for each segment (immutable)
seg_tave_air: Segment air temperature (MUTATED - output)
seg_melt: Segment snowmelt (MUTATED - output)
seg_rain: Segment rainfall (MUTATED - output)
seg_ccov: Segment cloud cover (MUTATED - output)
"""
# Initialize segment aggregate variables
seginc_sroff[:] = 0.0
seginc_ssflow[:] = 0.0
seginc_gwflow[:] = 0.0
seginc_swrad[:] = 0.0
# Initialize meteorological segment variables
seg_tave_air[:] = 0.0
seg_melt[:] = 0.0
seg_rain[:] = 0.0
seg_ccov[:] = 0.0
# Constants (from PRMS_SET_TIME)
# Cfs_conv converts acre-inches/day to cfs
# FT2_PER_ACRE / INCHES_PER_FOOT / SECS_PER_DAY
# = 43560 / 12 / 86400
cfs_conv = 43560.0 / 12.0 / 86400.0
# Aggregate HRU values to segments
for j in range(nhru):
seg_idx = hru_segment[j]
# Check if HRU contributes to a segment (seg_idx > 0)
# hru_segment is 1-based, so valid segments are > 0
if seg_idx > 0:
# Convert to 0-based index
i = seg_idx - 1
harea = hru_area[j]
# Convert from inches to cfs (area * inches/day * cfs_conv)
tocfs = harea * cfs_conv
# Accumulate flow components (converted to cfs)
seginc_sroff[i] += sroff[j] * tocfs
seginc_ssflow[i] += ssres_flow[j] * tocfs
seginc_gwflow[i] += gwres_flow[j] * tocfs
# Accumulate area-weighted radiation
seginc_swrad[i] += swrad[j] * harea
# Compute cloud cover for this HRU (stream_temp.f90 line 760-778)
# ccov = 1.0 - (swrad / soltab_potsw * hru_cossl)
potsw = soltab_potsw[j]
if potsw <= 10.0:
ccov = 1.0 - (swrad[j] / 10.0 * hru_cossl[j])
else:
ccov = 1.0 - (swrad[j] / potsw * hru_cossl[j])
# Clamp ccov to [0, 1]
if ccov < NEARZERO:
ccov = 0.0
elif ccov > 1.0:
ccov = 1.0
# Accumulate area-weighted meteorological variables
seg_tave_air[i] += tavgc[j] * harea
seg_ccov[i] += ccov * harea
seg_melt[i] += snowmelt[j] * harea
seg_rain[i] += hru_rain[j] * harea
# Process seginc_swrad in numerical order first (matches original logic)
# Divide radiation by segment HRU area to get averages
# Process in numerical order to match routing.f90 (line 741-810)
for i in range(nsegment):
if segment_hruarea[i] > NEARZERO:
seginc_swrad[i] /= segment_hruarea[i]
else:
# Segment has no HRUs - search upstream then downstream
# (matches routing.f90 line 746-805)
# Search upstream first (routing.f90 line 749-772)
this_seg = i
found = False
max_iter = nsegment # Prevent infinite loops
iter_count = 0
while not found and iter_count < max_iter:
iter_count += 1
# segment_up contains 0-based indices where 0 means
# "no upstream". However, if segment 0 is upstream, we can't
# distinguish it from "no upstream". The original code assumes
# segment 0 is never upstream of anything
upstream_seg = segment_up[this_seg]
# Check if headwater (no upstream)
if upstream_seg == 0 and this_seg != 0:
# No upstream segment exists
break
elif upstream_seg == 0 and this_seg == 0:
# this_seg is segment 0, which has no upstream
break
# Move to upstream segment (already 0-based)
this_seg = upstream_seg
if segment_hruarea[this_seg] > NEARZERO:
# Found segment with HRUs - compute average from
# accumulated value
seginc_swrad[i] = (
seginc_swrad[this_seg] / segment_hruarea[this_seg]
)
found = True
break
# If not found upstream, search downstream
# (routing.f90 line 776-800)
if not found:
this_seg = i
iter_count = 0
while not found and iter_count < max_iter:
iter_count += 1
# tosegment is 1-based, 0 means no downstream
downstream_seg = tosegment[this_seg]
# Check if terminal segment (no downstream)
if downstream_seg == 0:
break
# Move to downstream segment (convert 1-based to 0-based)
this_seg = downstream_seg - 1
if segment_hruarea[this_seg] > NEARZERO:
# Found segment with HRUs - compute average from
# accumulated value
seginc_swrad[i] = (
seginc_swrad[this_seg] / segment_hruarea[this_seg]
)
found = True
break
# If still not found, set to invalid marker
# (routing.f90 line 803-805)
if not found:
seginc_swrad[i] = -99.9
# Process meteorological variables in single pass (matches Fortran).
# Process in segment_order - for segments without HRUs, copy from
# seg_close. Note: seg_close may point to upstream/downstream segments
# not yet processed in some edge cases, but this matches the Fortran
# single-pass behavior (stream_temp.f90 lines 820-844).
for jj in range(nsegment):
i = segment_order[jj]
if segment_hruarea[i] > NEARZERO:
# Segment has HRUs - compute area-weighted averages
seg_tave_air[i] /= segment_hruarea[i]
seg_ccov[i] /= segment_hruarea[i]
seg_melt[i] /= segment_hruarea[i]
seg_rain[i] /= segment_hruarea[i]
else:
# Segment has no HRUs - use values from seg_close
# (stream_temp.f90 lines 832-844)
close_seg = seg_close[i]
seg_tave_air[i] = seg_tave_air[close_seg]
seg_ccov[i] = seg_ccov[close_seg]
seg_melt[i] = seg_melt[close_seg]
seg_rain[i] = seg_rain[close_seg]
@nb.jit(nopython=True)
def _compute_seg_potet_numba(
nhru,
nsegment,
hru_segment,
hru_area,
potet,
segment_order,
segment_hruarea,
seg_close,
seg_potet,
):
"""Compute seg_potet using stream_temp.f90 logic.
This numba-optimized function replaces _compute_seg_potet.
Args:
nhru: Number of HRUs (immutable)
nsegment: Number of segments (immutable)
hru_segment: HRU to segment mapping (immutable, 1-based)
hru_area: HRU areas (immutable)
potet: Potential ET from HRUs (immutable)
segment_order: Order to process segments (immutable)
segment_hruarea: Total HRU area per segment (immutable)
seg_close: Closest segment with HRUs for each segment (immutable)
seg_potet: Segment potential ET (MUTATED - output)
"""
# Initialize
seg_potet[:] = 0.0
# Accumulate from HRUs
for j in range(nhru):
seg_idx = hru_segment[j]
if seg_idx > 0:
i = seg_idx - 1
seg_potet[i] += potet[j] * hru_area[j]
# Process in segment_order
for jj in range(nsegment):
i = segment_order[jj]
if segment_hruarea[i] > NEARZERO:
seg_potet[i] /= segment_hruarea[i]
else:
# Segment has no HRUs - use seg_close
close_seg = seg_close[i]
seg_potet[i] = seg_potet[close_seg]
@nb.jit(nopython=True)
def _teak1(a_coef, b_coef, c_coef, d_coef, teq, maxiter_sntemp):
"""Solve for equilibrium temperature using Newton-Raphson iteration.
This is the teak1 function from PRMS.
Args:
a_coef: Coefficient (immutable)
b_coef: Coefficient (immutable)
c_coef: Coefficient (immutable)
d_coef: Coefficient (immutable)
teq: Initial guess for equilibrium temperature (immutable)
maxiter_sntemp: Maximum iterations (immutable)
Returns:
teq: Equilibrium temperature (degC)
ak1c: First-order thermal exchange coefficient
"""
# Local variables
fte = 99999.0
delte = 99999.0
kount = 0
# Begin Newton iteration solution for TE
while kount < maxiter_sntemp:
if np.abs(fte) < TOLRN:
break
if abs(delte) < TOLRN:
break
teabs = teq + ZERO_C
fte = (
(a_coef * (teabs**4.0))
+ (b_coef * teq)
- (c_coef * (teq**2.0))
- d_coef
)
fpte = (4.0 * a_coef * (teabs**3.0)) + b_coef - (2.0 * c_coef * teq)
delte = fte / fpte
teq = teq - delte
kount += 1
# Determine 1st thermal exchange coefficient
ak1c = (
(4.0 * a_coef * ((teq + ZERO_C) ** 3.0))
+ b_coef
- (2.0 * c_coef * teq)
)
return teq, ak1c
@nb.jit(nopython=True)
def _equilb(
t_o,
svi,
seg_inflow,
seginc_swrad,
seg_humid,
seg_elev,
seg_potet,
seg_shade,
seg_ccov,
seg_flow_width,
seg_slope,
seg_tave_gw,
albedo,
maxiter_sntemp,
):
"""Compute equilibrium temperature using full energy balance.
This is the equilb function from PRMS.
Args:
t_o: Initial temperature (immutable, degC)
svi: Vegetation shade index (immutable)
seg_inflow: Segment inflow (immutable, CFS)
seginc_swrad: Incident shortwave radiation (immutable)
seg_humid: Segment humidity (immutable)
seg_elev: Segment elevation (immutable)
seg_potet: Segment potential ET (immutable)
seg_shade: Segment shade fraction (immutable)
seg_ccov: Cloud cover fraction (immutable)
seg_flow_width: Flow width (immutable)
seg_slope: Segment slope (immutable)
seg_tave_gw: Groundwater temperature (immutable)
albedo: Albedo (immutable)
maxiter_sntemp: Maximum iterations (immutable)
Returns:
te: Equilibrium temperature (degC)
ak1: First-order thermal exchange coefficient
ak2: Second-order thermal exchange coefficient
hs: Net shortwave solar radiation (W/m²)
ha: Atmospheric longwave radiation (W/m²)
hf: Friction heating (W/m²)
hv: Vegetation longwave radiation (W/m²)
evap: Evaporation rate (m/s)
t_abs4: Temperature to 4th power (K^4)
"""
# Local Variables
taabs = float(t_o + ZERO_C)
vp_sat = 6.108 * np.exp(17.26939 * t_o / (t_o + 237.3))
# Convert units and set up parameters
q_init = max(seg_inflow * CFS_TO_CMS, NEARZERO)
sw_power = 11.63 / 24.0 * float(seginc_swrad)
# If humidity is 1.0, there is a divide by zero below
foo = min(seg_humid, 0.99)
# Compute atmospheric pressure based on segment elevation
press = 1013.0 - (0.1055 * seg_elev)
bow_coeff = (0.00061 * press) / (vp_sat * (1.0 - foo))
evap = float(seg_potet * MPS_CONVERT)
# Heat flux components
# Ha: atmospheric-emitted longwave radiation
# Note: Fortran uses Seg_humid directly, not foo (clamped for bow_coeff)
ha = (
(3.354939e-8 + 2.74995e-9 * np.sqrt(seg_humid * vp_sat))
* (1.0 - seg_shade)
* (1.0 + (0.17 * (seg_ccov**2)))
) * (taabs**4)
# Hf: heat dissipated from potential energy by friction
hf = 9805.0 * (q_init / seg_flow_width) * seg_slope
# Hs: net flux from shortwave solar radiation
hs = (1.0 - seg_shade) * sw_power * (1.0 - albedo)
# Hv: longwave radiation emitted by riparian vegetation
hv = 5.24e-8 * svi * (taabs**4)
# Determine equilibrium coefficients
del_ht = 2.36e06
ltnt_ht = 2495.0e06
b = bow_coeff * evap * (ltnt_ht + (del_ht * t_o)) + AKZ - (del_ht * evap)
c = bow_coeff * del_ht * evap
d = (ha + hv + hf + hs) + (
ltnt_ht * evap * ((bow_coeff * t_o) - 1.0) + (seg_tave_gw * AKZ)
)
# Determine equilibrium temperature & 1st order thermal exchange coef
ted = t_o
ted, ak1d = _teak1(A, b, c, d, ted, maxiter_sntemp)
# Determine 2nd order thermal exchange coefficient
hnet = (A * ((t_o + ZERO_C) ** 4)) + (b * t_o) - (c * (t_o**2.0)) - d
delt = t_o - ted
if abs(delt) < NEARZERO:
ak2d = 0.0
else:
ak2d = ((delt * ak1d) - hnet) / (delt**2)
return ted, ak1d, ak2d, hs, ha, hf, hv, evap, taabs**4
@nb.jit(nopython=True)
def _twavg(
qup,
t0,
qlat,
tl_avg,
te,
ak1,
ak2,
seg_flow_width,
seg_length,
atmos_exchange_factor=1.0,
):
"""Compute average water temperature with lateral inflows.
This is the twavg function from PRMS.
Args:
qup: Upstream flow (immutable, cfs)
t0: Inlet temperature (immutable, degC)
qlat: Lateral flow (immutable, cms)
tl_avg: Lateral flow temperature (immutable, degC)
te: Equilibrium temperature (immutable, degC)
ak1: First-order thermal exchange coefficient (immutable)
ak2: Second-order thermal exchange coefficient (immutable)
seg_flow_width: Flow width (immutable)
seg_length: Segment length (immutable)
atmos_exchange_factor: Factor to amplify atmospheric exchange effects
(immutable, default=1.0)
Returns:
tw: Average water temperature (degC)
"""
# Determine equation parameters
q_init = float(qup * CFS_TO_CMS)
ql = float(qlat)
width = seg_flow_width
length = seg_length
# Local Variables
tep = 0.0
b = 0.0
r = 0.0
rexp = 0.0
tw = 0.0
delt = 0.0
denom = 0.0
if ql <= NEARZERO:
# Zero lateral flow
tep = te
b = (ak1 * atmos_exchange_factor * width) / 4182.0e03
rexp = -1.0 * (b * length) / q_init
r = np.exp(rexp)
elif ql < 0.0:
# Losing stream (should not happen in PRMS)
tep = te
b = (ql / length) + ((ak1 * atmos_exchange_factor * width) / 4182.0e03)
rexp = (ql - (b * length)) / ql
r = 1.0 + (ql / q_init)
r = r**rexp
elif ql > NEARZERO and q_init <= NEARZERO:
tep = te
b = (ak1 * atmos_exchange_factor * width) / 4182.0e03
rexp = -1.0 * (b * length) / ql
r = np.exp(rexp)
else:
b = (ql / length) + ((ak1 * atmos_exchange_factor * width) / 4182.0e03)
tep = (
((ql / length) * tl_avg)
+ (((ak1 * atmos_exchange_factor * width) / (4182.0e03)) * te)
) / b
if ql > 0.0:
rexp = -b / (ql / length)
else:
rexp = 0.0
if q_init < NEARZERO:
r = 2.0
else:
r = 1.0 + (ql / q_init)
r = r**rexp
# Determine water temperature
delt = tep - t0
denom = 1.0 + (ak2 / ak1) * delt * (1.0 - r)
if np.abs(denom) < NEARZERO:
denom = np.sign(denom) * NEARZERO if denom != 0.0 else NEARZERO
tw = tep - (delt * r / denom)
if tw < 0.0:
tw = 0.0
return tw
@nb.jit(nopython=True)
def _lat_inflow(
seg_lateral_inflow,
seginc_sroff,
seginc_ssflow,
seginc_gwflow,
melt_temp,
tave_gw,
tave_air,
tave_ss,
melt,
rain,
):
"""Compute lateral inflow temperature from components.
This is the lat_inflow function from PRMS.
Args:
seg_lateral_inflow: Total lateral inflow to segment (immutable, cfs)
seginc_sroff: Surface runoff component (immutable, cfs)
seginc_ssflow: Subsurface flow component (immutable, cfs)
seginc_gwflow: Groundwater flow component (immutable, cfs)
melt_temp: Snowmelt temperature (immutable, degC)
tave_gw: Groundwater temperature (immutable, degC)
tave_air: Air temperature (immutable, degC)
tave_ss: Subsurface temperature (immutable, degC)
melt: Snowmelt (immutable, inches)
rain: Rainfall (immutable, inches)
Returns:
tl_avg: Weighted average lateral inflow temperature (degC)
qlat: Lateral inflow (cms)
"""
weight_roff = 0.0
weight_ss = 0.0
weight_gw = 0.0
melt_wt = 0.0
rain_wt = 0.0
troff = 0.0
tss = 0.0
qlat = seg_lateral_inflow * CFS_TO_CMS
tl_avg = 0.0
if qlat > 0.0:
weight_roff = float((seginc_sroff * CFS_TO_CMS) / qlat)
weight_ss = float((seginc_ssflow * CFS_TO_CMS) / qlat)
weight_gw = float((seginc_gwflow * CFS_TO_CMS) / qlat)
else:
weight_roff = 0.0
weight_ss = 0.0
weight_gw = 0.0
if melt > 0.0:
melt_wt = melt / (melt + rain)
if melt_wt < 0.0:
melt_wt = 0.0
if melt_wt > 1.0:
melt_wt = 1.0
rain_wt = 1.0 - melt_wt
if rain == 0.0:
troff = melt_temp
tss = melt_temp
else:
troff = melt_temp * melt_wt + tave_air * rain_wt
tss = melt_temp * melt_wt + tave_ss * rain_wt
else:
troff = tave_air
tss = tave_ss
if weight_roff == 0.0 and weight_ss == 0.0 and weight_gw == 0.0:
tl_avg = np.nan
qlat = np.nan
else:
tl_avg = weight_roff * troff + weight_ss * tss + weight_gw * tave_gw
return tl_avg, qlat
@nb.jit(nopython=True)
def _compute_mixed_inlet_temp(
upstream_flow, lateral_flow, seg_tave_upstream, seg_tave_lat
):
"""Compute mixed inlet temperature from upstream and lateral sources.
Args:
upstream_flow: Flow from upstream segments (immutable, cfs)
lateral_flow: Lateral inflow (immutable, cfs)
seg_tave_upstream: Upstream temperature (immutable, degC)
seg_tave_lat: Lateral flow temperature (immutable, degC)
Returns:
Mixed inlet temperature (degC)
"""
upstream_ready = upstream_flow > 0.0 and not np.isnan(seg_tave_upstream)
lateral_ready = lateral_flow > 0.0 and not np.isnan(seg_tave_lat)
if not upstream_ready and not lateral_ready:
return np.nan
elif upstream_ready and not lateral_ready:
return seg_tave_upstream
elif lateral_ready and not upstream_ready:
return seg_tave_lat
else:
# Both sources present - compute weighted average
return (
seg_tave_upstream * upstream_flow + seg_tave_lat * lateral_flow
) / (upstream_flow + lateral_flow)