Source code for oats.generator.univariate_wave

"""
Univariate Sine Wave
-----------------
"""

import numpy as np
from oats.generator._base import Generator


def _series_segmentation(data, stepsize=1):
    return np.split(data, np.where(np.diff(data) != stepsize)[0] + 1)


def _sine(length, freq=0.04, coef=1.5, offset=0.0, noise_amp=0.05):
    # timestamp = np.linspace(0, 10, length)
    timestamp = np.arange(length)
    value = np.sin(2 * np.pi * freq * timestamp)
    if noise_amp != 0:
        noise = np.random.normal(0, 1, length)
        value = value + noise_amp * noise
    value = coef * value + offset
    return value


def _square_sine(level=5, length=500, freq=0.04, coef=1.5, offset=0.0, noise_amp=0.05):
    value = np.zeros(length)
    for i in range(level):
        value += (
            1
            / (2 * i + 1)
            * _sine(
                length=length,
                freq=freq * (2 * i + 1),
                coef=coef,
                offset=offset,
                noise_amp=noise_amp,
            )
        )
    return value


def _collective_global_synthetic(length, base, coef=1.5, noise_amp=0.005):
    value = []
    norm = np.linalg.norm(base)
    base = base / norm
    num = int(length / len(base))
    for i in range(num):
        value.extend(base)
    residual = length - len(value)
    value.extend(base[:residual])
    value = np.array(value)
    noise = np.random.normal(0, 1, length)
    value = coef * value + noise_amp * noise
    return value


[docs]class UnivariateWaveGenerator(Generator): """Generates a univariate signal and append anomalies as desired. Attributes: train: numpy array of generated train data test: numpy array of generated test data label: numpy array of labels Example: ``` generator = UnivariateWaveGenerator(1000) generator.point_global_outliers(0.05, 2, 50) train, test, label = generator.get_dataset() ``` """ _BEHAVIOR_CONFIG = {"freq": 0.04, "coef": 1.5, "offset": 0.0, "noise_amp": 0.05} def __init__( self, stream_length: int, train_ratio: float = 0.2, behavior=_sine, behavior_config: dict = _BEHAVIOR_CONFIG, ): """ Args: stream_length (int): Total length of `train` and `test` combined train_ratio (float, optional): Length of train relative to `stream_length`. Defaults to 0.2. behavior (_type_, optional): Underlying signal generating function. Defaults to sine. behavior_config (dict, optional): Default config for . Defaults to BEHAVIOR_CONFIG. """ self.STREAM_LENGTH = stream_length self.train_ratio = train_ratio self.split = int(self.STREAM_LENGTH * self.train_ratio) self.behavior = behavior self.behavior_config = behavior_config if behavior_config is not None else {} self.train = None self.data = None self.label = None self.data_origin = None self.timestamp = np.arange(self.STREAM_LENGTH) self._generate_timeseries() def _generate_timeseries(self): self.behavior_config["length"] = self.STREAM_LENGTH self.data = self.behavior(**self.behavior_config) self.train = self.data[: self.split] self.test = self.data[self.split :] self.TEST_LENGTH = len(self.test) self.test_orig = self.test.copy() self.label = np.zeros(self.TEST_LENGTH, dtype=int)
[docs] def point_global_outliers(self, ratio, factor, radius): """Add point global outliers to original data Args: ratio: what ratio outliers will be added factor: the larger, the outliers are farther from inliers radius: the radius of collective outliers range """ position = ( np.random.rand(round(self.TEST_LENGTH * ratio)) * self.TEST_LENGTH ).astype(int) maximum, minimum = max(self.test), min(self.test) for i in position: local_std = self.test_orig[ max(0, i - radius) : min(i + radius, self.TEST_LENGTH) ].std() self.test[i] = self.test_orig[i] * factor * local_std if 0 <= self.test[i] < maximum: self.test[i] = maximum if 0 > self.test[i] > minimum: self.test[i] = minimum self.label[i] = 1
[docs] def point_contextual_outliers(self, ratio, factor, radius): """Add point contextual outliers to original data Args: ratio: what ratio outliers will be added factor: the larger, the outliers are farther from inliers Notice: point contextual outliers will not exceed the range of [min, max] of original data radius: the radius of collective outliers range """ position = ( np.random.rand(round(self.TEST_LENGTH * ratio)) * self.TEST_LENGTH ).astype(int) maximum, minimum = max(self.test), min(self.test) for i in position: local_std = self.test[ max(0, i - radius) : min(i + radius, self.TEST_LENGTH) ].std() self.test[i] = self.test[i] * factor * local_std if self.test[i] > maximum: self.test[i] = maximum * min( 0.95, abs(np.random.normal(0, 0.5)) ) # previous(0, 1) if self.test[i] < minimum: self.test[i] = minimum * min(0.95, abs(np.random.normal(0, 0.5))) self.label[i] = 1
[docs] def collective_shapelet_outliers( self, ratio, radius, option="square", coef=3.0, noise_amp=0.0, level=5, freq=0.04, offset=0.0, # only used when option=='square' base=[ 0.0, ], ): # only used when option=='other' """Add collective global outliers to original data Args: ratio: what ratio outliers will be added radius: the radius of collective outliers range option: if 'square': 'level' 'freq' and 'offset' are used to generate square sine wave if 'other': 'base' is used to generate outlier shape level: how many sine waves will square_wave synthesis base: a list of values that we want to substitute inliers when we generate outliers """ position = ( np.random.rand(round(self.TEST_LENGTH * ratio / (2 * radius))) * self.TEST_LENGTH ).astype(int) valid_option = {"square", "other"} if option not in valid_option: raise ValueError("'option' must be one of %r." % valid_option) if option == "square": sub_data = _square_sine( level=level, length=self.TEST_LENGTH, freq=freq, coef=coef, offset=offset, noise_amp=noise_amp, ) else: sub_data = _collective_global_synthetic( length=self.TEST_LENGTH, base=base, coef=coef, noise_amp=noise_amp ) for i in position: start, end = max(0, i - radius), min(self.TEST_LENGTH, i + radius) self.test[start:end] = sub_data[start:end] self.label[start:end] = 1
[docs] def collective_trend_outliers(self, ratio, factor, radius): """Add collective trend outliers to original data Args: ratio: what ratio outliers will be added factor: how dramatic will the trend be radius: the radius of collective outliers range """ position = ( np.random.rand(round(self.TEST_LENGTH * ratio / (2 * radius))) * self.TEST_LENGTH ).astype(int) for i in position: start, end = max(0, i - radius), min(self.TEST_LENGTH, i + radius) slope = np.random.choice([-1, 1]) * factor * np.arange(end - start) self.test[start:end] = self.test_orig[start:end] + slope self.test[end:] = self.test[end:] + slope[-1] self.label[start:end] = 1
[docs] def collective_seasonal_outliers(self, ratio, factor, radius): """Add collective seasonal outliers to original data Args: ratio: what ratio outliers will be added factor: how many times will frequency multiple radius: the radius of collective outliers range """ position = ( np.random.rand(round(self.TEST_LENGTH * ratio / (2 * radius))) * self.TEST_LENGTH ).astype(int) seasonal_config = self.behavior_config seasonal_config["freq"] = factor * self.behavior_config["freq"] for i in position: start, end = max(0, i - radius), min(self.TEST_LENGTH, i + radius) self.test[start:end] = self.behavior(**seasonal_config)[start:end] self.label[start:end] = 1
[docs] def get_dataset(self): """ Returns: train: numpy array of train data test: numpy array of test data label: numpy array of labels """ return self.train, self.test, self.label