Source code for oats.scorer.qualitative_metrics
"""
Qualitative Metrics
-----------------
"""
import numpy as np
import scipy.signal as signal
from oats.scorer._base import Scorer
[docs]class QualitativeMetrics(Scorer):
"""Unsupervised qualitative metrics used to access the quality of the anomaly detection algorithm"""
def __init__(self, window=10):
"""
Args:
window (int, optional): Window sized used to compute `diff_mean_trend` and `diff_mid_avg`. Defaults to 10.
"""
self.data = np.array([])
self.preds = np.array([])
self.window = window
[docs] def process(self, data, preds):
if data.ndim > 1 and data.shape[1] == 1:
data = data.flatten()
if len(data) != len(preds):
raise ValueError(
f"Data of length {len(data)} does not match preds of length {len(preds)}"
)
existing_n_feat = self.data.shape[1] if self.data.ndim > 1 else 1
incoming_n_feat = data.shape[1] if data.ndim > 1 else 1
if len(self.data) > 0 and existing_n_feat != incoming_n_feat:
raise ValueError(
f"Unable to process incoming data of shape {data.shape} with existing data of shape {self.data.shape}"
)
if data.ndim > 1 and data.shape[1] > 1:
self.data = np.vstack([self.data] * data.shape[1]).T
self.data = np.append(self.data, data, axis=0)
self.preds = np.append(self.preds, preds)
@property
def num_anom(self):
"""Number of predicted anomalies, should be low (as anomalies are rare)"""
return self.preds.sum()
@property
def pct_anom(self):
"""Percentage of predicted anomalies, should be low (as anomalies are rare)"""
return self.num_anom / len(self.preds)
@property
def _pred_anomalies(self):
return self.data[self.preds == 1]
@property
def _pred_non_anomalies(self):
return self.data[self.preds == 0]
@property
def avg_anom_dist_from_mean(self):
"""Distance of predicted anomalies to the mean of original data, should be high; useful for series with a lot of global point anomalies"""
return np.abs(self._pred_anomalies - self.data.mean(axis=0)).mean()
@property
def avg_cycles_delta_between_anom(self):
"""Average time between anomalies, should be high, as anomalies should be occuring far apart (for point anomalies)"""
if self.num_anom in (0, 1, len(self.data)):
return 0
return np.diff(np.where(self.preds == 1)[0]).mean()
@property
def max_range_non_anom(self):
"""The tightness of data from predicted non-anomalies, similar to the idea of `avg_anom_dist_from_mean`; should be low"""
if self.num_anom in (0, len(self.data)):
return 1e5
return (
np.abs(self._pred_non_anomalies.max() - self._pred_non_anomalies.min())
).mean()
def _get_mid_avg_filter(self):
# make sure window is odd
if self.window % 2 == 0:
self.window += 1
padding = self.window // 2
# local difference filter
fil = self.window
fil = np.full((self.window), -1 / (self.window - 1))
fil[padding] = 1
# if self.data.ndim > 1:
# fil = np.tile(fil, (self.data.shape[1], 1)).T
return fil
@property
def diff_mean_trend(self):
"""The trend (gradient) of predicted anomalies vs the trend of surrounding points, should be high"""
if not 0 < self.num_anom < len(self.preds):
return 0
fil = self._get_mid_avg_filter()
padding = self.window // 2
# abs-trend avg
diffs = []
data = self.data if self.data.ndim > 1 else self.data[:, np.newaxis]
for arr in data.T:
grads = signal.savgol_filter(arr, self.window, 1, deriv=1, axis=0)
grads = np.abs(grads)
conv = np.abs(signal.convolve(grads, fil, mode="valid"))
conv = np.pad(conv, (padding, padding), mode="edge")
diffs.append(
(
conv[self.preds == 1].mean(axis=0)
- conv[self.preds == 0].mean(axis=0)
).sum()
)
return np.mean(diffs)
@property
def diff_mid_avg(self):
"""The value of predicted anomalies vs the average of surrounding values, should be high"""
if not 0 < self.num_anom < len(self.preds):
return 0
fil = self._get_mid_avg_filter()
padding = self.window // 2
# ~= midpoint minus avg. of sides
diffs = []
data = self.data if self.data.ndim > 1 else self.data[:, np.newaxis]
for arr in data.T:
conv = np.abs(signal.convolve(arr, fil, mode="valid"))
conv = np.pad(conv, (padding, padding), mode="edge")
diffs.append(
conv[self.preds == 1].mean(axis=0)
- conv[self.preds == 0].mean(axis=0).sum()
)
return np.mean(diffs)