Skip to content

Commit

Permalink
[ObjectTracker] some cleaning and refactoring of motion detection.
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrhmanBassiouny committed Jan 14, 2025
1 parent c85fdb9 commit d89ed4b
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 18 deletions.
11 changes: 11 additions & 0 deletions src/episode_segmenter/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum, auto


class DistanceFilter(Enum):
MOVING_AVERAGE = auto()
LOW_PASS = auto()


class MotionDetectionMethod(Enum):
CONSISTENT_GRADIENT = auto()
DISTANCE = auto()
60 changes: 42 additions & 18 deletions src/episode_segmenter/event_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pycram.ros.logging import logdebug
from pycram.world_concepts.world_object import Object
from pycrap import PhysicalObject
from .enums import DistanceFilter, MotionDetectionMethod
from .event_logger import EventLogger
from .events import Event, ContactEvent, LossOfContactEvent, PickUpEvent, AgentContactEvent, \
AgentLossOfContactEvent, EventUnion, LossOfSurfaceEvent, TranslationEvent, StopTranslationEvent, NewObjectEvent, \
Expand Down Expand Up @@ -387,27 +388,34 @@ class MotionDetector(PrimitiveEventDetector, ABC):
A string that is used as a prefix for the thread ID.
"""

def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, distance_threshold: float = 0.05,
wait_time: Optional[float] = 0.1,
time_between_frames: Optional[timedelta] = timedelta(milliseconds=50),
def __init__(self, logger: EventLogger, starter_event: NewObjectEvent,
distance_filter_method: Optional[DistanceFilter] = None,
detection_method: MotionDetectionMethod = MotionDetectionMethod.CONSISTENT_GRADIENT,
distance_threshold: float = 0.05,
cut_off_frequency: float = 2,
moving_average_decay: float = 0.99,
measure_timestep: timedelta = timedelta(milliseconds=100),
time_between_frames: timedelta = timedelta(milliseconds=50),
window_timeframe: timedelta = timedelta(milliseconds=700),
*args, **kwargs):
"""
:param logger: An instance of the EventLogger class that is used to log the events.
:param starter_event: An instance of the NewObjectEvent class that represents the event to start the event.
:param distance_threshold: An optional float value that represents the distance threshold to consider the object
as moving.
:param wait_time: An optional float value that introduces a delay between calls to the event detector.
:param time_between_frames: An optional timedelta value that represents the time between frames.
:param measure_timestep: The time between calls to the event detector.
:param time_between_frames: The time between frames of episode player.
"""
super().__init__(logger, wait_time, *args, **kwargs)
super().__init__(logger, measure_timestep.total_seconds(), *args, **kwargs)
self.tracked_object = starter_event.tracked_object
self.latest_pose = self.tracked_object.pose
self.latest_time = time.time()
self.event_time: float = self.latest_time
self.start_pose: Pose = self.latest_pose
self.distance_threshold = distance_threshold
self.time_between_frames: Optional[timedelta] = time_between_frames
self.time_between_frames: timedelta = time_between_frames
self.measure_timestep: timedelta = measure_timestep

self.measure_timestep: timedelta = timedelta(seconds=max(timedelta(milliseconds=50).total_seconds(),
self.wait_time))
# frames per measure timestep
self.measure_frame_rate: float = ceil(self.measure_timestep.total_seconds() /
time_between_frames.total_seconds())
Expand All @@ -416,21 +424,21 @@ def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, distance_

self.velocity_threshold: float = self.distance_threshold * self.measure_timestep.total_seconds()
self.was_moving: bool = False
self.use_decay: bool = False
self.gamma: float = 0.99
self.cut_off_frequency: float = 2
self.use_low_pass_filter: bool = False
self.use_average_distance: bool = False
self.use_consistent_gradient: bool = True

# Window size for checking motion
self.window_size: int = ceil(timedelta(milliseconds=700).total_seconds() /
self.measure_timestep.total_seconds())

# Configurations
self.distance_filter: Optional[DistanceFilter] = distance_filter_method
self.detection_method: MotionDetectionMethod = detection_method
self.gamma: float = moving_average_decay
self.cut_off_frequency: float = cut_off_frequency

# Data
self.latest_distances: List[float] = []
self.latest_poses: List[Pose] = []
self.latest_times: List[float] = []
self.event_time: float = self.latest_time
self.start_pose: Pose = self.latest_pose

# for plotting purposes
self.original_distances: List[List[List[float]]] = []
Expand All @@ -444,6 +452,22 @@ def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, distance_
self.plot_distance_windows: bool = False
self.plot_frequencies: bool = False

@property
def use_decay(self) -> bool:
return self.distance_filter == DistanceFilter.MOVING_AVERAGE

@property
def use_low_pass_filter(self) -> bool:
return self.distance_filter == DistanceFilter.LOW_PASS

@property
def use_average_distance(self) -> bool:
return self.detection_method == MotionDetectionMethod.DISTANCE

@property
def use_consistent_gradient(self) -> bool:
return self.detection_method == MotionDetectionMethod.CONSISTENT_GRADIENT

def update_latest_pose_and_time(self):
"""
Update the latest pose and time of the object.
Expand Down Expand Up @@ -546,7 +570,7 @@ def _is_motion_condition_met(self):

if self.use_low_pass_filter:
self._apply_low_pass_filter()
distances = self.all_filtered_distances[-1]
distances = self.all_filtered_distances[-1].tolist()

if self.use_average_distance:
return self._check_motion_using_average_distance(distances)
Expand Down
52 changes: 52 additions & 0 deletions src/episode_segmenter/motion_detection_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from abc import abstractmethod, ABC
from math import ceil, floor

import numpy as np
from typing_extensions import List, Tuple


class MotionDetectionMethod(ABC):
"""
Interface for motion detection methods.
"""

@abstractmethod
def is_moving(self, latest_distances: List[List[float]]) -> Tuple[bool, int]:
"""
Check if the object is moving.
:param latest_distances: List of the latest distances.
:return: True if the object is moving, False if it is not moving,
and return the index in the given list of distances where the object started moving.
"""
pass


class ConsistentGradient(MotionDetectionMethod):

def __init__(self, threshold: float = 1e-4):
self.threshold = threshold

def is_moving(self, latest_distances: List[List[float]]) -> Tuple[bool, int]:
"""
Check if the object is moving by checking if the distance between the current and the previous position is
consistently positive or negative in at least one axis during the latest steps (the number of latest distances).
"""
distance_arr = np.array(latest_distances)
n_axes = distance_arr.shape[1]
return any(np.all(distance_arr[:, i] > self.threshold) or np.all(distance_arr[:, i] < -self.threshold)
for i in range(n_axes)), 0


class Displacement(MotionDetectionMethod):

def __init__(self, threshold: float):
self.threshold = threshold

def is_moving(self, latest_distances: List[List[float]]) -> Tuple[bool, int]:
"""
Check if the object is moving by checking if the displacement between latest position and the start position is
above a certain threshold.
"""
avg_distance = np.linalg.norm(np.sum(np.array(latest_distances)))
return avg_distance > self.threshold, floor((len(latest_distances) / 2) - 1)
51 changes: 51 additions & 0 deletions test/test_event_detectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from unittest import TestCase

import numpy as np

from episode_segmenter.motion_detection_helpers import ConsistentGradient, Displacement


class TestEventDetectors(TestCase):

@classmethod
def setUpClass(cls):
pass

@classmethod
def tearDownClass(cls):
pass

def setUp(self):
pass

def tearDown(self):
pass

def test_consistent_gradient_motion_detection_method(self):
for i in range(3):
a = np.zeros((3, 3))
a[:, i] = 1
cg = ConsistentGradient()
self.assertTrue(cg.is_moving(a.tolist()) == (True, 0))
a = np.zeros((3, 3))
a[:, i] = -1
self.assertTrue(cg.is_moving(a.tolist()) == (True, 0))
a = np.zeros((3, 3))
a[:, i] = -1
a[1, i] = 1
self.assertFalse(cg.is_moving(a.tolist()) == (True, 0))

def test_displacement_motion_detection_method(self):
for i in range(3):
a = np.zeros((3, 3))
a[:, i] = 1
disp = Displacement(1.5)
self.assertTrue(disp.is_moving(a.tolist()) == (True, 0))
a = np.zeros((3, 3))
a[:, i] = -1
self.assertTrue(disp.is_moving(a.tolist()) == (True, 0))
a = np.zeros((3, 3))
a[:, i] = -1
a[1, i] = 1
self.assertTrue(disp.is_moving(a.tolist()) == (False, 0))

0 comments on commit d89ed4b

Please sign in to comment.