Source code for oats.scorer.supervised_scorer
"""
Supervised Metrics
-----------------
"""
import numpy as np
from oats.scorer._base import Scorer
[docs]class SupervisedScorer(Scorer):
"""Scorer for traditional supervised metrics; only useful if actual anomalies are known"""
def __init__(self, delay: int = None):
"""
Args:
delay (int, optional): For pattern anomalies, how much tolerance to give for prediction; e.g. delay of 10 means only those predictions in the first 10 time steps of a pattern anomalies are counted as success. None if no delay needed. Defaults to None.
"""
self.tp = 0
self.fp = 0
self.tn = 0
self.fn = 0
self.delay = delay
[docs] def process(self, preds, labels):
preds = preds.copy()
labels = labels.copy()
ground_truth_ones = np.where(labels == 1)[0]
pred_ones = np.where(preds == 1)[0]
ranges = self._consecutive(ground_truth_ones)
tp, fp, tn, fn = 0, 0, 0, 0
for idx, r in enumerate(ranges):
intersect = np.intersect1d(r, pred_ones, assume_unique=True)
# if alert delay more than 100 timesteps, count that as bad!
if intersect.size != 0:
cond = (
intersect[0] < r[0] + self.delay if self.delay is not None else True
)
if cond:
tp += r.size
else:
fn += r.size
preds[intersect] = 0
pred_ones = np.where(preds == 1)[0]
else:
fn += r.size
fp += pred_ones.size
tn += preds.size - tp - fp - fn
self.tp += tp
self.fp += fp
self.tn += tn
self.fn += fn
def _consecutive(self, data, stepsize=1):
return np.split(data, np.where(np.diff(data) != stepsize)[0] + 1)
@property
def tpr(self):
"""True Positive Rate"""
return self.tp / (self.fn + self.tp)
@property
def fpr(self):
"""False Positive Rate"""
return self.fp / (self.tn + self.fp)
@property
def tnr(self):
"""True Negative Rate"""
if self.tn + self.fp == 0:
return 0
return self.tn / (self.tn + self.fp)
@property
def fnr(self):
"""False Negative Rate"""
if self.fn + self.tp == 0:
return 0
return self.fn / (self.fn + self.tp)
@property
def precision(self):
"""Precision"""
if self.tp + self.fp == 0:
return 0
return self.tp / (self.tp + self.fp)
@property
def recall(self):
"""Recall"""
if self.tp + self.fn == 0:
return 0
return self.tp / (self.tp + self.fn)
@property
def f1(self):
"""F-1 Score"""
if self.recall + self.precision == 0:
return 0
return (2 * self.precision * self.recall) / (self.precision + self.recall)
def __str__(self):
return f"{self.tp}, {self.fp}, {self.tn}, {self.fn}, {self.tpr}, {self.fpr}, {self.tnr}, {self.fnr}, {self.precision}, {self.recall}, {self.f1}"