From 60ff84f0735eef98ee3234d84bac3e51cddd77f5 Mon Sep 17 00:00:00 2001 From: mark Date: Mon, 2 Dec 2024 19:41:48 +0800 Subject: [PATCH] [py] add log tools (gz_log & split log) --- ZBin/py_playground/rocos/log/README.md | 7 + ZBin/py_playground/rocos/log/__init__.py | 0 ZBin/py_playground/rocos/log/data/__init__.py | 0 ZBin/py_playground/rocos/log/data/loader.py | 17 ++ .../rocos/log/data/tracker_vision.py | 119 +++++++++ ZBin/py_playground/rocos/log/tools/.gitignore | 2 + .../rocos/log/tools/logdefine.py | 8 + ZBin/py_playground/rocos/log/tools/logread.py | 74 ++++++ .../py_playground/rocos/log/tools/logsplit.py | 144 ++++++++++ .../py_playground/rocos/robot/model_casadi.py | 166 ++++++++++++ ZBin/py_playground/rocos/robot/test_gcn.py | 111 ++++++++ ZBin/py_playground/rocos/robot/test_lie.py | 146 +++++++++++ ZBin/py_playground/rocos/robot/test_manif.py | 247 ++++++++++++++++++ .../py_playground/rocos/robot/test_manif2d.py | 246 +++++++++++++++++ ZBin/py_playground/rocos/robot/test_pypose.py | 29 ++ 15 files changed, 1316 insertions(+) create mode 100644 ZBin/py_playground/rocos/log/README.md create mode 100644 ZBin/py_playground/rocos/log/__init__.py create mode 100644 ZBin/py_playground/rocos/log/data/__init__.py create mode 100644 ZBin/py_playground/rocos/log/data/loader.py create mode 100644 ZBin/py_playground/rocos/log/data/tracker_vision.py create mode 100644 ZBin/py_playground/rocos/log/tools/.gitignore create mode 100644 ZBin/py_playground/rocos/log/tools/logdefine.py create mode 100644 ZBin/py_playground/rocos/log/tools/logread.py create mode 100644 ZBin/py_playground/rocos/log/tools/logsplit.py create mode 100644 ZBin/py_playground/rocos/robot/model_casadi.py create mode 100644 ZBin/py_playground/rocos/robot/test_gcn.py create mode 100644 ZBin/py_playground/rocos/robot/test_lie.py create mode 100644 ZBin/py_playground/rocos/robot/test_manif.py create mode 100644 ZBin/py_playground/rocos/robot/test_manif2d.py create mode 100644 ZBin/py_playground/rocos/robot/test_pypose.py diff --git a/ZBin/py_playground/rocos/log/README.md b/ZBin/py_playground/rocos/log/README.md new file mode 100644 index 0000000..dd54835 --- /dev/null +++ b/ZBin/py_playground/rocos/log/README.md @@ -0,0 +1,7 @@ +# log tools & dataloader +* store xxx.log.gz in `__log/` dir +```bash +python tools/logsplit.py __log/xxx.log.gz # generate split log +python tools/logread.py __log/xxx.log/ # check log +python data/tracker_vision.py __log/ # load all log in __log dir +``` diff --git a/ZBin/py_playground/rocos/log/__init__.py b/ZBin/py_playground/rocos/log/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ZBin/py_playground/rocos/log/data/__init__.py b/ZBin/py_playground/rocos/log/data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ZBin/py_playground/rocos/log/data/loader.py b/ZBin/py_playground/rocos/log/data/loader.py new file mode 100644 index 0000000..7174f4a --- /dev/null +++ b/ZBin/py_playground/rocos/log/data/loader.py @@ -0,0 +1,17 @@ +from torch.utils.data import DataLoader + +def data_loader(args, path): + dset = TrackerVisionDataset( + path, + obs_len=args.obs_len, + pred_len=args.pred_len, + skip=args.skip, + delim=args.delim) + + loader = DataLoader( + dset, + batch_size=args.batch_size, + shuffle=True, + num_workers=args.loader_num_workers, + collate_fn=seq_collate) + return dset, loader \ No newline at end of file diff --git a/ZBin/py_playground/rocos/log/data/tracker_vision.py b/ZBin/py_playground/rocos/log/data/tracker_vision.py new file mode 100644 index 0000000..cf7b2bb --- /dev/null +++ b/ZBin/py_playground/rocos/log/data/tracker_vision.py @@ -0,0 +1,119 @@ +import sys, os, copy +from torch.utils.data import Dataset +import numpy as np +from tzcp.ssl.vision.messages_robocup_ssl_wrapper_tracked_pb2 import TrackerWrapperPacket +from tzcp.ssl.vision.messages_robocup_ssl_detection_tracked_pb2 import TrackedFrame, TeamColor +sys.path.append('../../..') +from rocos.log.tools.logread import get_content, read_log, MSG_INDEX as I +from rocos.log.tools.logdefine import TYPE + +LOG_MIN_LEN = 100 + +class TrackerVisionDataset(Dataset): + def __init__(self, data_dir, obs_len = 8, pred_len = 12, skip = 5): + super(TrackerVisionDataset, self).__init__() + self.data_dir = data_dir + self.obs_len = obs_len + self.pred_len = pred_len + self.skip = skip + self.seq_len = self.obs_len + self.pred_len + + self.data = [] + + all_log_files = [] + # get log files + for root, dirs, files in os.walk(self.data_dir): + for file in files: + all_log_files.append(os.path.join(root, file)) + + + for log_file in all_log_files: + content = get_content(log_file) + msgs, type = read_log(content) + assert type == TYPE.SSL_VISION_TRACKER_2020, "Unknown message type" + data_seqs = self.generate_seq(msgs, TrackerWrapperPacket) + self.data.extend(data_seqs) + + print(f"Data Dir : {data_dir}, Total search {len(all_log_files)} log files, found {len(self.data)} sequences") + + def generate_seq(self, msgs, MsgType): + if len(msgs) < LOG_MIN_LEN: + return [] + + dataset_seqs = [] + + data_seq = [] + for data in msgs: + msg = MsgType() + msg.ParseFromString(data[I.MSG]) + data = self.parse_single_msg(msg) + if data is None: + print("parse error in msg, maybe ball not detected or robot number not correct. skip") + break + data_seq.append(data) + + for i in range(0, len(data_seq) - self.seq_len, self.skip): + dataset_seqs.extend(self.generate_single_seq(data_seq[i:i+self.seq_len])) + + return dataset_seqs + + def parse_single_msg(self, msg: TrackerWrapperPacket): + frame = msg.tracked_frame + if len(frame.balls) == 0: + return None + ball = frame.balls[0] + robot_blue = {} + robot_yellow = {} + + for r in frame.robots: + # robot_data + rd = np.array([r.pos.x, r.pos.y, r.orientation, r.vel.x, r.vel.y, r.vel_angular, r.visibility]) + if r.robot_id.team_color == TeamColor.TEAM_COLOR_BLUE: + robot_blue[r.robot_id.id] = rd + elif r.robot_id.team_color == TeamColor.TEAM_COLOR_YELLOW: + robot_yellow[r.robot_id.id] = rd + return { + "ball": np.array([ball.pos.x, ball.pos.y, ball.pos.z, ball.vel.x, ball.vel.y, ball.vel.z, ball.visibility]), + "blue": robot_blue, + "yellow": robot_yellow + } + + def generate_single_seq(self, seqs): + if len(seqs) == 0: + return [] + blue_id = list(seqs[0]["blue"].keys()) + yellow_id = list(seqs[0]["yellow"].keys()) + checked_blue_id = copy.deepcopy(blue_id) + checked_yellow_id = copy.deepcopy(yellow_id) + for seq in seqs: + checked_blue_id = list(set(checked_blue_id) & set(seq["blue"].keys())) + checked_yellow_id = list(set(checked_yellow_id) & set(seq["yellow"].keys())) + assert len(checked_blue_id) == len(blue_id) and len(checked_yellow_id) == len(yellow_id), "robot number not correct" + + _ball_seq = np.empty((0, seqs[0]["ball"].shape[0])) + _blue_seq = np.empty((0, len(blue_id), seqs[0]["blue"][blue_id[0]].shape[0])) + _yellow_seq = np.empty((0, len(yellow_id), seqs[0]["yellow"][yellow_id[0]].shape[0])) + + for seq in seqs: + ball = seq["ball"] + blue = np.array([seq["blue"][i] for i in blue_id]) + yellow = np.array([seq["yellow"][i] for i in yellow_id]) + _ball_seq = np.vstack((_ball_seq, ball)) + _blue_seq = np.vstack((_blue_seq, blue[None, :, :])) + _yellow_seq = np.vstack((_yellow_seq, yellow[None, :, :])) + + return [{ + "ball": _ball_seq, + "blue": _blue_seq, + "yellow": _yellow_seq + }] + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx): + return self.data[idx] + +if __name__ == "__main__": + data_dir = sys.argv[1] + dataset = TrackerVisionDataset(data_dir) \ No newline at end of file diff --git a/ZBin/py_playground/rocos/log/tools/.gitignore b/ZBin/py_playground/rocos/log/tools/.gitignore new file mode 100644 index 0000000..1f576f4 --- /dev/null +++ b/ZBin/py_playground/rocos/log/tools/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +__temp__* diff --git a/ZBin/py_playground/rocos/log/tools/logdefine.py b/ZBin/py_playground/rocos/log/tools/logdefine.py new file mode 100644 index 0000000..62c3393 --- /dev/null +++ b/ZBin/py_playground/rocos/log/tools/logdefine.py @@ -0,0 +1,8 @@ +class TYPE: + BLANK = 0 + UNKNOWN = 1 + SSL_VISION_2010 = 2 + SSL_REFBOX_2013 = 3 + SSL_VISION_2014 = 4 + SSL_VISION_TRACKER_2020 = 5 + SSL_INDEX_2021 = 6 \ No newline at end of file diff --git a/ZBin/py_playground/rocos/log/tools/logread.py b/ZBin/py_playground/rocos/log/tools/logread.py new file mode 100644 index 0000000..964e637 --- /dev/null +++ b/ZBin/py_playground/rocos/log/tools/logread.py @@ -0,0 +1,74 @@ +import os, sys, struct, gzip + +sys.path.append('./proto_gen') +from tzcp.ssl.vision.messages_robocup_ssl_wrapper_tracked_pb2 import TrackerWrapperPacket +from tzcp.ssl.vision.messages_robocup_ssl_wrapper_pb2 import SSL_WrapperPacket +sys.path.append('../../..') +from rocos.log.tools.logdefine import TYPE +def get_content(filename): + content = None + if filename.endswith('.gz'): + with gzip.open(filename, 'rb') as f: + content = f.read() + else: + with open(filename, 'rb') as f: + content = f.read() + return content + +class MSG_INDEX: + TIMESTAMP = 0 + TYPE = 1 + SIZE = 2 + MSG = 3 + +def read_log(content): + if content[:12] != b'TZ_SPLIT_LOG': + print('Not a valid log file') + return None + msgs = [] + data = content[12:] + data_index = 0 + while data_index < len(data): + timestamp, type, size = struct.unpack('>qii', data[data_index:data_index+16]) + data_index += 16 + msg = data[data_index:data_index+size] + data_index += size + msgs.append((timestamp, type, size, msg)) + return msgs, type + +def check_log(filename): + content = get_content(filename) + msgs, type = read_log(content) + + MsgType = None + if type == TYPE.SSL_VISION_TRACKER_2020: + MsgType = TrackerWrapperPacket + elif type == TYPE.SSL_VISION_2014: + MsgType = SSL_WrapperPacket + else: + print('Unknown message type') + sys.exit(1) + for msg in msgs: + pack = MsgType() + pack.ParseFromString(msg[3]) + + if type == TYPE.SSL_VISION_TRACKER_2020: + frame_number = pack.tracked_frame.frame_number + ball_size = pack.tracked_frame.balls.__len__() + robot_size = pack.tracked_frame.robots.__len__() + if ball_size != 1 or robot_size != 22: + print(filename , " - frame_num: ", frame_number, 'ball size:', ball_size, 'robot size:', robot_size) + # delete file + os.remove(filename) + return + print(filename, ' - OK') +if __name__ == '__main__': + dir_name = sys.argv[1] + all_log_files = [] + # get all file in dir_name with walk + for root, dirs, files in os.walk(dir_name): + for file in files: + all_log_files.append(os.path.join(root, file)) + + for log_file in all_log_files: + check_log(log_file) \ No newline at end of file diff --git a/ZBin/py_playground/rocos/log/tools/logsplit.py b/ZBin/py_playground/rocos/log/tools/logsplit.py new file mode 100644 index 0000000..acd5355 --- /dev/null +++ b/ZBin/py_playground/rocos/log/tools/logsplit.py @@ -0,0 +1,144 @@ +import os, sys, struct +from typing import Optional +import gzip +import numpy as np + +# File Format +# Each log file starts with the following header: + +# 1: String – File type (“SSL_LOG_FILE”) +# 2: Int32 – Log file format version + +# Format version 1 encodes the protobuf messages in the following format: + +# 1: Int64 – Receiver timestamp in ns +# 2: Int32 – Message type +# 3: Int32 – Size of binary protobuf message +# 4: String – Binary protobuf message + +# The message types are: + +# MESSAGE_BLANK = 0 (ignore message) +# MESSAGE_UNKNOWN = 1 (try to guess message type by parsing the data) +# MESSAGE_SSL_VISION_2010 = 2 +# MESSAGE_SSL_REFBOX_2013 = 3 +# MESSAGE_SSL_VISION_2014 = 4 +# MESSAGE_SSL_VISION_TRACKER_2020 = 5 +# MESSAGE_SSL_INDEX_2021 = 6 + +from tzcp.ssl.vision.messages_robocup_ssl_wrapper_tracked_pb2 import TrackerWrapperPacket +from tzcp.ssl.vision.messages_robocup_ssl_wrapper_pb2 import SSL_WrapperPacket +from tzcp.ssl.ref.ssl_referee_pb2 import Referee +sys.path.append('../../..') +from rocos.log.tools.logdefine import TYPE +class SplitLog: + def __init__(self, name, compress=False): + self.filename = name+".splitlog" + (".gz" if compress else "") + # get path prefix + path = os.path.dirname(self.filename) + if not os.path.exists(path): + os.makedirs(path) + self.file = gzip.open(self.filename, 'wb') if compress else open(self.filename, 'wb') + self.file.write(b'TZ_SPLIT_LOG') + def write(self,timestamp,type,size,data): + self.file.write(struct.pack('>qii',timestamp,type,size)) + self.file.write(data) + def close(self): + self.file.close() + def __del__(self): + self.close() +class LogSplitter: + class Config: + def __init__(self): + self.skip_not_running_stages = True + self.tracker_source_name_filter = ['TIGERs'] + self.record_ref_commands = [ + Referee.Command.NORMAL_START, + Referee.Command.FORCE_START, + Referee.Command.DIRECT_FREE_YELLOW, + Referee.Command.DIRECT_FREE_BLUE, + ] + def __init__(self, config: Config = Config()): + self.config = config + self.counter = np.zeros(7, dtype=int) + self.current_stage = None + self.current_ref_command = None + self.current_vision_tracker = None + self.current_vision = None + # self.split_vision = None + self.split_tracker = None + def new_log(self, filename): + # if self.split_vision: + # self.split_vision.close() + if self.split_tracker: + self.split_tracker.close() + # self.split_vision = SplitLog(filename + '_vision') + self.split_tracker = SplitLog(filename + '_tracker') + def parse_msg(self, type, data, timestamp, size): + msg = None + if type == TYPE.SSL_REFBOX_2013: + msg = Referee() + msg.ParseFromString(data) + self.current_ref_command = msg.command + self.current_stage = msg.stage + pass + elif type == TYPE.SSL_VISION_2014: + msg = SSL_WrapperPacket() + msg.ParseFromString(data) + self.current_vision = msg + self.counter[type] += 1 + # self.split_vision.write(timestamp, type, size, data) + pass + elif type == TYPE.SSL_VISION_TRACKER_2020: + msg = TrackerWrapperPacket() + msg.ParseFromString(data) + if msg.source_name in self.config.tracker_source_name_filter: + self.counter[type] += 1 + self.current_vision_tracker = msg + self.split_tracker.write(timestamp, type, size, data) + pass + else: + # MESSAGE_BLANK + # MESSAGE_UNKNOWN + # SSL_VISION_2010 + # SSL_INDEX_2021 + # 'Unknown message type' + return + def split(self,filename,store_prefix=None): + if store_prefix is None: + store_prefix = os.path.splitext(filename)[0] + if not os.path.exists(store_prefix): + os.makedirs(store_prefix) + with gzip.open(filename, 'rb') as f: + content = f.read() + # get header + header, msgs = content[:16], content[16:] + # check header + if header[:12] != b'SSL_LOG_FILE': + print('Not a valid log file') + return + version = struct.unpack('>I', header[12:])[0] + # read messages + msg_index = 0 + while msg_index < len(msgs): + timestamp, msg_type, msg_size = struct.unpack('>qii', msgs[msg_index:msg_index+16]) + msg_index += 16 + if msg_type == 0: + continue + msg = msgs[msg_index:msg_index+msg_size] + msg_index = msg_index + msg_size + last_ref_command = self.current_ref_command + if self.current_ref_command not in self.config.record_ref_commands and msg_type != TYPE.SSL_REFBOX_2013: + continue + self.parse_msg(msg_type, msg, timestamp, msg_size) + if self.current_ref_command != last_ref_command and self.current_ref_command in self.config.record_ref_commands: + if last_ref_command is not None: + print(Referee.Command.Name(last_ref_command), '->', Referee.Command.Name(self.current_ref_command)) + path_prefix = os.path.join(store_prefix, str(self.current_stage) + '-' + Referee.Stage.Name(self.current_stage) + '/' +str(timestamp) + '-' + Referee.Command.Name(self.current_ref_command)) + self.new_log(path_prefix) + +if __name__ == '__main__': + splitter = LogSplitter() + # set name as arg[1] + log_name = sys.argv[1] + splitter.split(log_name) diff --git a/ZBin/py_playground/rocos/robot/model_casadi.py b/ZBin/py_playground/rocos/robot/model_casadi.py new file mode 100644 index 0000000..8269d89 --- /dev/null +++ b/ZBin/py_playground/rocos/robot/model_casadi.py @@ -0,0 +1,166 @@ +import casadi as ca +import numpy as np + +class RobotConfig: + max_speed = 3.4 # m/s + mass = 2.2 # kg + degree1 = 45 + degree2 = 45 + wheels_degree = np.array([degree1, 180-degree2, 180+degree2, 360-degree1]) + +class WheelConfig: + power = 30 # w + radius = 58/2.0 # mm + gear_ratio = 3.18 + stall_torque = 0.15 # Nm + nomimal_coef = 280 # rpm * Nm + degree = 45 # degree + +def wheelGetAccLimit(speed, degree: float): + max_rpm = RobotConfig.max_speed * np.sin(degree/180*np.pi) * WheelConfig.gear_ratio * 1000 / WheelConfig.radius / (2*np.pi) * 60 + friction_torque = WheelConfig.nomimal_coef / max_rpm + friction_ratio = friction_torque / max_rpm + # from m/s -> rpm + rpm = speed * WheelConfig.gear_ratio * 1000 / WheelConfig.radius / (2*np.pi) * 60 + + rpm_i1 = ca.if_else(ca.fabs(rpm) < 1e-3, 1e-3, 1/rpm) + # from rpm -> Nm + torque = ca.fmin(ca.fmax(-WheelConfig.stall_torque, WheelConfig.nomimal_coef * rpm_i1), WheelConfig.stall_torque) + # from Nm -> force (N) + friction_torque = -friction_ratio * rpm + mm_torque = ca.horzcat(torque, -torque) + mm_friction_torque = ca.horzcat(friction_torque, friction_torque) + force = (mm_friction_torque + mm_torque) / (WheelConfig.radius / 1000) * WheelConfig.gear_ratio + return ca.vertcat(ca.mmin(force), ca.mmax(force)) / RobotConfig.mass + +def robotGetAccLimit(v:ca.MX) -> ca.MX: + wheels = RobotConfig.wheels_degree + wheel_matrix = np.array([[-np.sin(np.deg2rad(w)), np.cos(np.deg2rad(w))] for w in wheels]) + wheel_ratio = np.average([-np.sin(np.deg2rad(wheels[-1])),np.cos(np.deg2rad(wheels[-1]))]) + simple_mx = wheel_ratio * 2 * np.array([[-1,1],[-1,-1]]) + simple_mx_inv = np.linalg.inv(simple_mx) + + speed_wheel = ca.mtimes(v.T,wheel_matrix.T) + acc_wheel = ca.horzcat(*[wheelGetAccLimit(speed_wheel[i],w) for i,w in enumerate(wheels)]).T + acc_limit = acc_wheel[:2,:] * 2 + acc_limit0 = ca.fmin(acc_limit[:,0],-1e-4) + acc_limit1 = ca.fmax(acc_limit[:,1],1e-4) + acc_limit_clip = ca.horzcat(acc_limit0,acc_limit1) + print("acc_limit ", acc_limit_clip, acc_limit_clip.shape) + return acc_limit_clip + +if __name__ == "__main__": + import time + import matplotlib.pyplot as plt + import matplotx + def testWheel(): + opti = ca.Opti() + + T = opti.variable(1) + N = 10 + dt = T/N + + acc = opti.variable(N) + opt_state = opti.variable(N+1,3) + vel = opt_state[:,0] + + vel0 = opti.parameter() + vel1 = opti.parameter() + + opti.subject_to(opti.bounded(0, T, 10)) + opti.subject_to(opt_state[0] == vel0) + for i in range(N): + current_acc = wheelGetAccLimit(vel[i], RobotConfig.degree1) + vel_next = vel[i]+acc[i]*dt + print("vel_next : ",vel_next, vel_next.shape) + opti.subject_to(vel[i+1] == vel_next) + opti.subject_to(opti.bounded(current_acc[0],acc[i],current_acc[1])) + opti.subject_to(opt_state[i,1] == current_acc[0]) + opti.subject_to(opt_state[i,2] == current_acc[1]) + + opti.subject_to(opti.bounded(-RobotConfig.max_speed,vel,RobotConfig.max_speed)) + + obj = 0 + for i in range(N): + obj = obj + (vel[i+1]-vel1)**2 + 0.01*acc[i]**2 + obj = obj + T + + print("obj : ",obj) + + opti.minimize(obj) + + opts_setting = {'ipopt.max_iter':1000, 'ipopt.print_level':0, 'print_time':0, 'ipopt.acceptable_tol':1e-2, 'ipopt.acceptable_obj_change_tol':1e-2} + + opti.solver('ipopt', opts_setting) + + _end = 3.0 + + start_time = time.time() + + opti.set_value(vel0, 0.0) + opti.set_value(vel1, _end) + opti.set_initial(acc, 0) + opti.set_initial(opt_state, 0.0) + + # print all parameters + + sol = opti.solve() + + print(f"Time: {time.time()-start_time}") + + acc_sol = sol.value(acc) + vel_sol = sol.value(vel) + acc_min = sol.value(opt_state[:-1,1]) + acc_max = sol.value(opt_state[:-1,2]) + + plt.plot(vel_sol, label="velocity") + plt.plot(acc_sol, label="acceleration") + plt.plot(acc_min, label="min") + plt.plot(acc_max, label="max") + matplotx.line_labels() + plt.show() + + def testWheelFunc(): + a = np.arange(-3,3,0.1) + acc_limit = [] + for acc in a: + _limit = wheelGetAccLimit(acc,45) + acc_limit.append(_limit) + acc_limit = np.array(acc_limit).reshape(-1,2) + print(acc_limit[:,0]) + print(acc_limit[:,1]) + plt.plot(a, acc_limit[:,0], label="min") + plt.plot(a, acc_limit[:,1], label="max") + plt.legend() + plt.show() + + # testWheelFunc() + testWheel() + + def testRobot(): + T = 0.1 + N = 20 + + opti = ca.Opti() + # vx, vy, w + opt_controls = opti.variable(N, 3) + vx = opt_controls[:,0] + vy = opt_controls[:,1] + w = opt_controls[:,2] + opt_states = opti.variable(N+1, 3) + x = opt_states[:,0] + y = opt_states[:,1] + theta = opt_states[:,2] + + opt_x0 = opti.parameter(6) + opt_xf = opti.parameter(6) + + # def f() + + + def testRobotFunc(): + v = np.mgrid[-0.1:0.2:0.1,-0.1:0.2:0.1].reshape(2,-1).T + for vel in v: + acc = robotGetAccLimit(vel.reshape(2,1)) + + testRobotFunc() \ No newline at end of file diff --git a/ZBin/py_playground/rocos/robot/test_gcn.py b/ZBin/py_playground/rocos/robot/test_gcn.py new file mode 100644 index 0000000..d01ac42 --- /dev/null +++ b/ZBin/py_playground/rocos/robot/test_gcn.py @@ -0,0 +1,111 @@ +import argparse +import os.path as osp +import time + +import torch +import torch.nn.functional as F + +import torch_geometric +import torch_geometric.transforms as T +from torch_geometric.datasets import Planetoid +from torch_geometric.logging import init_wandb, log +from torch_geometric.nn import GCNConv, GPSConv + +parser = argparse.ArgumentParser() +parser.add_argument('--dataset', type=str, default='Cora') +parser.add_argument('--hidden_channels', type=int, default=16) +parser.add_argument('--lr', type=float, default=0.01) +parser.add_argument('--epochs', type=int, default=200) +parser.add_argument('--use_gdc', action='store_true', help='Use GDC') +parser.add_argument('--wandb', action='store_true', help='Track experiment') +args = parser.parse_args() + +device = torch_geometric.device('auto') + +init_wandb( + name=f'GCN-{args.dataset}', + lr=args.lr, + epochs=args.epochs, + hidden_channels=args.hidden_channels, + device=device, +) + +path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', 'Planetoid') +dataset = Planetoid(path, args.dataset, transform=T.NormalizeFeatures()) +data = dataset[0].to(device) + +if args.use_gdc: + print("Using GDC") + transform = T.GDC( + self_loop_weight=1, + normalization_in='sym', + normalization_out='col', + diffusion_kwargs=dict(method='ppr', alpha=0.05), + sparsification_kwargs=dict(method='topk', k=128, dim=0), + exact=True, + ) + data = transform(data) +else: + print("Not using GDC") + +class GCN(torch.nn.Module): + def __init__(self, in_channels, hidden_channels, out_channels): + super().__init__() + self.conv1 = GCNConv(in_channels, hidden_channels, + normalize=not args.use_gdc) + self.conv2 = GCNConv(hidden_channels, out_channels, + normalize=not args.use_gdc) + + def forward(self, x, edge_index, edge_weight=None): + x = F.dropout(x, p=0.5, training=self.training) + x = self.conv1(x, edge_index, edge_weight).relu() + x = F.dropout(x, p=0.5, training=self.training) + x = self.conv2(x, edge_index, edge_weight) + return x + + +model = GCN( + in_channels=dataset.num_features, + hidden_channels=args.hidden_channels, + out_channels=dataset.num_classes, +).to(device) + +optimizer = torch.optim.Adam([ + dict(params=model.conv1.parameters(), weight_decay=5e-4), + dict(params=model.conv2.parameters(), weight_decay=0) +], lr=args.lr) # Only perform weight-decay on first convolution. + + +def train(): + model.train() + optimizer.zero_grad() + out = model(data.x, data.edge_index, data.edge_attr) + loss = F.cross_entropy(out[data.train_mask], data.y[data.train_mask]) + loss.backward() + optimizer.step() + return float(loss) + + +@torch.no_grad() +def test(): + model.eval() + pred = model(data.x, data.edge_index, data.edge_attr).argmax(dim=-1) + + accs = [] + for mask in [data.train_mask, data.val_mask, data.test_mask]: + accs.append(int((pred[mask] == data.y[mask]).sum()) / int(mask.sum())) + return accs + + +best_val_acc = test_acc = 0 +times = [] +for epoch in range(1, args.epochs + 1): + start = time.time() + loss = train() + train_acc, val_acc, tmp_test_acc = test() + if val_acc > best_val_acc: + best_val_acc = val_acc + test_acc = tmp_test_acc + log(Epoch=epoch, Loss=loss, Train=train_acc, Val=val_acc, Test=test_acc) + times.append(time.time() - start) +print(f'Median time per epoch: {torch.tensor(times).median():.4f}s') \ No newline at end of file diff --git a/ZBin/py_playground/rocos/robot/test_lie.py b/ZBin/py_playground/rocos/robot/test_lie.py new file mode 100644 index 0000000..ea70327 --- /dev/null +++ b/ZBin/py_playground/rocos/robot/test_lie.py @@ -0,0 +1,146 @@ +# Please note that for running this example you need to install `matplotlib` and `scipy`. +# You can do this by running the following command in your terminal: +# pip install matplotlib scipy +# If you are using anaconda, you can also run the following command: +# conda install matplotlib scipy + +import casadi as cs +import matplotlib + +import matplotlib.pyplot as plt +import numpy as np +from matplotlib import animation +from mpl_toolkits import mplot3d + +from liecasadi import SO3, SO3Tangent, SE3, SE3Tangent + +opti = cs.Opti() + +T = opti.variable(1) +N = 100 + +quat = [opti.variable(4, 1) for _ in range(N + 1)] +pos = [opti.variable(3, 1) for _ in range(N + 1)] +vel = [opti.variable(6, 1) for _ in range(N + 1)] +dt = T / N + + +for k in range(N): + # vector_SO3 = SO3Tangent(vel[k] * dt) + # rotation_SO3 = SO3(quat[k]) + # opti.subject_to(quat[k + 1] == (vector_SO3 + rotation_SO3).as_quat()) + + trans_SE3 = SE3Tangent(vel[k] * dt) + pose_SE3 = SE3(quat[k], pos[k]) + opti.subject_to(quat[k + 1] == (trans_SE3 + pose_SE3).rotation().as_quat()) + + + +C = sum(cs.sumsqr(vel[i]) for i in range(N)) + T + +# Initial rotation and velocity +opti.subject_to(quat[0] == SO3.Identity().as_quat()) +opti.subject_to(vel[0] == 0) +opti.subject_to(opti.bounded(0, T, 10)) + +# Set random initial guess +quat_rnd = np.random.randn(4, 1) +quat_rnd = quat_rnd / np.linalg.norm(quat_rnd) +for k in range(N + 1): + opti.set_initial(quat[k], quat_rnd) +for k in range(N): + opti.set_initial(vel[k], np.zeros([3, 1])) + + +opti.subject_to(vel[N - 1] == 0) +final_delta_increment = SO3Tangent([cs.pi / 3, cs.pi / 6, cs.pi / 2]) + +opti.subject_to(quat[N] == (final_delta_increment + SO3.Identity()).as_quat()) + +opti.minimize(C) + +opts_setting = {'ipopt.max_iter':1000, 'ipopt.print_level':0, 'print_time':0, 'ipopt.acceptable_tol':1e-2, 'ipopt.acceptable_obj_change_tol':1e-2} + +opti.solver('ipopt', opts_setting) + +try: + sol = opti.solve() +except: + print("Can't solve the problem!") + + +fig1 = plt.figure() +fig1.suptitle("Problem sparsity") +plt.spy(opti.debug.value(cs.jacobian(opti.g, opti.x))) + + +x = [sol.value(quat[i]) for i in range(N + 1)] +v = [sol.value(vel[i]) for i in range(N)] +time = sol.value(T) +plt.figure() +plt.suptitle("Velocity") +plt.plot(np.linspace(0, time, N), v) + +figure = plt.figure() +axes = figure.add_subplot(projection="3d") + +x_cords = np.array([1, 0, 0]) +y_cords = np.array([0, 1, 0]) +z_cords = np.array([0, 0, 1]) + +axes.set_box_aspect((1, 1, 1)) + +(xax,) = axes.plot([0, 1], [0, 0], [0, 0], "red") +(yax,) = axes.plot([0, 0], [0, 1], [0, 0], "green") +(zax,) = axes.plot([0, 0], [0, 0], [0, 1], "blue") + + +# final orientation +x_N = np.array(SO3(x[N]).act(x_cords)).reshape( + 3, +) +y_N = np.array(SO3(x[N]).act(y_cords)).reshape( + 3, +) +z_N = np.array(SO3(x[N]).act(z_cords)).reshape( + 3, +) + +(xaxN,) = axes.plot([0, x_N[0]], [0, x_N[1]], [0, x_N[2]], "red") +(yaxN,) = axes.plot([0, y_N[0]], [0, y_N[1]], [0, y_N[2]], "green") +(zaxN,) = axes.plot([0, z_N[0]], [0, z_N[1]], [0, z_N[2]], "blue") + + +def update_points(i): + x_i = np.array(SO3(x[i]).act(x_cords)).reshape( + 3, + ) + y_i = np.array(SO3(x[i]).act(y_cords)).reshape( + 3, + ) + z_i = np.array(SO3(x[i]).act(z_cords)).reshape( + 3, + ) + # update properties + xax.set_data(np.array([[0, x_i[0]], [0, x_i[1]]])) + xax.set_3d_properties(np.array([0, x_i[2]]), "z") + + yax.set_data(np.array([[0, y_i[0]], [0, y_i[1]]])) + yax.set_3d_properties(np.array([0, y_i[2]]), "z") + + zax.set_data(np.array([[0, z_i[0]], [0, z_i[1]]])) + zax.set_3d_properties(np.array([0, z_i[2]]), "z") + + # return modified axis + return ( + xax, + yax, + zax, + ) + + +ani = animation.FuncAnimation(figure, update_points, frames=N, repeat=False) +writergif = animation.PillowWriter(fps=5) +ani.save("animation.gif", writer=writergif) + +plt.show() \ No newline at end of file diff --git a/ZBin/py_playground/rocos/robot/test_manif.py b/ZBin/py_playground/rocos/robot/test_manif.py new file mode 100644 index 0000000..86294d1 --- /dev/null +++ b/ZBin/py_playground/rocos/robot/test_manif.py @@ -0,0 +1,247 @@ +r""" + +\file se3_localization.py. + +Created on: Jan 11, 2021 +\author: Jeremie Deray + +--------------------------------------------------------- +This file is: +(c) 2021 Jeremie Deray + +This file is part of `manif`, a C++ template-only library +for Lie theory targeted at estimation for robotics. +Manif is: +(c) 2021 Jeremie Deray +--------------------------------------------------------- + +--------------------------------------------------------- +Demonstration example: + + 3D Robot localization based on fixed beacons. + + See se3_sam.py for a more advanced example performing smoothing and mapping. +--------------------------------------------------------- + +This demo corresponds to the 3D version of the application +in chapter V, section A, in the paper Sola-18, +[https://arxiv.org/abs/1812.01537]. + +The following is an abstract of the content of the paper. +Please consult the paper for better reference. + +We consider a robot in 3D space surrounded by a small +number of punctual landmarks or _beacons_. +The robot receives control actions in the form of axial +and angular velocities, and is able to measure the location +of the beacons w.r.t its own reference frame. + +The robot pose X is in SE(3) and the beacon positions b_k in R^3, + + X = | R t | // position and orientation + | 0 1 | + + b_k = (bx_k, by_k, bz_k) // lmk coordinates in world frame + +The control signal u is a twist in se(3) comprising longitudinal +velocity vx and angular velocity wz, with no other velocity +components, integrated over the sampling time dt. + + u = (vx*dt, 0, 0, 0, 0, w*dt) + +The control is corrupted by additive Gaussian noise u_noise, +with covariance + + Q = diagonal(sigma_x^2, sigma_y^2, sigma_z^2, sigma_roll^2, sigma_pitch^2, sigma_yaw^2). + +This noise accounts for possible lateral and rotational slippage +through a non-zero values of sigma_y, sigma_z, sigma_roll and sigma_pitch. + +At the arrival of a control u, the robot pose is updated +with X <-- X * Exp(u) = X + u. + +Landmark measurements are of the range and bearing type, +though they are put in Cartesian form for simplicity. +Their noise n is zero mean Gaussian, and is specified +with a covariances matrix R. +We notice the rigid motion action y = h(X,b) = X^-1 * b +(see appendix D), + + y_k = (brx_k, bry_k, brz_k) // lmk coordinates in robot frame + +We consider the beacons b_k situated at known positions. +We define the pose to estimate as X in SE(3). +The estimation error dx and its covariance P are expressed +in the tangent space at X. + +All these variables are summarized again as follows + + X : robot pose, SE(3) + u : robot control, (v*dt; 0; 0; 0; 0; w*dt) in se(3) + Q : control perturbation covariance + b_k : k-th landmark position, R^3 + y : Cartesian landmark measurement in robot frame, R^3 + R : covariance of the measurement noise + +The motion and measurement models are + + X_(t+1) = f(X_t, u) = X_t * Exp ( w ) // motion equation + y_k = h(X, b_k) = X^-1 * b_k // measurement equation + +The algorithm below comprises first a simulator to +produce measurements, then uses these measurements +to estimate the state, using a Lie-based error-state Kalman filter. + +This file has plain code with only one main() function. +There are no function calls other than those involving `manif`. + +Printing simulated state and estimated state together +with an unfiltered state (i.e. without Kalman corrections) +allows for evaluating the quality of the estimates. +""" + + +from manifpy import SE3, SE3Tangent + +import numpy as np +from numpy.linalg import inv + + +Vector = np.array + + +def Covariance(): + return np.zeros((SE3.DoF, SE3.DoF)) + + +def Jacobian(): + return np.zeros((SE3.DoF, SE3.DoF)) + + +if __name__ == '__main__': + + # START CONFIGURATION + + NUMBER_OF_LMKS_TO_MEASURE = 5 + + # Define the robot pose element and its covariance + X_simulation = SE3.Identity() + X = SE3.Identity() + X_unfiltered = SE3.Identity() + P = Covariance() + + u_nom = Vector([0.1, 0.0, 0.0, 0.0, 0.0, 0.05]) + u_sigmas = Vector([0.1, 0.1, 0.1, 0.1, 0.1, 0.1]) + U = np.diagflat(np.square(u_sigmas)) + + # Declare the Jacobians of the motion wrt robot and control + J_x = Jacobian() + J_u = Jacobian() + + # Define five landmarks in R^3 + landmarks = [] + landmarks.append(Vector([2.0, 0.0, 0.0])) + landmarks.append(Vector([3.0, -1.0, -1.0])) + landmarks.append(Vector([2.0, -1.0, 1.0])) + landmarks.append(Vector([2.0, 1.0, 1.0])) + landmarks.append(Vector([2.0, 1.0, -1.0])) + + # Define the beacon's measurements + measurements = [Vector([0, 0, 0])] * NUMBER_OF_LMKS_TO_MEASURE + + y_sigmas = Vector([0.01, 0.01, 0.01]) + R = np.diagflat(np.square(y_sigmas)) + + # Declare some temporaries + J_xi_x = Jacobian() + J_e_xi = np.zeros((SE3.Dim, SE3.DoF)) + + # CONFIGURATION DONE + + # pretty print + np.set_printoptions(precision=3, suppress=True) + + # DEBUG + print('X STATE : X Y Z TH_x TH_y TH_z ') + print('-------------------------------------------------------') + print('X initial : ', X_simulation.log().coeffs()) + print('-------------------------------------------------------') + # END DEBUG + + # START TEMPORAL LOOP + + # Make 10 steps. Measure up to three landmarks each time. + for t in range(10): + # I. Simulation + + # simulate noise + u_noise = u_sigmas * np.random.rand(SE3.DoF) # control noise + u_noisy = u_nom + u_noise # noisy control + + u_simu = SE3Tangent(u_nom) + u_est = SE3Tangent(u_noisy) + u_unfilt = SE3Tangent(u_noisy) + + # first we move + X_simulation = X_simulation + u_simu # overloaded X.rplus(u) = X * exp(u) + + # then we measure all landmarks + for i in range(NUMBER_OF_LMKS_TO_MEASURE): + b = landmarks[i] # lmk coordinates in world frame + + # simulate noise + y_noise = y_sigmas * np.random.rand(SE3.Dim) # measurement noise + + y = X_simulation.inverse().act(b) # landmark measurement, before adding noise + + y = y + y_noise # landmark measurement, noisy + measurements[i] = y # store for the estimator just below + + # II. Estimation + + # First we move + + X = X.plus(u_est, J_x, J_u) # X * exp(u), with Jacobians + + P = J_x * P * J_x.transpose() + J_u * U * J_u.transpose() + + # Then we correct using the measurements of each lmk + for i in range(NUMBER_OF_LMKS_TO_MEASURE): + # landmark + b = landmarks[i] # lmk coordinates in world frame + + # measurement + y = measurements[i] # lmk measurement, noisy + + # expectation + e = X.inverse(J_xi_x).act(b, J_e_xi) # note: e = R.tr * ( b - t ), for X = (R,t). + H = J_e_xi @ J_xi_x # Jacobian of the measurements wrt the robot pose. note: H = J_e_x = J_e_xi * J_xi_x + E = H @ P @ H.transpose() + + # innovation + z = y - e + Z = E + R + + # Kalman gain + K = P @ H.transpose() @ inv(Z) # K = P * H.tr * ( H * P * H.tr + R).inv + + # Correction step + dx = K @ z # dx is in the tangent space at X + + # Update + X = X + SE3Tangent(dx) # overloaded X.rplus(dx) = X * exp(dx) + P = P - K @ Z @ K.transpose() + + # III. Unfiltered + + # move also an unfiltered version for comparison purposes + X_unfiltered = X_unfiltered + u_unfilt + + # IV. Results + + # DEBUG + print('X simulated : ', X_simulation.log().coeffs().transpose()) + print('X estimated : ', X.log().coeffs().transpose()) + print('X unfilterd : ', X_unfiltered.log().coeffs().transpose()) + print('-------------------------------------------------------') + # END DEBUG \ No newline at end of file diff --git a/ZBin/py_playground/rocos/robot/test_manif2d.py b/ZBin/py_playground/rocos/robot/test_manif2d.py new file mode 100644 index 0000000..29c543e --- /dev/null +++ b/ZBin/py_playground/rocos/robot/test_manif2d.py @@ -0,0 +1,246 @@ +r""" + +\file se2_localization.py. + +Created on: Jan 11, 2021 +\author: Jeremie Deray + +--------------------------------------------------------- +This file is: +(c) 2021 Jeremie Deray + +This file is part of `manif`, a C++ template-only library +for Lie theory targeted at estimation for robotics. +Manif is: +(c) 2021 Jeremie Deray +--------------------------------------------------------- + +--------------------------------------------------------- +Demonstration example: + + 2D Robot localization based on fixed beacons. + + See se3_localization.cpp for the 3D equivalent. + See se3_sam.cpp for a more advanced example performing smoothing and mapping. +--------------------------------------------------------- + +This demo corresponds to the application in chapter V, section A, +in the paper Sola-18, [https://arxiv.org/abs/1812.01537]. + +The following is an abstract of the content of the paper. +Please consult the paper for better reference. + +We consider a robot in the plane surrounded by a small +number of punctual landmarks or _beacons_. +The robot receives control actions in the form of axial +and angular velocities, and is able to measure the location +of the beacons w.r.t its own reference frame. + +The robot pose X is in SE(2) and the beacon positions b_k in R^2, + + | cos th -sin th x | + X = | sin th cos th y | // position and orientation + | 0 0 1 | + + b_k = (bx_k, by_k) // lmk coordinates in world frame + +The control signal u is a twist in se(2) comprising longitudinal +velocity v and angular velocity w, with no lateral velocity +component, integrated over the sampling time dt. + + u = (v*dt, 0, w*dt) + +The control is corrupted by additive Gaussian noise u_noise, +with covariance + + Q = diagonal(sigma_v^2, sigma_s^2, sigma_w^2). + +This noise accounts for possible lateral slippage u_s +through a non-zero value of sigma_s, + +At the arrival of a control u, the robot pose is updated +with X <-- X * Exp(u) = X + u. + +Landmark measurements are of the range and bearing type, +though they are put in Cartesian form for simplicity. +Their noise n is zero mean Gaussian, and is specified +with a covariances matrix R. +We notice the rigid motion action y = h(X,b) = X^-1 * b +(see appendix C), + + y_k = (brx_k, bry_k) // lmk coordinates in robot frame + +We consider the beacons b_k situated at known positions. +We define the pose to estimate as X in SE(2). +The estimation error dx and its covariance P are expressed +in the tangent space at X. + +All these variables are summarized again as follows + + X : robot pose, SE(2) + u : robot control, (v*dt ; 0 ; w*dt) in se(2) + Q : control perturbation covariance + b_k : k-th landmark position, R^2 + y : Cartesian landmark measurement in robot frame, R^2 + R : covariance of the measurement noise + +The motion and measurement models are + + X_(t+1) = f(X_t, u) = X_t * Exp ( w ) // motion equation + y_k = h(X, b_k) = X^-1 * b_k // measurement equation + +The algorithm below comprises first a simulator to +produce measurements, then uses these measurements +to estimate the state, using a Lie-based error-state Kalman filter. + +This file has plain code with only one main() function. +There are no function calls other than those involving `manif`. + +Printing simulated state and estimated state together +with an unfiltered state (i.e. without Kalman corrections) +allows for evaluating the quality of the estimates. +""" + + +from manifpy import SE2, SE2Tangent + +import numpy as np +from numpy.linalg import inv + + +Vector = np.array + + +def Covariance(): + return np.zeros((SE2.DoF, SE2.DoF)) + + +def Jacobian(): + return np.zeros((SE2.DoF, SE2.DoF)) + + +if __name__ == '__main__': + + # START CONFIGURATION + + NUMBER_OF_LMKS_TO_MEASURE = 3 + + # Define the robot pose element and its covariance + X_simulation = SE2.Identity() + X = SE2.Identity() + X_unfiltered = SE2.Identity() + P = Covariance() + + u_nom = Vector([0.1, 0.0, 0.05]) + u_sigmas = Vector([0.1, 0.1, 0.1]) + U = np.diagflat(np.square(u_sigmas)) + + # Declare the Jacobians of the motion wrt robot and control + J_x = Jacobian() + J_u = Jacobian() + + # Define five landmarks in R^2 + landmarks = [] + landmarks.append(Vector([2.0, 0.0])) + landmarks.append(Vector([2.0, 1.0])) + landmarks.append(Vector([2.0, -1.0])) + + # Define the beacon's measurements + measurements = [Vector([0, 0])] * NUMBER_OF_LMKS_TO_MEASURE + + y_sigmas = Vector([0.01, 0.01]) + R = np.diagflat(np.square(y_sigmas)) + + # Declare some temporaries + J_xi_x = Jacobian() + J_e_xi = np.zeros((SE2.Dim, SE2.DoF)) + + # CONFIGURATION DONE + + # pretty print + np.set_printoptions(precision=3, suppress=True) + + # DEBUG + print('X STATE : X Y Z TH_x TH_y TH_z ') + print('-------------------------------------------------------') + print('X initial : ', X_simulation.log().coeffs()) + print('-------------------------------------------------------') + # END DEBUG + + # START TEMPORAL LOOP + + # Make 10 steps. Measure up to three landmarks each time. + for t in range(10): + # I. Simulation + + # simulate noise + u_noise = u_sigmas * np.random.rand(SE2.DoF) # control noise + u_noisy = u_nom + u_noise # noisy control + + u_simu = SE2Tangent(u_nom) + u_est = SE2Tangent(u_noisy) + u_unfilt = SE2Tangent(u_noisy) + + # first we move + X_simulation = X_simulation + u_simu # overloaded X.rplus(u) = X * exp(u) + + # then we measure all landmarks + for i in range(NUMBER_OF_LMKS_TO_MEASURE): + b = landmarks[i] # lmk coordinates in world frame + + # simulate noise + y_noise = y_sigmas * np.random.rand(SE2.Dim) # measurement noise + + y = X_simulation.inverse().act(b) # landmark measurement, before adding noise + + y = y + y_noise # landmark measurement, noisy + measurements[i] = y # store for the estimator just below + + # II. Estimation + + # First we move + + X = X.plus(u_est, J_x, J_u) # X * exp(u), with Jacobians + + P = J_x @ P @ J_x.transpose() + J_u @ U @ J_u.transpose() + + # Then we correct using the measurements of each lmk + for i in range(NUMBER_OF_LMKS_TO_MEASURE): + # landmark + b = landmarks[i] # lmk coordinates in world frame + + # measurement + y = measurements[i] # lmk measurement, noisy + + # expectation + e = X.inverse(J_xi_x).act(b, J_e_xi) # note: e = R.tr * ( b - t ), for X = (R,t). + H = J_e_xi @ J_xi_x # Jacobian of the measurements wrt the robot pose. note: H = J_e_x = J_e_xi * J_xi_x + E = H @ P @ H.transpose() + + # innovation + z = y - e + Z = E + R + + # Kalman gain + K = P @ H.transpose() @ inv(Z) # K = P * H.tr * ( H * P * H.tr + R).inv + + # Correction step + dx = K @ z # dx is in the tangent space at X + + # Update + X = X + SE2Tangent(dx) # overloaded X.rplus(dx) = X * exp(dx) + P = P - K @ Z @ K.transpose() + + # III. Unfiltered + + # move also an unfiltered version for comparison purposes + X_unfiltered = X_unfiltered + u_unfilt + + # IV. Results + + # DEBUG + print('X simulated : ', X_simulation.log().coeffs().transpose()) + print('X estimated : ', X.log().coeffs().transpose()) + print('X unfilterd : ', X_unfiltered.log().coeffs().transpose()) + print('-------------------------------------------------------') + # END DEBUG \ No newline at end of file diff --git a/ZBin/py_playground/rocos/robot/test_pypose.py b/ZBin/py_playground/rocos/robot/test_pypose.py new file mode 100644 index 0000000..1e3608e --- /dev/null +++ b/ZBin/py_playground/rocos/robot/test_pypose.py @@ -0,0 +1,29 @@ +import torch +import pypose as pp + +data = torch.randn(2, 3, requires_grad=True, device='cuda:0') +a = pp.LieTensor(data, ltype=pp.so3_type) +print('a:', a) +b = pp.so3(data) +print('b:', b) + +x = pp.identity_SE3(2,1) +y = pp.randn_se3(2,2) +print('x.shape:', x.shape, '\nx.gshape:', x.lshape) +print(x.lview(2)) +print(y) + +print("-------------------") +res = (x * y.Exp()).Inv().Log() +print("res : ",res) + +print("-------------------") +X = pp.randn_Sim3(6, dtype=torch.double) +a = pp.randn_sim3(6, dtype=torch.double) +b = X.AdjT(a) +print((X * b.Exp() - a.Exp() * X).abs().mean() < 1e-7) + +X = pp.randn_SE3(8) +a = pp.randn_se3(8) +b = X.Adj(a) +print((b.Exp() * X - X * a.Exp()).abs().mean() < 1e-7) \ No newline at end of file