Source code for oats.models.predictive.fluxev

"""
FluxEV
-----------------
"""

import time

from oats.models._base import Model
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from sklearn.neighbors import KernelDensity
from sklearn.model_selection import GridSearchCV
from scipy.stats import norm, genpareto


class _FluxEV2000:
    def __init__(
        self,
        window,
        q,
        level,
        alpha=0.7,
        method="all",
        p=None,
        memory=2000,
        bw_reset_every=2000,
        support=0,
        init_cutoff=1,
        mle=False,
        **kwargs,
    ):
        if method.lower() not in ["all", "above", "below"]:
            raise ValueError(
                f"method parameter expexts ['all', 'above', 'below'] but passed in {method}"
            )
        self.bw_reset_every = bw_reset_every
        self.support = support
        self.init_cutoff = init_cutoff

        self.s = window
        self.q = q
        self.level = level
        self.p = 1 if p is None else p
        self.memory = memory

        # EWMA Decay Factor
        # alpha -> 0 => normal EMA
        # alpha -> 1 => EWMA -> X[-1]
        self.alpha = alpha

        # Method: Literal["all", "above", "below"]
        # all    => admit all points
        # above  => admit only those above moving avg
        # below  => admit only those below moving average
        self.method = method

        # Raw Data
        self.X = np.zeros(window)

        # Residual after EWMA
        self.E = np.zeros(window)

        # Residual after First Smoothing
        self.F = np.zeros(p)

        # Peak Set
        self.Y = np.array([])

        # F Memory
        self.F_mem = np.array([])

        # S -> Residual after second smoothing
        self.S = np.zeros(1)

        # len(data set)
        self.n = 0

        # I -> indices of peaks in order to maintain the correct n_peak/n ratio
        self.I = np.array([0])

        # thresholds
        self.percentile_thres = 0
        self.spot_thres = 0

        # timer
        self.perf_counter = 0

        # Use MLE Estimation (more stable)
        self.mle = mle

    @property
    def adjusted_n(self):
        return self.n - self.I[0]

    # TODO: typing
    #       data -> np.NDArray
    def load_initial(self, data):
        if data.size <= 2 * self.s:
            raise ValueError(
                f"Expects initial series of length > max(2*s, p) == {max(2*self.s, self.p)}, passed in {data.size}"
            )
        t = time.perf_counter()

        self.X = data[self.s : 2 * self.s]

        s_window = sliding_window_view(data[: 2 * self.s], self.s)[:-1]
        self.E = data[self.s : 2 * self.s] - EWMA(s_window.T, self.alpha)

        Q = np.array([])
        for i in data[2 * self.s :]:
            # initialize SPOT
            self._step_train(i)
            Q = np.append(Q, self.F[-1])

        self.spot_thres = self._spot_initialize(Q)
        self.perf_counter += time.perf_counter() - t

    # TODO: typing
    #       data -> np.float / float
    def _step_train(self, x_t) -> int:
        """
        Stepping without updating SPOT
        """
        # fluctuation removal
        E_i = x_t - EWMA(self.X, self.alpha)

        if self.method == "above":
            E_i = np.maximum(0.0, E_i)
        if self.method == "below":
            E_i = np.minimum(0, E_i)

        # first smoothing
        E_new = np.append(self.E, E_i)
        E_old = self.E.copy()
        F_i = (np.std(E_new, ddof=1) - np.std(E_old, ddof=1)).clip(min=0)

        self._add_mem(F_i)

        # second smoothing
        m = np.max(self.F)
        self.S = np.where(F_i - m > 0, np.array([F_i]), np.array([0]))

        # smoothing calculations done; advance one step
        self._array_step(x_t, E_i, F_i)

    def step_predict(self, x_t) -> int:
        """
        Stepping + Updating SPOT
        """
        t = time.perf_counter()
        if self.n % self.bw_reset_every == 0:
            print(f"z_t: {self.percentile_thres} | z_q: {self.spot_thres}")
            self.bw = _SPOTMoM.fit_kde_bw(self.F_mem)

        self._step_train(x_t)
        new_p_thres = _SPOTMoM.get_tail_threshold(
            self.F_mem, self.level, samples=100, bw=self.bw
        )
        self.spot_thres += new_p_thres - self.percentile_thres
        self.percentile_thres = new_p_thres

        out = np.int32(0)
        if self.S[-1] > self.spot_thres:
            """erasing anomalous data point from history"""
            out = np.int32(1)
            self.F_mem[-1] = 0

        elif self.F[-1] > self.percentile_thres and self.F[-1] < self.spot_thres:
            # add obs to peak set
            y_i = self.F[-1] - self.percentile_thres
            self._add_peak(y_i)

            """ vanilla calc
            sigma, gamma = SPOTMoM.get_gpd_params(self.Y)
            self.spot_thres = SPOTMoM.calc_spot_threshold(self.percentile_thres, sigma, gamma, self.n, self.Y.size, self.q)
            """

            sigma, gamma = _SPOTMoM.get_gpd_params(self.Y, mle=self.mle)
            self.spot_thres = max(
                _SPOTMoM.calc_spot_threshold(
                    self.percentile_thres,
                    sigma,
                    gamma,
                    self.adjusted_n,
                    self.Y.size,
                    self.q,
                ),
                _SPOTMoM.calc_half_normal_threshold(
                    self.percentile_thres,
                    self.Y.std(ddof=1),
                    self.q,
                    support=self.support,
                ),
            )

        elif self.F[-1] > self.spot_thres:
            self.F_mem[-1] = 0

        self.perf_counter += time.perf_counter() - t
        return out

    def _array_step(self, X_i, E_i, F_i):
        self.X = np.roll(self.X, -1)
        self.X[-1] = X_i

        self.E = np.roll(self.E, -1)
        self.E[-1] = E_i

        self.F = np.roll(self.F, -1)
        self.F[-1] = F_i

        self.n += 1

    @property
    def n_peaks(self):
        return self.Y.size

    def _spot_initialize(self, S):
        # print(f"Q_len: {len(S)} | Q_non_zeros: {np.count_nonzero(S)} | Q_max: {np.max(S)}")
        # print(S[S>0])
        self.bw = _SPOTMoM.fit_kde_bw(S)
        self.percentile_thres = _SPOTMoM.get_tail_threshold(S, self.level, bw=self.bw)

        # print(f"thres: {self.percentile_thres}")
        # print(f"n_above: {((S-self.percentile_thres) > 0).sum()}")

        Y = _SPOTMoM.get_peaks(S, self.percentile_thres)
        I = _SPOTMoM.get_peaks_idx(S, self.percentile_thres)

        if self.init_cutoff < 1:
            bw = _SPOTMoM.fit_kde_bw(Y)
            cutoff = _SPOTMoM.get_tail_threshold(Y, self.init_cutoff, bw=bw)
            Y = Y[Y < cutoff]

        for y, i in zip(Y, I):
            self._add_peak(y, i)

        sigma, gamma = _SPOTMoM.get_gpd_params(Y, mle=self.mle)

        n_y = len(Y)
        n = len(S)

        spot_thres = max(
            _SPOTMoM.calc_spot_threshold(
                self.percentile_thres, sigma, gamma, n, n_y, self.q
            ),
            _SPOTMoM.calc_half_normal_threshold(
                self.percentile_thres, Y.std(ddof=1), self.q, support=self.support
            ),
        )

        return spot_thres

    def _add_peak(self, y, index=None):
        if self.Y.size <= self.memory:
            self.Y = np.append(self.Y, y)
        else:
            self.Y = np.roll(self.Y, -1)
            self.Y[-1] = y

        index = self.n if index is None else index
        if self.I.size <= self.memory + 1:
            self.I = np.append(self.I, index)
        else:
            self.I = np.roll(self.I, -1)
            self.I[-1] = index

    def _add_mem(self, f):
        if self.F_mem.size <= self.memory:
            self.F_mem = np.append(self.F_mem, f)
        else:
            self.F_mem = np.roll(self.F_mem, -1)
            self.F_mem[-1] = f


[docs]def EWMA(x, alpha, flip_x=True): if flip_x: x = np.flip(x) s = len(x) a = np.full(s, 1 - alpha) power = np.arange(0, s) weights = np.power(a, power) res = np.dot(weights.T, x) / np.sum(weights) return np.flip(res)
class _SPOTMoM: @classmethod def fit_kde_bw(cls, data, kernel="gaussian"): params = {"bandwidth": np.logspace(-5, 10, 100)} grid = GridSearchCV(KernelDensity(rtol=0.01), params, n_jobs=-1) grid.fit(data[:, np.newaxis]) return grid.best_params_["bandwidth"] @classmethod def get_tail_threshold( cls, data, level, samples=1000, bw=0.5, kernel="gaussian", rtol=0.02 ) -> float: # via KDE data = np.append(data, 1e-3) # added for stability in case data is all zeros x = np.linspace(data.min() + 1e-3, data.max(), samples) kde = KernelDensity(kernel=kernel, rtol=rtol, bandwidth=bw).fit( data[:, np.newaxis] ) pdf = np.exp(kde.score_samples(x[:, np.newaxis])) cdf = np.cumsum(pdf) cdf /= cdf.max() if len(np.where(cdf <= level)[0]) == 0: return cls.get_tail_threshold(data, level, samples * 5, bw, kernel) idx = np.where(cdf <= level)[0][-1] return x[idx] @classmethod def get_peaks(cls, data, threshold, n_sample_min=5): threshold = cls._adjust_threshold(data, threshold, n_sample_min) return data[data >= threshold] - threshold @classmethod def get_peaks_idx(cls, data, threshold, n_sample_min=5): threshold = cls._adjust_threshold(data, threshold, n_sample_min) return np.where(data >= threshold)[0] @classmethod def _adjust_threshold(cls, data, threshold, n_sample_min): x = data.copy() x = x[x >= threshold] if len(x) < n_sample_min: threshold *= 0.95 threshold = 0 if threshold < 1e-3 else threshold return cls._adjust_threshold(data, threshold, n_sample_min) return threshold @classmethod def get_gpd_params_mv(cls, mean, var): if var == 0: return 1, 1 sigma = mean / 2 * (1 + mean**2 / var) gamma = 1 / 2 * (1 - (mean**2 / var)) return sigma, gamma @classmethod def get_gpd_params(cls, peaks, robust=False, mle=False): y = peaks.copy() if mle: mu, sigma, gamma = genpareto.fit(y) return sigma, gamma if robust: huber.maxiter = 100 huber.tol = 5e-2 mu, std = huber(y) var_y = std**2 else: mu = y.mean() var_y = y.var(ddof=1) sigma = mu / 2 * (1 + mu**2 / var_y) gamma = 1 / 2 * (1 - (mu**2 / var_y)) return sigma, gamma @classmethod def calc_spot_threshold(cls, initial_threshold, sigma, gamma, n, n_peaks, q): delta = sigma / gamma * ((q * n / n_peaks) ** (-gamma) - 1) new_thres = initial_threshold + delta return new_thres @classmethod def calc_half_normal_threshold(cls, initial_threshold, std_dev, q, support=1): return initial_threshold + std_dev * norm.ppf(((1 - q) + 1) / 2) * support
[docs]class FluxEVModel(Model): """FluxEV Implementation from the paper with QoL improvements for production. Li, Jia and Di, Shimin and Shen, Yanyan and Chen, Lei "FluxEV: A Fast and Effective Unsupervised Framework for Time-Series Anomaly Detection" https://doi.org/10.1145/3437963.3441823 """ def __init__( self, window: int = 10, window_smoothing=None, q=1e-4, level=0.95, **kwargs ): """ Args: window (int, optional): main window length. Defaults to 10. window_smoothing (int, optional): window length for smoothig operation, if None then same as `window`. Defaults to None. q (float, optional): q level such that `P(threshold) < q`. Defaults to 1e-4. level (float, optional): threshold to fit tail distribution. Defaults to 0.95. memory (int, optional): how many data points for the tails. Defaults to 2000. bw_reset_every (int, optional): used for method of moments estimation to prevent a bad fit (MoM is not robust against outliers); use distribution from kde estimation to compute tail level; this parameter dictates how often bandwidth is refitted. Defaults to 2000. support (float, optional): used for method of moments estimation to prevent a bad fit (MoM is fast but unreliable); threshold is computed as the maximum of `GPD` and `support * HalfNormal`. Defaults to 0. init_cutoff (float, optional): used for method of moments estimation to prevent a bad fit (MoM is not robust against outliers); only bottom `init_cutoff` deciles are used to fit the GPD distribution. Defaults to 1. mle (bool, optional): whether to use MLE for GPD parameter estimation; setting True will result in a slow down in performance. Default to False. """ self.window = window self.window_smoothing = window if window_smoothing is None else window_smoothing self.q = q self.level = level self.params = kwargs
[docs] def fit(self, data, **kwargs): if data.ndim > 1 and data.shape[1] == 1: data = data.flatten() if data.ndim > 1 and data.shape[1] > 1: self._models = self._pseudo_mv_train(data) return self.model = _FluxEV2000( self.window, q=self.q, level=self.level, p=self.window_smoothing, **self.params, ) self.model.load_initial(data)
[docs] def get_scores(self, data): multivar = True if data.ndim > 1 and data.shape[1] > 1 else False if multivar: return self._handle_multivariate(data, self._models) if data.ndim > 1 and data.shape[1] == 1: data = data.flatten() anoms = np.zeros(len(data)) for idx, value in enumerate(data): anoms[idx] = self.model.step_predict(value) return anoms