diff --git a/astrobee/survey_manager/CMakeLists.txt b/astrobee/survey_manager/CMakeLists.txt index 93daf02c..c83f14e4 100644 --- a/astrobee/survey_manager/CMakeLists.txt +++ b/astrobee/survey_manager/CMakeLists.txt @@ -52,5 +52,5 @@ include_directories( ## Install ## ############# -catkin_install_python(PROGRAMS scripts/command_astrobee +catkin_install_python(PROGRAMS scripts/command_astrobee scripts/monitor_astrobee DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}) \ No newline at end of file diff --git a/astrobee/survey_manager/scripts/command_astrobee b/astrobee/survey_manager/scripts/command_astrobee index 585c3e97..577c61c2 100755 --- a/astrobee/survey_manager/scripts/command_astrobee +++ b/astrobee/survey_manager/scripts/command_astrobee @@ -21,51 +21,11 @@ import argparse import subprocess import sys +import os +import threading - - -# command = 'rosrun inspection inspection_tool -panorama -panorama_poses /resources/panorama_iss.txt -panorama_mode "5_mapper_and_hugin"' - - -def send_command_with_input(command): - print(command) - - # Start the process - process = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=sys.stdout, stderr=subprocess.PIPE, text=True) - - try: - while process.poll() is None: - - # Get user input dynamically - user_input = input() - - # Check if the user wants to exit - if user_input.lower().strip() == 'exit': - break - - # Send the user input to the process - process.stdin.write(user_input + "\n") - process.stdin.flush() - - # Close stdin to indicate no more input will be sent - process.stdin.close() - - # Read output and error (if any) - output, error = process.communicate() - - # Get the final exit code - return process.returncode - - except: - return -1 - - -def send_command(command): - print(command) - process = subprocess.Popen(command, shell=True, stdout=sys.stdout, stderr=sys.stdout, text=True) - - # Get the output and error (if any) - return process.wait() +import socket +import select def get_position(bay): @@ -84,43 +44,234 @@ def get_position(bay): if bay == "jem_bay7": return "'11, -9.7, 4.8'" +class SurveyExecutor: + + def __init__(self, robot_name, goal, bay, run): + self.robot = robot_name + self.input_path = '/tmp/input_' + robot_name + self.output_path = '/tmp/output_' + robot_name + + # Check if the file exists + if os.path.exists(self.input_path): + os.remove(self.input_path) + if os.path.exists(self.output_path): + os.remove(self.output_path) + + # Declare socket for process input + self.sock_input = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock_input.settimeout(1) # Set a timeout for socket operations + self.sock_input.bind(self.input_path) + self.sock_input.listen(1) # Listen for one connection + # self.sock_input_connected = False + # self.sock_input.accept(connect_input_callback) + + # Declare socket for process output + self.sock_output = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock_output.settimeout(1) # Set a timeout for socket operations + self.sock_output.bind(self.output_path) + self.sock_output.listen(1) # Listen for one connection + # self.sock_output_connected = False + # self.sock_output.accept(connect_output_callback) + + # Open the input FIFO file descriptor with non-blocking flag + # self.input_fifo_fd = os.open(self.input_fifo_path, os.O_RDONLY | os.O_NONBLOCK) + # self.input_fifo = os.fdopen(self.input_fifo_fd) + + # Open the output FIFO file descriptor with non-blocking flag + # self.output_fifo_fd = os.open(self.output_fifo_path, os.O_WRONLY) + # self.output_fifo = os.fdopen(self.output_fifo_fd, 'w') + + + self._stop_event = threading.Event() + + + self.survey_manager_executor(goal, bay, run) + + self.sock_input.close() + self.sock_output.close() + + # def connect_input_callback(sock, addr): + # """Callback function that is called when a connection is made to the socket.""" + # print("Input connection made from", addr) + # sock_input_connected = False + + # def connect_output_callback(sock, addr): + # """Callback function that is called when a connection is made to the socket.""" + # print("Output connection made from", addr) + # sock_output_connected = False + + def thread_write_output(self, process): + print("starting thread_write_output...") + # Store commulative output + output_total = "" + connected = False + try: + while True: + # Get output from process + output = process.stdout.readline() + if (output == '' and process.poll() is not None) or self._stop_event.is_set(): + break + if output: + output_total += output + + # If socket is not connected try to connect + try: + if not connected: + print("trying to connect") + conn, addr = self.sock_output.accept() + conn.setblocking(False) + + connected = True + conn.send(output_total.encode("utf-8")[:1024]) + except socket.timeout: + continue + except (socket.error, BrokenPipeError): + print("Error sending data. Receiver may have disconnected.") + connected = False + + # If socket is already connected, send output + if connected: + try: + conn.send(output.encode("utf-8")[:1024]) + except (socket.error, BrokenPipeError): + print("Error sending data. Receiver may have disconnected.") + connected = False + + # print(output.strip()) + # self.sock_output.send(output.encode("utf-8")[:1024]) + except Exception as e: + print("exit output:") + print(e) + # finally: + # # Save total output into a log + # print(output_total) + + def thread_read_input(self, process): + print("starting thread_read_input...") + try: + while True: + while not self._stop_event.is_set(): + print("waiting for connection") + try: + client_socket, client_address = self.sock_input.accept() + break + except socket.timeout: + continue + if self._stop_event.is_set(): + break + client_socket.settimeout(1) # Set a timeout for socket operations + + + while True: + print("accepted connection:") + print(client_address) + + while not self._stop_event.is_set(): + print("waiting to receive") + try: + request = client_socket.recv(1024).decode("utf-8") + break + except socket.timeout: + continue + if self._stop_event.is_set(): + break + + if not request: + break + print("got: " + request) + + print(request) + process.stdin.write(request + "\n") + process.stdin.flush() + except Exception as e: + print("exit input:") + print(e) + + + + def send_command_with_input(self, command): + print(command) + return_code = -1 + + try: + # Start the process + process = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + # Start input and output threads + input_thread = threading.Thread(target=self.thread_read_input, args=(process,)) + input_thread.start() + output_thread = threading.Thread(target=self.thread_write_output, args=(process,)) + output_thread.start() + + output_thread.join() + # Get the return code of the process + return_code = process.poll() + + except Exception as e: + print("exit main:") + print(e) + # Get the return code of the process + process.kill() + finally: + # Forcefully stop the thread (not recommended) + print("Killing input thread...") + self._stop_event.set() + input_thread.join() + + # Get the final exit code + return return_code + + def send_command(self, command): + print(command) + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + # Get the output and error (if any) + return process.wait() + + + def repeat_inspection(self): + while True: + user_input = input("Do you want to continue? (y/n): ").lower().strip() + if user_input == 'y': + if send_command_with_input("rosrun inspection inspection_tool -resume") != 0: + repeat_inspection() # Continue recursively + break + elif user_input == 'n': + print("Exiting.") + break + else: + print("Invalid input. Please enter 'y' or 'n'.") + + + def survey_manager_executor(self, goal, bay, run): -def repeat_inspection(): - while True: - user_input = input("Do you want to continue? (y/n): ").lower().strip() - if user_input == 'y': - if send_command_with_input("rosrun inspection inspection_tool -resume") != 0: - repeat_inspection() # Continue recursively - break - elif user_input == 'n': - print("Exiting.") - break - else: - print("Invalid input. Please enter 'y' or 'n'.") + if goal == "dock": + self.send_command("rosrun executive teleop -dock") + elif goal == "dock": + self.send_command("rosrun executive teleop -undock") -def survey_manager_executer(goal, bay, run): + elif goal == "move": + self.send_command("rosrun executive teleop -move -pos " + bay) - if goal == "dock": - send_command("rosrun executive teleop -dock") + # Change exposure if needed - elif goal == "dock": - send_command("rosrun executive teleop -undock") + # Change map if needed - elif goal == "move": - send_command("rosrun executive teleop -move -pos " + bay) + elif goal == "panorama": - elif goal == "panorama": - send_command("python gds_helper_batch.py -i cmd -- bagger -start pano_" + bay + "_" + run) - if send_command_with_input("rosrun inspection inspection_tool -panorama -panorama_mode '5_mapper_and_hugin' -pos " + str(get_position(bay))) != 0: - repeat_inspection() - send_command("python gds_helper_batch.py -i cmd -- bagger -stop") + self.send_command("python gds_helper_batch.py -i cmd -- bagger -start pano_" + bay + "_" + run) + if self.send_command_with_input("rosrun inspection inspection_tool -panorama -panorama_mode '5_mapper_and_hugin' -pos " + str(get_position(bay))) != 0: + self.repeat_inspection() + self.send_command("python gds_helper_batch.py -i cmd -- bagger -stop") - elif goal == "stereo": - send_command("python gds_helper_batch.py -i cmd -- bagger -start stereo_" + bay + "_" + run) - send_command("python gds_helper_batch.py -i cmd -- plan -load plans/ISAAC/" + bay + "_stereo_mapping.fplan") - send_command("python gds_helper_batch.py -i cmd -- plan -run") - send_command("python gds_helper_batch.py -i cmd -- bagger -stop") + + + elif goal == "stereo": + self.send_command("python gds_helper_batch.py -i cmd -- bagger -start stereo_" + bay + "_" + run) + self.send_command("python gds_helper_batch.py -i cmd -- plan -load plans/ISAAC/" + bay + "_stereo_mapping.fplan") + self.send_command("python gds_helper_batch.py -i cmd -- plan -run") + self.send_command("python gds_helper_batch.py -i cmd -- bagger -stop") class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter): @@ -132,6 +283,11 @@ if __name__ == "__main__": description=__doc__, formatter_class=CustomFormatter ) + parser.add_argument( + "robot_name", + default="", + help="Robot name executing the command.", + ) parser.add_argument( "command_name", help="Prefix for bagfiles to merge. Bags should all be in the current working directory.", @@ -149,4 +305,4 @@ if __name__ == "__main__": ) args = parser.parse_args() - survey_manager_executer(args.command_name, args.bay, args.run) + e = SurveyExecutor(args.robot_name, args.command_name, args.bay, args.run) diff --git a/astrobee/survey_manager/scripts/monitor_astrobee b/astrobee/survey_manager/scripts/monitor_astrobee new file mode 100755 index 00000000..9776ed49 --- /dev/null +++ b/astrobee/survey_manager/scripts/monitor_astrobee @@ -0,0 +1,97 @@ +#!/usr/bin/env python +# +# Copyright (c) 2021, United States Government, as represented by the +# Administrator of the National Aeronautics and Space Administration. +# +# All rights reserved. +# +# The "ISAAC - Integrated System for Autonomous and Adaptive Caretaking +# platform" software is licensed under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with the +# License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import sys +import os +import threading +import socket + +def thread_write_input(sock_input): + print("starting thread_write_input...") + # Store commulative output + output_total = "" + try: + while True: + # Get user input dynamically + print("get input") + user_input = input() + print("user input: " + user_input) + # Check if the user wants to exit + if user_input.lower().strip() == 'exit': + break + sock_input.send(user_input.encode("utf-8")[:1024]) + + except: + print("exit output") + finally: + # Save total output into a log + print(output_total) + +def thread_read_output(sock_output): + print("starting thread_read_output...") + try: + while True: + request = sock_output.recv(1024) + request = request.decode("utf-8") # convert bytes tro str + + print(request) + except: + print("exit input") + +def survey_monitor(robot_name): + input_path = '/tmp/input_' + robot_name + output_path = '/tmp/output_' + robot_name + + sock_client_input = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock_client_input.connect(input_path) + sock_client_output = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock_client_output.connect(output_path) + + # Start input and output threads + input_thread = threading.Thread(target=thread_write_input, args=(sock_client_input,)) + input_thread.start() + output_thread = threading.Thread(target=thread_read_output, args=(sock_client_output,)) + output_thread.start() + + # Wait for the thread + input_thread.join() + output_thread.join() + + # Close the sockets + sock_client_input.close() + sock_client_output.close() + +class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter): + pass + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=CustomFormatter + ) + + parser.add_argument( + "robot_name", + help="Name of the robot that needs monitoring.", + ) + args = parser.parse_args() + + survey_monitor(args.robot_name)