Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AnomalyDetection] Add threshold and aggregation functions. #34018

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/aggregations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import collections
import math
import statistics
from typing import Callable
from typing import Iterable

from apache_beam.ml.anomaly.base import AggregationFn
from apache_beam.ml.anomaly.base import AnomalyPrediction
from apache_beam.ml.anomaly.specifiable import specifiable


class LabelAggregation(AggregationFn):
"""Aggregates anomaly predictions based on their labels.

This is an abstract base class for `AggregationFn`s that combine multiple
`AnomalyPrediction` objects into a single `AnomalyPrediction` based on
the labels of the input predictions.

Args:
agg_func (Callable[[Iterable[int]], int]): A function that aggregates
a collection of anomaly labels (integers) into a single label.
include_history (bool): If True, include the input predictions in the
`agg_history` of the output. Defaults to False.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is history a standard term here or something we came up with? It seems a little odd to me if its not already a standard, to me it implies including historical predictions (e.g. what did I predict for the last data point).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use that to store pre-aggregated predictions so that users can always go back to check what leads to the aggregated result. It is kind of a history to me, but I agree that it may be misleading in some other context. If you have a better term, I am more than happy to hear that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just include_input_predictions? And input_predictions instead of agg_history?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about include_source_predictions and source_predictions?

"""
def __init__(
self,
agg_func: Callable[[Iterable[int]], int],
include_history: bool = False):
self._agg = agg_func
self._include_history = include_history
self._agg_model_id = None

def apply(
self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction:
"""Applies the label aggregation function to a list of predictions.

Args:
predictions (Iterable[AnomalyPrediction]): A collection of
`AnomalyPrediction` objects to be aggregated.

Returns:
AnomalyPrediction: A single `AnomalyPrediction` object with the
aggregated label.
"""
labels = [
prediction.label for prediction in predictions
if prediction.label is not None
]

if len(labels) == 0:
return AnomalyPrediction(model_id=self._agg_model_id)

label = self._agg(labels)

history = list(predictions) if self._include_history else None

return AnomalyPrediction(
model_id=self._agg_model_id, label=label, agg_history=history)


class ScoreAggregation(AggregationFn):
"""Aggregates anomaly predictions based on their scores.

This is an abstract base class for `AggregationFn`s that combine multiple
`AnomalyPrediction` objects into a single `AnomalyPrediction` based on
the scores of the input predictions.

Args:
agg_func (Callable[[Iterable[float]], float]): A function that aggregates
a collection of anomaly scores (floats) into a single score.
include_history (bool): If True, include the input predictions in the
`agg_history` of the output. Defaults to False.
"""
def __init__(
self,
agg_func: Callable[[Iterable[float]], float],
include_history: bool = False):
self._agg = agg_func
self._include_history = include_history
self._agg_model_id = None

def apply(
self, predictions: Iterable[AnomalyPrediction]) -> AnomalyPrediction:
"""Applies the score aggregation function to a list of predictions.

Args:
predictions (Iterable[AnomalyPrediction]): A collection of
`AnomalyPrediction` objects to be aggregated.

Returns:
AnomalyPrediction: A single `AnomalyPrediction` object with the
aggregated score.
"""
scores = [
prediction.score for prediction in predictions
if prediction.score is not None and not math.isnan(prediction.score)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make sense to define a score for predictions which have no score, but have a label (and vice versa). Or maybe we can just throw? I guess this relates to my above question - when would we expect this scoreless condition to happen.

Copy link
Collaborator Author

@shunping shunping Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this important point.

Let me take a step back and clarify the workflow we implemented here:
[input] -> detector -> [score] -> threhold_fn -> [label] -> aggregation_fn -> [aggregated label]

  • First, we are trying to address scenarios where a detector generates score of None and NaN. In my opinion, we can distinguish between these two cases:

    • The detector is NOT ready to give a prediction. This could imply that the detector needs some warm-up time before the first inference can be made.
    • The detector is ready to predict, but there is something wrong during the prediction process. For example, the input data could be ill-formated or the detector is simply not able to make a prediction on this input.

    We can use None to represent the first case, and NaN for the second one. The rationale is that None value is something we don't know yet, but recoverable (if we feed the input into the detector that is ready to score), but NaN is coming from an error during prediction and can never be recovered.

  • After we have None and NaN scores, the threshold_fn needs to handle how to assign labels for them.

    • In the current implementation, I only consider None and assign a normal label to it, which may be ok, because we don't want to flag outliers when the detector is still warming up. Alternatively, we can also set the label to be None which means that we will defer the decision to other detectors.
    • For the irrecoverable NaN score, I think we can assign an outlier label.
  • When multiple labels are ready for aggregation, it is reasonable to apply the aggregation_fn on all the non-None labels. If they are all None (the situation you mentioned in the previous comment), maybe we can expose another parameter in the aggregation function for undecided default.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I like this approach and think that using NaN/None for weird failure/intentional no output is reasonable.

We can use None to represent the first case, and NaN for the second one. The rationale is that None value is something we don't know yet, but recoverable (if we feed the input into the detector that is ready to score), but NaN is coming from an error during prediction and can never be recovered.

I'd actually flip these. I think None is more likely to happen because of some user mistake (e.g. I'm using a predictor that outputs a label in a situation that expects a score or vice versa), whereas NaN is an intentional choice.

When multiple labels are ready for aggregation, it is reasonable to apply the aggregation_fn on all the non-None labels. If they are all None (the situation you mentioned in the previous comment), maybe we can expose another parameter in the aggregation function for undecided default.

I think if all the detectors agree (whether that is a None or NaN, it makes sense to match whatever they are outputting). If they're all inconclusive, an inconclusive result makes sense. If they're all errors, an error result makes sense. Thoughts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually flip these. I think None is more likely to happen because of some user mistake (e.g. I'm using a predictor that outputs a label in a situation that expects a score or vice versa), whereas NaN is an intentional choice.

I am fine with that.

I think if all the detectors agree (whether that is a None or NaN, it makes sense to match whatever they are outputting). If they're all inconclusive, an inconclusive result makes sense. If they're all errors, an error result makes sense. Thoughts?

Hmmm...Are you saying that other than normal and outlier label, we will have to add two labels for the cases of score=None and score=NaN, respectively? This will be get a bit complicated; for example, some model says None while the other says NaN.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, maybe it is not as complicated as it seems to me. Let me see if it can be sorted out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I made the changes. PTAL.

]
if len(scores) == 0:
return AnomalyPrediction(model_id=self._agg_model_id)

score = self._agg(scores)

history = list(predictions) if self._include_history else None

return AnomalyPrediction(
model_id=self._agg_model_id, score=score, agg_history=history)


@specifiable
class MajorityVote(LabelAggregation):
"""Aggregates anomaly labels using majority voting.

This `AggregationFn` implements a majority voting strategy to combine
anomaly labels from multiple `AnomalyPrediction` objects. It counts the
occurrences of normal and outlier labels and selects the label with the
higher count as the aggregated label. In case of a tie, a tie-breaker
label is used.

Example:
If input labels are [normal, outlier, outlier, normal, outlier], and
normal_label=0, outlier_label=1, then the aggregated label will be
outlier (1) because outliers have a majority (3 vs 2).

Args:
normal_label (int): The integer label for normal predictions. Defaults to 0.
outlier_label (int): The integer label for outlier predictions. Defaults to
1.
tie_breaker (int): The label to return if there is a tie in votes.
Defaults to 0 (normal_label).
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, tie_breaker=0, **kwargs):
self._tie_breaker = tie_breaker
self._normal_label = normal_label
self._outlier_label = outlier_label

def inner(predictions: Iterable[int]) -> int:
counters = collections.Counter(predictions)
if counters[self._normal_label] < counters[self._outlier_label]:
vote = self._outlier_label
elif counters[self._normal_label] > counters[self._outlier_label]:
vote = self._normal_label
else:
vote = self._tie_breaker
return vote

super().__init__(agg_func=inner, **kwargs)


# And scheme
@specifiable
class AllVote(LabelAggregation):
"""Aggregates anomaly labels using an "all vote" (AND) scheme.

This `AggregationFn` implements an "all vote" strategy. It aggregates
anomaly labels such that the result is considered an outlier only if all
input `AnomalyPrediction` objects are labeled as outliers.

Example:
If input labels are [outlier, outlier, outlier], and outlier_label=1,
then the aggregated label will be outlier (1).
If input labels are [outlier, normal, outlier], and outlier_label=1,
then the aggregated label will be normal (0).

Args:
normal_label (int): The integer label for normal predictions. Defaults to 0.
outlier_label (int): The integer label for outlier predictions. Defaults to
1.
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, **kwargs):
self._normal_label = normal_label
self._outlier_label = outlier_label

def inner(predictions: Iterable[int]) -> int:
return self._outlier_label if all(
map(lambda p: p == self._outlier_label,
predictions)) else self._normal_label

super().__init__(agg_func=inner, **kwargs)


# Or scheme
@specifiable
class AnyVote(LabelAggregation):
"""Aggregates anomaly labels using an "any vote" (OR) scheme.

This `AggregationFn` implements an "any vote" strategy. It aggregates
anomaly labels such that the result is considered an outlier if at least
one of the input `AnomalyPrediction` objects is labeled as an outlier.

Example:
If input labels are [normal, normal, outlier], and outlier_label=1,
then the aggregated label will be outlier (1).
If input labels are [normal, normal, normal], and outlier_label=1,
then the aggregated label will be normal (0).

Args:
normal_label (int): The integer label for normal predictions. Defaults to 0.
outlier_label (int): The integer label for outlier predictions. Defaults to
1.
**kwargs: Additional keyword arguments to pass to the base
`LabelAggregation` class.
"""
def __init__(self, normal_label=0, outlier_label=1, **kwargs):
self._normal_label = normal_label
self._outlier_label = outlier_label

def inner(predictions: Iterable[int]) -> int:
return self._outlier_label if any(
map(lambda p: p == self._outlier_label,
predictions)) else self._normal_label

super().__init__(agg_func=inner, **kwargs)


@specifiable
class AverageScore(ScoreAggregation):
"""Aggregates anomaly scores by calculating their average.

This `AggregationFn` computes the average of the anomaly scores from a
collection of `AnomalyPrediction` objects.

Args:
**kwargs: Additional keyword arguments to pass to the base
`ScoreAggregation` class.
"""
def __init__(self, **kwargs):
super().__init__(agg_func=statistics.mean, **kwargs)


@specifiable
class MaxScore(ScoreAggregation):
"""Aggregates anomaly scores by selecting the maximum score.

This `AggregationFn` selects the highest anomaly score from a collection
of `AnomalyPrediction` objects as the aggregated score.

Args:
**kwargs: Additional keyword arguments to pass to the base
`ScoreAggregation` class.
"""
def __init__(self, **kwargs):
super().__init__(agg_func=max, **kwargs)
120 changes: 120 additions & 0 deletions sdks/python/apache_beam/ml/anomaly/aggregations_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import unittest

from apache_beam.ml.anomaly import aggregations
from apache_beam.ml.anomaly.base import AnomalyPrediction


class MajorityVoteTest(unittest.TestCase):
def test_default(self):
normal = AnomalyPrediction(label=0)
outlier = AnomalyPrediction(label=1)
vote = aggregations.MajorityVote().apply

self.assertEqual(vote([]), AnomalyPrediction())

self.assertEqual(vote([normal]), normal)

self.assertEqual(vote([outlier]), outlier)

self.assertEqual(vote([outlier, normal, normal]), normal)

self.assertEqual(vote([outlier, normal, outlier]), outlier)

# use normal to break ties by default
self.assertEqual(vote([outlier, normal]), normal)

def test_tie_breaker(self):
normal = AnomalyPrediction(label=0)
outlier = AnomalyPrediction(label=1)
vote = aggregations.MajorityVote(tie_breaker=1).apply

self.assertEqual(vote([outlier, normal]), outlier)


class AllVoteTest(unittest.TestCase):
def test_default(self):
normal = AnomalyPrediction(label=0)
outlier = AnomalyPrediction(label=1)
vote = aggregations.AllVote().apply

self.assertEqual(vote([]), AnomalyPrediction())

self.assertEqual(vote([normal]), normal)

self.assertEqual(vote([outlier]), outlier)

# outlier is only labeled when everyone is outlier
self.assertEqual(vote([normal, normal, normal]), normal)
self.assertEqual(vote([outlier, normal, normal]), normal)
self.assertEqual(vote([outlier, normal, outlier]), normal)
self.assertEqual(vote([outlier, outlier, outlier]), outlier)


class AnyVoteTest(unittest.TestCase):
def test_default(self):
normal = AnomalyPrediction(label=0)
outlier = AnomalyPrediction(label=1)
vote = aggregations.AnyVote().apply

self.assertEqual(vote([]), AnomalyPrediction())

self.assertEqual(vote([normal]), normal)

self.assertEqual(vote([outlier]), outlier)

# outlier is labeled when at least one is outlier
self.assertEqual(vote([normal, normal, normal]), normal)
self.assertEqual(vote([outlier, normal, normal]), outlier)
self.assertEqual(vote([outlier, normal, outlier]), outlier)
self.assertEqual(vote([outlier, outlier, outlier]), outlier)


class AverageScoreTest(unittest.TestCase):
def test_default(self):
avg = aggregations.AverageScore().apply

self.assertEqual(avg([]), AnomalyPrediction())

self.assertEqual(
avg([AnomalyPrediction(score=1)]), AnomalyPrediction(score=1))

self.assertEqual(
avg([AnomalyPrediction(score=1), AnomalyPrediction(score=2)]),
AnomalyPrediction(score=1.5))


class MaxScoreTest(unittest.TestCase):
def test_default(self):
avg = aggregations.MaxScore().apply

self.assertEqual(avg([]), AnomalyPrediction())

self.assertEqual(
avg([AnomalyPrediction(score=1)]), AnomalyPrediction(score=1))

self.assertEqual(
avg([AnomalyPrediction(score=1), AnomalyPrediction(score=2)]),
AnomalyPrediction(score=2))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/ml/anomaly/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ class AnomalyResult():
"""A dataclass for the anomaly detection results"""
#: The original input data.
example: beam.Row
#: The `AnomalyPrediction` object containing the prediction.
prediction: AnomalyPrediction
#: The iterable of `AnomalyPrediction` objects containing the predictions.
#: Expect length 1 if it is a result for a non-ensemble detector or an
#: ensemble detector with an aggregation strategy applied.
predictions: Iterable[AnomalyPrediction]


class ThresholdFn(abc.ABC):
Expand Down
Loading
Loading