Source code for oats.threshold.spot

"""
Streaming Peaks-Over-Threshold (SPOT)
-----------------
"""
import time
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from statsmodels.robust.scale import huber
from scipy.stats import genpareto
from scipy.stats import norm

from oats.threshold._base import Threshold


[docs]class SPOTThreshold(Threshold): """Streaming version of POT Fit the tails of the data with Generalized Pareto Distribution (GPD). Find the threshold where `P(thres) < q`. Usual values for q is 1e-3 to 1e-6. `fit()` is required to compute the initial threshold for the first `n` points. Siffer, Alban and Fouque, Pierre-Alain and Termier, Alexandre and Largouet, Christine "Anomaly Detection in Streams with Extreme Value Theory" https://doi.org/10.1145/3097983.3098144 """ def __init__( self, q: float = 1e-4, level: float = 0.95, memory: int = 2000, support: float = 0, init_cutoff: float = 1, robust: bool = False, estimator: str = "MoM", **kwargs ): """ Args: q (float, optional): q level such that `P(threshold) < q`. Defaults to 1e-4. level (float, optional): threshold to fit tail distribution. Defaults to 0.95. memory (int, optional): how many data points for the tails. Defaults to 2000. support (float, optional): used for method of moments estimation to prevent a bad fit (MoM is fast but unreliable); threshold is computed as the maximum of `GPD` and `support * HalfNormal`. Defaults to 0. init_cutoff (float, optional): used for method of moments estimation to prevent a bad fit (MoM is not robust against outliers); only bottom `init_cutoff` deciles are used to fit the GPD distribution. Defaults to 1. robust (bool, optional): (experimental) using robust estimates of `mean` and `variance` to fit GPD using MoM. Defaults to False. estimator (str, optional): estimator used to fit GPD distribution; accepts 'MoM' or 'MLE'. Defaults to "MoM". """ self.support = support self.init_cutoff = init_cutoff self.robust = robust self.q = q self.level = level self.memory = memory # Rolling Data self.X = np.array([]) # Peak Set self.Y = np.array([]) # len(data set) self.n = 0 # I -> indices of peaks in order to maintain the correct n_peak/n ratio self.I = np.array([]) # thresholds self.percentile_thres = 0 self.spot_thres = 0 self.estimator = estimator self._thresholders = None @property def adjusted_n(self): return self.n - self.I[0] # TODO: typing # data -> np.NDArray
[docs] def fit(self, data): multivar = True if data.ndim > 1 and data.shape[1] > 1 else False if multivar: self._thresholders = self._pseudo_mv_fit(data) return for x in data: self._add_mem(x) self.spot_thres = self._spot_initialize(data)
def _step(self, x_t) -> float: """ Stepping + Updating SPOT """ self._add_mem(x_t) new_p_thres = _GPDThreshold.get_tail_threshold(self.X, self.level) self.spot_thres += new_p_thres - self.percentile_thres self.percentile_thres = new_p_thres if self.X[-1] > self.spot_thres: """erasing anomalous data point from history""" self.X[-1] = 0 elif self.X[-1] > self.percentile_thres: # add obs to peak set y_i = self.X[-1] - self.percentile_thres self._add_peak(y_i) sigma, gamma = _GPDThreshold.get_gpd_params( self.Y, robust=self.robust, estimator=self.estimator ) self.spot_thres = max( _GPDThreshold.calc_spot_threshold( self.percentile_thres, sigma, gamma, self.adjusted_n, self.Y.size, self.q, ), _GPDThreshold.calc_half_normal_threshold( self.percentile_thres, self.Y.std(ddof=1), self.q, support=self.support, ), ) return self.spot_thres
[docs] def get_threshold(self, data): multivar = True if data.ndim > 1 and data.shape[1] > 1 else False if multivar: if not self._thresholders: self._thresholders = [self] * data.shape[1] return self._handle_multivariate(data, self._thresholders) ret = np.array([]) for x in data: ret = np.append(ret, self._step(x)) return ret
@property def n_peaks(self): return self.Y.size def _spot_initialize(self, S): self.percentile_thres = _GPDThreshold.get_tail_threshold(S, self.level) Y = _GPDThreshold.get_peaks(S, self.percentile_thres) if self.init_cutoff < 1: cutoff = _GPDThreshold.get_tail_threshold(Y, self.init_cutoff) Y = Y[Y < cutoff] for y in Y: self._add_peak(y) sigma, gamma = _GPDThreshold.get_gpd_params( Y, robust=self.robust, estimator=self.estimator ) n_y = len(Y) n = len(S) spot_thres = max( _GPDThreshold.calc_spot_threshold( self.percentile_thres, sigma, gamma, n, n_y, self.q ), _GPDThreshold.calc_half_normal_threshold( self.percentile_thres, Y.std(ddof=1), self.q, support=self.support ), ) return spot_thres def _add_peak(self, y): if self.Y.size <= self.memory: self.Y = np.append(self.Y, y) else: self.Y = np.roll(self.Y, -1) self.Y[-1] = y if self.I.size <= self.memory + 1: self.I = np.append(self.I, self.n) else: self.I = np.roll(self.I, -1) self.I[-1] = self.n def _add_mem(self, x): if self.X.size <= self.memory: self.X = np.append(self.X, x) else: self.X = np.roll(self.X, -1) self.X[-1] = x self.n += 1
# SPOT based on MoM-Estimation Helper Class class _GPDThreshold: @classmethod def get_tail_threshold(cls, data, level) -> float: return np.quantile(data, level) @classmethod def get_peaks(cls, data, threshold, n_sample_min=5): x = data.copy() x = x[x >= threshold] if len(x) < n_sample_min: threshold *= 0.95 threshold = 0 if threshold < 1e-3 else threshold return cls.get_peaks(data, threshold, n_sample_min) return x - threshold @classmethod def get_gpd_params_mv(cls, mean, var): if estimator == "MoM": if var == 0: return 1, 1 sigma = mean / 2 * (1 + mean**2 / var) gamma = 1 / 2 * (1 - (mean**2 / var)) return sigma, gamma @classmethod def get_gpd_params(cls, peaks, robust=False, estimator="MoM"): y = peaks.copy() if estimator != "MoM": mu, sigma, gamma = genpareto.fit(y) return sigma, gamma if robust: try: huber.maxiter = 1000 huber.tol = 5e-2 mu, std = huber(y) var_y = std**2 except ValueError: mu, var_y = y.mean(), y.var(ddof=1) else: mu, var_y = y.mean(), y.var(ddof=1) sigma = mu / 2 * (1 + mu**2 / var_y) gamma = 1 / 2 * (1 - (mu**2 / var_y)) return sigma, gamma @classmethod def calc_spot_threshold(cls, initial_threshold, sigma, gamma, n, n_peaks, q): delta = sigma / gamma * ((q * n / n_peaks) ** (-gamma) - 1) new_thres = initial_threshold + delta return new_thres @classmethod def calc_half_normal_threshold(cls, initial_threshold, std_dev, q, support=1): return initial_threshold + std_dev * norm.ppf(((1 - q) + 1) / 2) * support