"""
Spectral Residual
-------------------
"""
# Copyright (c) 2019 Takahiro Yoshinaga
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
import numpy as np
from numpy.typing import ArrayLike, NDArray
from scipy import stats
from oats.preprocessor._base import Preprocessor
def _series_filter(values, kernel_size=3):
"""
Filter a time series. Practically, calculated mean value inside kernel size.
As math formula, see https://docs.opencv.org/2.4/modules/imgproc/doc/filtering.html.
Args:
values:
kernel_size:
Returns:
The list of filtered average
"""
filter_values = np.cumsum(values, dtype=float)
filter_values[kernel_size:] = (
filter_values[kernel_size:] - filter_values[:-kernel_size]
)
filter_values[kernel_size:] = filter_values[kernel_size:] / kernel_size
for i in range(1, kernel_size):
filter_values[i] /= i + 1
return filter_values
def _extrapolate_next(values):
"""
Extrapolates the next value by sum up the slope of the last value with previous values.
:param values: a list or numpy array of time-series
:return: the next value of time-series
"""
last_value = values[-1]
slope = [(last_value - v) / i for (i, v) in enumerate(values[::-1])]
slope[0] = 0
next_values = last_value + np.cumsum(slope)
return next_values
def _merge_series(values, extend_num=5, forward=5):
next_value = _extrapolate_next(values)[forward]
extension = [next_value] * extend_num
if isinstance(values, list):
marge_values = values + extension
else:
marge_values = np.append(values, extension)
return marge_values
[docs]class SpectralResidual(Preprocessor):
"""Generates a saliency map via spectral residual.
Inspired from:
Hansheng Ren, Bixiong Xu, Yujing Wang, Chao Yi, Congrui Huang, Xiaoyu Kou, Tony Xing, Mao Yang, Jie Tong, Qi Zhang.
"Time-Series Anomaly Detection Service at Microsoft." arXiv preprint arXiv:1906.03821 (2019).
Implementation: https://github.com/y-bar/ml-based-anomaly-detection
"""
def __init__(self, amp_window_size=16, series_window_size=16, score_window_size=32):
self.amp_window_size = amp_window_size
self.series_window_size = series_window_size
self.score_window_size = score_window_size
def _transform_silency_map(self, values):
"""
Transform a time-series into spectral residual, a method adopted from computer vision.
For example, See https://github.com/uoip/SpectralResidualSaliency.
Args:
values: a list or numpy array of float values.
Returns:
Silency map and spectral residual
"""
freq = np.fft.fft(values)
mag = np.sqrt(freq.real**2 + freq.imag**2)
spectral_residual = np.exp(
np.log(mag) - _series_filter(np.log(mag), self.amp_window_size)
)
freq.real = freq.real * spectral_residual / mag
freq.imag = freq.imag * spectral_residual / mag
silency_map = np.fft.ifft(freq)
return silency_map
def _transform_spectral_residual(self, values):
silency_map = self._transform_silency_map(values)
spectral_residual = np.sqrt(silency_map.real**2 + silency_map.imag**2)
return spectral_residual
[docs] def fit(self, *arg, **kwargs) -> None:
return