Skip to content

Commit

Permalink
[ObjectTracker] better support detection.
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrhmanBassiouny committed Dec 16, 2024
1 parent 7997826 commit a2cadb4
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 62 deletions.
3 changes: 2 additions & 1 deletion src/episode_segmenter/episode_segmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ def run_initial_event_detectors(self) -> None:
"""
set_of_objects = set()
for obj in World.current_world.objects:
if not obj.is_an_environment and (obj not in self.objects_to_avoid):
if (not obj.is_an_environment and not issubclass(obj.obj_type, pycrap.Supporter)
and (obj.name not in self.objects_to_avoid)):
set_of_objects.add(obj)
try:
if not check_if_object_is_supported(obj):
Expand Down
88 changes: 38 additions & 50 deletions src/episode_segmenter/event_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pycram.datastructures.dataclasses import ContactPointsList
from pycram.datastructures.pose import Pose
from pycram.datastructures.world import UseProspectionWorld
from pycram.datastructures.world_entity import PhysicalBody
from pycram.ros.logging import logdebug
from pycram.world_concepts.world_object import Object
from pycrap import PhysicalObject
Expand All @@ -26,7 +27,7 @@
from .object_tracker import ObjectTracker, ObjectTrackerFactory
from .utils import get_angle_between_vectors, calculate_euclidean_distance, calculate_quaternion_difference, \
check_if_object_is_supported, check_if_object_is_supported_using_contact_points, \
check_if_object_is_supported_by_another_object
check_if_object_is_supported_by_another_object, check_if_in_contact_with_support


class PrimitiveEventDetector(threading.Thread, ABC):
Expand Down Expand Up @@ -320,24 +321,26 @@ def trigger_events(self, contact_points: ContactPointsList) -> Union[List[LossOf
:return: An instance of the LossOfContactEvent/AgentLossOfContactEvent class that represents the event if the
object lost contact, else None.
"""
objects_that_lost_contact = self.get_objects_that_lost_contact(contact_points)
if len(objects_that_lost_contact) == 0:
bodies_that_lost_contact = self.get_bodies_that_lost_contact(contact_points)
if len(bodies_that_lost_contact) == 0:
return []
event_type = AgentLossOfContactEvent if issubclass(self.obj_type, pycrap.Agent) else LossOfContactEvent
return [event_type(contact_points, self.latest_contact_points, of_object=self.tracked_object, with_object=obj)
for obj in objects_that_lost_contact]
return [event_type(contact_points, self.latest_contact_points, of_object=self.tracked_object,
with_object=body.parent_entity)
for body in bodies_that_lost_contact]

def get_objects_that_lost_contact(self, contact_points: ContactPointsList) -> List[Object]:
def get_bodies_that_lost_contact(self, contact_points: ContactPointsList) -> List[PhysicalBody]:
"""
Get the objects that lost contact with the object to track.
:param contact_points: The current contact points.
:return: A list of Object instances that represent the objects that lost contact with the object to track.
"""
objects_that_lost_contact = contact_points.get_objects_that_got_removed(self.latest_contact_points)
bodies_that_lost_contact = contact_points.get_bodies_that_got_removed(self.latest_contact_points)
if self.with_object is not None:
objects_that_lost_contact = [obj for obj in objects_that_lost_contact if obj == self.with_object]
return objects_that_lost_contact
bodies_that_lost_contact = [body for body in bodies_that_lost_contact
if body.parent_entity == self.with_object]
return bodies_that_lost_contact


class LossOfSurfaceDetector(LossOfContactDetector):
Expand All @@ -350,11 +353,11 @@ def trigger_events(self, contact_points: ContactPointsList) -> List[LossOfSurfac
:return: An instance of the LossOfSurfaceEvent class that represents the event if the object lost contact with
the surface, else None.
"""
objects_that_lost_contact = self.get_objects_that_lost_contact(contact_points)
if len(objects_that_lost_contact) == 0:
bodies_that_lost_contact = self.get_bodies_that_lost_contact(contact_points)
if len(bodies_that_lost_contact) == 0:
return []
supporting_surface = check_for_supporting_surface(self.tracked_object,
objects_that_lost_contact)
supporting_surface = check_if_in_contact_with_support(self.tracked_object,
bodies_that_lost_contact)
if supporting_surface is None:
return []
return [LossOfSurfaceEvent(contact_points, self.latest_contact_points, of_object=self.tracked_object,
Expand Down Expand Up @@ -823,11 +826,12 @@ def detect_events(self) -> List[EventUnion]:

self.interaction_event.end_timestamp = self.end_timestamp
event = self.interaction_event

break

if event:
rospy.loginfo(f"{self.__class__.__name__} detected an interaction with: {self.tracked_object.name}")
return [event]

return []

@abstractmethod
Expand Down Expand Up @@ -946,27 +950,25 @@ def start_condition_checker(cls, event: Event) -> bool:
:param event: The ContactEvent instance that represents the contact event.
"""
return (isinstance(event, LossOfContactEvent)
and any(select_transportable_objects_from_loss_of_contact_event(event)))
and any(select_transportable_objects_from_loss_of_contact_event(event))
and check_if_in_contact_with_support(event.tracked_object, event.links))

def interaction_checks(self) -> bool:
"""
Check for upward motion after the object lost contact with the surface.
"""
if check_for_supporting_surface(self.tracked_object,
self.starter_event.latest_objects_that_got_removed) is not None:
# wait for the object to be lifted TODO: Should be replaced with a wait on a lifting event
dt = timedelta(milliseconds=400)
time.sleep(dt.total_seconds())
latest_event = self.check_for_event_near_starter_event(TranslationEvent, dt)

if not latest_event:
return False
print(f"checking if {self.tracked_object.name} was picked up")
# wait for the object to be lifted TODO: Should be replaced with a wait on a lifting event
dt = timedelta(milliseconds=400)
time.sleep(dt.total_seconds())
print(f"checking for translation event for {self.tracked_object.name}")
latest_event = self.check_for_event_near_starter_event(TranslationEvent, dt)

# z_motion = latest_event.current_pose.position.z - latest_event.start_pose.position.z
# if z_motion >= 0.005:
if latest_event:
self.end_timestamp = max(latest_event.timestamp, self.start_timestamp)
return True

self.kill_event.set()
return False


Expand Down Expand Up @@ -994,8 +996,8 @@ def start_condition_checker(cls, event: Event) -> bool:
:param event: The ContactEvent instance that represents the contact event.
"""
if (isinstance(event, ContactEvent)
and any(select_transportable_objects([event.tracked_object]))):
if (isinstance(event, ContactEvent) and any(select_transportable_objects([event.tracked_object]))
and check_if_in_contact_with_support(event.tracked_object, event.links)):
return True
return False

Expand All @@ -1007,30 +1009,14 @@ def initial_interaction_checkers(self) -> bool:
time.sleep(dt.total_seconds())
event = self.check_for_event_near_starter_event(StopMotionEvent, dt)
if event is not None:
print(f"found event: {event} for {self.tracked_object.name}")
print(f"with history: {self.object_tracker.get_event_history()}")
if not check_if_object_is_supported(self.tracked_object, 0.01):
self.kill_event.set()
return False
self.end_timestamp = self.start_timestamp
print(f"end_timestamp: {self.end_timestamp}")
self.end_timestamp = event.timestamp
return True
self.kill_event.set()
return False

def check_if_contact_event_is_with_surface(self, contact_event: ContactEvent) -> bool:
"""
Check if the contact event is with a surface.
:param contact_event: The ContactEvent instance that represents the contact event.
"""
return check_if_object_is_supported_by_another_object(self.tracked_object, contact_event.with_object,
contact_event.contact_points)
# return check_if_object_is_supported(self.tracked_object)


def check_for_supporting_surface(tracked_object: Object,
possible_surfaces: List[Object]) -> Optional[Object]:
possible_surfaces: Optional[List[Object]] = None) -> Optional[Object]:
"""
Check if any of the possible surfaces are supporting the tracked_object.
Expand All @@ -1046,8 +1032,11 @@ def check_for_supporting_surface(tracked_object: Object,
contacted_bodies = contact_points.get_objects_that_have_points()
contacted_body_names = [body.name for body in contacted_bodies]
contacted_bodies = dict(zip(contacted_body_names, contacted_bodies))
possible_surface_names = [obj.name for obj in possible_surfaces]
possible_surface_names = list(set(contacted_body_names).intersection(possible_surface_names))
if possible_surfaces is None:
possible_surface_names = contacted_body_names
else:
possible_surface_names = [obj.name for obj in possible_surfaces]
possible_surface_names = list(set(contacted_body_names).intersection(possible_surface_names))
supporting_surface = None
opposite_gravity = [0, 0, 1]
smallest_angle = np.pi / 8
Expand Down Expand Up @@ -1129,8 +1118,7 @@ def select_transportable_objects(objects: List[Object]) -> List[Object]:
:param objects: A list of Object instances.
"""
transportable_objects = [obj for obj in objects
if
not issubclass(obj.obj_type, (pycrap.Agent, pycrap.Location, pycrap.Floor, pycrap.Genobj))]
if not issubclass(obj.obj_type, (pycrap.Agent, pycrap.Location, pycrap.Supporter))]
return transportable_objects


Expand Down
3 changes: 2 additions & 1 deletion src/episode_segmenter/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pycram.datastructures.dataclasses import ContactPointsList, Color, TextAnnotation, ObjectState
from pycram.datastructures.pose import Pose
from pycram.datastructures.world import World
from pycram.datastructures.world_entity import PhysicalBody
from pycram.world_concepts.world_object import Object, Link


Expand Down Expand Up @@ -355,7 +356,7 @@ def __init__(self, contact_points: ContactPointsList,
surface: Optional[Object] = None,
timestamp: Optional[float] = None):
super().__init__(contact_points, latest_contact_points, of_object, surface, timestamp)
self.surface: Optional[Object] = surface
self.surface: Optional[PhysicalBody] = surface


class AbstractAgentObjectInteractionEvent(HasTwoTrackedObjects, ABC):
Expand Down
49 changes: 39 additions & 10 deletions src/episode_segmenter/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
from __future__ import annotations

import math

import numpy as np
from tf.transformations import quaternion_inverse, quaternion_multiply
from typing_extensions import List, Optional
from typing_extensions import List, Optional, TYPE_CHECKING

import pycrap
from pycram.datastructures.dataclasses import ContactPointsList
from pycram.datastructures.pose import Transform
from pycram.datastructures.world import World, UseProspectionWorld
from pycram.datastructures.world_entity import PhysicalBody
from pycram.world_concepts.world_object import Object
from pycram.ros.logging import logdebug
from pycram.object_descriptors.generic import ObjectDescription as GenericObjectDescription

if TYPE_CHECKING:
from .events import LossOfContactEvent


def check_if_object_is_supported(obj: Object, distance: Optional[float] = 0.03) -> bool:
"""
Expand All @@ -24,14 +30,17 @@ def check_if_object_is_supported(obj: Object, distance: Optional[float] = 0.03)
supported = True
with UseProspectionWorld():
prospection_obj = World.current_world.get_prospection_object_for_object(obj)
current_position = prospection_obj.get_position_as_list()
# current_position = prospection_obj.get_position_as_list()
dt = math.sqrt(2 * distance / 9.81) + 0.01 # time to fall distance
World.current_world.simulate(dt)
new_position = prospection_obj.get_position_as_list()
print("change in position", current_position[2] - new_position[2])
if current_position[2] - new_position[2] > distance:
logdebug(f"Object {obj.name} is not supported")
supported = False
cp = prospection_obj.contact_points
if not check_if_in_contact_with_support(prospection_obj, cp.get_bodies_in_contact()):
return False
# new_position = prospection_obj.get_position_as_list()
# print("change in position", current_position[2] - new_position[2])
# if current_position[2] - new_position[2] > distance:
# logdebug(f"Object {obj.name} is not supported")
# supported = False
return supported


Expand All @@ -48,6 +57,23 @@ def check_if_object_is_supported_using_contact_points(obj: Object, contact_point
return True


def check_if_in_contact_with_support(obj: Object, contact_bodies: List[PhysicalBody]) -> Optional[PhysicalBody]:
"""
Check if the object is in contact with a supporting surface.
:param obj: The object to check if it is in contact with a supporting surface.
:param contact_bodies: The bodies in contact with the object.
"""
for body in contact_bodies:
if issubclass(body.parent_entity.obj_type, pycrap.Supporter):
body_aabb = body.get_axis_aligned_bounding_box()
surface_z = body_aabb.max_z
tracked_object_base = obj.get_base_origin().position
if tracked_object_base.z >= surface_z and body_aabb.min_x <= tracked_object_base.x <= body_aabb.max_x and \
body_aabb.min_y <= tracked_object_base.y <= body_aabb.max_y:
return body


def check_if_object_is_supported_by_another_object(obj: Object, support_obj: Object,
contact_points: Optional[ContactPointsList] = None) -> bool:
"""
Expand All @@ -60,8 +86,11 @@ def check_if_object_is_supported_by_another_object(obj: Object, support_obj: Obj
"""
if contact_points is None:
contact_points = obj.get_contact_points_with_body(support_obj)
average_normal = np.mean([cp.normal for cp in contact_points], axis=0)
return is_vector_opposite_to_gravity(average_normal)
normals = [cp.normal for cp in contact_points if any(cp.normal)]
if len(normals) > 0:
average_normal = np.mean(normals, axis=0)
return is_vector_opposite_to_gravity(average_normal)
return False


def is_vector_opposite_to_gravity(vector: List[float], gravity_vector: Optional[List[float]] = None) -> bool:
Expand Down Expand Up @@ -97,7 +126,7 @@ def imagine_support(cls, obj: Object, support_thickness: Optional[float] = 0.005
obj_aabb = obj.get_axis_aligned_bounding_box()
support = GenericObjectDescription(f"imagined_support_{cls.latest_surface_idx}",
[0, 0, 0], [obj_aabb.width, obj_aabb.depth, support_thickness*0.5])
support_obj = Object(f"imagined_support_{cls.latest_surface_idx}", pycrap.Genobj, None, support)
support_obj = Object(f"imagined_support_{cls.latest_surface_idx}", pycrap.Supporter, None, support)
support_position = obj_base_position.copy()
support_position[2] -= support_thickness
support_obj.set_position(support_position)
Expand Down

0 comments on commit a2cadb4

Please sign in to comment.