Skip to content

Commit

Permalink
Add y-tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
KenH2 committed Nov 12, 2023
1 parent d6f3e29 commit b326cba
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 153 deletions.
239 changes: 160 additions & 79 deletions people_tracking/src/UKFclass.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,133 @@
# import numpy as np
# from filterpy.kalman import MerweScaledSigmaPoints
# from filterpy.kalman import UnscentedKalmanFilter
# from filterpy.common.discretization import Q_discrete_white_noise
# # from numpy.random import randn
# #
# # import random
# # import matplotlib.pyplot as plt
# import scipy
#
# # def sqrt_func(x):
# # """ Sqrt functions that prevents covariance matrix P to become slowly not symmetric or positive definite"""
# # try:
# # result = scipy.linalg.cholesky(x)
# # except scipy.linalg.LinAlgError:
# # x = (x + x.T)/2
# # result = scipy.linalg.cholesky(x)
# # return result
#
#
# class UKF:
#
# def __init__(self):
# self.current_time = 0
#
# dt = 0.1 # standard dt
#
# # Create sigma points
# self.points = MerweScaledSigmaPoints(6, alpha=0.1, beta=2.0, kappa=-1)#, sqrt_method=sqrt_func)
#
# self.kf = UnscentedKalmanFilter(dim_x=6, dim_z=3, dt=dt, fx=self.fx, hx=self.hx, points=self.points)
#
# self.kf.x = np.array([1., 0, 1., 0, 1., 0]) # Initial state
# self.kf.P *= 1 # Initial uncertainty
#
# z_std = 0.2
# self.kf.R = np.diag([z_std ** 2, z_std ** 2, z_std ** 2]) # Measurement noise covariance matrix
#
# # Assuming standard deviations for process noise in x, y, and z
# std_x = 3.0
# std_y = 3.0
# std_z = 3.0
# process_noise_cov = np.diag([std_x ** 2, std_y ** 2, std_z ** 2, std_x ** 2, std_y ** 2, std_z ** 2])
# # Set the process noise covariance matrix
# self.kf.Q = process_noise_cov
#
#
# # self.kf.Q = Q_discrete_white_noise(dim=4, dt=dt, var= 3 ** 2, block_size=2) #Process noise
#
# # https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html
# def fx(self, x, dt):
# """ Function that returns the state x transformed by the state transition function. (cv model)
# Assumption:
# * x = [x, vx, y, vy z, vz]^T
# """
# F = np.array([[1, dt, 0, 0, 0, 0],
# [0, 1, 0, 0, 0, 0],
# [0, 0, 1, dt, 0, 0],
# [0, 0, 0, 1, 0, 0],
# [0, 0, 0, 0, 1, dt],
# [0, 0, 0, 0, 0, 1]], dtype=float)
# return np.dot(F, x)
#
# def hx(self, x):
# """Measurement function - convert the state into a measurement where measurements are [x_pos, y_pos, z_pos] """
# return np.array([x[0], x[2], x[4]])
#
# def predict(self, time):
# """ update the kalman filter by predicting value."""
# delta_t = time - self.current_time
# self.kf.predict(dt=delta_t)
# self.current_time = time
#
# def update(self, time, z):
# """ Update filter with measurements z."""
# self.predict(time)
# self.kf.update(z)
#
#
# # # Example with varying dt
# # z_std = 0.1
# # zs = [[i + randn() * z_std, i + randn() * z_std] for i in range(50)] # Measurements
# #
# # start = 0
# # step = 0.1
# # end = 10
# # times = [start + step * i for i in range(int((end - start) / step) + 1)]
# # times = [x + random.uniform(0, 0.05) for x in times]
# #
# # print(times)
# # xs = []
# # ys = []
# # xk = []
# # yk = []
# #
# # i = -1
# # time_last = 0
# #
# # ukf = UKF()
# #
# # for z in zs:
# # i = i + 1
# # # delta_t = times[i] - time_last
# # # print(times[i], delta_t)
# # ukf.predict(times[i])
# # ukf.update(times[i], z)
# # # time_last = times[i]
# # # kf.update(z)
# # # print(kf.x, 'log-likelihood', kf.log_likelihood)
# # xs.append(z[0])
# # ys.append(z[1])
# # xk.append(ukf.kf.x[0])
# # yk.append(ukf.kf.x[2])
# # # print(xk)
# #
# # fig, ax = plt.subplots()
# # # measured = [list(t) for t in zs]
# # ax.scatter(xs, ys, s=5)
# # ax.plot(xk, yk)
# #
# # ax.set_xlabel('X-axis')
# # ax.set_ylabel('Z-axis')
# #
# # # Display the plot
# # plt.show()

import numpy as np
from filterpy.kalman import MerweScaledSigmaPoints
from filterpy.kalman import UnscentedKalmanFilter
from filterpy.common.discretization import Q_discrete_white_noise
# from numpy.random import randn
#
# import random
# import matplotlib.pyplot as plt
import scipy

# def sqrt_func(x):
# """ Sqrt functions that prevents covariance matrix P to become slowly not symmetric or positive definite"""
# try:
# result = scipy.linalg.cholesky(x)
# except scipy.linalg.LinAlgError:
# x = (x + x.T)/2
# result = scipy.linalg.cholesky(x)
# return result

from filterpy.common import Q_continuous_white_noise

class UKF:

Expand All @@ -26,89 +137,59 @@ def __init__(self):
dt = 0.1 # standard dt

# Create sigma points
self.points = MerweScaledSigmaPoints(4, alpha=0.1, beta=2.0, kappa=-1)#, sqrt_method=sqrt_func)
self.points = MerweScaledSigmaPoints(6, alpha=0.1, beta=2.0, kappa=-1)

self.kf = UnscentedKalmanFilter(dim_x=4, dim_z=2, dt=dt, fx=self.fx, hx=self.hx, points=self.points)
self.kf = UnscentedKalmanFilter(dim_x=6, dim_z=3, dt=dt, fx=self.fx, hx=self.hx, points=self.points)

self.kf.x = np.array([1., 0, 1., 0]) # Initial state
self.kf.P *= 1 # Initial uncertainty
# Initial state: [x, vx, y, vy, z, vz]
self.kf.x = np.array([1., 0, 300., 0, 1., 0])

# Initial uncertainty
self.kf.P *= 1

# Measurement noise covariance matrix
z_std = 0.2
self.kf.R = np.diag([z_std ** 2, z_std ** 2]) # Measurement noise covariance matrix
self.kf.R = np.diag([z_std ** 2, z_std ** 2, z_std ** 2])

# Process noise covariance matrix
process_noise_stds = [3.0 ** 2, 0.1 ** 2, 3 ** 2, 0.1 ** 2, 0.1 ** 2, 0.1 ** 2]

self.kf.Q = Q_discrete_white_noise(dim=2, dt=dt, var= 3 ** 2, block_size=2) #
self.kf.Q = np.diag(process_noise_stds)

# https://filterpy.readthedocs.io/en/latest/kalman/UnscentedKalmanFilter.html
def fx(self, x, dt):
""" Function that returns the state x transformed by the state transition function. (cv model)
Assumption:
* x = [x, vx, z, vz]^T
"""
F = np.array([[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]], dtype=float)
""" State transition function """
F = np.array([[1, dt, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, dt, 0, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 1, dt],
[0, 0, 0, 0, 0, 1]], dtype=float)
return np.dot(F, x)

def hx(self, x):
"""Measurement function - convert the state into a measurement where measurements are [x_pos, z_pos] """
return np.array([x[0], x[2]])
""" Measurement function """
return np.array([x[0], x[2], x[4]])

def predict(self, time):
""" update the kalman filter by predicting value."""
""" Predict the next state """
delta_t = time - self.current_time
self.kf.predict(dt=delta_t)
self.current_time = time

def update(self, time, z):
""" Update filter with measurements z."""
""" Update filter with measurements z """
self.predict(time)
self.kf.update(z)


# # Example with varying dt
# z_std = 0.1
# zs = [[i + randn() * z_std, i + randn() * z_std] for i in range(50)] # Measurements
#
# start = 0
# step = 0.1
# end = 10
# times = [start + step * i for i in range(int((end - start) / step) + 1)]
# times = [x + random.uniform(0, 0.05) for x in times]
#
# print(times)
# xs = []
# ys = []
# xk = []
# yk = []
# Example usage
# ukf = UKF()
#
# i = -1
# time_last = 0
# # Assuming you have measurements z (replace with your actual measurements)
# z = np.array([1.2, 0.9, 1.8])
#
# ukf = UKF()
# # Update the filter with the measurements
# ukf.update(current_time, z)
#
# for z in zs:
# i = i + 1
# # delta_t = times[i] - time_last
# # print(times[i], delta_t)
# ukf.predict(times[i])
# ukf.update(times[i], z)
# # time_last = times[i]
# # kf.update(z)
# # print(kf.x, 'log-likelihood', kf.log_likelihood)
# xs.append(z[0])
# ys.append(z[1])
# xk.append(ukf.kf.x[0])
# yk.append(ukf.kf.x[2])
# # print(xk)
#
# fig, ax = plt.subplots()
# # measured = [list(t) for t in zs]
# ax.scatter(xs, ys, s=5)
# ax.plot(xk, yk)
#
# ax.set_xlabel('X-axis')
# ax.set_ylabel('Z-axis')
#
# # Display the plot
# plt.show()
# # Access the current state estimate
# current_state_estimate = ukf.kf.x
# print("Current State Estimate:", current_state_estimate)
26 changes: 10 additions & 16 deletions people_tracking/src/colour_check.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python

import rospy
import cv2
import numpy as np
Expand All @@ -9,20 +8,17 @@
from people_tracking.msg import ColourCheckedTarget
from people_tracking.msg import DetectedPerson

from sensor_msgs.msg import Image


NODE_NAME = 'HoC'
TOPIC_PREFIX = '/hero/'

class HOC:
def __init__(self) -> None:

# ROS Initialize
rospy.init_node(NODE_NAME, anonymous=True)
self.subscriber = rospy.Subscriber(TOPIC_PREFIX + 'person_detections', DetectedPerson, self.callback, queue_size = 1)
self.publisher = rospy.Publisher(TOPIC_PREFIX + 'HoC', ColourCheckedTarget, queue_size=2)
# self.publisher_debug = rospy.Publisher(TOPIC_PREFIX + 'HOCdebug', Image, queue_size=10)
# self.publisher_debug = rospy.Publisher(TOPIC_PREFIX + 'debug/HoC_debug', Image, queue_size=10)

# Variables
self.HoC_detections = []
Expand All @@ -35,17 +31,14 @@ def get_vector(image, bins=32):
:param bins: amount of bins in histogram
:return: HSV-colour histogram vector from image
"""
# Convert to HSV
hsv_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
hsv_image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV) # Convert to HSV

# Get color histograms
histograms = [cv2.calcHist([hsv_image], [i], None, [bins], [0, 256]) for i in range(3)]
histograms = [cv2.calcHist([hsv_image], [i], None, [bins], [0, 256])
for i in range(3)] # Get color histograms

# Normalize histograms
histograms = [hist / hist.sum() for hist in histograms]
histograms = [hist / hist.sum() for hist in histograms] # Normalize histograms

# Create Vector
vector = np.concatenate(histograms, axis=0).reshape(-1)
vector = np.concatenate(histograms, axis=0).reshape(-1) # Create colour histogram vector
return vector

@staticmethod
Expand All @@ -58,7 +51,6 @@ def cosine(a, b):
""" Cosine distance between two vectors. Closer to 1 means a better match."""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))


def compare_hoc(self, detected_persons):
""" Compare newly detected persons to previously detected target."""
bridge = CvBridge()
Expand Down Expand Up @@ -96,6 +88,8 @@ def callback(self, data):
nr_persons = data.nr_persons
detected_persons = data.detected_persons
x_positions = data.x_positions
y_positions = data.y_positions
z_positions = data.z_positions

match = False
idx_match = None
Expand All @@ -108,8 +102,8 @@ def callback(self, data):
msg.batch_nr = int(nr_batch)
msg.idx_person = int(idx_match)
msg.x_position = x_positions[idx_match]
msg.y_position = 0
msg.z_position = 0
msg.y_position = y_positions[idx_match]
msg.z_position = 0 #z_positions[idx_match]

self.publisher.publish(msg)
self.last_batch_processed = nr_batch
Expand Down
Loading

0 comments on commit b326cba

Please sign in to comment.