From b326cba51e52145831fa632a47f9cab095fcc458 Mon Sep 17 00:00:00 2001 From: Ken Haagh Date: Sun, 12 Nov 2023 16:33:21 +0100 Subject: [PATCH] Add y-tracking --- people_tracking/src/UKFclass.py | 239 ++++++++++++++++-------- people_tracking/src/colour_check.py | 26 +-- people_tracking/src/people_tracker.py | 108 ++++++----- people_tracking/src/person_detection.py | 18 +- 4 files changed, 238 insertions(+), 153 deletions(-) diff --git a/people_tracking/src/UKFclass.py b/people_tracking/src/UKFclass.py index 2f102b8..a567654 100755 --- a/people_tracking/src/UKFclass.py +++ b/people_tracking/src/UKFclass.py @@ -1,22 +1,133 @@ +# import numpy as np +# from filterpy.kalman import MerweScaledSigmaPoints +# from filterpy.kalman import UnscentedKalmanFilter +# from filterpy.common.discretization import Q_discrete_white_noise +# # from numpy.random import randn +# # +# # import random +# # import matplotlib.pyplot as plt +# import scipy +# +# # def sqrt_func(x): +# # """ Sqrt functions that prevents covariance matrix P to become slowly not symmetric or positive definite""" +# # try: +# # result = scipy.linalg.cholesky(x) +# # except scipy.linalg.LinAlgError: +# # x = (x + x.T)/2 +# # result = scipy.linalg.cholesky(x) +# # return result +# +# +# class UKF: +# +# def __init__(self): +# self.current_time = 0 +# +# dt = 0.1 # standard dt +# +# # Create sigma points +# self.points = MerweScaledSigmaPoints(6, alpha=0.1, beta=2.0, kappa=-1)#, sqrt_method=sqrt_func) +# +# self.kf = UnscentedKalmanFilter(dim_x=6, dim_z=3, dt=dt, fx=self.fx, hx=self.hx, points=self.points) +# +# self.kf.x = np.array([1., 0, 1., 0, 1., 0]) # Initial state +# self.kf.P *= 1 # Initial uncertainty +# +# z_std = 0.2 +# self.kf.R = np.diag([z_std ** 2, z_std ** 2, z_std ** 2]) # Measurement noise covariance matrix +# +# # Assuming standard deviations for process noise in x, y, and z +# std_x = 3.0 +# std_y = 3.0 +# std_z = 3.0 +# process_noise_cov = np.diag([std_x ** 2, std_y ** 2, std_z ** 2, std_x ** 2, std_y ** 2, std_z ** 2]) +# # Set the process noise covariance matrix +# self.kf.Q = process_noise_cov +# +# +# # self.kf.Q = Q_discrete_white_noise(dim=4, dt=dt, var= 3 ** 2, block_size=2) #Process noise +# +# # https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html +# def fx(self, x, dt): +# """ Function that returns the state x transformed by the state transition function. (cv model) +# Assumption: +# * x = [x, vx, y, vy z, vz]^T +# """ +# F = np.array([[1, dt, 0, 0, 0, 0], +# [0, 1, 0, 0, 0, 0], +# [0, 0, 1, dt, 0, 0], +# [0, 0, 0, 1, 0, 0], +# [0, 0, 0, 0, 1, dt], +# [0, 0, 0, 0, 0, 1]], dtype=float) +# return np.dot(F, x) +# +# def hx(self, x): +# """Measurement function - convert the state into a measurement where measurements are [x_pos, y_pos, z_pos] """ +# return np.array([x[0], x[2], x[4]]) +# +# def predict(self, time): +# """ update the kalman filter by predicting value.""" +# delta_t = time - self.current_time +# self.kf.predict(dt=delta_t) +# self.current_time = time +# +# def update(self, time, z): +# """ Update filter with measurements z.""" +# self.predict(time) +# self.kf.update(z) +# +# +# # # Example with varying dt +# # z_std = 0.1 +# # zs = [[i + randn() * z_std, i + randn() * z_std] for i in range(50)] # Measurements +# # +# # start = 0 +# # step = 0.1 +# # end = 10 +# # times = [start + step * i for i in range(int((end - start) / step) + 1)] +# # times = [x + random.uniform(0, 0.05) for x in times] +# # +# # print(times) +# # xs = [] +# # ys = [] +# # xk = [] +# # yk = [] +# # +# # i = -1 +# # time_last = 0 +# # +# # ukf = UKF() +# # +# # for z in zs: +# # i = i + 1 +# # # delta_t = times[i] - time_last +# # # print(times[i], delta_t) +# # ukf.predict(times[i]) +# # ukf.update(times[i], z) +# # # time_last = times[i] +# # # kf.update(z) +# # # print(kf.x, 'log-likelihood', kf.log_likelihood) +# # xs.append(z[0]) +# # ys.append(z[1]) +# # xk.append(ukf.kf.x[0]) +# # yk.append(ukf.kf.x[2]) +# # # print(xk) +# # +# # fig, ax = plt.subplots() +# # # measured = [list(t) for t in zs] +# # ax.scatter(xs, ys, s=5) +# # ax.plot(xk, yk) +# # +# # ax.set_xlabel('X-axis') +# # ax.set_ylabel('Z-axis') +# # +# # # Display the plot +# # plt.show() + import numpy as np from filterpy.kalman import MerweScaledSigmaPoints from filterpy.kalman import UnscentedKalmanFilter -from filterpy.common.discretization import Q_discrete_white_noise -# from numpy.random import randn -# -# import random -# import matplotlib.pyplot as plt -import scipy - -# def sqrt_func(x): -# """ Sqrt functions that prevents covariance matrix P to become slowly not symmetric or positive definite""" -# try: -# result = scipy.linalg.cholesky(x) -# except scipy.linalg.LinAlgError: -# x = (x + x.T)/2 -# result = scipy.linalg.cholesky(x) -# return result - +from filterpy.common import Q_continuous_white_noise class UKF: @@ -26,89 +137,59 @@ def __init__(self): dt = 0.1 # standard dt # Create sigma points - self.points = MerweScaledSigmaPoints(4, alpha=0.1, beta=2.0, kappa=-1)#, sqrt_method=sqrt_func) + self.points = MerweScaledSigmaPoints(6, alpha=0.1, beta=2.0, kappa=-1) - self.kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=self.fx, hx=self.hx, points=self.points) + self.kf = UnscentedKalmanFilter(dim_x=6, dim_z=3, dt=dt, fx=self.fx, hx=self.hx, points=self.points) - self.kf.x = np.array([1., 0, 1., 0]) # Initial state - self.kf.P *= 1 # Initial uncertainty + # Initial state: [x, vx, y, vy, z, vz] + self.kf.x = np.array([1., 0, 300., 0, 1., 0]) + # Initial uncertainty + self.kf.P *= 1 + + # Measurement noise covariance matrix z_std = 0.2 - self.kf.R = np.diag([z_std ** 2, z_std ** 2]) # Measurement noise covariance matrix + self.kf.R = np.diag([z_std ** 2, z_std ** 2, z_std ** 2]) + + # Process noise covariance matrix + process_noise_stds = [3.0 ** 2, 0.1 ** 2, 3 ** 2, 0.1 ** 2, 0.1 ** 2, 0.1 ** 2] - self.kf.Q = Q_discrete_white_noise(dim=2, dt=dt, var= 3 ** 2, block_size=2) # + self.kf.Q = np.diag(process_noise_stds) - # https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html def fx(self, x, dt): - """ Function that returns the state x transformed by the state transition function. (cv model) - Assumption: - * x = [x, vx, z, vz]^T - """ - F = np.array([[1, dt, 0, 0], - [0, 1, 0, 0], - [0, 0, 1, dt], - [0, 0, 0, 1]], dtype=float) + """ State transition function """ + F = np.array([[1, dt, 0, 0, 0, 0], + [0, 1, 0, 0, 0, 0], + [0, 0, 1, dt, 0, 0], + [0, 0, 0, 1, 0, 0], + [0, 0, 0, 0, 1, dt], + [0, 0, 0, 0, 0, 1]], dtype=float) return np.dot(F, x) def hx(self, x): - """Measurement function - convert the state into a measurement where measurements are [x_pos, z_pos] """ - return np.array([x[0], x[2]]) + """ Measurement function """ + return np.array([x[0], x[2], x[4]]) def predict(self, time): - """ update the kalman filter by predicting value.""" + """ Predict the next state """ delta_t = time - self.current_time self.kf.predict(dt=delta_t) self.current_time = time def update(self, time, z): - """ Update filter with measurements z.""" + """ Update filter with measurements z """ self.predict(time) self.kf.update(z) - -# # Example with varying dt -# z_std = 0.1 -# zs = [[i + randn() * z_std, i + randn() * z_std] for i in range(50)] # Measurements -# -# start = 0 -# step = 0.1 -# end = 10 -# times = [start + step * i for i in range(int((end - start) / step) + 1)] -# times = [x + random.uniform(0, 0.05) for x in times] -# -# print(times) -# xs = [] -# ys = [] -# xk = [] -# yk = [] +# Example usage +# ukf = UKF() # -# i = -1 -# time_last = 0 +# # Assuming you have measurements z (replace with your actual measurements) +# z = np.array([1.2, 0.9, 1.8]) # -# ukf = UKF() +# # Update the filter with the measurements +# ukf.update(current_time, z) # -# for z in zs: -# i = i + 1 -# # delta_t = times[i] - time_last -# # print(times[i], delta_t) -# ukf.predict(times[i]) -# ukf.update(times[i], z) -# # time_last = times[i] -# # kf.update(z) -# # print(kf.x, 'log-likelihood', kf.log_likelihood) -# xs.append(z[0]) -# ys.append(z[1]) -# xk.append(ukf.kf.x[0]) -# yk.append(ukf.kf.x[2]) -# # print(xk) -# -# fig, ax = plt.subplots() -# # measured = [list(t) for t in zs] -# ax.scatter(xs, ys, s=5) -# ax.plot(xk, yk) -# -# ax.set_xlabel('X-axis') -# ax.set_ylabel('Z-axis') -# -# # Display the plot -# plt.show() +# # Access the current state estimate +# current_state_estimate = ukf.kf.x +# print("Current State Estimate:", current_state_estimate) diff --git a/people_tracking/src/colour_check.py b/people_tracking/src/colour_check.py index 0f3f53a..8febbca 100755 --- a/people_tracking/src/colour_check.py +++ b/people_tracking/src/colour_check.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - import rospy import cv2 import numpy as np @@ -9,20 +8,17 @@ from people_tracking.msg import ColourCheckedTarget from people_tracking.msg import DetectedPerson -from sensor_msgs.msg import Image - NODE_NAME = 'HoC' TOPIC_PREFIX = '/hero/' class HOC: def __init__(self) -> None: - # ROS Initialize rospy.init_node(NODE_NAME, anonymous=True) self.subscriber = rospy.Subscriber(TOPIC_PREFIX + 'person_detections', DetectedPerson, self.callback, queue_size = 1) self.publisher = rospy.Publisher(TOPIC_PREFIX + 'HoC', ColourCheckedTarget, queue_size=2) - # self.publisher_debug = rospy.Publisher(TOPIC_PREFIX + 'HOCdebug', Image, queue_size=10) + # self.publisher_debug = rospy.Publisher(TOPIC_PREFIX + 'debug/HoC_debug', Image, queue_size=10) # Variables self.HoC_detections = [] @@ -35,17 +31,14 @@ def get_vector(image, bins=32): :param bins: amount of bins in histogram :return: HSV-colour histogram vector from image """ - # Convert to HSV - hsv_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV) + hsv_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV) # Convert to HSV - # Get color histograms - histograms = [cv2.calcHist([hsv_image], [i], None, [bins], [0, 256]) for i in range(3)] + histograms = [cv2.calcHist([hsv_image], [i], None, [bins], [0, 256]) + for i in range(3)] # Get color histograms - # Normalize histograms - histograms = [hist / hist.sum() for hist in histograms] + histograms = [hist / hist.sum() for hist in histograms] # Normalize histograms - # Create Vector - vector = np.concatenate(histograms, axis=0).reshape(-1) + vector = np.concatenate(histograms, axis=0).reshape(-1) # Create colour histogram vector return vector @staticmethod @@ -58,7 +51,6 @@ def cosine(a, b): """ Cosine distance between two vectors. Closer to 1 means a better match.""" return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) - def compare_hoc(self, detected_persons): """ Compare newly detected persons to previously detected target.""" bridge = CvBridge() @@ -96,6 +88,8 @@ def callback(self, data): nr_persons = data.nr_persons detected_persons = data.detected_persons x_positions = data.x_positions + y_positions = data.y_positions + z_positions = data.z_positions match = False idx_match = None @@ -108,8 +102,8 @@ def callback(self, data): msg.batch_nr = int(nr_batch) msg.idx_person = int(idx_match) msg.x_position = x_positions[idx_match] - msg.y_position = 0 - msg.z_position = 0 + msg.y_position = y_positions[idx_match] + msg.z_position = 0 #z_positions[idx_match] self.publisher.publish(msg) self.last_batch_processed = nr_batch diff --git a/people_tracking/src/people_tracker.py b/people_tracking/src/people_tracker.py index 0fa2c7a..c91f152 100755 --- a/people_tracking/src/people_tracker.py +++ b/people_tracking/src/people_tracker.py @@ -1,10 +1,8 @@ #!/usr/bin/env python - import rospy import cv2 -from cv_bridge import CvBridge import math - +from cv_bridge import CvBridge from UKFclass import * @@ -20,35 +18,6 @@ name_subscriber_RGB = '/hero/head_rgbd_sensor/rgb/image_raw' if not laptop else 'video_frames' -def euclidean_distance(point1, point2): - """ Calculate the Euclidean distance between two points. - - :param point1: A tuple or list representing the coordinates of the first point. - :param point2: A tuple or list representing the coordinates of the second point. - :return: The Euclidean distance between the two points. - """ - if len(point1) != len(point2): - raise ValueError("Not the same dimensions") - - squared_sum = sum((coord2 - coord1) ** 2 for coord1, coord2 in zip(point1, point2)) - distance = math.sqrt(squared_sum) - return distance - - -def element_exists(lst, element): - """ Check if element is in list. - - :param lst: List to check element against. - :param element: Element to check if it is in the list. - :return: True, index element if in the list, False, None if element not in list - """ - try: # Try to find element - idx = lst.index(element) - return True, idx - except ValueError: # If element is not in the list - return False, None - - class PeopleTracker: def __init__(self) -> None: @@ -58,7 +27,6 @@ def __init__(self) -> None: self.callback_HoC, queue_size=1) self.subscriber_persons = rospy.Subscriber(TOPIC_PREFIX + 'person_detections', DetectedPerson, self.callback_persons, queue_size=1) - self.subscriber_frames = rospy.Subscriber(name_subscriber_RGB, Image, self.get_latest_image, queue_size=1) self.publisher_debug = rospy.Publisher(TOPIC_PREFIX + 'debug/people_tracker', Image, queue_size=10) @@ -66,13 +34,44 @@ def __init__(self) -> None: # Variables self.latest_image = None - self.tracked_data = [[0, 0, 0, 0, 0, 0]] self.ukf_confirmed = UKF() self.ukf_prediction = UKF() + @staticmethod + def euclidean_distance(point1, point2): + """ Calculate the Euclidean distance between two points. + + :param point1: A tuple or list representing the coordinates of the first point. + :param point2: A tuple or list representing the coordinates of the second point. + :return: The Euclidean distance between the two points. + """ + if len(point1) != len(point2): + raise ValueError("Not the same dimensions") + + squared_sum = sum((coord2 - coord1) ** 2 for coord1, coord2 in zip(point1, point2)) + distance = math.sqrt(squared_sum) + return distance + + @staticmethod + def element_exists(lst, element): + """ Check if element is in list. + + :param lst: List to check element against. + :param element: Element to check if it is in the list. + :return: True, index element if in the list, False, None if element not in list + """ + try: # Try to find element + idx = lst.index(element) + return True, idx + except ValueError: # If element is not in the list + return False, None + def callback_HoC(self, data): + """ Update the ukf_confirmed with the HoC data, as well as check that the ukf_prediction is still + following the correct target. + """ time = data.time batch_nr = data.batch_nr idx_person = data.idx_person @@ -80,27 +79,27 @@ def callback_HoC(self, data): y_position = data.y_position z_position = data.z_position - - exist, idx = element_exists(self.tracked_data, [batch_nr, idx_person, time, x_position, y_position, z_position]) + exists, idx = self.element_exists(self.tracked_data, + [batch_nr, idx_person, time, x_position, y_position, z_position]) # rospy.loginfo("exist: %s, idx: %s", exist, idx) - if exist: + if exists: update_data = self.tracked_data[:idx + 1][:] for entry in update_data: - z = [entry[3], entry[5]] + z = [entry[3], entry[4], 0] self.ukf_confirmed.update(entry[2], z) self.tracked_data = self.tracked_data[idx:][:] + def callback_persons(self, data): - """ Update the ukf_prediciton using the closest image.""" + """ Update the ukf_prediction using the closest image.""" time = data.time nr_batch = data.nr_batch nr_persons = data.nr_persons x_positions = data.x_positions y_positions = data.y_positions - z_positions = data.z_positions - y_positions = [0] * nr_persons + z_positions = data.z_positions z_positions = [0] * nr_persons if time > self.tracked_data[-1][0]: @@ -109,8 +108,8 @@ def callback_persons(self, data): person = None for idx in range(nr_persons): tracked = tuple(self.tracked_data[-1][-3:]) - distance = euclidean_distance(tracked, - tuple([x_positions[idx], y_positions[idx], z_positions[idx]])) + distance = self.euclidean_distance(tracked, + tuple([x_positions[idx], y_positions[idx], z_positions[idx]])) if smallest_distance is None: person = idx smallest_distance = distance @@ -120,8 +119,8 @@ def callback_persons(self, data): self.tracked_data.append( [nr_batch, person, time, x_positions[person], y_positions[person], z_positions[person]]) - self.ukf_prediction.update(time, [x_positions[person], 0]) - # rospy.loginfo([nr_batch, person, time , x_positions[person], y_positions[person], z_positions[person]]) + self.ukf_prediction.update(time, [x_positions[person], y_positions[person], 0]) + def get_latest_image(self, data): """ Get the most recent frame/image from the camera.""" @@ -140,16 +139,23 @@ def plot_tracker(self): self.ukf_prediction.predict(current_time) x_hoc = int(self.ukf_confirmed.kf.x[0]) + y_hoc = int(self.ukf_confirmed.kf.x[2]) + x_position = int(self.ukf_prediction.kf.x[0]) - # rospy.loginfo('predict: time: ' + str(float(rospy.get_time())) + 'x: ' + str(x_position)) + y_position = int(self.ukf_prediction.kf.x[2]) - x_position = 0 if x_position < 0 else x_position - x_position = 639 if x_position > 639 else x_position - cv2.circle(cv_image, (x_position, 200), 5, (0, 0, 255), -1) # plot ukf prediction measurement red - cv2.circle(cv_image, (self.tracked_data[-1][-3], 200), 5, (0, 255, 0), + # Red = UKF prediction location + # Blue = HoC ukf latest input measurement + # Green = Data association + + cv2.circle(cv_image, (x_position, y_position), 5, (0, 0, 255), -1) # plot ukf prediction measurement red + cv2.circle(cv_image, (self.tracked_data[-1][-3], self.tracked_data[-1][-2]), 5, (0, 255, 0), -1) # plot latest data ass. measurement green - cv2.circle(cv_image, (x_hoc, 200), 5, (255, 0, 0), -1) # plot latest hoc measurement blue + cv2.circle(cv_image, (x_hoc, y_hoc), 5, (255, 0, 0), -1) # plot latest hoc measurement blue tracker_image = bridge.cv2_to_imgmsg(cv_image, encoding="passthrough") + + # rospy.loginfo("predict x: %s, y: %s; measured x: %s, y: %s, HoC x: %s, y: %s", x_position, y_position, self.tracked_data[-1][-3], self.tracked_data[-1][-2], x_hoc, y_hoc) + self.publisher_debug.publish(tracker_image) def loop(self): diff --git a/people_tracking/src/person_detection.py b/people_tracking/src/person_detection.py index 47b2ddd..11c1f13 100755 --- a/people_tracking/src/person_detection.py +++ b/people_tracking/src/person_detection.py @@ -2,13 +2,14 @@ import rospy import cv2 import numpy as np -from ultralytics import YOLO from cv_bridge import CvBridge +from ultralytics import YOLO # MSGS from sensor_msgs.msg import Image from people_tracking.msg import DetectedPerson + NODE_NAME = 'person_detection' TOPIC_PREFIX = '/hero/' @@ -28,8 +29,8 @@ def __init__(self) -> None: rospy.init_node(NODE_NAME, anonymous=True) self.subscriber = rospy.Subscriber(name_subscriber_RGB, Image, self.image_callback, queue_size=1) - # self.publisher_debug = rospy.Publisher(TOPIC_PREFIX + 'segmented_image', Image, queue_size=10) - self.publisher = rospy.Publisher(TOPIC_PREFIX + 'person_detections', DetectedPerson, queue_size= 10) + self.publisher = rospy.Publisher(TOPIC_PREFIX + 'person_detections', DetectedPerson, queue_size=5) + # self.publisher_debug = rospy.Publisher(TOPIC_PREFIX + 'debug/segmented_image', Image, queue_size=5) # Initialize variables self.batch_nr = 0 @@ -80,6 +81,7 @@ def process_latest_image(self): detected_persons = [] x_positions = [] + y_positions = [] nr_persons = 0 self.batch_nr += 1 @@ -96,15 +98,17 @@ def process_latest_image(self): image_message = bridge.cv2_to_imgmsg(cropped_image, encoding="passthrough") detected_persons.append(image_message) - x_positions.append(int( x1 + ((x2-x1) / 2))) + x_positions.append(int(x1 + ((x2 - x1) / 2))) + y_positions.append(int(y1 + ((y2 - y1) / 2))) - # Create person_detections msg + # Create and Publish person_detections msg msg = DetectedPerson() msg.time = self.latest_image_time msg.nr_batch = self.batch_nr msg.nr_persons = nr_persons msg.detected_persons = detected_persons - msg. x_positions = x_positions + msg.x_positions = x_positions + msg.y_positions = y_positions self.publisher.publish(msg) self.latest_image = None # Clear the latest image after processing @@ -114,7 +118,7 @@ def process_latest_image(self): # self.publisher_debug.publish(image_message) def main_loop(self): - """ Main loop that makes sure only the latest images are processed""" + """ Main loop that makes sure only the latest images are processed. """ while not rospy.is_shutdown(): self.process_latest_image()