import pathlib as pl
from copy import deepcopy
from typing import Literal, Union
from warnings import warn
import numpy as np
import pyPRMS as pp
import xarray as xr
import pywatershed as pws
from ..base import meta as pws_meta
from ..constants import pyprms_meta as _pyprms_meta
from .segment_from_tracing import (
get_from_segment_params,
get_nhm_segs_ids_above_seg,
)
cbh_var_ascii_format = {
"tmax": "%0.2f",
"tmin": "%0.2f",
"prcp": "%0.2f",
"rhavg": "%0.2f",
"actet": "%0.4f",
"potet": "%0.4f",
}
cbh_vars_rename = {"actet": "aet_observed", "potet": "pet_observed"}
cbh_vars_rename_inv = {vv: kk for kk, vv in cbh_vars_rename.items()}
# TODO: subset restarts
# TODO: maybe something todo, there are no real checks on time or subsetting
# in time.
# Only some of these are in the GSFLOW 2.4.0 input instruction word doc.
additional_cbh_meta = {
"AET_cbh_file": {
"datatype": "string",
"description": (
"Pathname of the CBH file of pre-processed actual "
"evapotransipration input data for each HRU to specify "
"variable actet."
),
"context": "scalar",
},
"AET_module": {
"datatype": "string",
"description": (
"Module to read actual evapotranspiration CBH File; specify "
"climate_hru to activate."
),
"context": "scalar",
},
"ag_frac_dynamic": {
"datatype": "string",
"description": "Pathname to the dynamic ag fraction parameter file.",
"context": "scalar",
},
"dyn_ag_frac_flag": {
"datatype": "int32",
"description": "Flag governing the use of ag_frac_dynamic fie data.",
"context": "scalar",
},
"forcing_check_flag": {
"datatype": "int32",
"description": (
"Flag to indicate performance of precipitation and temperature "
"checks for hru_ppt < 0.0, hru_rain < 0.0, hru_snow < 0.0, "
"tmax < tmin, tminf < -150.0 and tmaxf > 200.0 (0=no; 1=yes)."
),
"context": "scalar",
},
"PET_ag_module": {
"datatype": "string",
"description": (
"Module to read potential evapotranspiration CBH File; specify "
"climate_hru to activate."
),
"context": "scalar",
},
"PET_cbh_file": {
"datatype": "string",
"description": (
"Pathname of the CBH file of pre-processed potential "
"evapotransipration input data for each HRU to specify "
"variable potet."
),
"context": "scalar",
},
"iter_aet_flag": {
"datatype": "int32",
"description": (
"Flag to indicate to estimate irrigation water based on the "
"difference between actual evapotranspiration and specified "
"actual evapotranspiration; computed in soilzone_ag (0=no; 1=yes)"
),
"context": "scalar",
},
"springfrost_dynamic": {
"datatype": "string",
"description": (
"Pathname to the dynamic spring frost parameter file."
),
"context": "scalar",
"force_default": False,
},
"dyn_springfrost_flag": {
"datatype": "int32",
"description": (
"Flag governing the use of springfrost_dynamic file data "
"(0=off; 1=on)."
),
"context": "scalar",
"force_default": False,
},
"fallfrost_dynamic": {
"datatype": "string",
"description": ("Pathname to the dynamic fall frost parameter file."),
"context": "scalar",
"force_default": False,
},
"dyn_fallfrost_flag": {
"datatype": "int32",
"description": (
"Flag governing the use of fallfrost_dynamic file data "
"(0=off; 1=on)."
),
"context": "scalar",
"force_default": False,
},
}
# TODO move this code in to a global function to be called. like init_module.
# Make a local deep copy to avoid modifying the shared constants.pyprms_meta
pyprms_meta = deepcopy(_pyprms_meta)
pyprms_meta["control"] = pyprms_meta["control"] | additional_cbh_meta
ppmp = pyprms_meta["parameters"]
dt_conv = {"I": "int32", "F": "float32"}
class id_dict(dict):
@staticmethod
def __missing__(key):
return key
dim_remap = id_dict(scalar="one", ndoy="ndays")
for kk in pws_meta.parameters.keys():
if kk not in ppmp.keys():
kk_pws_meta = pws_meta.parameters[kk].copy()
kk_pws_meta["dimensions"] = list(kk_pws_meta.pop("dims"))
kk_pws_meta["datatype"] = dt_conv[kk_pws_meta.pop("type")]
kk_pws_meta["dimensions"] = [
dim_remap[ii] for ii in kk_pws_meta["dimensions"]
]
ppmp[kk] = kk_pws_meta
"""Notes:
At the moment the assumption is that the full domain is provided PRMS input
files with only the option for NetCDF files.
"""
cbh_ctl_var_map = pws.constants.cbh_ctl_var_map
cbh_control_names = list(cbh_ctl_var_map.keys())
cbh_var_names = list(cbh_ctl_var_map.values())
[docs]
class DomainSubset:
"""Subset a domain.
Args:
full_control_file: A pathlib.Path to the full domain control file.
Currently, the "param_file" field must a be a scalar (multiple
parameter files specified will throw an error).
sub_nhm_ids: Either an np.ndarray of the nhm_ids or None if subsetting
above a single nhm_seg.
sub_nhm_segs: Either an np.ndarray of the nhm_segs or a np.int64 if
subsetting above a single nhm_seg.
full_cbh_nc_files_dict: Only used when NetCDF CBH files are being
used, which should not be specified in a PRMS control file. If the
CBH files in the control file are to be used, this argument is None.
The argument is to be a dict of name:pathlib.Path where the names are
in the list:
["albedo_day", "cloud_cover_day", "humidity_day", "potet_day",
"precip_day", "swrad_day", "tmax_day", "tmin_day", "transp_day",
"windspeed_day", "AET_cbh_file", "PET_cbh_file"]
output_format: Either "pywatershed" (default) or "PRMS".
"""
[docs]
def __init__(
self,
full_control_file: pl.Path,
sub_nhm_ids: Union[np.ndarray, None],
sub_nhm_segs: Union[np.ndarray, np.int64],
full_cbh_nc_files_dict: Union[dict[str, pl.Path], None],
start_one_below_nhm_seg: bool = False,
output_format: Literal["pywatershed", "PRMS", None] = None,
from_seg_calc_parallel: bool = False,
from_seg_calc_check: bool = False,
start_time: Union[np.datetime64, None] = None,
end_time: Union[np.datetime64, None] = None,
) -> None:
# Bring in arguments with vetting.
self._full_control_file = full_control_file
if not isinstance(sub_nhm_ids, (np.ndarray, type(None))):
msg = f"The type of sub_nhm_ids, '{type(sub_nhm_ids)}' is invald."
raise ValueError(msg)
# <
if not isinstance(sub_nhm_segs, (np.ndarray, np.int64)):
msg = f"The type of sub_nhm_segs, '{type(sub_nhm_segs)}' is invald"
raise ValueError(msg)
self._sub_nhm_ids = sub_nhm_ids
if sub_nhm_ids is None:
self._start_seg = sub_nhm_segs
self._sub_nhm_segs = None
else:
self._start_seg = None
self._sub_nhm_segs = sub_nhm_segs
self._start_one_below_nhm_seg = start_one_below_nhm_seg
self._output_format = output_format
self._from_seg_calc_parallel = from_seg_calc_parallel
self._from_seg_calc_check = from_seg_calc_check
self._start_time = start_time
self._end_time = end_time
# Get the full domain parameters
# TODO: probably rename this one _full_control_pws and also track
# a _full_control_pp as well?
self._full_control = pws.Control.load_prms(
self._full_control_file, warn_unused_options=False
)
control = pp.ControlFile(
filename=self._full_control_file,
metadata=pyprms_meta,
verbose=False,
)
self._full_control_var_names = control.control_variables.keys()
del control
self._cbh_control_names = list(
set(cbh_control_names).intersection(self._full_control_var_names)
)
param_file = self._full_control.options["parameter_file"]
if not isinstance(param_file, str) and len(param_file) > 1:
raise NotImplementedError(
"DomainSubset only works with single parameter files at the "
"moment. Please consolidate your parameter files or extend "
"pyPRMS to handle multiple parameter files."
)
if full_cbh_nc_files_dict is None:
raise NotImplementedError(
"Currently only Netcdf CBH files accepted."
)
self._full_cbh_nc_files_dict = {}
for kk in self._cbh_control_names:
if kk in self._full_control.control_variables.keys():
self._full_cbh_nc_files_dict[kk] = (
self._full_control.control_variables[kk]
)
else:
self._full_cbh_nc_files_dict = full_cbh_nc_files_dict
self._get_full_params()
# Solve upstream segs and hrus if only a single segment is specified.
if self._sub_nhm_ids is None:
self._sub_domain_from_nhm_seg()
# <
self._set_subset_masks_order()
# TODO: make these individual methods available externally?
# IE the case of Noah's parameter files.
print("Subsetting dynamic parameter files.")
self._subset_dynamic_params()
print("Subsetting parameters.")
self._subset_params()
print("Subsetting CBH files.")
self._subset_cbh()
# make this conditional/lazy on output format?
if (
self._output_format is not None
and self._output_format.lower == "prms"
):
print("Subsetting data file.")
self._subset_data_file()
# Make this conditional on restart being active in the control file
# print("Subsetting restarts.")
# self._subset_restarts()
return None
@property
def sub_nhm_ids(self) -> np.ndarray:
return self._sub_nhm_ids
@property
def sub_nhm_segs(self) -> np.ndarray:
return self._sub_nhm_segs
def _sub_domain_from_nhm_seg(self) -> None:
# This is the case where a single user-selected nhm_seg
# is used to identify subset nhm_ids and nhm_segs
# The user can pass a len 1 array or a scalar integer for the "outlet."
if isinstance(self._start_seg, np.ndarray):
assert len(self._start_seg) == 1
self._start_seg = self._start_seg[0]
# <
# In most cases, when the passed seg id is a segment of interest, it is
# likely better to take an additional segment below it before
# subsetting
if self._start_one_below_nhm_seg:
wh_in_seg = np.where(self._full_params.nhm_seg == self._start_seg)[
0
]
if len(wh_in_seg):
assert len(wh_in_seg) == 1
self._start_seg = self._full_params.tosegment_nhm[
wh_in_seg[0]
].values
else:
raise ValueError(
"There appears to be no downstream segment of "
f"nhm_seg={self._start_seg}. You can disable the "
"start_one_below_nhm_seg option."
)
# Need upstream tracing parameters
self._get_from_segment_params(
self_attr_key="_full_params",
parallel=self._from_seg_calc_parallel,
check=self._from_seg_calc_check,
)
# Trace upstream
nhm_ids_segs_above = get_nhm_segs_ids_above_seg(
start_seg=self._start_seg,
nhm_segs=self._full_params.nhm_seg.values,
nhm_ids=self._full_params.nhm_id.values,
hru_segment_nhm=self._full_params.hru_segment_nhm.values,
from_segment_starts=self._full_params.from_segment_starts.values,
from_segment_ends=self._full_params.from_segment_ends.values,
from_segment=self._full_params.from_segment.values,
)
self._sub_nhm_ids = nhm_ids_segs_above["nhm_ids_above"]
self._sub_nhm_segs = nhm_ids_segs_above["nhm_segs_above"]
return None
def _get_from_segment_params(
self, self_attr_key: str, parallel=False, check=True
) -> None:
params_edit = getattr(self, self_attr_key)
print("starting get_from_segment_params")
from_dict = get_from_segment_params(
tosegment=params_edit.tosegment.values,
parallel=parallel,
check=check,
)
new_vars_kv = {
"from_segment_starts": from_dict["from_segment_starts"],
"from_segment_ends": from_dict["from_segment_ends"],
}
for kk, vv in new_vars_kv.items():
params_edit[kk] = params_edit.tosegment.copy()
params_edit[kk][:] = vv
params_edit[kk].attrs = pws.meta.find_variables(kk)[kk]
# there is no useful coordinate because "from" (down stream to
# upstream) is not unique
froms = from_dict["from_segment"]
from_seg_da = xr.DataArray(
froms,
coords={"from_nhm_seg_unstruct": np.arange(len(froms))},
dims=["from_nhm_seg_unstruct"],
name="from_segment",
)
kk = "from_segment"
params_edit[kk] = from_seg_da
params_edit[kk].attrs = pws.meta.find_variables(kk)[kk]
return None
def _get_full_params(self) -> None:
full_param_files = (
self._full_control_file.parent
/ self._full_control.options["parameter_file"]
)
self._full_params = pws.parameters.PrmsParameters.load(
full_param_files
).to_xr_ds()
return None
def _set_subset_masks_order(self) -> None:
"""Only masks are solved here. Recalculating indexed variables happens
later in _subset_params, etc...
"""
self._sub_nhm_ids_mask = self._full_params.nhm_id.isin(
self._sub_nhm_ids
)
self._sub_nhm_segs_mask = self._full_params.nhm_seg.isin(
self._sub_nhm_segs
)
nhm_id_df = self._full_params["nhm_id"].to_pandas().reset_index()
sub_nhm_id_df = nhm_id_df.iloc[self._sub_nhm_ids_mask.values]
assert (np.diff(sub_nhm_id_df.nhru.values) > 0).all()
self._sub_nhm_ids_order = sub_nhm_id_df.nhm_id.values
nhm_seg_df = self._full_params["nhm_seg"].to_pandas().reset_index()
sub_nhm_seg_df = nhm_seg_df.iloc[self._sub_nhm_segs_mask.values]
assert (np.diff(sub_nhm_seg_df.nsegment.values) > 0).all()
self._sub_nhm_segs_order = sub_nhm_seg_df.nhm_seg.values
# poi
# associate poi with nhm_seg first, since that's actually useful. then
# solve indices.
poi_gage_segment_nhm = []
for ii in self._full_params.npoigages:
poi_gage_segment_nhm += [
self._full_params.nhm_seg[
self._full_params.poi_gage_segment[ii]
].values.tolist()
]
self._full_params["poi_gage_segment_nhm"] = (
self._full_params["poi_gage_segment"] * 0 - 1
)
self._full_params["poi_gage_segment_nhm"].values = np.array(
poi_gage_segment_nhm, dtype=np.int32
)
# now only keep the pois with nhm_seg in nhm_seg_df
self._sub_pois_mask = self._full_params["poi_gage_segment_nhm"].isin(
sub_nhm_seg_df.nhm_seg
)
nhm_poi_df = self._full_params["poi_gage_id"].to_pandas().reset_index()
sub_poi_df = nhm_poi_df.iloc[self._sub_pois_mask.values]
assert (np.diff(sub_poi_df.npoigages.values) > 0).all()
self._sub_pois_order = sub_poi_df.poi_gage_id.values
return None
def _subset_cbh(self) -> None:
if self._full_cbh_nc_files_dict is not None:
self._subset_cbh_netcdf_files()
else:
self._subset_cbh_files()
def _subset_cbh_files(self) -> None:
raise NotImplementedError
def _subset_cbh_netcdf_files(self):
keys_drop = []
for kk, vv in self._full_cbh_nc_files_dict.items():
if kk not in self._cbh_control_names:
warn(
f"'{kk}' CBH file not present in: "
f"{self._cbh_control_names}. Skipping"
)
keys_drop.append(kk)
if not vv.exists():
raise ValueError("Supplied CBH file path, '{vv}', not found.")
for kk in keys_drop:
del self._full_cbh_nc_files_dict[kk]
# bit of a silly pipeline, dont see another way to rename dim
# independently of the coordinate with a dataarray
self._sub_cbh_files_dict = {
kk: xr.load_dataset(vv).rename_dims(nhm_id="nhru")
for kk, vv in self._full_cbh_nc_files_dict.items()
}
for kk in self._sub_cbh_files_dict.keys():
self._sub_cbh_files_dict[kk] = self._sub_cbh_files_dict[kk].isel(
nhru=self._sub_nhm_ids_mask
)
assert (
self._sub_cbh_files_dict[kk].nhm_id.values
== self._sub_nhm_ids_order
).all()
if self._start_time is not None and self._end_time is not None:
if self._start_time is None:
start_time = self._sub_cbh_files_dict[kk].time[0]
else:
start_time = self._start_time
if self._end_time is None:
end_time = self._sub_cbh_files_dict[kk].time[-1]
else:
end_time = self._end_time
# <
self._sub_cbh_files_dict = {
kk: vv.sel(time=slice(start_time, end_time))
for kk, vv in self._sub_cbh_files_dict.items()
}
return None
def _subset_params(self) -> None:
indexed_vars = [
"from_segment",
"from_segment_starts",
"from_segment_ends",
"tosegment",
"from_nhm_seg_unstruct",
"poi_gage_segment",
]
# drop the indexed vars in advance of re-solving them
self._sub_params = self._full_params.copy()
indexed_vars_in_ds = set(indexed_vars).intersection(
set(self._sub_params.variables)
)
self._sub_params = self._sub_params.drop_vars(indexed_vars_in_ds)
# actually subset on npoigages=self._sub_poi_gages_mask
# need to recalculate poi_gage_segment upon subsetting
self._sub_params = (
self._sub_params.isel(nhru=self._sub_nhm_ids_mask)
.isel(nsegment=self._sub_nhm_segs_mask)
.isel(npoigages=self._sub_pois_mask)
)
assert (
self._sub_params.nhm_id.values == self._sub_nhm_ids_order
).all()
assert (
self._sub_params.nhm_seg.values == self._sub_nhm_segs_order
).all()
assert (
self._sub_params.poi_gage_id.values == self._sub_pois_order
).all()
# re-solve: tosegment, hru_segment, poi_gage_segment
tosegment_sub = []
for ii in self._sub_params.nsegment.values:
result = np.where(
self._sub_params.nhm_seg.values
== self._sub_params.tosegment_nhm.values[ii]
)[0]
len_result = len(result)
if len_result == 1:
tosegment_sub += result.tolist()
elif len_result == 0:
tosegment_sub += [-1]
else:
raise ValueError
self._sub_params["tosegment"] = self._sub_params["nhm_seg"] * 0
self._sub_params["tosegment"].values = np.array(tosegment_sub) + 1
hru_segment_sub = []
for ii in self._sub_params.nhru.values:
result = np.where(
self._sub_params.nhm_seg.values
== self._sub_params.hru_segment_nhm.values[ii]
)[0]
len_result = len(result)
if len_result == 1:
hru_segment_sub += result.tolist()
elif len_result == 0:
hru_segment_sub += [-1]
else:
raise ValueError
self._sub_params["hru_segment"] = self._sub_params["hru_segment"] * 0
self._sub_params["hru_segment"].values = np.array(hru_segment_sub) + 1
# Add upstream tracing parameters
self._get_from_segment_params(
self_attr_key="_sub_params",
parallel=self._from_seg_calc_parallel,
check=self._from_seg_calc_check,
)
poi_gage_segment = []
for ii in self._sub_params.npoigages.values:
result = np.where(
self._sub_params.poi_gage_segment_nhm.values[ii]
== self._sub_params.nhm_seg.values
)[0]
len_result = len(result)
if len_result == 1:
poi_gage_segment += result.tolist()
elif len_result == 0:
poi_gage_segment += [-1]
else:
raise ValueError
self._sub_params["poi_gage_segment"] = (
self._sub_params["poi_gage_segment_nhm"] * 0
)
self._sub_params["poi_gage_segment"].values = (
np.array(poi_gage_segment) + 1
)
return None
def _subset_dynamic_params(self) -> None:
"""Subset dynamic parameter files if they are active in the control.
This method checks for dynamic parameter files (ag_frac_dynamic,
springfrost_dynamic, fallfrost_dynamic) in the control file,
verifies their associated flags are enabled, and subsets them
based on the HRU mask.
"""
from .prms_dyn_param import (
DYNAMIC_PARAM_CONFIG,
PrmsDynamicParameter,
)
self._sub_dynamic_param_files = {}
# Get the control directory for resolving relative paths
control_dir = self._full_control_file.parent
# Use pyPRMS control to access control variables
control = pp.ControlFile(
filename=self._full_control_file,
metadata=pyprms_meta,
verbose=False,
)
control_vars = control.control_variables
for param_name, config in DYNAMIC_PARAM_CONFIG.items():
flag_name = config["flag_name"]
dtype = config["dtype"]
# Check if the flag variable exists and is enabled
if flag_name not in control_vars:
continue
flag_value = control_vars[flag_name].values
if isinstance(flag_value, (list, np.ndarray)):
flag_value = flag_value[0]
flag_value = int(flag_value)
if flag_value != 1:
continue
# Check if the file path variable exists
if param_name not in control_vars:
warn(
f"Dynamic parameter flag {flag_name} is set but "
f"{param_name} is not specified in control file."
)
continue
file_path = control_vars[param_name].values
if isinstance(file_path, (list, np.ndarray)):
file_path = file_path[0]
# Resolve the path relative to control directory
file_path = pl.Path(file_path)
if not file_path.is_absolute():
file_path = control_dir / file_path
if not file_path.exists():
warn(
f"Dynamic parameter file not found: {file_path}. "
f"Skipping subsetting for {param_name}."
)
continue
print(f" Subsetting {param_name}: {file_path}")
# Load and subset the dynamic parameter file
try:
dyn_param = PrmsDynamicParameter.load(file_path, dtype=dtype)
subset_param = dyn_param.subset(self._sub_nhm_ids_mask)
# Store the subsetted data and original filename for writing
self._sub_dynamic_param_files[param_name] = {
"data": subset_param,
"original_path": file_path,
"control_var_name": param_name,
}
except Exception as e:
warn(
f"Error subsetting dynamic parameter file {file_path}: "
f"{e}. Skipping."
)
continue
return None
def _subset_data_file(self):
if hasattr(self, "_sub_data_file"):
return None
# _full_control is pws control, sub_control is pp
data_file_name = pl.Path(self._sub_control.get("data_file").values)
# since this path is likely relative to the control_file
if not data_file_name.exists():
data_file_name = self._full_control_file.parent / str(
data_file_name
)
self._data_file_name = data_file_name
# <
data_file = pp.DataFile(data_file_name, metadata=pyprms_meta)
# for now only handle runoff, throw an error if there are other
# variables, mostly because I'm unsure how that will work for certain.
runoff_mask = data_file.data.columns.str.contains("runoff")
if not runoff_mask.all():
raise NotImplementedError("")
# <
file_poi_ids = data_file.data.columns.str.slice(7).values
file_poi_keep_mask = np.isin(
file_poi_ids, self._sub_params.poi_gage_id.values
)
cols_to_drop = data_file.data.columns[~file_poi_keep_mask].tolist()
self._sub_data_file = data_file.data.drop(columns=cols_to_drop)
return None
[docs]
def write(
self,
write_dir: pl.Path,
output_format: Literal["pywatershed", "PRMS", None],
) -> None:
"""Write the subset domain to file.
Args:
write_dir: An NON-EXISTENT directory into which to write domain
files. The reason is so that existing domain files are not
overwritten.
output_format: Optional choice of format. If not supplied, this
argument supplied at __init__ is consulted. An error is
raised if None is found.
"""
write_dir = write_dir.resolve()
if write_dir.exists():
raise ValueError(
f"The write_dir can not exist beforehand: {write_dir}"
)
if not write_dir.parent.exists():
raise ValueError(
"The parent of write_dir must exist beforehand: "
f"{write_dir.parent}"
)
write_dir.mkdir()
if output_format is None:
output_format = self._output_format
else:
self._output_format = output_format
if self._output_format is None:
raise ValueError(
"output_format not specified on initialization or write."
)
self._sub_control = pp.ControlFile(
filename=self._full_control_file,
metadata=pyprms_meta,
verbose=False,
)
self._sub_control_file_name = (
f"{self._full_control_file.stem}_subset.control"
)
if self._start_time is not None:
self._sub_control["start_time"].values = self._start_time
if self._end_time is not None:
self._sub_control["end_time"].values = self._end_time
if self._output_format.lower() == "pywatershed":
self._write_pws(write_dir=write_dir)
elif self._output_format.lower() == "prms":
self._write_prms(write_dir=write_dir)
return None
def _write_prms(self, write_dir: pl.Path) -> None:
# the following methods set the control, reusing the filename in it
# but making it relative to the write_dir.
self._cbh_dataset_to_ascii(write_dir=write_dir)
# not an approved PRMS parameter, remove before writing
del self._sub_params["poi_gage_segment_nhm"]
self._parameters_to_ascii(write_dir=write_dir)
self._data_file_to_ascii(write_dir=write_dir)
# Write dynamic parameter files
self._write_dynamic_params(write_dir=write_dir)
# TODO: rename the data file in to the write_dir
self._sub_control.write(write_dir / self._sub_control_file_name)
return None
def _write_pws(self, write_dir: pl.Path) -> None:
# Edit the control
# dont really need to do anything with the cbh or param files for
# pywatershed, but since the control file really shouldnt be used
# by PRMS, we'll edit it to point to the netcdf files.
for kk in self._sub_cbh_files_dict.keys():
if kk in self._sub_control.control_variables.keys():
self._sub_control.control_variables[kk].values = f"{kk}.nc"
else:
vv = pp.ControlVariable(
name=kk, strict=False, meta=additional_cbh_meta[kk]
)
vv.__dict__["_ControlVariable__values"] = f"{kk}.nc"
self._sub_control.control_variables[kk] = vv
self._sub_control.control_variables[
"param_file"
].values = "parameters.nc"
# write netcdf cbh files
for kk, vv in self._sub_cbh_files_dict.items():
# check the order before output
assert (vv.nhm_id.values == self._sub_nhm_ids_order).all()
# The naming of cbh files, their control parameters, and their
# internal variables is INSANE and a mess in PRMS.
var_name = cbh_ctl_var_map[kk]
file_name = f"{var_name}.nc"
if var_name not in vv.data_vars:
current_name = cbh_vars_rename_inv[var_name]
vv = vv.rename(
{
current_name: var_name,
}
)
vv.to_netcdf(write_dir / file_name)
# <
# write netcdf parameter file
# check order before output
assert (
self._sub_params.nhm_id.values == self._sub_nhm_ids_order
).all()
assert (
self._sub_params.nhm_seg.values == self._sub_nhm_segs_order
).all()
param_file_name = self._sub_control.control_variables[
"param_file"
].values
self._sub_params.to_netcdf(write_dir / param_file_name)
# Write dynamic parameter files
self._write_dynamic_params(write_dir=write_dir)
# write control file
self._sub_control.write(write_dir / self._sub_control_file_name)
return None
def _write_dynamic_params(self, write_dir: pl.Path) -> None:
"""Write subsetted dynamic parameter files and update control.
Args:
write_dir: Directory to write the files to
"""
if not hasattr(self, "_sub_dynamic_param_files"):
return None
if not self._sub_dynamic_param_files:
return None
for param_name, file_info in self._sub_dynamic_param_files.items():
subset_param = file_info["data"]
original_path = file_info["original_path"]
# Use original filename, write directly to write_dir
output_filename = original_path.name
output_path = write_dir / output_filename
subset_param.write(output_path)
# Update control file to point to new filename (no path prefix)
if param_name in self._sub_control.control_variables:
self._sub_control.control_variables[
param_name
].values = output_filename
else:
# Add the control variable if it doesn't exist
meta = additional_cbh_meta.get(param_name, {})
vv = pp.ControlVariable(
name=param_name, strict=False, meta=meta
)
vv.__dict__["_ControlVariable__values"] = output_filename
self._sub_control.control_variables[param_name] = vv
print(f" Wrote dynamic parameter file: {output_path}")
return None
def _cbh_dataset_to_ascii(self, write_dir):
import uuid
data_dir = pl.Path(pws.constants.__pywatershed_root__ / "data")
# simply build and load a dummy CBH netcdf file from the pywatershed
# repo and replace its dataset with ours.
random_filename = str(uuid.uuid4()) + ".nc"
dum_file_path = data_dir / random_filename
make_dummy_netcdf_cbh_file(dum_file_path)
pp_cbh = pp.Cbh(
src_path=dum_file_path,
metadata=pyprms_meta,
engine="netcdf",
)
# the following two operation are only required on windows before
# deleting the file, else it is busy.
pp_cbh._Cbh__dataset.load()
pp_cbh._Cbh__dataset.close()
dum_file_path.unlink()
cbh_ds = xr.merge(self._sub_cbh_files_dict.values())
pp_cbh._Cbh__dataset = cbh_ds
for kk, vv in self._sub_cbh_files_dict.items():
var_name = list(vv.data_vars)[0]
file_name = pl.Path(self._sub_control[kk].values).name
self._sub_control[kk].values = file_name
pp_cbh.write_ascii(
filename=write_dir / file_name,
variable=var_name,
float_format=cbh_var_ascii_format[var_name],
)
def _parameters_to_ascii(self, write_dir):
# This is basically implemeting pyPRMS/parameters/ParameterNetCDF.py
# but just skipping reading from netcdf file
verbose = False
# prep the parameters to be acceptable
sub_params_ds = self._sub_params.rename(
ndoy="ndays", scalar="one", nmonth="nmonths"
)
drop_vars = [
"doy",
"from_segment_starts",
"from_segment_ends",
"from_nhm_seg_unstruct",
"from_segment",
"hru_in_to_cf",
]
sub_params_ds = sub_params_ds.drop_vars(drop_vars)
pp_params = pp.Parameters(metadata=pyprms_meta, verbose=verbose)
pp_params.__verbose = verbose
# leaving this here for the mask_and_scale and decode_timedelta
# implications
# xr_df = xr.open_dataset(
# self.__filename, mask_and_scale=False, decode_timedelta=False
# )
# Populate the dimensions first
pp_params.dimensions.add(name="one", size=1)
# self.dimensions.add(name='ndays')
for dn, ds in dict(sub_params_ds.sizes).items():
pp_params.dimensions.add(name=str(dn), size=ds)
# Add ndepl using ndeplval
pp_params.dimensions.add(
name="ndepl", size=int(sub_params_ds.sizes["ndeplval"] / 11)
)
# Add nobs if needed
if not pp_params.dimensions.exists("nobs"):
if pp_params.dimensions.exists("npoigages"):
pp_params.dimensions.add(
name="nobs",
size=pp_params.dimensions.get("npoigages").size,
)
else:
pp_params.dimensions.add(name="nobs", size=0)
if not pp_params.dimensions.exists("ngw"):
pp_params.dimensions.add(
name="ngw", size=pp_params.dimensions.get("nhru").size
)
if not pp_params.dimensions.exists("nssr"):
pp_params.dimensions.add(
name="nssr", size=pp_params.dimensions.get("nhru").size
)
# Now add the parameters
for var in sub_params_ds.variables.keys():
if pp_params.__verbose:
print(str(var))
cparam = sub_params_ds[var].T
# Add the parameter
pp_params.add(name=str(var))
# Add the data
pp_params.get(str(var)).data = cparam.values
pp_params.adjust_bounded_parameters()
file_name = pl.Path(self._sub_control["param_file"].values).name
self._sub_control["param_file"].values = file_name
pp_params.write_parameter_file(filename=write_dir / file_name)
return None
def _data_file_to_ascii(self, write_dir: pl.Path):
# _subset_data_file is lazy and wont do anything if
# hasattr(self, "_sub_data_file")
self._subset_data_file()
# there is no method for writing out the Datafile in pyprms.
output_data_file = write_dir / self._data_file_name.name
pws.utils.utils.write_data_file(self._sub_data_file, output_data_file)
# edit the path to the data_file in the output control
self._sub_control["data_file"].values = output_data_file.name
return None
def make_dummy_netcdf_cbh_file(file_path: pl.Path):
# it appears that any basic netcdf file works, no other requirements
# imposed by pp.CBH()
ds = xr.Dataset(
data_vars=dict(
foo=(
("time", "hru"),
np.zeros([7, 3]),
{"units": "parsec", "long_name": "foooo"},
),
),
attrs=dict(
title="Dummy",
),
)
ds.to_netcdf(file_path)
return None