Source code for pywatershed.utils.cbh_utils

import math
import pathlib as pl
from datetime import datetime
from typing import Union

import netCDF4 as nc4
import numpy as np
import pandas as pd

from ..base import meta
from ..parameters import PrmsParameters

zero = np.zeros((1))[0]
one = np.ones((1))[0]

# Compound types

nc4_to_np_types = {
    "i4": "int32",
    "i8": "int64",
    "f4": "float32",
    "f8": "float64",
}
np_to_nc4_types = {val: key for key, val in nc4_to_np_types.items()}

created_line = "Created "
written_line = "Written "
hash_line = "##############"
hash_line_official = "########################################"


def _cbh_file_to_df(
    the_file: Union[str, pl.Path], params: PrmsParameters = None
) -> pd.DataFrame:
    # Only take cbh files that contain single variables
    # JLM: this can be substantially simplified as
    # we are only reading NHM CBH files now.
    # Revisit with hru1.yaml

    meta_lines = []
    with open(the_file, "r") as file_open:
        wh_hash_line = -1
        the_line = ""
        while hash_line not in the_line:
            wh_hash_line += 1
            the_line = file_open.readline()
            if (
                ("//" not in the_line)
                and (created_line not in the_line)
                and (written_line not in the_line)
                and (hash_line not in the_line)
            ):
                meta_lines += [the_line.strip()]

    col_names = []
    var_count_dict = {}
    line = meta_lines[-1]
    line_split = line.split(" ")
    key = line_split[0]
    count = int(line_split[-1])
    zs = math.ceil(math.log(count, 10))
    if (params is None) or ("nhm_id" not in params.parameters):
        col_names += [f"{key}{str(ii).zfill(zs)}" for ii in range(count)]
    else:
        col_names = np.char.add(
            np.array([key]), params.parameters["nhm_id"].astype(str)
        ).tolist()
    var_count_dict[key] = count

    if len(var_count_dict) > 1:
        msg = (
            "cbh input files should contain only one variable each: "
            f"{the_file}"
        )
        raise ValueError(msg)

    dtypes = (["str"] * 6) + (["float64"] * len(col_names))
    date_cols = ["Y", "m", "d", "H", "M", "S"]
    col_names = date_cols + col_names
    assert len(dtypes) == len(col_names)
    dtype_dict = dict(zip(col_names, dtypes))
    assert len(dtype_dict) == len(col_names)

    # Some files erroneously specify the number of columns
    # catch that here
    data = pd.read_csv(
        the_file,
        nrows=1,
        # names=col_names,  # dont specify names
        header=None,  # dont use default header from first line
        index_col=False,
        skiprows=wh_hash_line + 1,
        sep=r"\s+",
        dtype=dtype_dict,
    )
    msg = (
        "Number of actual data columns does not match metadata info: "
        f"{meta_lines}"
    )
    assert len(data.columns) == len(col_names), msg
    # JLM: is the above sufficient?

    data = pd.read_csv(
        the_file,
        names=col_names,
        index_col=False,
        skiprows=wh_hash_line + 1,
        sep=r"\s+",
        dtype=dtype_dict,
    )

    data["date"] = pd.to_datetime(
        data.Y + "-" + data.m.str.zfill(2) + "-" + data.d.str.zfill(2)
    )
    # JLM TODO: Set datetime resolution to hours? or mins. Could do days but
    # might look forward a bit.
    data = data.drop(columns=set(date_cols))
    data = data.set_index("date")

    return data


def _cbh_files_to_df(
    file_dict: dict, params: PrmsParameters = None
) -> pd.DataFrame:
    if isinstance(file_dict, dict):
        dfs = [_cbh_file_to_df(val, params) for val in file_dict.values()]
    else:
        dfs = [_cbh_file_to_df(val, params) for val in file_dict]
    return pd.concat(dfs, axis=1)


def cbh_files_to_df(
    files: Union[str, pl.Path], params: PrmsParameters = None
) -> pd.DataFrame:
    if isinstance(files, (str, pl.Path)):
        df = _cbh_file_to_df(files, params)
    elif isinstance(files, (dict, list)):
        df = _cbh_files_to_df(files, params)
    else:
        raise ValueError(
            f'"files" argument of type {type(files)} not accepted.'
        )
    return df


def _col_name_split(string: str) -> tuple:
    char_list = list(string)
    wh_digit = [ii for ii in range(len(char_list)) if char_list[ii].isdigit()]
    return string[: wh_digit[0]], string[wh_digit[0] :]


def cbh_df_to_np_dict(df: pd.DataFrame) -> dict:
    # Convert to a multi-index? For getting to a dict of np arrays
    # This might go elsewhere
    col_name_tuples = [_col_name_split(kk) for kk, vv in df.items()]
    df.columns = pd.MultiIndex.from_tuples(col_name_tuples)
    var_names = df.columns.unique(level=0)
    np_dict = {}
    np_dict["time"] = df.index.to_numpy(copy=True).astype("datetime64[s]")
    spatial_ids = (
        df.loc[:, var_names[0]].columns.values.astype(float).astype(int)
    )
    if spatial_ids[0] == "000":
        np_dict["hru_ind"] = spatial_ids
    else:
        np_dict["nhm_id"] = spatial_ids
    for vv in var_names:
        np_dict[vv] = df[vv].to_numpy(copy=True)
    return np_dict


def cbh_n_hru(np_dict: dict) -> int:
    odd_shapes = ["time", "hru_ind", "nhm_id"]
    shapes = [
        var.shape for key, var in np_dict.items() if key not in odd_shapes
    ]
    for ss in shapes:
        assert shapes[0] == ss
    return shapes[0][1]


def cbh_n_time(np_dict: dict) -> int:
    return np_dict["time"].shape[0]


def cbh_files_to_np_dict(
    files: Union[str, pl.Path], params: PrmsParameters
) -> dict:
    np_dict = cbh_df_to_np_dict(cbh_files_to_df(files, params))
    return np_dict


[docs] def cbh_file_to_netcdf( input_file: Union[str, pl.Path], parameters: PrmsParameters, nc_file: Union[str, pl.Path], clobber: bool = True, output_vars: list = None, zlib: bool = True, complevel: int = 4, global_atts: dict = None, rename_vars: dict = None, chunk_sizes: dict = None, time_units="days since 1979-01-01 00:00:00", time_calendar="standard", ) -> None: """Convert PRMS native CBH files to NetCDF format for pywatershed Args: input_file: the CBH file to read parameters: the Parameters object of PRMS parameters for this domain nc_file: the NetCDF output file clobber: Overwrite an existing NetCDF file? output_vars: Subset of variables in the CBH to write? zlib: use zlib compression? complevel: The level of compression. global_atts: A dictionary of meta data for the variables. rename_vars: a dictionary for mapping CBH variable names chunk_sizes: along each dimension, default {"time": 30, "hru": 0}, time_units: default "days since 1979-01-01 00:00:00", time_calendar: default"standard", """ if rename_vars is None: rename_vars = {} if global_atts is None: global_atts = {} if chunk_sizes is None: chunk_sizes = {"time": 30, "nhm_id": 0} np_dict = cbh_files_to_np_dict(input_file, parameters) # Default time chunk is for a read pattern of ~monthly at a time. ds = nc4.Dataset(nc_file, "w", clobber=clobber) ds.setncattr("Description", "Climate by HRU") for key, val in global_atts.items(): ds.setncattr(key, val) # Dimensions # None for the len argument gives an unlimited dim ds.createDimension("time", None) # cbh_n_time(np_dict)) ds.createDimension("nhm_id", cbh_n_hru(np_dict)) # Dim Variables time = ds.createVariable("time", "f4", ("time",)) time[:] = nc4.date2num(np_dict["time"].astype(datetime), time_units) time.units = time_units # time.calendar = time_calendar hru_name = "hru_ind" if "hru_ind" in np_dict.keys() else "nhm_id" hruid = ds.createVariable( hru_name, meta.get_types(hru_name)[hru_name], ("nhm_id") ) hru_meta_dict = { "hru_ind": { "type": "i4", "desc": "Hydrologic Response Unit (HRU) index", "cf_role": "timeseries_id", }, "nhm_id": { "type": "i4", "desc": "NHM Hydrologic Response Unit (HRU) ID", "cf_role": "timeseries_id", }, } for att, val in hru_meta_dict[hru_name].items(): hruid.setncattr(att, val) hruid[:] = np_dict[hru_name] # Variables var_list = set(np_dict.keys()).difference({"time", "hru_ind", "nhm_id"}) if output_vars is not None: var_list = [var for var in var_list if var in output_vars] for vv in var_list: vv_meta = meta.get_vars(vv)[vv] vv_type = vv_meta["type"] var_name_out = vv if vv in rename_vars.keys(): var_name_out = rename_vars[vv] var = ds.createVariable( var_name_out, vv_type, ("time", "nhm_id"), fill_value=nc4.default_fillvals[np_to_nc4_types[vv_type]], zlib=zlib, complevel=complevel, chunksizes=tuple(chunk_sizes.values()), ) for att, val in vv_meta.items(): if att in ["_FillValue", "type", "dimensions"]: continue var.setncattr(att, val) ds.variables[var_name_out][:, :] = np_dict[vv] ds.close() print(f"Wrote netcdf file: {nc_file}") return