Skip to content

Commit

Permalink
[ObjectTracker] use moving average to calculate motion
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrhmanBassiouny committed Dec 26, 2024
1 parent 773bd29 commit 3d5c0a0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
41 changes: 25 additions & 16 deletions src/episode_segmenter/event_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from .object_tracker import ObjectTracker, ObjectTrackerFactory
from .utils import get_angle_between_vectors, calculate_euclidean_distance, calculate_quaternion_difference, \
check_if_object_is_supported, check_if_object_is_supported_using_contact_points, \
check_if_object_is_supported_by_another_object, check_if_in_contact_with_support
check_if_object_is_supported_by_another_object, check_if_in_contact_with_support, calculate_translation


class PrimitiveEventDetector(threading.Thread, ABC):
Expand Down Expand Up @@ -374,7 +374,7 @@ class MotionDetector(PrimitiveEventDetector, ABC):
A string that is used as a prefix for the thread ID.
"""

def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, velocity_threshold: float = 0.07,
def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, velocity_threshold: float = 0.25,
wait_time: Optional[float] = 0.1,
time_between_frames: Optional[timedelta] = timedelta(milliseconds=50),
*args, **kwargs):
Expand All @@ -392,14 +392,15 @@ def __init__(self, logger: EventLogger, starter_event: NewObjectEvent, velocity_
self.latest_time = time.time()
self.velocity_threshold = velocity_threshold

self.measure_timestep: timedelta = timedelta(milliseconds=450)
self.measure_timestep: timedelta = timedelta(milliseconds=300)
# frames per measure timestep
self.measure_frame_rate: float = ceil(self.measure_timestep.total_seconds() /
time_between_frames.total_seconds()) + 0.5
self.measure_timestep = time_between_frames * self.measure_frame_rate

self.distance_threshold: float = self.velocity_threshold * self.measure_timestep.total_seconds()
self.was_moving: bool = False
self.latest_distances: List[float] = []

def update_latest_pose_and_time(self):
"""
Expand All @@ -420,10 +421,10 @@ def detect_events(self) -> List[Union[TranslationEvent, StopTranslationEvent]]:
:return: An instance of the TranslationEvent class that represents the event if the object is moving, else None.
"""
events = []
if self.time_since_last_event < self.measure_timestep.total_seconds():
time.sleep(self.measure_timestep.total_seconds() - self.time_since_last_event)
# if self.time_since_last_event < self.measure_timestep.total_seconds():
# time.sleep(self.measure_timestep.total_seconds() - self.time_since_last_event)
is_moving = self.is_moving()
if is_moving != self.was_moving:
if is_moving is not None and is_moving != self.was_moving:
self.update_object_motion_state(is_moving)
self.was_moving = not self.was_moving
events.append(self.create_event())
Expand All @@ -441,14 +442,24 @@ def update_object_motion_state(self, moving: bool) -> None:
"""
pass

def is_moving(self) -> bool:
def is_moving(self) -> Optional[bool]:
"""
Check if the object is moving by comparing the current pose with the previous pose.
:return: A boolean value that represents if the object is moving.
"""
distance = self.calculate_distance(self.tracked_object.pose)
return distance > self.distance_threshold
self.latest_distances.append(distance)
self.latest_distances = list(map(lambda x: [x_i*0.9 for x_i in x], self.latest_distances))
if len(self.latest_distances) < self.measure_frame_rate:
return None
else:
self.latest_distances = self.latest_distances[-int(self.measure_frame_rate):]
return self._is_motion_condition_met

@property
def _is_motion_condition_met(self):
return np.linalg.norm(np.sum(self.latest_distances)) > self.distance_threshold

def create_event(self) -> Union[TranslationEvent, StopTranslationEvent]:
"""
Expand Down Expand Up @@ -485,7 +496,8 @@ def calculate_distance(self, current_pose: Pose):
:param current_pose: The current pose of the object.
"""
return calculate_euclidean_distance(self.latest_pose.position_as_list(), current_pose.position_as_list())
# return calculate_euclidean_distance(self.latest_pose.position_as_list(), current_pose.position_as_list())
return calculate_translation(self.latest_pose.position_as_list(), current_pose.position_as_list())

def get_event_type(self):
return TranslationEvent if self.was_moving else StopTranslationEvent
Expand All @@ -495,12 +507,9 @@ class RotationDetector(MotionDetector):
thread_prefix = "rotation_"

def __init__(self, logger: EventLogger, starter_event: NewObjectEvent,
angular_velocity_threshold: float = 10 * np.pi / 180,
wait_time: Optional[float] = 0.1,
time_between_frames: Optional[timedelta] = timedelta(milliseconds=50),
angular_velocity_threshold: float = 30 * np.pi / 180,
*args, **kwargs):
super().__init__(logger, starter_event, velocity_threshold=angular_velocity_threshold, wait_time=wait_time,
time_between_frames=time_between_frames, *args, **kwargs)
super().__init__(logger, starter_event, velocity_threshold=angular_velocity_threshold, *args, **kwargs)

def update_object_motion_state(self, moving: bool) -> None:
"""
Expand All @@ -516,8 +525,8 @@ def calculate_distance(self, current_pose: Pose):
"""
quat_diff = calculate_quaternion_difference(self.latest_pose.orientation_as_list(),
current_pose.orientation_as_list())
angle = 2 * np.arccos(quat_diff[0])
return angle
# angle = 2 * np.arccos(quat_diff[0])
return euler_from_quaternion(quat_diff)[:2]

def get_event_type(self):
return RotationEvent if self.was_moving else StopRotationEvent
Expand Down
15 changes: 13 additions & 2 deletions src/episode_segmenter/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def calculate_translation_difference_and_check(trans_1: List[float], trans_2: Li
:return: A boolean value that represents the condition for the translation of the object to be considered as
picked up.
"""
translation_diff = calculate_translation_difference(trans_1, trans_2)
translation_diff = calculate_abs_translation_difference(trans_1, trans_2)
return is_translation_difference_small(translation_diff, threshold)


Expand All @@ -225,7 +225,18 @@ def is_translation_difference_small(trans_diff: List[float], threshold: float) -
# return all([diff <= threshold for diff in trans_diff])


def calculate_translation_difference(trans_1: List[float], trans_2: List[float]) -> List[float]:
def calculate_translation(position_1: List[float], position_2: List[float]) -> List:
"""
calculate the translation between two positions.
:param position_1: The first position.
:param position_2: The second position.
:return: A list of float values that represent the translation between the two positions.
"""
return [p2 - p1 for p1, p2 in zip(position_1, position_2)]


def calculate_abs_translation_difference(trans_1: List[float], trans_2: List[float]) -> List[float]:
"""
Calculate the translation difference.
Expand Down

0 comments on commit 3d5c0a0

Please sign in to comment.