Source code for hydrobm.utils

import numpy as np
import pandas as pd
from scipy.optimize import Bounds, minimize, minimize_scalar  # differential_evolution,

from .metrics import kge, mse

# --- Wrapper functions for adjusted precipitation benchmark
# and adjusted smoothed precipitation benchmark optimization


# Wrapper for adjusted precipitation benchmark optimization
[docs] def optimize_apb(scaled_precip, streamflow, method, max_lag=30): """Wrapper function around adjusted precipitation benchmark model optimization functions. Parameters ---------- scaled_precip : pandas Series Scaled precipitation data. streamflow : pandas Series Streamflow data. method : str Optimization method to use. Currently supports "brute_force" and "minimize". max_lag : int, optional Maximum lag to consider. Default is 30. Returns ------- best_lag : int Best lag value. best_mse : float Best mean squared error value. """ # Check that the method is valid if method not in ["brute_force", "minimize"]: # , "differential_evolution"]: raise ValueError(f"Invalid optimization method specified for optimize_lag: {method}.") # Brute force optimization if method == "brute_force": best_lag, best_mse = brute_force_apb(scaled_precip, streamflow, max_lag) # Minimize scalar optimization elif method == "minimize": best_lag, best_mse = minimize_scalar_apb(scaled_precip, streamflow, max_lag) # # Differential evolution optimization # elif method == "differential_evolution": # best_lag, best_mse = differential_evolution_apb(scaled_precip, streamflow, max_lag) return best_lag, best_mse
# Wrapper for adjusted smoothed precipitation benchmark optimization
[docs] def optimize_aspb(scaled_precip, streamflow, method, max_lag=30, max_window=90): """Wrapper function around adjusted smoothed precipitation benchmark model optimization functions. Parameters ---------- scaled_precip : pandas Series Scaled precipitation data. streamflow : pandas Series Streamflow data. method : str Optimization method to use. Currently supports "brute_force" and "minimize". max_lag : int, optional Maximum lag to consider. Default is 30. max_window: int, optional Maximum smoothing window length to consider. Default is 90. Returns ------- best_lag : int Best lag value. best_window: int Best window value. best_mse : float Best mean squared error value. """ # Check that the method is valid if method not in ["brute_force", "minimize"]: raise ValueError(f"Invalid optimization method specified for optimize_lag: {method}.") # Brute force optimization if method == "brute_force": best_lag, best_window, best_mse = brute_force_aspb(scaled_precip, streamflow, max_lag, max_window) # Minimize scalar optimization elif method == "minimize": best_lag, best_window, best_mse = minimize_aspb(scaled_precip, streamflow, max_lag, max_window) return best_lag, best_window, best_mse
# --- Optimization functions for adjusted precipitation benchmark (APB) # Basic brute force optimization routine for adjusted precipitation benchmark
[docs] def brute_force_apb(scaled_precip, streamflow, max_lag=30): """Optimize the lag for the adjusted precipitation benchmark model using brute force. Parameters ---------- scaled_precip : pandas Series Scaled precipitation data. streamflow : pandas Series Streamflow data. max_lag : int, optional Maximum lag to consider. Default is 30. Returns ------- best_lag : int Best lag value. best_mse : float Best mean squared error value. """ # Initialize the best lag and MSE best_lag = 0 best_mse = np.inf # Loop over all possible lags for lag in range(0, max_lag): # Shift the precipitation data by the lag shifted_precip = scaled_precip.shift(lag) # Calculate the MSE for the shifted precipitation mse_val = mse(shifted_precip, streamflow) # Update the best lag and MSE if mse_val < best_mse: best_lag = lag best_mse = mse_val return best_lag, best_mse
# minimize_scalar optimization routine for adjusted precipitation benchmark
[docs] def minimize_scalar_apb(scaled_precip, streamflow, max_lag=30): """Optimize the lag for the adjusted precipitation benchmark model using scipy.optimize.minimize_scalar. Parameters ---------- scaled_precip : pandas Series Scaled precipitation data. streamflow : pandas Series Streamflow data. max_lag : int, optional Maximum lag to consider. Default is 30. Returns ------- best_lag : int Best lag value. best_mse : float Best mean squared error value. Notes ----- scipy.optimize.minimize_scalar is not designed for use with integer-only solutions. Here we use the round function to enforce integer solutions. This seems to work for simple test cases, but results for real data may vary. User caution is advised. Use brute force optimization if 100% accurate solutions are required. """ # Define the optimization function def mse_apb(lag): lag = round(lag) # ensures integer apb = scaled_precip.shift(lag) return mse(apb, streamflow) # Run the optimization bounds = (0, max_lag - 1) # minimize_scalar only accepts bounds as a tuple, not as Bounds class res = minimize_scalar(mse_apb, bounds=bounds, method="bounded") # Extract the best lag and MSE best_lag = round(res.x) best_mse = res.fun return best_lag, best_mse
# # differential_evolution optimization routine for adjusted precipitation benchmark # def differential_evolution_apb(scaled_precip, streamflow, max_lag=30): # # Define the optimization function # def mse_apb(lag): # apb = scaled_precip.shift(lag) # lag as integer enforced elsewhere # return mse(apb, streamflow) # # Custom integer mutation function # def integer_mutation(xk, **kwargs): # xk_new = np.round(xk).astype(int) # return xk_new # # Run the optimization # bounds = Bounds(0, max_lag - 1) # Bounds([lower], [upper]) # bounds = [(0, max_lag - 1)] # res = differential_evolution( # mse, # bounds, # args=(obs,), # strategy="best1bin", # mutation=(0.5, 1), # recombination=0.7, # tol=0.01, # seed=42, # workers=1, # updating="deferred", # disp=True, # polish=False, # init="random", # callback=integer_mutation, # ) # return "Not implemented yet" # --- Optimization functions for adjusted smoothed precipitation benchmark (ASPB) # Basic brute force optimization routine for adjusted smoothed precipitation benchmark
[docs] def brute_force_aspb(scaled_precip, streamflow, max_lag=30, max_window=90): """Optimize the lag and window for adjusted smoothed precipitation benchmark model using brute force. Parameters ---------- scaled_precip : pandas Series Scaled precipitation data. streamflow : pandas Series Streamflow data. max_lag : int, optional Maximum lag to consider. Default is 30. max_window: int, optional Maximum smoothing window length to consider. Default is 90. Returns ------- best_lag : int Best lag value. best_window: int Best window value. best_mse : float Best mean squared error value. """ # Initialize an array to store MSE scores all_mse = np.full([max_lag, max_window], np.inf) # Loop over all possible lags for lag in range(0, max_lag): for window in range(1, max_window): # window=0 is undefined # Shift the precipitation data by the lag and smooth aspb = scaled_precip.shift(lag).rolling(window=window).mean() # Calculate the MSE for the shifted precipitation mse_val = mse(aspb, streamflow) if not np.isnan(mse_val): # might get NaN if lag/window > time series all_mse[lag, window] = mse_val # Find the lowest MSE. If multiple, by default selects the smallest lag and window. best_lag, best_window = np.unravel_index(all_mse.argmin(), all_mse.shape) best_mse = all_mse[best_lag, best_window] return best_lag, best_window, best_mse
# minimize optimization routine for adjusted precipitation benchmark
[docs] def minimize_aspb(scaled_precip, streamflow, max_lag=30, max_window=90, method="Powell"): """Optimize the lag and window for the ASPB model using scipy.optimize.minimize. Parameters ---------- scaled_precip : pandas Series Scaled precipitation data. streamflow : pandas Series Streamflow data. max_lag : int, optional Maximum lag to consider. Default is 30. max_window: int, optional Maximum smoothing window length to consider. Default is 90. method: str, optional Optimization method to use. Default is 'Powell'. See scipy.optimize.minimize for more options. Returns ------- best_lag : int Best lag value. best_window: int Best window value. best_mse : float Best mean squared error value. Notes ----- scipy.optimize.minimize is not designed for use with integer-only solutions. Here we use the round function to enforce integer solutions. The 'Powell' optimization method seems to return appropriate lag and window values in simple test cases, but results for real data may vary. User caution is advised. Use brute force optimization if 100% accurate solutions are required. """ # Define the optimization function def mse_aspb(params): lag, window = params lag = round(lag) # ensures integer window = round(window) # Calculate the adjusted smoothed precipitation benchmark aspb = scaled_precip.shift(lag).rolling(window=window).mean() return mse(aspb, streamflow) # Run the optimization init = [0, 1] # initial guess for lag and window bounds = Bounds([0, 1], [max_lag - 1, max_window - 1]) res = minimize(mse_aspb, init, bounds=bounds, method=method) # Extract the best lag, window, and MSE best_lag, best_window = res.x.round() best_mse = res.fun return int(best_lag), int(best_window), best_mse
# --- Eckhardt Baseflow utility functions # Parameter estimation for Eckhardt
[docs] def estimate_eckhardt_parameters(streamflow, precip, precip_window_days=3, precip_threshold=0.1): """ Estimate both recession coefficient (k) and maximum baseflow index (BFI_max) which are required for baseflow separation as outlined by Eckhardt (2005). This function combines recession analysis to estimate k with the backward filter method from Collischonn & Fan (2013) to estimate BFI_max. Automatically detects the timestep from the data and adjusts time window accordingly. Parameters ---------- streamflow : pandas Series Observed streamflow with DatetimeIndex. precip : pandas Series Precipitation data with DatetimeIndex. precip_window_days : float, optional Number of DAYS to check for precipitation when identifying recessions. Automatically converted to appropriate number of timesteps based on data frequency. Default is 3 days. precip_threshold : float, optional Precipitation threshold in same units as precip data (e.g., mm/day or kg/m²/s). Default is 0.1. Returns ------- k : float Recession coefficient estimated from recession periods (for native timestep). BFI_max : float Maximum baseflow index estimated using backward filter method. References ---------- Eckhardt, K. (2005). How to construct recursive digital filters for baseflow separation. Hydrological Processes, 19(2), 507-515. Collischonn, W., & Fan, F. M. (2013). Defining parameters for Eckhardt's digital baseflow filter. Hydrological Processes, 27(18), 2614–2622. https://doi.org/10.1002/hyp.9391 """ # Detect timestep time_diff = streamflow.index.to_series().diff() median_timestep = time_diff.median() if isinstance(median_timestep, pd.Timedelta): delta_t_days = median_timestep.total_seconds() / 86400 else: delta_t_days = float(median_timestep) # Convert precipitation window from days to number of periods precip_window_periods = max(1, int(np.ceil(precip_window_days / delta_t_days))) # =========================== # Step 1: Estimate k from recession periods # =========================== k_values = [] # Identify periods with no precipitation no_precip = precip.rolling(window=precip_window_periods, min_periods=1).sum() < precip_threshold # Flow is declining declining = streamflow.diff() < 0 # Combined condition: recession period recession_mask = no_precip & declining # Calculate k for recession days: k = Q[t] / Q[t-1] for t in range(1, len(streamflow)): if recession_mask.iloc[t] and streamflow.iloc[t] > 0 and streamflow.iloc[t - 1] > 0: k_t = streamflow.iloc[t] / streamflow.iloc[t - 1] k_values.append(k_t) # Calculate the final value of k k = np.median(k_values) # =========================== # Step 2: Estimate BFI_max using backward filter # =========================== Q_values = streamflow.values n = len(Q_values) # Initialize backward baseflow array baseflow_backward = np.zeros(n) # Start from the end: assume last timestep is all baseflow baseflow_backward[-1] = Q_values[-1] # Apply backward filter: b'[t] = b'[t+1] / k for t in range(n - 2, -1, -1): # Go backward from second-to-last to first if Q_values[t] > 0: baseflow_backward[t] = baseflow_backward[t + 1] / k # Baseflow cannot exceed total flow baseflow_backward[t] = min(baseflow_backward[t], Q_values[t]) else: baseflow_backward[t] = 0 # BFI_max is the ratio of total baseflow to total streamflow (Equation 11) BFI_max = np.sum(baseflow_backward) / np.sum(Q_values) return k, BFI_max
# Calculation function for Eckhardt
[docs] def eckhardt_filter(Q, BFI_max, k): """ Eckhardt two-parameter digital filter for baseflow separation. The Eckhardt filter was found to be the best of 9 evaluated baseflow separation methods in Xie et al. (2020), showing superior performance across diverse catchment conditions. Parameters ---------- Q : pandas Series or numpy array Streamflow time series. BFI_max : float Maximum baseflow index. k : float Recession constant. Returns ------- baseflow : pandas Series or numpy array Separated baseflow component. References ---------- Eckhardt, K. (2005). How to construct recursive digital filters for baseflow separation. Hydrological Processes, 19(2), 507-515. Xie, J., Liu, X., Wang, K., Yang, T., Liang, K., & Liu, C. (2020). Evaluation of typical methods for baseflow separation in the contiguous United States. Journal of Hydrology, 583, 124628. https://doi.org/10.1016/j.jhydrol.2020.124628 """ # Check input type and extract values is_series = isinstance(Q, pd.Series) if is_series: Q_values = Q.values index = Q.index else: Q_values = np.asarray(Q) baseflow = np.zeros(len(Q_values)) baseflow[0] = Q_values[0] * BFI_max # Initialize with fraction of first flow # Apply Eckhardt filter for t in range(1, len(Q_values)): if Q_values[t] > 0: baseflow[t] = ((1 - BFI_max) * k * baseflow[t - 1] + (1 - k) * BFI_max * Q_values[t]) / ( 1 - k * BFI_max ) # Baseflow cannot exceed total flow baseflow[t] = min(baseflow[t], Q_values[t]) else: baseflow[t] = 0 # Return in same format as input if is_series: return pd.Series(baseflow, index=index) else: return baseflow
# --- Snow accumulation and melt model
[docs] def rain_to_melt( data, precipitation="precipitation", temperature="temperature", snow_and_melt_temp=0.0, snow_and_melt_rate=3.0 ): """Calculate snow accumulation and melt based on temperature thresholds. Parameters ---------- data : pandas DataFrame Input data containing precipitation and temperature columns. precipitation : str, optional Name of the precipitation column in the input data. Default is 'precipitation'. temperature : str, optional' Name of the temperature column in the input data. Default is 'temperature'. snow_and_melt_temp : float, optional Temperature threshold for snow accumulation and melt. Default is 0.0 [C]. snow_and_melt_rate : float, optional Snow melt rate if temperature above threshold. Default is 3.0 [mm/timestep/degree C]. Returns ------- data : pandas DataFrame Input data with additional columns for snow depth and rain plus melt. Notes ----- The default values for snow_and_melt_temp and snow_and_melt_rate are given in units of degrees Celsius and millimeters per time step per degree Celsius, respectively. These are not used in the code however, as the function is designed to work with any units. For example, providing the input data in Kelvin and setting snow_and_melt_temp to 273.15 will work as expected. Similarly, if the input precipitation data is not in millimeters, simply providing the snow_and_melt_rate in those same units will yield the correct output. """ # Docs: 3 degrees C is a conservative estimate (see e.g.: https://tc.copernicus.org/articles/17/211/2023/) # Check that melt rate is not negative if snow_and_melt_rate < 0: raise ValueError(f"Snow melt rate must be non-negative. Currently set to: {snow_and_melt_rate}.") # Run a really simple time-stepping scheme to account for snow accumulation and melt. # We'll deal with the time step implicitly, simply assuming that delta t = 1. # We can get away with Explicit Euler (Snew = Sold + snowfall - snowmelt) because fall # and melt are mutually exclusive: no problems with ad-hoc operator splitting here. snow_depth = [] rain_plus_melt = [] Sold = 0 for _, row in data.iterrows(): # Determine snowfall or melt if row[temperature] > snow_and_melt_temp: melt = np.min([Sold, snow_and_melt_rate * (row[temperature] - snow_and_melt_temp)]) rain = row[precipitation] snow = 0 else: melt = 0 rain = 0 snow = row[precipitation] # Update the snow pack Snew = Sold + snow - melt # Retain the values snow_depth.append(Snew) rain_plus_melt.append(rain + melt) # Prepare for the next time step Sold = Snew # Outputs data["snow_depth"] = snow_depth data["rain_plus_melt"] = rain_plus_melt return data
# --- BME calculation functions
[docs] def bme_nse(q_obs, q_sim, q_bm, cal_mask, val_mask=None): """Calculate NSE-based Benchmark Efficiency (BME) for cal and val periods. The formulation can be found in Seibert (2001) and Schaefli and Gupta (2007). BME = 1 - sum((q_obs - q_sim)^2) / sum((q_obs - q_bm)^2) Parameters ---------- q_obs : pandas Series Observed streamflow. q_sim : pandas Series Simulated streamflow. q_bm : pandas Series Benchmark streamflow. cal_mask : pandas Series Boolean mask for the calibration period. val_mask : pandas Series, optional Boolean mask for the validation period. Default is None (no val score returned). Returns ------- cal_score : float NSE-based BME score for the calibration period. val_score : float NSE-based BME score for the validation period. NaN if no val_mask specified. References ---------- Seibert, J. (2001). On the need for benchmarks in hydrological modelling. Hydrological Processes, 15(6), 1063–1064. https://doi.org/10.1002/hyp.446 Schaefli, B., & Gupta, H. V. (2007). Do Nash values have value? Hydrological Processes, 21(15), 2075–2080. https://doi.org/10.1002/hyp.6825 """ def _nse_bme_score(mask): obs = q_obs[mask].values sim = q_sim[mask].values bm = q_bm[mask].values # filter nan for 3 arrays if np.all(np.isnan(obs)) or np.all(np.isnan(sim)) or np.all(np.isnan(bm)): return np.nan nan_mask = ~np.isnan(obs) & ~np.isnan(sim) & ~np.isnan(bm) obs, sim, bm = obs[nan_mask], sim[nan_mask], bm[nan_mask] denominator = np.sum((obs - bm) ** 2) if denominator == 0: return np.nan return 1 - np.sum((obs - sim) ** 2) / denominator cal_score = _nse_bme_score(cal_mask) val_score = _nse_bme_score(val_mask) if val_mask is not None else np.nan return cal_score, val_score
[docs] def bme_kge(q_obs, q_sim, q_bm, cal_mask, val_mask=None): """Calculate KGE-based Benchmark Model Efficiency (KGE skill score) for cal and val periods. This skill score formulation can be found in Knoben et al. (2019) among others. KGE_skill = (KGE_model - KGE_benchmark) / (1 - KGE_benchmark) Parameters ---------- q_obs : pandas Series Observed streamflow. q_sim : pandas Series Simulated streamflow. q_bm : pandas Series Benchmark streamflow. cal_mask : pandas Series Boolean mask for the calibration period. val_mask : pandas Series, optional Boolean mask for the validation period. Default is None (no val score returned). Returns ------- cal_score : float KGE skill score for the calibration period. val_score : float KGE skill score for the validation period. NaN if no val_mask specified. References ---------- Knoben, W. J. M., Freer, J. E., & Woods, R. A. (2019). Technical note: Inherent benchmark or not? Comparing Nash–Sutcliffe and Kling–Gupta efficiency scores. Hydrology and Earth System Sciences, 23(10), 4323–4331. https://doi.org/10.5194/hess-23-4323-2019 """ def _bme_kge_score(mask): obs = q_obs[mask].values sim = q_sim[mask].values bm = q_bm[mask].values kge_model = kge(obs, sim) kge_benchmark = kge(obs, bm) if np.isclose(kge_benchmark, 1.0): return np.nan return (kge_model - kge_benchmark) / (1 - kge_benchmark) cal_score = _bme_kge_score(cal_mask) val_score = _bme_kge_score(val_mask) if val_mask is not None else np.nan return cal_score, val_score