diff --git a/src/episode_segmenter/episode_segmenter.py b/src/episode_segmenter/episode_segmenter.py index 77fb744..091d6df 100644 --- a/src/episode_segmenter/episode_segmenter.py +++ b/src/episode_segmenter/episode_segmenter.py @@ -323,7 +323,8 @@ def run_initial_event_detectors(self) -> None: """ set_of_objects = set() for obj in World.current_world.objects: - if not obj.is_an_environment and (obj not in self.objects_to_avoid): + if (not obj.is_an_environment and not issubclass(obj.obj_type, pycrap.Supporter) + and (obj.name not in self.objects_to_avoid)): set_of_objects.add(obj) try: if not check_if_object_is_supported(obj): diff --git a/src/episode_segmenter/event_detectors.py b/src/episode_segmenter/event_detectors.py index d914955..93b16ae 100644 --- a/src/episode_segmenter/event_detectors.py +++ b/src/episode_segmenter/event_detectors.py @@ -16,6 +16,7 @@ from pycram.datastructures.dataclasses import ContactPointsList from pycram.datastructures.pose import Pose from pycram.datastructures.world import UseProspectionWorld +from pycram.datastructures.world_entity import PhysicalBody from pycram.ros.logging import logdebug from pycram.world_concepts.world_object import Object from pycrap import PhysicalObject @@ -26,7 +27,7 @@ from .object_tracker import ObjectTracker, ObjectTrackerFactory from .utils import get_angle_between_vectors, calculate_euclidean_distance, calculate_quaternion_difference, \ check_if_object_is_supported, check_if_object_is_supported_using_contact_points, \ - check_if_object_is_supported_by_another_object + check_if_object_is_supported_by_another_object, check_if_in_contact_with_support class PrimitiveEventDetector(threading.Thread, ABC): @@ -320,24 +321,26 @@ def trigger_events(self, contact_points: ContactPointsList) -> Union[List[LossOf :return: An instance of the LossOfContactEvent/AgentLossOfContactEvent class that represents the event if the object lost contact, else None. """ - objects_that_lost_contact = self.get_objects_that_lost_contact(contact_points) - if len(objects_that_lost_contact) == 0: + bodies_that_lost_contact = self.get_bodies_that_lost_contact(contact_points) + if len(bodies_that_lost_contact) == 0: return [] event_type = AgentLossOfContactEvent if issubclass(self.obj_type, pycrap.Agent) else LossOfContactEvent - return [event_type(contact_points, self.latest_contact_points, of_object=self.tracked_object, with_object=obj) - for obj in objects_that_lost_contact] + return [event_type(contact_points, self.latest_contact_points, of_object=self.tracked_object, + with_object=body.parent_entity) + for body in bodies_that_lost_contact] - def get_objects_that_lost_contact(self, contact_points: ContactPointsList) -> List[Object]: + def get_bodies_that_lost_contact(self, contact_points: ContactPointsList) -> List[PhysicalBody]: """ Get the objects that lost contact with the object to track. :param contact_points: The current contact points. :return: A list of Object instances that represent the objects that lost contact with the object to track. """ - objects_that_lost_contact = contact_points.get_objects_that_got_removed(self.latest_contact_points) + bodies_that_lost_contact = contact_points.get_bodies_that_got_removed(self.latest_contact_points) if self.with_object is not None: - objects_that_lost_contact = [obj for obj in objects_that_lost_contact if obj == self.with_object] - return objects_that_lost_contact + bodies_that_lost_contact = [body for body in bodies_that_lost_contact + if body.parent_entity == self.with_object] + return bodies_that_lost_contact class LossOfSurfaceDetector(LossOfContactDetector): @@ -350,11 +353,11 @@ def trigger_events(self, contact_points: ContactPointsList) -> List[LossOfSurfac :return: An instance of the LossOfSurfaceEvent class that represents the event if the object lost contact with the surface, else None. """ - objects_that_lost_contact = self.get_objects_that_lost_contact(contact_points) - if len(objects_that_lost_contact) == 0: + bodies_that_lost_contact = self.get_bodies_that_lost_contact(contact_points) + if len(bodies_that_lost_contact) == 0: return [] - supporting_surface = check_for_supporting_surface(self.tracked_object, - objects_that_lost_contact) + supporting_surface = check_if_in_contact_with_support(self.tracked_object, + bodies_that_lost_contact) if supporting_surface is None: return [] return [LossOfSurfaceEvent(contact_points, self.latest_contact_points, of_object=self.tracked_object, @@ -823,11 +826,12 @@ def detect_events(self) -> List[EventUnion]: self.interaction_event.end_timestamp = self.end_timestamp event = self.interaction_event - break + if event: rospy.loginfo(f"{self.__class__.__name__} detected an interaction with: {self.tracked_object.name}") return [event] + return [] @abstractmethod @@ -946,27 +950,25 @@ def start_condition_checker(cls, event: Event) -> bool: :param event: The ContactEvent instance that represents the contact event. """ return (isinstance(event, LossOfContactEvent) - and any(select_transportable_objects_from_loss_of_contact_event(event))) + and any(select_transportable_objects_from_loss_of_contact_event(event)) + and check_if_in_contact_with_support(event.tracked_object, event.links)) def interaction_checks(self) -> bool: """ Check for upward motion after the object lost contact with the surface. """ - if check_for_supporting_surface(self.tracked_object, - self.starter_event.latest_objects_that_got_removed) is not None: - # wait for the object to be lifted TODO: Should be replaced with a wait on a lifting event - dt = timedelta(milliseconds=400) - time.sleep(dt.total_seconds()) - latest_event = self.check_for_event_near_starter_event(TranslationEvent, dt) - - if not latest_event: - return False + print(f"checking if {self.tracked_object.name} was picked up") + # wait for the object to be lifted TODO: Should be replaced with a wait on a lifting event + dt = timedelta(milliseconds=400) + time.sleep(dt.total_seconds()) + print(f"checking for translation event for {self.tracked_object.name}") + latest_event = self.check_for_event_near_starter_event(TranslationEvent, dt) - # z_motion = latest_event.current_pose.position.z - latest_event.start_pose.position.z - # if z_motion >= 0.005: + if latest_event: self.end_timestamp = max(latest_event.timestamp, self.start_timestamp) return True + self.kill_event.set() return False @@ -994,8 +996,8 @@ def start_condition_checker(cls, event: Event) -> bool: :param event: The ContactEvent instance that represents the contact event. """ - if (isinstance(event, ContactEvent) - and any(select_transportable_objects([event.tracked_object]))): + if (isinstance(event, ContactEvent) and any(select_transportable_objects([event.tracked_object])) + and check_if_in_contact_with_support(event.tracked_object, event.links)): return True return False @@ -1007,30 +1009,14 @@ def initial_interaction_checkers(self) -> bool: time.sleep(dt.total_seconds()) event = self.check_for_event_near_starter_event(StopMotionEvent, dt) if event is not None: - print(f"found event: {event} for {self.tracked_object.name}") - print(f"with history: {self.object_tracker.get_event_history()}") - if not check_if_object_is_supported(self.tracked_object, 0.01): - self.kill_event.set() - return False - self.end_timestamp = self.start_timestamp - print(f"end_timestamp: {self.end_timestamp}") + self.end_timestamp = event.timestamp return True self.kill_event.set() return False - def check_if_contact_event_is_with_surface(self, contact_event: ContactEvent) -> bool: - """ - Check if the contact event is with a surface. - - :param contact_event: The ContactEvent instance that represents the contact event. - """ - return check_if_object_is_supported_by_another_object(self.tracked_object, contact_event.with_object, - contact_event.contact_points) - # return check_if_object_is_supported(self.tracked_object) - def check_for_supporting_surface(tracked_object: Object, - possible_surfaces: List[Object]) -> Optional[Object]: + possible_surfaces: Optional[List[Object]] = None) -> Optional[Object]: """ Check if any of the possible surfaces are supporting the tracked_object. @@ -1046,8 +1032,11 @@ def check_for_supporting_surface(tracked_object: Object, contacted_bodies = contact_points.get_objects_that_have_points() contacted_body_names = [body.name for body in contacted_bodies] contacted_bodies = dict(zip(contacted_body_names, contacted_bodies)) - possible_surface_names = [obj.name for obj in possible_surfaces] - possible_surface_names = list(set(contacted_body_names).intersection(possible_surface_names)) + if possible_surfaces is None: + possible_surface_names = contacted_body_names + else: + possible_surface_names = [obj.name for obj in possible_surfaces] + possible_surface_names = list(set(contacted_body_names).intersection(possible_surface_names)) supporting_surface = None opposite_gravity = [0, 0, 1] smallest_angle = np.pi / 8 @@ -1129,8 +1118,7 @@ def select_transportable_objects(objects: List[Object]) -> List[Object]: :param objects: A list of Object instances. """ transportable_objects = [obj for obj in objects - if - not issubclass(obj.obj_type, (pycrap.Agent, pycrap.Location, pycrap.Floor, pycrap.Genobj))] + if not issubclass(obj.obj_type, (pycrap.Agent, pycrap.Location, pycrap.Supporter))] return transportable_objects diff --git a/src/episode_segmenter/events.py b/src/episode_segmenter/events.py index f0f0063..dfc7676 100644 --- a/src/episode_segmenter/events.py +++ b/src/episode_segmenter/events.py @@ -8,6 +8,7 @@ from pycram.datastructures.dataclasses import ContactPointsList, Color, TextAnnotation, ObjectState from pycram.datastructures.pose import Pose from pycram.datastructures.world import World +from pycram.datastructures.world_entity import PhysicalBody from pycram.world_concepts.world_object import Object, Link @@ -355,7 +356,7 @@ def __init__(self, contact_points: ContactPointsList, surface: Optional[Object] = None, timestamp: Optional[float] = None): super().__init__(contact_points, latest_contact_points, of_object, surface, timestamp) - self.surface: Optional[Object] = surface + self.surface: Optional[PhysicalBody] = surface class AbstractAgentObjectInteractionEvent(HasTwoTrackedObjects, ABC): diff --git a/src/episode_segmenter/utils.py b/src/episode_segmenter/utils.py index 62efca9..6f89eb4 100644 --- a/src/episode_segmenter/utils.py +++ b/src/episode_segmenter/utils.py @@ -1,17 +1,23 @@ +from __future__ import annotations + import math import numpy as np from tf.transformations import quaternion_inverse, quaternion_multiply -from typing_extensions import List, Optional +from typing_extensions import List, Optional, TYPE_CHECKING import pycrap from pycram.datastructures.dataclasses import ContactPointsList from pycram.datastructures.pose import Transform from pycram.datastructures.world import World, UseProspectionWorld +from pycram.datastructures.world_entity import PhysicalBody from pycram.world_concepts.world_object import Object from pycram.ros.logging import logdebug from pycram.object_descriptors.generic import ObjectDescription as GenericObjectDescription +if TYPE_CHECKING: + from .events import LossOfContactEvent + def check_if_object_is_supported(obj: Object, distance: Optional[float] = 0.03) -> bool: """ @@ -24,14 +30,17 @@ def check_if_object_is_supported(obj: Object, distance: Optional[float] = 0.03) supported = True with UseProspectionWorld(): prospection_obj = World.current_world.get_prospection_object_for_object(obj) - current_position = prospection_obj.get_position_as_list() + # current_position = prospection_obj.get_position_as_list() dt = math.sqrt(2 * distance / 9.81) + 0.01 # time to fall distance World.current_world.simulate(dt) - new_position = prospection_obj.get_position_as_list() - print("change in position", current_position[2] - new_position[2]) - if current_position[2] - new_position[2] > distance: - logdebug(f"Object {obj.name} is not supported") - supported = False + cp = prospection_obj.contact_points + if not check_if_in_contact_with_support(prospection_obj, cp.get_bodies_in_contact()): + return False + # new_position = prospection_obj.get_position_as_list() + # print("change in position", current_position[2] - new_position[2]) + # if current_position[2] - new_position[2] > distance: + # logdebug(f"Object {obj.name} is not supported") + # supported = False return supported @@ -48,6 +57,23 @@ def check_if_object_is_supported_using_contact_points(obj: Object, contact_point return True +def check_if_in_contact_with_support(obj: Object, contact_bodies: List[PhysicalBody]) -> Optional[PhysicalBody]: + """ + Check if the object is in contact with a supporting surface. + + :param obj: The object to check if it is in contact with a supporting surface. + :param contact_bodies: The bodies in contact with the object. + """ + for body in contact_bodies: + if issubclass(body.parent_entity.obj_type, pycrap.Supporter): + body_aabb = body.get_axis_aligned_bounding_box() + surface_z = body_aabb.max_z + tracked_object_base = obj.get_base_origin().position + if tracked_object_base.z >= surface_z and body_aabb.min_x <= tracked_object_base.x <= body_aabb.max_x and \ + body_aabb.min_y <= tracked_object_base.y <= body_aabb.max_y: + return body + + def check_if_object_is_supported_by_another_object(obj: Object, support_obj: Object, contact_points: Optional[ContactPointsList] = None) -> bool: """ @@ -60,8 +86,11 @@ def check_if_object_is_supported_by_another_object(obj: Object, support_obj: Obj """ if contact_points is None: contact_points = obj.get_contact_points_with_body(support_obj) - average_normal = np.mean([cp.normal for cp in contact_points], axis=0) - return is_vector_opposite_to_gravity(average_normal) + normals = [cp.normal for cp in contact_points if any(cp.normal)] + if len(normals) > 0: + average_normal = np.mean(normals, axis=0) + return is_vector_opposite_to_gravity(average_normal) + return False def is_vector_opposite_to_gravity(vector: List[float], gravity_vector: Optional[List[float]] = None) -> bool: @@ -97,7 +126,7 @@ def imagine_support(cls, obj: Object, support_thickness: Optional[float] = 0.005 obj_aabb = obj.get_axis_aligned_bounding_box() support = GenericObjectDescription(f"imagined_support_{cls.latest_surface_idx}", [0, 0, 0], [obj_aabb.width, obj_aabb.depth, support_thickness*0.5]) - support_obj = Object(f"imagined_support_{cls.latest_surface_idx}", pycrap.Genobj, None, support) + support_obj = Object(f"imagined_support_{cls.latest_surface_idx}", pycrap.Supporter, None, support) support_position = obj_base_position.copy() support_position[2] -= support_thickness support_obj.set_position(support_position)