Source code for hydrobm.calculate

import pandas as pd
import xarray as xr

from .benchmarks import create_bm, evaluate_bm
from .utils import bme_kge, bme_nse, rain_to_melt


# Main function to calculate metric scores for a given set of benchmark models
[docs] def calc_bm( data, # Time period selection cal_mask, val_mask=[], # Variable names in 'data' precipitation="precipitation", streamflow="streamflow", # Benchmark choices benchmarks=["daily_mean_flow"], metrics=["rmse"], optimization_method="brute_force", # Snow model inputs calc_snowmelt=False, temperature="temperature", snowmelt_threshold=0.0, snowmelt_rate=3.0, ): """Calculate benchmark model scores for a given set of benchmark models and metrics. Parameters ---------- data : pandas DataFrame or xarray Dataset Input data containing precipitation and streamflow columns. cal_mask : pandas Series Boolean mask for the calculation period. val_mask : pandas Series, optional Boolean mask for the validation period. Default is [] (no validation scores returned). precipitation : str, optional Name of the precipitation column in the input data. Default is 'precipitation'. streamflow : str, optional Name of the streamflow column in the input data. Default is 'streamflow'. benchmarks : list, optional List of benchmark models to calculate. Default is ['daily_mean_flow']. metrics : list, optional List of metrics to calculate. Default is ['rmse']. optimization_method : str, optional Optimization method to use for benchmark model calibration. Default is 'brute_force'. calc_snowmelt : bool, optional Flag to run a basic snow accumulation and melt model. Default is False. temperature : str, optional Name of the temperature column in the input data. Default is 'temperature'. snowmelt_threshold : float, optional Threshold temperature for snowmelt calculation. Default is 0.0 [C]. Returns ------- benchmark_flows : pandas DataFrame DataFrame containing benchmark flows for each benchmark model. metrics : dict Dictionary containing metric scores for each benchmark model. """ # Input handling: if xarray dataset is provided, convert to DataFrame if isinstance(data, xr.Dataset): data = data.to_dataframe() # Input check: data is a pandas DataFrame. if not isinstance(data, pd.DataFrame): raise TypeError("Input data must be a pandas DataFrame or xarray Dataset") # Input check: precipitation and streamflow columns are present in the DataFrame if precipitation not in data.columns: raise ValueError(f"Precipitation column {precipitation} not found in the input data") if streamflow not in data.columns: raise ValueError(f"Streamflow column {streamflow} not found in the input data") # Input check: DataFrame has a datetime index if not isinstance(data.index, pd.DatetimeIndex): raise ValueError("Input data must have a datetime index") # Input check: calibration and validation masks same length as the data index if len(cal_mask) != len(data.index): raise ValueError("Benchmark calculation mask does not match length of data index") if len(val_mask) != len(data.index): raise ValueError("benchmark evaluation mask does not match length of data index") # Run a basic snow model if requested if calc_snowmelt: # Input check: temperature column is present in input data if temperature not in data.columns: raise ValueError(f"Temperature column {temperature} not found in the input data") # Calculate snowmelt data = rain_to_melt( data, precipitation=precipitation, temperature=temperature, snow_and_melt_temp=snowmelt_threshold, snow_and_melt_rate=snowmelt_rate, ) # Update the precipitation variable to instead use rain_plus_melt precipitation = "rain_plus_melt" # First create the benchmark flows as a one-off benchmark_flow_list = ( [] ) # list to store DataFrames of benchmark flows, merged later if multiple benchmarks are requested for benchmark in benchmarks: _, qbm = create_bm( data, benchmark, cal_mask, precipitation=precipitation, streamflow=streamflow, optimization_method=optimization_method, ) # Create the benchmark flow for calibration period benchmark_flow_list.append(qbm) # Then loop over the metrics to calculate scores for each benchmark flow results = {"benchmarks": benchmarks} # dictionary to store metric scores; tracks the benchmark models used for metric in metrics: cal_scores = [] val_scores = [] for benchmark_flow in benchmark_flow_list: [cal_score, val_score] = evaluate_bm( data, benchmark_flow, metric, cal_mask, val_mask=val_mask, streamflow=streamflow ) cal_scores.append(cal_score) val_scores.append(val_score) results.update({metric + "_cal": cal_scores, metric + "_val": val_scores}) return pd.concat(benchmark_flow_list, axis=1), results
# Function to calculate benchmark efficiency (BME) scores
[docs] def calc_bme( data, # Time period selection cal_mask, val_mask=[], # Variable names in 'data' precipitation="precipitation", streamflow="streamflow", simulated_flow="simulated_flow", # Benchmark choices benchmarks=["daily_mean_flow"], metrics=["rmse"], optimization_method="brute_force", # BME formulation formulation="bme_nse", # Snow model inputs calc_snowmelt=False, temperature="temperature", snowmelt_threshold=0.0, snowmelt_rate=3.0, ): """Calculate Benchmark Efficiency (BME) scores alongside standard metric scores. Parameters ---------- data : pandas DataFrame or xarray Dataset Input data containing precipitation, streamflow, and simulated flow columns. cal_mask : pandas Series Boolean mask for the calibration period. val_mask : pandas Series, optional Boolean mask for the validation period. Default is [] (no validation scores returned). precipitation : str, optional Name of the precipitation column in the input data. Default is 'precipitation'. streamflow : str, optional Name of the streamflow column in the input data. Default is 'streamflow'. simulated_flow : str, optional Name of the simulated flow column in the input data. Default is 'simulated_flow'. benchmarks : list, optional List of benchmark models to calculate. Default is ['daily_mean_flow']. metrics : list, optional List of metrics to calculate via calc_bm. Default is ['rmse']. optimization_method : str, optional Optimization method for benchmark model calibration. Default is 'brute_force'. formulation : str, optional BME formulation. Options: - 'bme_nse' (default): BME = 1 - sum((q_obs-q_sim)^2) / sum((q_obs-q_b)^2) - 'bme_kge': BME = (KGE_model - KGE_benchmark) / (1 - KGE_benchmark) calc_snowmelt : bool, optional Flag to run a basic snow accumulation and melt model. Default is False. temperature : str, optional Name of the temperature column in the input data. Default is 'temperature'. snowmelt_threshold : float, optional Threshold temperature for snowmelt. Default is 0.0 [C]. snowmelt_rate : float, optional Rate of snowmelt. Default is 3.0. Returns ------- bme_scores : dict Dictionary of BME scores for each benchmark. benchmark_flows : pandas DataFrame DataFrame containing benchmark flows for each benchmark model (from calc_bm). results : dict Dictionary of standard metric scores for each benchmark (from calc_bm). """ # Input check: convert xarray Dataset to DataFrame if isinstance(data, xr.Dataset): data = data.to_dataframe() # Input check: data is a pandas DataFrame if not isinstance(data, pd.DataFrame): raise TypeError("Input data must be a pandas DataFrame or xarray Dataset") # Input check: simulated flow column is present if simulated_flow not in data.columns: raise ValueError(f"Simulated flow column '{simulated_flow}' not found in the input data") # Input check: valid formulation valid_formulations = ["bme_nse", "bme_kge"] if formulation not in valid_formulations: raise ValueError(f"formulation must be one of {valid_formulations}, got '{formulation}'") # Call calc_bm to obtain benchmark flows and metric scores benchmark_flows, results = calc_bm( data, cal_mask=cal_mask, val_mask=val_mask, precipitation=precipitation, streamflow=streamflow, benchmarks=benchmarks, metrics=metrics, optimization_method=optimization_method, calc_snowmelt=calc_snowmelt, temperature=temperature, snowmelt_threshold=snowmelt_threshold, snowmelt_rate=snowmelt_rate, ) # Compute BME scores per benchmark q_obs = data[streamflow] q_sim = data[simulated_flow] bme_cal_scores = [] bme_val_scores = [] for i, benchmark in enumerate(benchmarks): q_bm = benchmark_flows["bm_" + benchmark] if formulation == "bme_nse": bme_cal, bme_val = bme_nse(q_obs, q_sim, q_bm, cal_mask, val_mask=val_mask) elif formulation == "bme_kge": bme_cal, bme_val = bme_kge(q_obs, q_sim, q_bm, cal_mask, val_mask=val_mask) bme_cal_scores.append(bme_cal) bme_val_scores.append(bme_val) # Package BME scores bme_scores = { "benchmarks": benchmarks, f"{formulation}_cal": bme_cal_scores, f"{formulation}_val": bme_val_scores, } return bme_scores, benchmark_flows, results