Skip to content

Commit

Permalink
Add mean, stdev and quantile trackers with tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Feb 14, 2025
1 parent 46945de commit f9e9e32
Show file tree
Hide file tree
Showing 9 changed files with 887 additions and 0 deletions.
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/univariate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
50 changes: 50 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/univariate/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import abc
from enum import Enum
from collections import deque


class BaseTracker(abc.ABC):
@abc.abstractmethod
def push(self, x):
raise NotImplementedError()

@abc.abstractmethod
def get(self, **kwargs):
raise NotImplementedError()


class WindowMode(Enum):
LANDMARK = 1
SLIDING = 2


class IncrementalTracker(BaseTracker):
def __init__(self, window_mode, **kwargs):
if window_mode == WindowMode.SLIDING:
self._window_size = kwargs.get("window_size", 100)
self._queue = deque(maxlen=self._window_size)
self._n = 0
self._window_mode = window_mode

def push(self, x):
self._queue.append(x)

def pop(self):
return self._queue.popleft()
83 changes: 83 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/univariate/mean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import math
import warnings

import numpy as np

from apache_beam.ml.anomaly.univariate.base import IncrementalTracker
from apache_beam.ml.anomaly.univariate.base import WindowMode

__all__ = [
"LandmarkMeanTracker", "SimpleSlidingMeanTracker", "SlidingMeanTracker"
]


class SimpleSlidingMeanTracker(IncrementalTracker):
def __init__(self, window_size):
super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)

def get(self):
if len(self._queue) == 0:
return float('nan')

with warnings.catch_warnings(record=False):
warnings.simplefilter("ignore")
return np.nanmean(self._queue)


class IncrementalMeanTracker(IncrementalTracker):
def __init__(self, window_mode, **kwargs):
super().__init__(window_mode, **kwargs)
self._mean = 0

def push(self, x):
if not math.isnan(x):
self._n += 1
delta = x - self._mean
else:
delta = 0

if self._window_mode == WindowMode.SLIDING:
if len(self._queue) >= self._window_size and \
not math.isnan(old_x := self.pop()):
self._n -= 1
delta += (self._mean - old_x)

super().push(x)

if self._n > 0:
self._mean += delta / self._n
else:
self._mean = 0

def get(self):
if self._n < 1:
# keep it consistent with numpy
return float("nan")
return self._mean


class LandmarkMeanTracker(IncrementalMeanTracker):
def __init__(self):
super().__init__(window_mode=WindowMode.LANDMARK)


class SlidingMeanTracker(IncrementalMeanTracker):
def __init__(self, window_size):
super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size)
161 changes: 161 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/univariate/mean_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import math
import random
import time
import unittest
import warnings

from parameterized import parameterized

from apache_beam.ml.anomaly.univariate.mean import LandmarkMeanTracker
from apache_beam.ml.anomaly.univariate.mean import SimpleSlidingMeanTracker
from apache_beam.ml.anomaly.univariate.mean import SlidingMeanTracker

FLOAT64_MAX = 1.79769313486231570814527423731704356798070e+308


class LandmarkMeanTest(unittest.TestCase):
def test_without_nan(self):
t = LandmarkMeanTracker()
self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty

t.push(1)
self.assertEqual(t.get(), 1.0)
t.push(3)
self.assertEqual(t.get(), 2.0)
t.push(8)
self.assertEqual(t.get(), 4.0)
t.push(16)
self.assertEqual(t.get(), 7.0)
t.push(-3)
self.assertEqual(t.get(), 5.0)

def test_with_nan(self):
t = LandmarkMeanTracker()

t.push(float('nan'))
self.assertTrue(math.isnan(t.get())) # NaN is ignored
t.push(1)
self.assertEqual(t.get(), 1.0)
t.push(float('nan'))
self.assertEqual(t.get(), 1.0)
t.push(float('nan'))
self.assertEqual(t.get(), 1.0)
t.push(float('nan'))
self.assertEqual(t.get(), 1.0)

def test_with_float64_max(self):
t = LandmarkMeanTracker()
t.push(FLOAT64_MAX)
self.assertEqual(t.get(), FLOAT64_MAX)
t.push(FLOAT64_MAX)
self.assertEqual(t.get(), FLOAT64_MAX)

def test_accuracy_fuzz(self):
seed = int(time.time())
random.seed(seed)
print("Random seed: %d" % seed)

for _ in range(10):
numbers = []
for _ in range(5000):
numbers.append(random.randint(0, 1000))

with warnings.catch_warnings(record=False):
warnings.simplefilter("ignore")
t1 = LandmarkMeanTracker()
t2 = SimpleSlidingMeanTracker(len(numbers))
for v in numbers:
t1.push(v)
t2.push(v)
self.assertTrue(abs(t1.get() - t2.get()) < 1e-9)


class SlidingMeanTest(unittest.TestCase):
@parameterized.expand([SimpleSlidingMeanTracker, SlidingMeanTracker])
def test_without_nan(self, tracker):
t = tracker(3)
self.assertTrue(math.isnan(t.get())) # Returns NaN if tracker is empty

t.push(1)
self.assertEqual(t.get(), 1.0)
t.push(3)
self.assertEqual(t.get(), 2.0)
t.push(8)
self.assertEqual(t.get(), 4.0)
t.push(16)
self.assertEqual(t.get(), 9.0)
t.push(-3)
self.assertEqual(t.get(), 7.0)

@parameterized.expand([SimpleSlidingMeanTracker, SlidingMeanTracker])
def test_with_nan(self, tracker):
t = tracker(3)

t.push(float('nan'))
self.assertTrue(math.isnan(t.get())) # NaN is ignored
t.push(1)
self.assertEqual(t.get(), 1.0)

# flush the only number out
t.push(float('nan'))
self.assertEqual(t.get(), 1.0)
t.push(float('nan'))
self.assertEqual(t.get(), 1.0)
t.push(float('nan'))
self.assertTrue(math.isnan(t.get())) # All values in the tracker are NaN
t.push(4)
self.assertEqual(t.get(), 4.0)

@parameterized.expand([SimpleSlidingMeanTracker, SlidingMeanTracker])
def test_with_float64_max(self, tracker):
t = tracker(2)
t.push(FLOAT64_MAX)
self.assertEqual(t.get(), FLOAT64_MAX)
t.push(FLOAT64_MAX)
if tracker is SlidingMeanTracker:
self.assertEqual(t.get(), FLOAT64_MAX)
self.assertFalse(math.isinf(t.get()))
else:
# SimpleSlidingMean (using Numpy) returns inf when it computes the
# average of [float64_max, float64_max].
self.assertTrue(math.isinf(t.get()))

def test_accuracy_fuzz(self):
seed = int(time.time())
random.seed(seed)
print("Random seed: %d" % seed)

for _ in range(10):
numbers = []
for _ in range(5000):
numbers.append(random.randint(0, 1000))

t1 = SlidingMeanTracker(100)
t2 = SimpleSlidingMeanTracker(100)
for v in numbers:
t1.push(v)
t2.push(v)
self.assertTrue(abs(t1.get() - t2.get()) < 1e-9)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
77 changes: 77 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/univariate/perf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import random
import time
import timeit
import statistics
import unittest
import warnings

from apache_beam.ml.anomaly.univariate.mean import *
from apache_beam.ml.anomaly.univariate.stdev import *
from apache_beam.ml.anomaly.univariate.quantile import *

seed_value_time = int(time.time())
random.seed(seed_value_time)
print(f"{'Seed value':30s}{seed_value_time}")

numbers = []
for _ in range(50000):
numbers.append(random.randint(0, 1000))


def run_tracker(tracker, numbers):
for i in range(len(numbers)):
tracker.push(numbers[i])
_ = tracker.get()


def print_result(tracker, number=10, repeat=5):
runtimes = timeit.repeat(
lambda: run_tracker(tracker, numbers), number=number, repeat=repeat)
mean = statistics.mean(runtimes)
sd = statistics.stdev(runtimes)
print(f"{tracker.__class__.__name__:30s}{mean:.6f} ± {sd:.6f}")


class PerfTest(unittest.TestCase):
def test_mean_perf(self):
print()
print_result(LandmarkMeanTracker())
print_result(SlidingMeanTracker(100))
print_result(SimpleSlidingMeanTracker(100), number=1)

def test_stdev_perf(self):
print()
print_result(LandmarkStdevTracker())
print_result(SlidingStdevTracker(100))
print_result(SimpleSlidingStdevTracker(100), number=1)

def test_quantile_perf(self):
print()
with warnings.catch_warnings(record=False):
warnings.simplefilter("ignore")
print_result(LandmarkQuantileTracker(0.5))
print_result(SlidingQuantileTracker(100, 0.5))
print_result(SimpleSlidingQuantileTracker(100, 0.5), number=1)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading

0 comments on commit f9e9e32

Please sign in to comment.