Source code for SeqMetrics.utils

import os
import warnings
import itertools
from typing import Union
from types import FunctionType
from collections import OrderedDict

import scipy
import numpy as np
from scipy.special import xlogy
from scipy.stats import skew, kurtosis, variation, gmean

try:
    import plotly.graph_objects as go
except ModuleNotFoundError:
    go = None


def take(st, en, d):
    keys = list(d.keys())[st:en]
    values = list(d.values())[st:en]

    return {k: v for k, v in zip(keys, values)}


[docs]def plot_metrics( metrics: dict, ranges: tuple = ((0.0, 1.0), (1.0, 10), (10, 1000)), exclude: list = None, plot_type: str = 'bar', max_metrics_per_fig: int = 15, show: bool = True, save: bool = False, save_path: str = '', **kwargs): """ Plots the metrics given as dictionary as radial or bar plot between specified ranges. Arguments: metrics: dictionary whose keys are names are erros and values are error values. ranges: tuple of tuples defining range of errors to plot in one plot exclude: List of metrics to be excluded from plotting. max_metrics_per_fig: maximum number of metrics to show in one figure. plot_type: either of ``radial`` or ``bar``. show : If, then figure will be shown/drawn save: if True, the figure will be saved. save_path: if given, the figure will the saved at this location. kwargs: keyword arguments for plotting Examples: >>> import numpy as np >>> from SeqMetrics import RegressionMetrics >>> from SeqMetrics import plot_metrics >>> t = np.random.random((20, 1)) >>> p = np.random.random((20, 1)) >>> er = RegressionMetrics(t, p) >>> all_errors = er.calculate_all() >>> plot_metrics(all_errors, plot_type='bar', max_metrics_per_fig=50) >>> # or draw the radial plot >>> plot_metrics(all_errors, plot_type='radial', max_metrics_per_fig=50) ``` """ for idx, rng in enumerate(ranges): assert rng[1] > rng[0], f'For range {idx}, second value: {rng[1]} is not greater than first value: {rng[0]}. ' assert len(rng) == 2, f"Range number {idx} has length {len(rng)}. It must be a tuple of length 2." if exclude is None: exclude = [] _metrics = metrics.copy() for k in metrics.keys(): if k in exclude: _metrics.pop(k) assert plot_type in ['bar', 'radial'], f'plot_type must be either `bar` or `radial`.' for _range in ranges: plot_metrics_between( _metrics, *_range, plot_type=plot_type, max_metrics_per_fig=max_metrics_per_fig, show=show, save=save, save_path=save_path, **kwargs) return
def plot_metrics_between( errors: dict, lower: int, upper: int, plot_type: str = 'bar', max_metrics_per_fig: int = 15, save=False, show=True, save_path='', **kwargs): import matplotlib.pyplot as plt if plot_type == "bar": from easy_mpl import circular_bar_plot selected_metrics = {} for k, v in errors.items(): if v is not None: if lower < v < upper: selected_metrics[k] = v st = 0 n = len(selected_metrics) sequence = np.linspace(0, n, int(n / max_metrics_per_fig) + 1) if len(sequence) == 1 and n > 0: sequence = np.array([0, len(selected_metrics)]) for i in np.array(sequence, dtype=np.int32): if i == 0: pass else: en = i d = take(st, en, selected_metrics) if plot_type == 'radial': plot_radial(d, lower, upper, save=save, show=show, save_path=save_path, **kwargs) elif len(d) < 10: pass else: plt.close('all') _ = circular_bar_plot(d, show=False, **kwargs) if save: plt.savefig( os.path.join(save_path, f"errors_{lower}_{upper}_{st}_{en}.png"), bbox_inches="tight") if show: plt.show() st = i return def plot_radial(errors: dict, low: int, up: int, save=True, save_path=None, **kwargs): """Plots all the errors in errors dictionary. low and up are used to draw the limits of radial plot.""" if go is None: print("can not plot radial plot because plotly is not installed.") return fill = kwargs.get('fill', None) fillcolor = kwargs.get('fillcolor', None) line = kwargs.get('line', None) marker = kwargs.get('marker', None) OrderedDict(sorted(errors.items(), key=lambda kv: kv[1])) lower = round(np.min(list(errors.values())), 4) upper = round(np.max(list(errors.values())), 4) fig = go.Figure() categories = list(errors.keys()) fig.add_trace(go.Scatterpolar( r=list(errors.values()), theta=categories, # angular coordinates fill=fill, fillcolor=fillcolor, line=line, marker=marker, name='errors' )) fig.update_layout( title_text=f"Errors from {lower} to {upper}", polar=dict( radialaxis=dict( visible=True, range=[low, up] )), showlegend=False ) fig.show() if save: fname = f"radial_errors_from_{lower}_to_{upper}.png" if save_path is not None: fname = os.path.join(save_path, fname) fig.write_image(fname) return def plot1d(true, predicted, save=True, name="plot", show=False): import matplotlib.pyplot as plt _, axis = plt.subplots() axis.plot(np.arange(len(true)), true, label="True") axis.plot(np.arange(len(predicted)), predicted, label="Predicted") axis.legend(loc="best") if save: plt.savefig(name, dpi=300, bbox_inches='tight') if show: plt.show() plt.close('all') return def _foo(denominator, numerator): nonzero_numerator = numerator != 0 nonzero_denominator = denominator != 0 valid_score = nonzero_numerator & nonzero_denominator output_scores = np.ones(1) output_scores[valid_score] = 1 - (numerator[valid_score] / denominator[valid_score]) output_scores[nonzero_numerator & ~nonzero_denominator] = 0. return output_scores def _mean_tweedie_deviance(y_true, y_pred, power=0, weights=None): # copying from # https://github.com/scikit-learn/scikit-learn/blob/95d4f0841d57e8b5f6b2a570312e9d832e69debc/sklearn/metrics/_regression.py#L659 message = ("Mean Tweedie deviance error with power={} can only be used on " .format(power)) if power < 0: # 'Extreme stable', y_true any real number, y_pred > 0 if (y_pred <= 0).any(): raise ValueError(message + "strictly positive y_pred.") dev = 2 * (np.power(np.maximum(y_true, 0), 2 - power) / ((1 - power) * (2 - power)) - y_true * np.power(y_pred, 1 - power) / (1 - power) + np.power(y_pred, 2 - power) / (2 - power)) elif power == 0: # Normal distribution, y_true and y_pred any real number dev = (y_true - y_pred) ** 2 elif power < 1: raise ValueError("Tweedie deviance is only defined for power<=0 and " "power>=1.") elif power == 1: # Poisson distribution, y_true >= 0, y_pred > 0 if (y_true < 0).any() or (y_pred <= 0).any(): raise ValueError(message + "non-negative y_true and strictly " "positive y_pred.") dev = 2 * (xlogy(y_true, y_true / y_pred) - y_true + y_pred) elif power == 2: # Gamma distribution, y_true and y_pred > 0 if (y_true <= 0).any() or (y_pred <= 0).any(): raise ValueError(message + "strictly positive y_true and y_pred.") dev = 2 * (np.log(y_pred / y_true) + y_true / y_pred - 1) else: if power < 2: # 1 < p < 2 is Compound Poisson, y_true >= 0, y_pred > 0 if (y_true < 0).any() or (y_pred <= 0).any(): raise ValueError(message + "non-negative y_true and strictly " "positive y_pred.") else: if (y_true <= 0).any() or (y_pred <= 0).any(): raise ValueError(message + "strictly positive y_true and " "y_pred.") dev = 2 * (np.power(y_true, 2 - power) / ((1 - power) * (2 - power)) - y_true * np.power(y_pred, 1 - power) / (1 - power) + np.power(y_pred, 2 - power) / (2 - power)) return float(np.average(dev, weights=weights)) def _geometric_mean(a, axis=0, dtype=None): """ Geometric mean """ if not isinstance(a, np.ndarray): # if not an ndarray object attempt to convert it log_a = np.log(np.array(a, dtype=dtype)) elif dtype: # Must change the default dtype allowing array type if isinstance(a, np.ma.MaskedArray): log_a = np.log(np.ma.asarray(a, dtype=dtype)) else: log_a = np.log(np.asarray(a, dtype=dtype)) else: log_a = np.log(a) return float(np.exp(log_a.mean(axis=axis))) def listMethods(cls): return set(x for x, y in cls.__dict__.items() if isinstance(y, (FunctionType, classmethod, staticmethod))) def listParentMethods(cls): return set(itertools.chain.from_iterable( listMethods(c).union(listParentMethods(c)) for c in cls.__bases__)) def list_subclass_methods(cls, is_narrow, ignore_underscore=True, additional_ignores=None): """Finds all methods of a child class""" methods = listMethods(cls) if is_narrow: parent_methods = listParentMethods(cls) methods = set(cls for cls in methods if not (cls in parent_methods)) if additional_ignores is not None: methods = methods - set(additional_ignores) if ignore_underscore: methods = set(cls for cls in methods if not cls.startswith('_')) return methods def features(data: Union[np.ndarray, list], precision: int = 3, name: str = '', st: int = 0, en: int = None, features: Union[list, str] = None ) -> dict: """ Extracts features from 1d time series data. Features can be * point, one integer or float point value for example mean * 1D, 1D array for example sin(data) * 2D, 2D array for example wavelent transform Arguments --------- data: array like precision: number of significant figures name: str, only for erro or warning messages st: str/int, starting index of data to be considered. en: str/int, end index of data to be considered. features: name/names of features to extract from data. # information holding degree """ point_features = { 'Skew': skew, 'Kurtosis': kurtosis, 'Mean': np.nanmean, 'Geometric Mean': gmean, 'Standard error of mean': scipy.stats.sem, 'Median': np.nanmedian, 'Variance': np.nanvar, 'Coefficient of Variation': variation, 'Std': np.nanstd, 'Non Zeros': np.count_nonzero, 'Min': np.nanmin, 'Max': np.nanmax, 'Sum': np.nansum, 'Counts': np.size } point_features_lambda = { # 'Shannon entropy': lambda x: np.round(scipy.stats.entropy(pd.Series(x).value_counts()), precision), 'Negative counts': lambda x: int(np.sum(x < 0.0)), '90th percentile': lambda x: round(np.nanpercentile(x, 90), precision), '75th percentile': lambda x: round(np.nanpercentile(x, 75), precision), '50th percentile': lambda x: round(np.nanpercentile(x, 50), precision), '25th percentile': lambda x: round(np.nanpercentile(x, 25), precision), '10th percentile': lambda x: round(np.nanpercentile(x, 10), precision), } if not isinstance(data, np.ndarray): if hasattr(data, '__len__'): data = np.array(data) else: raise TypeError(f"{name} must be array like but it is of type {data.__class__.__name__}") if np.array(data).dtype.type is np.str_: warnings.warn(f"{name} contains string values") return {} if 'int' not in data.dtype.name: if 'float' not in data.dtype.name: warnings.warn(f"changing the dtype of {name} from {data.dtype.name} to float") data = data.astype(np.float64) assert data.size == len(data), f""" data must be 1 dimensional array but it has shape {np.shape(data)} """ data = data[st:en] stats = dict() if features is None: features = list(point_features.keys()) + list(point_features_lambda.keys()) elif isinstance(features, str): features = [features] for feat in features: if feat in point_features: stats[feat] = np.round(point_features[feat](data), precision) elif feat in point_features_lambda: stats[feat] = point_features_lambda[feat](data) for k, v in stats.items(): if 'int' in v.__class__.__name__: stats[k] = int(v) else: stats[k] = round(float(v), precision) return stats def maybe_to_oneD_array(array_like): """converts x to 1D array if possible otherwise returns as it is """ if array_like.__class__.__name__ in ['list', 'tuple', 'Series', 'int', 'float']: return np.array(array_like) if isinstance(array_like, np.ndarray): if len(array_like) == array_like.size: return array_like.reshape(-1, ) return array_like def to_oneD_array(array_like): """converts x to 1D array and if not possible, raises ValueError Returned array will have shape (n,) """ if array_like.__class__.__name__ in ['list', 'tuple', 'Series', 'int', 'float']: return np.array(array_like) elif array_like.__class__.__name__ == 'ndarray': if array_like.ndim == 1: return array_like else: if array_like.size != len(array_like): raise ValueError(f'cannot convert multidim array of shape {array_like.shape} to 1d') return array_like.reshape(-1, ) elif array_like.__class__.__name__ == 'DataFrame' and array_like.ndim == 2: assert len(array_like) == array_like.size return array_like.values.reshape(-1, ) elif array_like.__class__.__name__ == "Series": return array_like.values.reshape(-1, ) elif isinstance(array_like, float) or isinstance(array_like, int): return np.array([array_like]) else: raise ValueError(f'cannot convert object {array_like.__class__.__name__} to 1d ') METRIC_TYPES = { "r2": "max", "nse": "max", "nse_alpha": "max", "nse_beta": "max", "nse_mod": "max", "nse_rel": "max", "nse_bound": "max", "r2_score": "max", "adjusted_r2": "max", "kge": "max", "kge_bound": "max", "kge_mod": "max", "kge_np": "max", 'log_nse': 'max', "corr_coeff": "max", 'accuracy': "max", 'f1_score': 'max', "mse": "min", "rmse": "min", "rmsle": "min", "mape": "min", "nrmse": "min", "pbias": "min", "bias": "min", "med_seq_error": "min", "mae": "min", "abs_pbias": "min", "gmae": "min", "inrse": "min", "irmse": "min", "mase": "min", "mare": "min", "msle": "min", "decomposed_mse": "min", "euclid_distance": "min", 'exp_var_score': 'max', 'expanded_uncertainty': 'min', 'fdc_fhv': 'min', 'fdc_flv': 'min', 'gmean_diff': 'min', 'gmrae': 'min', 'JS': 'min', 'kendaull_tau': 'max', 'kgeprime_c2m': 'max', 'kgenp_bound': 'max', 'kl_sym': 'min', 'lm_index': 'max', 'maape': 'min', 'mbe': 'min', 'mbrae': 'min', 'mapd': 'min', 'max_error': 'min', 'rse': 'min', 'rrse': 'min', 'rae': 'min', 'ref_agreement_index': 'max', 'rel_agreement_index': 'max', 'relative_rmse': 'min', 'rmspe': 'min', 'rsr': 'min', 'rmsse': 'min', 'sa': 'min', 'sc': 'min', 'sga': 'min', 'smape': 'min', 'smdape': 'min', 'sid': 'min', 'skill_score_murphy': 'max', 'std_ratio': 'min', 'umbrae': 'min', 've': 'min', 'volume_error': 'min', 'wape': 'min', 'watt_m': 'min', 'wmape': 'min', 'norm_ape': 'min', 'post_process_kge': 'min', 'spearmann_corr': 'min', 'log1p': 'min', 'covariance': 'min', 'brier_score': 'min', 'bic': 'min', 'sse': 'min', 'amemiya_pred_criterion': 'min', 'amemiya_adj_r2': 'min', 'aitchison': 'min', 'log_t': 'min', 'log_p': 'min', '_assert_greater_than_one': 'min', 'acc': 'min', 'agreement_index': 'min', 'aic': 'min', 'cronbach_alpha': 'min', 'centered_rms_dev': 'min', 'cosine_similarity': 'min', '_error': 'min', '_relative_error': 'min', '_naive_prognose': 'min', '_minimal': 'min', '_hydro_metrics': 'min', 'calculate_hydro_metrics': 'min', '_bounded_relative_error': 'min', '_ae': 'min', 'mb_r': 'min', 'mda': 'min', 'mde': 'min', 'mdape': 'min', 'mdrae': 'min', 'me': 'min', 'mean_bias_error': 'min', 'mean_var': 'min', 'mean_poisson_deviance': 'min', 'mean_gamma_deviance': 'min', 'median_abs_error': 'min', 'mle': 'min', 'mod_agreement_index': 'min', 'mpe': 'min', 'mrae': 'min', 'norm_euclid_distance': 'min', 'nrmse_range': 'min', 'nrmse_ipercentile': 'min', 'nrmse_mean': 'min', 'norm_ae': 'min', 'log_prob': 'min', 'rmdspe': 'min', 'variability_ratio': 'min', } def _assert_1darray(array_like, metric_type: str) -> np.ndarray: """Makes sure that the provided `array_like` is 1D numpy array""" # this will convert tensorflow and torch tensors to numpy if hasattr(array_like, 'numpy') and callable(getattr(array_like, 'numpy')): array_like = getattr(array_like, 'numpy')() # this will cover xarray DataArray if hasattr(array_like, 'to_numpy') and callable(getattr(array_like, 'to_numpy')): array_like = getattr(array_like, 'to_numpy')() if metric_type == "regression": return to_oneD_array(array_like) return maybe_to_oneD_array(array_like) def maybe_treat_arrays( preprocess: bool = None, true=None, predicted=None, metric_type: str = None, **process_kws ): """ This function is applied by default at the start/at the time of initiating the class. However, it can be used any time after that. This can be handy if we want to calculate error first by ignoring nan and then by no ignoring nan. Adopting from HydroErr_ . Removes the nan, negative, and inf values in two numpy arrays .. _HydroErr: https://github.com/BYU-Hydroinformatics/HydroErr/blob/master/HydroErr/HydroErr.py#L6210 parameters ---------- preprocess: bool, default None if True, preprocess the true and predicted arrays true: array_like array of true/actual/observed values predicted: array_like array of predicted/simulated/calculated values metric_type: str type of metric, either "regression" or "classification" process_kws: keyword arguments for preprocessing - remove_nan: bool, default True - remove_inf : bool, - replace_nan: float, default None - remove_zero: bool, default None - remove_neg: bool, default None - replace_inf: float, default None """ if preprocess: predicted = _assert_1darray(predicted, metric_type) true = _assert_1darray(true, metric_type) assert len(predicted) == len(true), """ lengths of provided arrays mismatch, predicted array: {}, true array: {} """.format(len(predicted), len(true)) if metric_type == 'regression': # todo true, predicted = treat_arrays(true, predicted, **process_kws) return true, predicted def treat_arrays( true, predicted, remove_nan: bool = True, remove_inf: bool = True, replace_nan=None, remove_zero=None, remove_neg=None, replace_inf=None ): sim_copy = np.copy(predicted) obs_copy = np.copy(true) sim_copy, obs_copy = maybe_replace(sim_copy, obs_copy, replace_nan, replace_inf, remove_zero, remove_neg) if remove_nan: sim_copy, obs_copy = maybe_remove_nan(sim_copy, obs_copy) if remove_inf: sim_copy, obs_copy = maybe_remove_inf(sim_copy, obs_copy) return obs_copy, sim_copy def maybe_remove_nan(sim_copy, obs_copy): data = np.array([sim_copy.flatten(), obs_copy.flatten()]) data = np.transpose(data) if data.dtype.kind not in {'U', 'S'}: data = data[~np.isnan(data).any(1)] # TODO check NaNs in an array containing strings sim_copy, obs_copy = data[:, 0], data[:, 1] return sim_copy, obs_copy def maybe_remove_inf(sim_copy, obs_copy): data = np.array([sim_copy.flatten(), obs_copy.flatten()]) data = np.transpose(data) if data.dtype.kind not in {'U', 'S'}: data = data[~np.isinf(data).any(1)] # TODO infinity NaNs in an array containing strings sim_copy, obs_copy = data[:, 0], data[:, 1] return sim_copy, obs_copy def maybe_replace(sim_copy, obs_copy, replace_nan, replace_inf, remove_zero, remove_neg): # Treat missing data in observed_array and simulated_array, rows in simulated_array or # observed_array that contain nan values all_treatment_array = np.ones(obs_copy.size, dtype=bool) if replace_nan and (np.any(np.isnan(obs_copy)) or np.any(np.isnan(sim_copy))): # Finding the NaNs sim_nan = np.isnan(sim_copy) obs_nan = np.isnan(obs_copy) # Replacing the NaNs with the input sim_copy[sim_nan] = replace_nan obs_copy[obs_nan] = replace_nan warnings.warn("Elements(s) {} contained NaN values in the simulated array and " "elements(s) {} contained NaN values in the observed array and have been " "replaced (Elements are zero indexed).".format(np.where(sim_nan)[0], np.where(obs_nan)[0]), UserWarning) if replace_inf and (np.any(np.isinf(obs_copy)) or np.any(np.isinf(sim_copy))): # Finding the NaNs sim_inf = np.isinf(sim_copy) obs_inf = np.isinf(obs_copy) # Replacing the NaNs with the input sim_copy[sim_inf] = replace_inf obs_copy[obs_inf] = replace_inf warnings.warn("Elements(s) {} contained Inf values in the simulated array and " "elements(s) {} contained Inf values in the observed array and have been " "replaced (Elements are zero indexed).".format(np.where(sim_inf)[0], np.where(obs_inf)[0]), UserWarning) # Treat zero data in observed_array and simulated_array, rows in simulated_array or # observed_array that contain zero values if remove_zero: if (obs_copy == 0).any() or (sim_copy == 0).any(): zero_indices_fcst = ~(sim_copy == 0) zero_indices_obs = ~(obs_copy == 0) all_zero_indices = np.logical_and(zero_indices_fcst, zero_indices_obs) all_treatment_array = np.logical_and(all_treatment_array, all_zero_indices) warnings.warn( "Row(s) {} contained zero values and the row(s) have been removed (Rows are " "zero indexed).".format(np.where(~all_zero_indices)[0]), UserWarning ) # Treat negative data in observed_array and simulated_array, rows in simulated_array or # observed_array that contain negative values # Ignore runtime warnings from comparing if remove_neg: with np.errstate(invalid='ignore'): obs_copy_bool = obs_copy < 0 sim_copy_bool = sim_copy < 0 if obs_copy_bool.any() or sim_copy_bool.any(): neg_indices_fcst = ~sim_copy_bool neg_indices_obs = ~obs_copy_bool all_neg_indices = np.logical_and(neg_indices_fcst, neg_indices_obs) all_treatment_array = np.logical_and(all_treatment_array, all_neg_indices) warnings.warn("Row(s) {} contained negative values and the row(s) have been " "removed (Rows are zero indexed).".format(np.where(~all_neg_indices)[0]), UserWarning) obs_copy = obs_copy[all_treatment_array] sim_copy = sim_copy[all_treatment_array] return sim_copy, obs_copy