Skip to content

Commit

Permalink
[ObjectTracker] Placing and picking are working properly.
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrhmanBassiouny committed Dec 16, 2024
1 parent 3bbd339 commit 7997826
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 59 deletions.
7 changes: 2 additions & 5 deletions src/episode_segmenter/episode_segmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .events import ContactEvent, Event, AgentContactEvent, PickUpEvent, EventUnion, StopMotionEvent, MotionEvent, \
NewObjectEvent, RotationEvent, StopRotationEvent, PlacingEvent, HasTrackedObject
from .object_tracker import ObjectTracker
from .utils import check_if_object_is_supported, add_imaginary_support_for_object, adjust_imaginary_support_for_object
from .utils import check_if_object_is_supported, Imaginator


class EpisodeSegmenter(ABC):
Expand Down Expand Up @@ -327,10 +327,7 @@ def run_initial_event_detectors(self) -> None:
set_of_objects.add(obj)
try:
if not check_if_object_is_supported(obj):
if World.current_world.get_object_by_name('imagined_support') is not None:
adjust_imaginary_support_for_object(obj)
else:
add_imaginary_support_for_object(obj)
Imaginator.imagine_support(obj)
except NotImplementedError:
logwarn("Support detection is not implemented for this simulator.")
for obj in set_of_objects:
Expand Down
172 changes: 157 additions & 15 deletions src/episode_segmenter/event_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .event_logger import EventLogger
from .events import Event, ContactEvent, LossOfContactEvent, PickUpEvent, AgentContactEvent, \
AgentLossOfContactEvent, EventUnion, LossOfSurfaceEvent, TranslationEvent, StopTranslationEvent, NewObjectEvent, \
RotationEvent, StopRotationEvent, PlacingEvent, MotionEvent, StopMotionEvent
RotationEvent, StopRotationEvent, PlacingEvent, MotionEvent, StopMotionEvent, LiftingEvent
from .object_tracker import ObjectTracker, ObjectTrackerFactory
from .utils import get_angle_between_vectors, calculate_euclidean_distance, calculate_quaternion_difference, \
check_if_object_is_supported, check_if_object_is_supported_using_contact_points, \
Expand Down Expand Up @@ -598,6 +598,154 @@ def start_timestamp(self) -> float:
return self.starter_event.timestamp


class LiftingDetector(EventDetector):
"""
A detector that detects if the tracked_object was lifted.
"""

thread_prefix = "lifting_"

def __init__(self, logger: EventLogger, starter_event: LossOfSurfaceEvent, *args, **kwargs):
"""
:param logger: An instance of the EventLogger class that is used to log the events.
:param starter_event: An instance of the Event class that represents the event to start the event detector.
"""
super().__init__(logger, starter_event, *args, **kwargs)
self.tracked_object = self.get_object_to_lift_from_event(starter_event)
self.lifting_event: Optional[LiftingEvent] = None
self.run_once = True

@classmethod
def get_object_to_lift_from_event(cls, event: LossOfSurfaceEvent) -> Object:
"""
Get the tracked_object to lift from the event.
"""
return event.tracked_object

@classmethod
def start_condition_checker(cls, event: Event) -> bool:
"""
Check if the event is a starter event.
:param event: The Event instance that represents the event.
"""
return isinstance(event, LossOfSurfaceEvent)

def detect_events(self) -> List[LiftingEvent]:
"""
Detect if the tracked_object was lifted.
:return: An instance of the LiftingEvent class if the tracked_object was lifted, else None.
"""
event = None
start_time = time.time()
while not self.kill_event.is_set() and (time.time() - start_time < 1):
if not self.lifting_checks():
time.sleep(0.01)
continue
event = self.lifting_event
break
if event:
rospy.loginfo(f"{self.__class__.__name__} detected a lifting of: {self.tracked_object.name}")
return [event]
return []

def lifting_checks(self) -> bool:
"""
Perform checks to determine if the object was lifted.
:return: A boolean value that represents if all the checks passed and the object was lifted.
"""
translation_event = self.check_for_event_near_starter_event(TranslationEvent, timedelta(seconds=1))
if translation_event and self.translation_event_condition(translation_event):
self.lifting_event = LiftingEvent(self.tracked_object, translation_event.start_pose,
translation_event.current_pose, timestamp=translation_event.timestamp,
from_surface=self.starter_event.surface)
return True
return False

def translation_event_condition(self, translation_event: TranslationEvent) -> bool:
"""
Check if the translation event condition is met.
:param translation_event: The translation event instance.
"""
return translation_event.current_pose.position.z > translation_event.start_pose.position.z


class LoweringDetector(EventDetector):
"""
A detector that detects if the tracked_object was lowered.
"""

thread_prefix = "lowering_"

def __init__(self, logger: EventLogger, starter_event: LiftingEvent, *args, **kwargs):
"""
:param logger: An instance of the EventLogger class that is used to log the events.
:param starter_event: An instance of the Event class that represents the event to start the event detector.
"""
super().__init__(logger, starter_event, *args, **kwargs)
self.tracked_object = self.get_object_to_lower_from_event(starter_event)
self.lowering_event: Optional[TranslationEvent] = None
self.run_once = True

@classmethod
def get_object_to_lower_from_event(cls, event: LiftingEvent) -> Object:
"""
Get the tracked_object to lower from the event.
"""
return event.tracked_object

@classmethod
def start_condition_checker(cls, event: Event) -> bool:
"""
Check if the event is a starter event.
:param event: The Event instance that represents the event.
"""
return isinstance(event, LiftingEvent)

def detect_events(self) -> List[TranslationEvent]:
"""
Detect if the tracked_object was lowered.
:return: An instance of the TranslationEvent class if the tracked_object was lowered, else None.
"""
event = None
start_time = time.time()
while not self.kill_event.is_set() and (time.time() - start_time < 1):
if not self.lowering_checks():
time.sleep(0.01)
continue
event = self.lowering_event
break
if event:
rospy.loginfo(f"{self.__class__.__name__} detected a lowering of: {self.tracked_object.name}")
return [event]
return []

def lowering_checks(self) -> bool:
"""
Perform checks to determine if the object was lowered.
:return: A boolean value that represents if all the checks passed and the object was lowered.
"""
translation_event = self.check_for_event_near_starter_event(TranslationEvent, timedelta(seconds=1))
if translation_event and self.translation_event_condition(translation_event):
self.lowering_event = translation_event
return True
return False

def translation_event_condition(self, translation_event: TranslationEvent) -> bool:
"""
Check if the translation event condition is met.
:param translation_event: The translation event instance.
"""
return translation_event.current_pose.position.z < translation_event.start_pose.position.z


class MotionPlacingDetector(EventDetector, ABC):
"""
A detector that detects if the tracked_object was placed on a surface by using motion and contact.
Expand Down Expand Up @@ -807,17 +955,17 @@ def interaction_checks(self) -> bool:
if check_for_supporting_surface(self.tracked_object,
self.starter_event.latest_objects_that_got_removed) is not None:
# wait for the object to be lifted TODO: Should be replaced with a wait on a lifting event
dt = timedelta(milliseconds=300)
dt = timedelta(milliseconds=400)
time.sleep(dt.total_seconds())
latest_event = self.check_for_event_near_starter_event(TranslationEvent, dt)

if not latest_event:
return False

z_motion = latest_event.current_pose.position.z - latest_event.start_pose.position.z
if z_motion >= 0.005:
self.end_timestamp = max(latest_event.timestamp, self.start_timestamp)
return True
# z_motion = latest_event.current_pose.position.z - latest_event.start_pose.position.z
# if z_motion >= 0.005:
self.end_timestamp = max(latest_event.timestamp, self.start_timestamp)
return True

return False

Expand Down Expand Up @@ -857,16 +1005,10 @@ def initial_interaction_checkers(self) -> bool:
"""
dt = timedelta(milliseconds=1000)
time.sleep(dt.total_seconds())
event = self.check_for_event_near_starter_event(StopTranslationEvent, dt)
event = self.check_for_event_near_starter_event(StopMotionEvent, dt)
if event is not None:
print(f"found translation event: {event} for {self.tracked_object.name}")
print(f"object with history: {self.object_tracker.get_event_history()}")
if (event.current_pose.position.z - event.start_pose.position.z) > -0.001:
self.kill_event.set()
return False
# wait for the object to stop moving
dt = timedelta(milliseconds=500)
time.sleep(dt.total_seconds())
print(f"found event: {event} for {self.tracked_object.name}")
print(f"with history: {self.object_tracker.get_event_history()}")
if not check_if_object_is_supported(self.tracked_object, 0.01):
self.kill_event.set()
return False
Expand Down
10 changes: 10 additions & 0 deletions src/episode_segmenter/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ def set_color(self, color: Optional[Color] = None):
def __str__(self):
return f"{self.__class__.__name__}: {self.tracked_object.name} - {self.timestamp}"

def __repr__(self):
return self.__str__()


class TranslationEvent(MotionEvent):
@property
Expand Down Expand Up @@ -177,6 +180,13 @@ class StopRotationEvent(StopMotionEvent):
...


class LiftingEvent(TranslationEvent):
def __init__(self, tracked_object: Object, start_pose: Pose, current_pose: Pose,
timestamp: Optional[float] = None, from_surface: Optional[Object] = None):
super().__init__(tracked_object, start_pose, current_pose, timestamp)
self.from_surface: Optional[Object] = from_surface


class HasTwoTrackedObjects(HasTrackedObject, ABC):
"""
A mixin class that provides the tracked object and the object with which it interacts for the event.
Expand Down
67 changes: 28 additions & 39 deletions src/episode_segmenter/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,46 +76,35 @@ def is_vector_opposite_to_gravity(vector: List[float], gravity_vector: Optional[
return np.dot(vector, gravity_vector) < 0


def adjust_imaginary_support_for_object(obj: Object,
support_name: Optional[str] = f"imagined_support",
support_thickness: Optional[float] = 0.005) -> None:
"""
Adjust the imaginary support for the object such that it is at the base of the object.
:param obj: The object to check if it is supported.
:param support_name: The name of the support object.
:param support_thickness: The thickness of the support.
"""
support_obj = World.current_world.get_object_by_name(support_name)
floor = World.current_world.get_object_by_name('floor')
obj_base_position = obj.get_base_position_as_list()
support_position = support_obj.get_position_as_list()
if obj_base_position[2] <= (support_position[2] + support_thickness * 0.5):
floor.detach(support_obj)
support_position[2] = obj_base_position[2] - support_thickness * 0.5
class Imaginator:

"""
A class that provides methods for imagining objects.
"""
surfaces_created: List[Object] = []
latest_surface_idx: int = 0

@classmethod
def imagine_support(cls, obj: Object, support_thickness: Optional[float] = 0.005) -> Object:
"""
Imagine a support for the object.
:param obj: The object for which the support should be imagined.
:param support_thickness: The thickness of the support.
:return: The support object.
"""
obj_base_position = obj.get_base_position_as_list()
obj_aabb = obj.get_axis_aligned_bounding_box()
support = GenericObjectDescription(f"imagined_support_{cls.latest_surface_idx}",
[0, 0, 0], [obj_aabb.width, obj_aabb.depth, support_thickness*0.5])
support_obj = Object(f"imagined_support_{cls.latest_surface_idx}", pycrap.Genobj, None, support)
support_position = obj_base_position.copy()
support_position[2] -= support_thickness
support_obj.set_position(support_position)
floor.attach(support_obj)


def add_imaginary_support_for_object(obj: Object,
support_name: Optional[str] = f"imagined_support",
support_thickness: Optional[float] = 0.005) -> Object:
"""
Add an imaginary support for the object.
:param obj: The object for which the support should be added.
:param support_name: The name of the support object.
:param support_thickness: The thickness of the support.
:return: The support object.
"""
obj_base_position = obj.get_base_position_as_list()
support = GenericObjectDescription(support_name, [0, 0, 0], [1, 1, support_thickness*0.5])
support_obj = Object(support_name, pycrap.Genobj, None, support)
support_position = obj_base_position.copy()
support_position[2] -= support_thickness
support_obj.set_position(support_position)
World.current_world.get_object_by_name('floor').attach(support_obj)
return support_obj
World.current_world.get_object_by_name('floor').attach(support_obj)
cls.surfaces_created.append(support_obj)
cls.latest_surface_idx += 1
return support_obj


def get_angle_between_vectors(vector_1: List[float], vector_2: List[float]) -> float:
Expand Down

0 comments on commit 7997826

Please sign in to comment.