From d89ed4b76bd87a216277fae0010ef5a036ef3227 Mon Sep 17 00:00:00 2001 From: Abdelrhman Bassiouny Date: Tue, 14 Jan 2025 16:49:05 +0100 Subject: [PATCH] [ObjectTracker] some cleaning and refactoring of motion detection. --- src/episode_segmenter/enums.py | 11 ++++ src/episode_segmenter/event_detectors.py | 60 +++++++++++++------ .../motion_detection_helpers.py | 52 ++++++++++++++++ test/test_event_detectors.py | 51 ++++++++++++++++ 4 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 src/episode_segmenter/enums.py create mode 100644 src/episode_segmenter/motion_detection_helpers.py create mode 100644 test/test_event_detectors.py diff --git a/src/episode_segmenter/enums.py b/src/episode_segmenter/enums.py new file mode 100644 index 0000000..57f0b4f --- /dev/null +++ b/src/episode_segmenter/enums.py @@ -0,0 +1,11 @@ +from enum import Enum, auto + + +class DistanceFilter(Enum): + MOVING_AVERAGE = auto() + LOW_PASS = auto() + + +class MotionDetectionMethod(Enum): + CONSISTENT_GRADIENT = auto() + DISTANCE = auto() diff --git a/src/episode_segmenter/event_detectors.py b/src/episode_segmenter/event_detectors.py index bdcbdaa..c457416 100644 --- a/src/episode_segmenter/event_detectors.py +++ b/src/episode_segmenter/event_detectors.py @@ -21,6 +21,7 @@ from pycram.ros.logging import logdebug from pycram.world_concepts.world_object import Object from pycrap import PhysicalObject +from .enums import DistanceFilter, MotionDetectionMethod from .event_logger import EventLogger from .events import Event, ContactEvent, LossOfContactEvent, PickUpEvent, AgentContactEvent, \ AgentLossOfContactEvent, EventUnion, LossOfSurfaceEvent, TranslationEvent, StopTranslationEvent, NewObjectEvent, \ @@ -387,27 +388,34 @@ class MotionDetector(PrimitiveEventDetector, ABC): A string that is used as a prefix for the thread ID. """ - def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, distance_threshold: float = 0.05, - wait_time: Optional[float] = 0.1, - time_between_frames: Optional[timedelta] = timedelta(milliseconds=50), + def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, + distance_filter_method: Optional[DistanceFilter] = None, + detection_method: MotionDetectionMethod = MotionDetectionMethod.CONSISTENT_GRADIENT, + distance_threshold: float = 0.05, + cut_off_frequency: float = 2, + moving_average_decay: float = 0.99, + measure_timestep: timedelta = timedelta(milliseconds=100), + time_between_frames: timedelta = timedelta(milliseconds=50), + window_timeframe: timedelta = timedelta(milliseconds=700), *args, **kwargs): """ :param logger: An instance of the EventLogger class that is used to log the events. :param starter_event: An instance of the NewObjectEvent class that represents the event to start the event. :param distance_threshold: An optional float value that represents the distance threshold to consider the object as moving. - :param wait_time: An optional float value that introduces a delay between calls to the event detector. - :param time_between_frames: An optional timedelta value that represents the time between frames. + :param measure_timestep: The time between calls to the event detector. + :param time_between_frames: The time between frames of episode player. """ - super().__init__(logger, wait_time, *args, **kwargs) + super().__init__(logger, measure_timestep.total_seconds(), *args, **kwargs) self.tracked_object = starter_event.tracked_object self.latest_pose = self.tracked_object.pose self.latest_time = time.time() + self.event_time: float = self.latest_time + self.start_pose: Pose = self.latest_pose self.distance_threshold = distance_threshold - self.time_between_frames: Optional[timedelta] = time_between_frames + self.time_between_frames: timedelta = time_between_frames + self.measure_timestep: timedelta = measure_timestep - self.measure_timestep: timedelta = timedelta(seconds=max(timedelta(milliseconds=50).total_seconds(), - self.wait_time)) # frames per measure timestep self.measure_frame_rate: float = ceil(self.measure_timestep.total_seconds() / time_between_frames.total_seconds()) @@ -416,21 +424,21 @@ def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, distance_ self.velocity_threshold: float = self.distance_threshold * self.measure_timestep.total_seconds() self.was_moving: bool = False - self.use_decay: bool = False - self.gamma: float = 0.99 - self.cut_off_frequency: float = 2 - self.use_low_pass_filter: bool = False - self.use_average_distance: bool = False - self.use_consistent_gradient: bool = True + # Window size for checking motion self.window_size: int = ceil(timedelta(milliseconds=700).total_seconds() / self.measure_timestep.total_seconds()) + # Configurations + self.distance_filter: Optional[DistanceFilter] = distance_filter_method + self.detection_method: MotionDetectionMethod = detection_method + self.gamma: float = moving_average_decay + self.cut_off_frequency: float = cut_off_frequency + + # Data self.latest_distances: List[float] = [] self.latest_poses: List[Pose] = [] self.latest_times: List[float] = [] - self.event_time: float = self.latest_time - self.start_pose: Pose = self.latest_pose # for plotting purposes self.original_distances: List[List[List[float]]] = [] @@ -444,6 +452,22 @@ def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, distance_ self.plot_distance_windows: bool = False self.plot_frequencies: bool = False + @property + def use_decay(self) -> bool: + return self.distance_filter == DistanceFilter.MOVING_AVERAGE + + @property + def use_low_pass_filter(self) -> bool: + return self.distance_filter == DistanceFilter.LOW_PASS + + @property + def use_average_distance(self) -> bool: + return self.detection_method == MotionDetectionMethod.DISTANCE + + @property + def use_consistent_gradient(self) -> bool: + return self.detection_method == MotionDetectionMethod.CONSISTENT_GRADIENT + def update_latest_pose_and_time(self): """ Update the latest pose and time of the object. @@ -546,7 +570,7 @@ def _is_motion_condition_met(self): if self.use_low_pass_filter: self._apply_low_pass_filter() - distances = self.all_filtered_distances[-1] + distances = self.all_filtered_distances[-1].tolist() if self.use_average_distance: return self._check_motion_using_average_distance(distances) diff --git a/src/episode_segmenter/motion_detection_helpers.py b/src/episode_segmenter/motion_detection_helpers.py new file mode 100644 index 0000000..91119ff --- /dev/null +++ b/src/episode_segmenter/motion_detection_helpers.py @@ -0,0 +1,52 @@ +from abc import abstractmethod, ABC +from math import ceil, floor + +import numpy as np +from typing_extensions import List, Tuple + + +class MotionDetectionMethod(ABC): + """ + Interface for motion detection methods. + """ + + @abstractmethod + def is_moving(self, latest_distances: List[List[float]]) -> Tuple[bool, int]: + """ + Check if the object is moving. + + :param latest_distances: List of the latest distances. + :return: True if the object is moving, False if it is not moving, + and return the index in the given list of distances where the object started moving. + """ + pass + + +class ConsistentGradient(MotionDetectionMethod): + + def __init__(self, threshold: float = 1e-4): + self.threshold = threshold + + def is_moving(self, latest_distances: List[List[float]]) -> Tuple[bool, int]: + """ + Check if the object is moving by checking if the distance between the current and the previous position is + consistently positive or negative in at least one axis during the latest steps (the number of latest distances). + """ + distance_arr = np.array(latest_distances) + n_axes = distance_arr.shape[1] + return any(np.all(distance_arr[:, i] > self.threshold) or np.all(distance_arr[:, i] < -self.threshold) + for i in range(n_axes)), 0 + + +class Displacement(MotionDetectionMethod): + + def __init__(self, threshold: float): + self.threshold = threshold + + def is_moving(self, latest_distances: List[List[float]]) -> Tuple[bool, int]: + """ + Check if the object is moving by checking if the displacement between latest position and the start position is + above a certain threshold. + """ + avg_distance = np.linalg.norm(np.sum(np.array(latest_distances))) + return avg_distance > self.threshold, floor((len(latest_distances) / 2) - 1) diff --git a/test/test_event_detectors.py b/test/test_event_detectors.py new file mode 100644 index 0000000..3dca1f1 --- /dev/null +++ b/test/test_event_detectors.py @@ -0,0 +1,51 @@ +from unittest import TestCase + +import numpy as np + +from episode_segmenter.motion_detection_helpers import ConsistentGradient, Displacement + + +class TestEventDetectors(TestCase): + + @classmethod + def setUpClass(cls): + pass + + @classmethod + def tearDownClass(cls): + pass + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_consistent_gradient_motion_detection_method(self): + for i in range(3): + a = np.zeros((3, 3)) + a[:, i] = 1 + cg = ConsistentGradient() + self.assertTrue(cg.is_moving(a.tolist()) == (True, 0)) + a = np.zeros((3, 3)) + a[:, i] = -1 + self.assertTrue(cg.is_moving(a.tolist()) == (True, 0)) + a = np.zeros((3, 3)) + a[:, i] = -1 + a[1, i] = 1 + self.assertFalse(cg.is_moving(a.tolist()) == (True, 0)) + + def test_displacement_motion_detection_method(self): + for i in range(3): + a = np.zeros((3, 3)) + a[:, i] = 1 + disp = Displacement(1.5) + self.assertTrue(disp.is_moving(a.tolist()) == (True, 0)) + a = np.zeros((3, 3)) + a[:, i] = -1 + self.assertTrue(disp.is_moving(a.tolist()) == (True, 0)) + a = np.zeros((3, 3)) + a[:, i] = -1 + a[1, i] = 1 + self.assertTrue(disp.is_moving(a.tolist()) == (False, 0)) +