diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..954fc0a --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.idea +/.gitignore +venv +logs +Test +data +/__pycache__/*.pyc +*.pyc +/config/PiHub.json diff --git a/Globals.py b/Globals.py index 8b09347..9c0cf65 100644 --- a/Globals.py +++ b/Globals.py @@ -1,11 +1,5 @@ -################################################## -# PiHub global variables -################################################## -# Author: Simon Brière, Eng. MASc. -################################################## - from libs.config.ConfigManager import ConfigManager config_man = ConfigManager() -version_string = '1.1.0' +version_string = '1.2.0' diff --git a/config/PiHub.json b/config/PiHub_Defaults.json similarity index 78% rename from config/PiHub.json rename to config/PiHub_Defaults.json index df1a8c2..8f025a4 100644 --- a/config/PiHub.json +++ b/config/PiHub_Defaults.json @@ -10,8 +10,7 @@ "hostname": "0.0.0.0", "port": 8118, "data_path": "data/watch", - "sftp_transfer": true, - "opentera_transfer": false, + "transfer_type": "opentera", "server_base_folder": "Watch", "send_logs_only": false, "minimal_dataset_duration": 10 @@ -30,13 +29,13 @@ "sensor_ID": "Sensor_0" }, "SFTP": { - "hostname": "telesante.cdrv.ca", - "port": 40091, - "username": "sftp_dev", - "password": "LS9PwJx7$829so" + "hostname": "127.0.0.1", + "port": 22, + "username": "sftp", + "password": "sftp" }, "OpenTera": { - "hostname": "localhost", + "hostname": "127.0.0.1", "port": 40075 } } \ No newline at end of file diff --git a/libs/config/ConfigManager.py b/libs/config/ConfigManager.py index dd60c97..5a5fade 100644 --- a/libs/config/ConfigManager.py +++ b/libs/config/ConfigManager.py @@ -41,8 +41,9 @@ def load_config(self, filename) -> bool: self.opentera_config = config_json["OpenTera"] if self.validate_config("WatchServer", config_json['WatchServer'], ['hostname', 'port', 'data_path', - 'sftp_transfer', 'opentera_transfer', - 'server_base_folder', 'send_logs_only']): + 'transfer_type', 'server_base_folder', + 'send_logs_only', 'minimal_dataset_duration' + ]): self.watch_server_config = config_json["WatchServer"] if self.validate_config("BedServer", config_json['BedServer'], ['hostname', 'port', 'data_path', diff --git a/libs/servers/WatchServer.py b/libs/servers/WatchServer.py deleted file mode 100644 index eeef906..0000000 --- a/libs/servers/WatchServer.py +++ /dev/null @@ -1,352 +0,0 @@ -################################################## -# PiHub Apple Watch communication server -################################################## -# Authors: Simon Brière, Eng. MASc. -################################################## -from libs.servers.BaseServer import BaseServer -from libs.uploaders.SFTPUploader import SFTPUploader - -from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from pathlib import Path - -import logging -import os -import threading - - -class WatchServer(BaseServer): - - server = None - processed_files = [] - - def __init__(self, server_config: dict, sftp_config: dict): - super().__init__(server_config=server_config) - self.sftp_config = sftp_config - self.server_base_folder = server_config['server_base_folder'] - self.sftp_transfer = server_config['sftp_transfer'] - self.opentera_transfer = server_config['opentera_transfer'] - self.send_logs_only = server_config['send_logs_only'] - self.minimal_dataset_duration = server_config['minimal_dataset_duration'] - - self.synching_files = False - - # Set file synching after a few seconds without receiving any data - self.file_syncher_timer = threading.Timer(20, self.sync_files) - - def run(self): - logging.info('Apple Watch Server starting...') - - # Check if all files are on sync on the server - # self.sync_files(check_internet=False) # Don't explicitely check internet connection on startup - - request_handler = AppleWatchRequestHandler - request_handler.base_server = self - - self.server = ThreadingHTTPServer((self.hostname, self.port), request_handler) - self.server.timeout = 5 # 5 seconds timeout should be ok since we are usually on local network - self.is_running = True - logging.info('Apple Watch Server started on port ' + str(self.port)) - - # Check if all files are on sync on the server (after the main server has started) - self.file_syncher_timer = threading.Timer(1, self.sync_files, [False]) - self.file_syncher_timer.start() - - # Thread will wait here - self.server.serve_forever() - self.server.server_close() - - logging.info("Apple Watch Server stopped.") - - def stop(self): - super().stop() - - if self.server: - self.server.shutdown() - self.server.server_close() - - def sync_files(self, check_internet: bool = True): - logging.info("WatchServer: Synchronizing files with server...") - if self.synching_files: - logging.info("*** WatchServer: Already synching files. Will wait for next time.") - return - - self.synching_files = True - # Build list of files to transfer - base_folder = self.data_path + '/ToProcess/' - base_folder = base_folder.replace('/', os.sep) - files = [] - full_files = [] - file_folders = [] - for (dp, dn, f) in os.walk(base_folder): - if f: - dp = dp.replace('/', os.sep) - if self.send_logs_only: - # Filter list of files to keep only log files - folder_files = [file for file in f if file.lower().endswith("txt") or file.lower().endswith("oimi")] - else: - if self.minimal_dataset_duration > 0: - # Filter dataset that are too small (<10 seconds) - if 'watch_logs.txt' in f: - import csv - try: - with open(os.path.join(dp, 'watch_logs.txt'), newline='') as csvfile: - log_reader = csv.reader(csvfile, delimiter='\t') - first_timestamp = None - for row in log_reader: - if len(row) == 0: - continue - if not first_timestamp: - first_timestamp = row[0] - last_timestamp = row[0] - duration = float(last_timestamp) - float(first_timestamp) - if duration <= self.minimal_dataset_duration: - # Must reject! Too short! - self.move_files([os.path.join(dp, file) for file in f], 'Rejected') - logging.info('Rejected folder ' + dp + ': dataset too small.') - continue # Move to next folder - except IOError: - pass # Ignore error and move on! - except AssertionError: - pass - - folder_files = f - files.extend(folder_files) - full_files.extend([os.path.join(dp, file) for file in folder_files]) - file_folder = dp.replace(base_folder, '') - file_folders.extend(self.server_base_folder + "/" + file_folder.replace(os.sep, '/') - for _ in folder_files) - - # Filter duplicates - # full_files = list(set(full_files)) - - if full_files: - logging.info('About to sync files...') - - if self.sftp_transfer: - # Send files using sftp - # Sending files - success = SFTPUploader.sftp_send(sftp_config=self.sftp_config, files_to_transfer=full_files, - files_directory_on_server=file_folders, - file_transferred_callback=self.file_was_processed, - check_internet=check_internet) - - # Set files as processed - if success: - self.move_processed_files() - else: - # Something occurred... Try again in 5 minutes - self.file_syncher_timer = threading.Timer(300, self.sync_files) - self.file_syncher_timer.start() - - # for file in full_files: - # WatchServer.file_was_processed(file) - else: - logging.info('No file to sync!') - # Clean up empty folders - WatchServer.remove_empty_folders(Path(base_folder).absolute()) - logging.info("WatchServer: Synchronization done.") - self.synching_files = False - - def file_was_processed(self, full_filepath: str): - # Mark file as processed - will be moved later on to prevent conflicts - self.processed_files.append(full_filepath) - - def move_files(self, source_files, target_folder): - for full_filepath in source_files: - # Move file from "ToProcess" to the target folder - target_file = full_filepath.replace(os.sep + 'ToProcess' + os.sep, os.sep + target_folder + os.sep) - - # Create directory, if needed - target_dir = os.path.dirname(target_file) - try: - os.makedirs(name=target_dir, exist_ok=True) - except OSError as exc: - logging.error('Error creating ' + target_dir + ': ' + exc.strerror) - continue - # raise - - try: - os.rename(full_filepath, target_file) - except (OSError, IOError) as exc: - logging.error('Error moving ' + full_filepath + ' to ' + target_file + ': ' + exc.strerror) - continue - # raise - - def move_processed_files(self): - self.move_files(self.processed_files, 'Processed') - self.processed_files.clear() - - @staticmethod - def remove_empty_folders(path_abs): - walk = list(os.walk(path_abs)) - for path, _, _ in walk[::-1]: - if len(os.listdir(path)) == 0: - os.rmdir(path.replace("/", os.sep)) - - -class AppleWatchRequestHandler(BaseHTTPRequestHandler): - - base_server: WatchServer = None - - def setup(self): - BaseHTTPRequestHandler.setup(self) - - # Simple get to show what to do for file transfer - def do_GET(self): - - # Ping requests can be answered directly - content_type = self.headers['Content-Type'] - if content_type == 'cdrv-cmd/Connect': - # self.streamer.add_log.emit("Connexion de " + self.headers['Device-Name'], LogTypes.LOGTYPE_INFO) - logging.info(self.headers['Device-Name'] + ' connected.') - self.send_response(202) - self.send_header('Content-type', 'cdrv-cmd/Connect') - self.end_headers() - return - - if content_type == 'cdrv-cmd/Disconnect': - # self.streamer.add_log.emit("Déconnexion de " + self.headers['Device-Name'], LogTypes.LOGTYPE_INFO) - logging.info(self.headers['Device-Name'] + ' disconnected.') - self.send_response(202) - self.send_header('Content-type', 'cdrv-cmd/Disconnect') - self.end_headers() - # self.base_server.sync_files() - return - - self.send_response(200) - self.send_header('Content-type', 'text/html') - self.end_headers() - - def do_POST(self): - - # Unpack metadata - content_type = self.headers['Content-Type'] - content_length = int(self.headers['Content-Length']) - file_type = self.headers['File-Type'] - device_type = self.headers['Device-Type'] - device_name = self.headers['Device-Name'] - file_path = self.headers['File-Path'] - file_name = self.headers['File-Name'] - - if None in [file_type, device_type, device_name, file_path, file_name]: - logging.error(device_name + " - Badly formatted request - missing some headers.") - self.send_response(400) - self.end_headers() - return - - if content_type != 'cdrv-cmd/File-Upload': - logging.warning(device_name + " - Unknown command: " + content_type) - self.send_response(400) - self.end_headers() - return - - # Stop timer to send data, since we received new data - if self.base_server.file_syncher_timer and self.base_server.file_syncher_timer.is_alive(): - self.base_server.file_syncher_timer.cancel() - self.base_server.file_syncher_timer = None - - # Prepare to receive data - destination_dir = (self.base_server.data_path + '/ToProcess/' + device_name + '/' + file_path + '/')\ - .replace('//', '/').replace('/', os.sep) - destination_path = destination_dir + file_name - - file_name = device_name + file_path + '/' + file_name - logging.info(device_name + " - Receiving: " + file_name + " (" + str(content_length) + " bytes)") - - # Check if file exists and size matches - file_infos = Path(destination_path) - if file_infos.exists(): - file_infos = os.stat(destination_path) - if file_infos.st_size < content_length: - logging.warning(device_name + ": " + file_name + " - Existing file, but incomplete (" + - str(file_infos.st_size) + "/" + str(content_length) + " bytes), resending.") - else: - logging.warning(device_name + ": " + file_name + " - Existing file - replacing file.") - - # Destination directory if it doesn't exist - Path(destination_dir).mkdir(parents=True, exist_ok=True) - - # Gets the data and save to file - buffer_size = 4 * 1024 - content_size_remaining = content_length - # last_pc = -1 - - # Supported file type? - if file_type.lower() in ['data', 'dat', 'csv', 'txt', 'oimi']: - - if file_type.lower() in ['data', 'dat']: - # Binary file - fh = open(destination_path, 'wb') - text_format = False - else: - # Text file - fh = open(destination_path, 'w') - text_format = True - - while content_size_remaining > 0: - if buffer_size > content_size_remaining: - buffer_size = content_size_remaining - try: - data = self.rfile.read(buffer_size) - except OSError as err: - err_desc = err.strerror - if not err_desc and len(err.args) > 0: - err_desc = err.args[0] - logging.error(device_name + " - Error occured while transferring " + file_name + ": " + - str(err_desc)) - return - - if text_format: - fh.write(data.decode()) - else: - fh.write(data) - content_size_remaining -= buffer_size - # content_received = (content_length - content_size_remaining) - # pc = math.floor((content_received / content_length) * 100) - # if pc != last_pc: - # self.streamer.update_progress.emit(file_name, " (" + str(content_received) + "/ " + - # str(content_length) + ")", (content_length - - # content_size_remaining), - # content_length) - # last_pc = pc - fh.close() - else: - # self.streamer.add_log.emit(device_name + ": " + file_name + " - Type de fichier non-supporté: " + - # file_type.lower(), LogTypes.LOGTYPE_ERROR) - logging.error(device_name + " - " + file_name + " - Unsupported file type: " + file_type.lower()) - self.send_response(400) - self.send_header('Content-type', 'file-transfer/invalid-file-type') - self.end_headers() - return - - # Check if everything was received correctly - file_infos = os.stat(destination_path) - if file_infos.st_size < content_length: - # Missing data?!?! - error = "Transfer error: " + str(file_infos.st_size) + " bytes received, " + str(content_length) + \ - " expected." - logging.error(device_name + " - " + file_name + " - " + error) - self.send_response(400) - self.send_header('Content-type', 'file-transfer/error') - self.end_headers() - return - - if content_length == 0 or (file_infos.st_size == 0 and content_length != 0): - error = "Transfer error: 0 byte received." - logging.error(device_name + " - " + file_name + " - " + error) - self.send_response(400) - self.send_header('Content-type', 'file-transfer/error') - self.end_headers() - return - - # All is good! - logging.info(device_name + " - " + file_name + ": transfer complete.") - - self.send_response(200) - self.send_header('Content-type', 'file-transfer/ack') - self.end_headers() - - # Start timer to sync data, if no other transfer occurs until timeout - self.base_server.file_syncher_timer = threading.Timer(20, self.base_server.sync_files) - self.base_server.file_syncher_timer.start() - diff --git a/libs/servers/WatchServerBase.py b/libs/servers/WatchServerBase.py new file mode 100644 index 0000000..b2e6600 --- /dev/null +++ b/libs/servers/WatchServerBase.py @@ -0,0 +1,79 @@ +from libs.servers.BaseServer import BaseServer +from http.server import ThreadingHTTPServer + +import logging +import os + + +class WatchServerBase(BaseServer): + server: ThreadingHTTPServer | None = None + _request_handler = None + processed_files = [] + + def __init__(self, server_config: dict, request_handler): + super().__init__(server_config=server_config) + self.server_base_folder = server_config['server_base_folder'] + self.send_logs_only = server_config['send_logs_only'] + self.minimal_dataset_duration = server_config['minimal_dataset_duration'] + + self._request_handler = request_handler + self._request_handler.base_server = self + + def run(self): + logging.info(self.__class__.__name__ + ' starting...') + self.server = ThreadingHTTPServer((self.hostname, self.port), self._request_handler) + self.server.timeout = 5 # 5 seconds timeout should be ok since we are usually on local network + self.is_running = True + logging.info(self.__class__.__name__ + ' started on port ' + str(self.port)) + + # Thread will wait here + self.server.serve_forever() + self.server.server_close() + logging.info(self.__class__.__name__ + ' stopped.') + + def stop(self): + super().stop() + + if self.server: + self.server.shutdown() + self.server.server_close() + + def new_file_received(self, filename: str): + logging.error(self.__class__.__name__ + ' - unhandled new file received') + + @staticmethod + def move_files(source_files, target_folder): + for full_filepath in source_files: + # Move file from "ToProcess" to the target folder + target_file = full_filepath.replace(os.sep + 'ToProcess' + os.sep, os.sep + target_folder + os.sep) + + # Create directory, if needed + target_dir = os.path.dirname(target_file) + try: + os.makedirs(name=target_dir, exist_ok=True) + except OSError as exc: + logging.error('Error creating ' + target_dir + ': ' + exc.strerror) + continue + # raise + + try: + os.rename(full_filepath, target_file) + except (OSError, IOError) as exc: + logging.error('Error moving ' + full_filepath + ' to ' + target_file + ': ' + exc.strerror) + continue + # raise + + def file_was_processed(self, full_filepath: str): + # Mark file as processed - will be moved later on to prevent conflicts + self.processed_files.append(full_filepath) + + def move_processed_files(self): + self.move_files(self.processed_files, 'Processed') + self.processed_files.clear() + + @staticmethod + def remove_empty_folders(path_abs): + walk = list(os.walk(path_abs)) + for path, _, _ in walk[::-1]: + if len(os.listdir(path)) == 0: + os.rmdir(path.replace("/", os.sep)) diff --git a/libs/servers/WatchServerOpenTera.py b/libs/servers/WatchServerOpenTera.py new file mode 100644 index 0000000..9bad2fa --- /dev/null +++ b/libs/servers/WatchServerOpenTera.py @@ -0,0 +1,23 @@ +from libs.servers.WatchServerBase import WatchServerBase +from libs.servers.handlers.OpenTeraAppleWatchRequestHandler import OpenTeraAppleWatchRequestHandler + +import logging +import os +import threading + + +class WatchServerOpenTera(WatchServerBase): + + def __init__(self, server_config: dict, opentera_config: dict): + + # Setup request handler + request_handler = OpenTeraAppleWatchRequestHandler + + super().__init__(server_config=server_config, request_handler=request_handler) + self.opentera_config = opentera_config + + def run(self): + super().run() + + def new_file_received(self, filename: str): + logging.error(self.__class__.__name__ + ' - TODO - Handle file!') diff --git a/libs/servers/WatchServerSFTP.py b/libs/servers/WatchServerSFTP.py new file mode 100644 index 0000000..575c8d2 --- /dev/null +++ b/libs/servers/WatchServerSFTP.py @@ -0,0 +1,121 @@ +from libs.servers.WatchServerBase import WatchServerBase +from libs.servers.handlers.SFTPAppleWatchRequestHandler import SFTPAppleWatchRequestHandler +from libs.uploaders.SFTPUploader import SFTPUploader + +from pathlib import Path + +import logging +import os +import threading + + +class WatchServerSFTP(WatchServerBase): + + server = None + + def __init__(self, server_config: dict, sftp_config: dict): + + # Setup request handler + request_handler = SFTPAppleWatchRequestHandler + + super().__init__(server_config=server_config, request_handler=request_handler) + self.sftp_config = sftp_config + + self.synching_files = False + + # Set file synching after a few seconds without receiving any data + self.file_syncher_timer = threading.Timer(20, self.sync_files) + + def run(self): + # Check if all files are on sync on the server (after the main server has started) + self.file_syncher_timer = threading.Timer(1, self.sync_files, [False]) + self.file_syncher_timer.start() + + super().run() + + def sync_files(self, check_internet: bool = True): + logging.info("WatchServerSFTP: Synchronizing files with server...") + if self.synching_files: + logging.info("*** WatchServerSFTP: Already synching files. Will wait for next time.") + return + + self.synching_files = True + # Build list of files to transfer + base_folder = self.data_path + '/ToProcess/' + base_folder = base_folder.replace('/', os.sep) + files = [] + full_files = [] + file_folders = [] + for (dp, dn, f) in os.walk(base_folder): + if f: + dp = dp.replace('/', os.sep) + if self.send_logs_only: + # Filter list of files to keep only log files + folder_files = [file for file in f if file.lower().endswith("txt") or file.lower().endswith("oimi")] + else: + if self.minimal_dataset_duration > 0: + # Filter dataset that are too small (<10 seconds) + if 'watch_logs.txt' in f: + import csv + try: + with open(os.path.join(dp, 'watch_logs.txt'), newline='') as csvfile: + log_reader = csv.reader(csvfile, delimiter='\t') + first_timestamp = None + for row in log_reader: + if len(row) == 0: + continue + if not first_timestamp: + first_timestamp = row[0] + last_timestamp = row[0] + duration = float(last_timestamp) - float(first_timestamp) + if duration <= self.minimal_dataset_duration: + # Must reject! Too short! + self.move_files([os.path.join(dp, file) for file in f], 'Rejected') + logging.info('Rejected folder ' + dp + ': dataset too small.') + continue # Move to next folder + except IOError: + pass # Ignore error and move on! + except AssertionError: + pass + + folder_files = f + files.extend(folder_files) + full_files.extend([os.path.join(dp, file) for file in folder_files]) + file_folder = dp.replace(base_folder, '') + file_folders.extend(self.server_base_folder + "/" + file_folder.replace(os.sep, '/') + for _ in folder_files) + + # Filter duplicates + # full_files = list(set(full_files)) + + if full_files: + logging.info('WatchServerSFTP: About to sync files...') + + # Send files using sftp + # Sending files + success = SFTPUploader.sftp_send(sftp_config=self.sftp_config, files_to_transfer=full_files, + files_directory_on_server=file_folders, + file_transferred_callback=self.file_was_processed, + check_internet=check_internet) + + # Set files as processed + if success: + self.move_processed_files() + else: + # Something occurred... Try again in 5 minutes + self.file_syncher_timer = threading.Timer(300, self.sync_files) + self.file_syncher_timer.start() + + # for file in full_files: + # WatchServer.file_was_processed(file) + else: + logging.info('WatchServerSFTP: No file to sync!') + # Clean up empty folders + self.remove_empty_folders(Path(base_folder).absolute()) + logging.info("WatchServerSFTP: Synchronization done.") + self.synching_files = False + + def new_file_received(self, filename: str): + # Start timer to batch transfer files in 20 seconds + self.file_syncher_timer = threading.Timer(20, self.sync_files) + self.file_syncher_timer.start() diff --git a/libs/servers/handlers/BaseAppleWatchRequestHandler.py b/libs/servers/handlers/BaseAppleWatchRequestHandler.py new file mode 100644 index 0000000..4927d34 --- /dev/null +++ b/libs/servers/handlers/BaseAppleWatchRequestHandler.py @@ -0,0 +1,165 @@ +from http.server import BaseHTTPRequestHandler +from pathlib import Path + +import logging +import os + + +class BaseAppleWatchRequestHandler(BaseHTTPRequestHandler): + base_server = None + + def setup(self): + BaseHTTPRequestHandler.setup(self) + + # Simple get to show what to do for file transfer + def do_GET(self): + # Ping requests can be answered directly + content_type = self.headers['Content-Type'] + if content_type == 'cdrv-cmd/Connect': + # self.streamer.add_log.emit("Connexion de " + self.headers['Device-Name'], LogTypes.LOGTYPE_INFO) + logging.info(self.headers['Device-Name'] + ' connected.') + self.send_response(202) + self.send_header('Content-type', 'cdrv-cmd/Connect') + self.end_headers() + return + + if content_type == 'cdrv-cmd/Disconnect': + # self.streamer.add_log.emit("Déconnexion de " + self.headers['Device-Name'], LogTypes.LOGTYPE_INFO) + logging.info(self.headers['Device-Name'] + ' disconnected.') + self.send_response(202) + self.send_header('Content-type', 'cdrv-cmd/Disconnect') + self.end_headers() + # self.base_server.sync_files() + return + + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + + def do_POST(self): + + # Unpack metadata + content_type = self.headers['Content-Type'] + content_length = int(self.headers['Content-Length']) + file_type = self.headers['File-Type'] + device_type = self.headers['Device-Type'] + device_name = self.headers['Device-Name'] + file_path = self.headers['File-Path'] + file_name = self.headers['File-Name'] + + if None in [file_type, device_type, device_name, file_path, file_name]: + logging.error(device_name + " - Badly formatted request - missing some headers.") + self.send_response(400) + self.end_headers() + return + + if content_type != 'cdrv-cmd/File-Upload': + logging.warning(device_name + " - Unknown command: " + content_type) + self.send_response(400) + self.end_headers() + return + + # Prepare to receive data + destination_dir = (self.base_server.data_path + '/ToProcess/' + device_name + '/' + file_path + '/')\ + .replace('//', '/').replace('/', os.sep) + destination_path = destination_dir + file_name + + file_name = device_name + file_path + '/' + file_name + logging.info(device_name + " - Receiving: " + file_name + " (" + str(content_length) + " bytes)") + + # Check if file exists and size matches + file_infos = Path(destination_path) + if file_infos.exists(): + file_infos = os.stat(destination_path) + if file_infos.st_size < content_length: + logging.warning(device_name + ": " + file_name + " - Existing file, but incomplete (" + + str(file_infos.st_size) + "/" + str(content_length) + " bytes), resending.") + else: + logging.warning(device_name + ": " + file_name + " - Existing file - replacing file.") + + # Destination directory if it doesn't exist + Path(destination_dir).mkdir(parents=True, exist_ok=True) + + # Gets the data and save to file + buffer_size = 4 * 1024 + content_size_remaining = content_length + # last_pc = -1 + + # Supported file type? + if file_type.lower() in ['data', 'dat', 'csv', 'txt', 'oimi']: + + if file_type.lower() in ['data', 'dat']: + # Binary file + fh = open(destination_path, 'wb') + text_format = False + else: + # Text file + fh = open(destination_path, 'w') + text_format = True + + while content_size_remaining > 0: + if buffer_size > content_size_remaining: + buffer_size = content_size_remaining + try: + data = self.rfile.read(buffer_size) + except OSError as err: + err_desc = err.strerror + if not err_desc and len(err.args) > 0: + err_desc = err.args[0] + logging.error(device_name + " - Error occured while transferring " + file_name + ": " + + str(err_desc)) + return + + if text_format: + fh.write(data.decode()) + else: + fh.write(data) + content_size_remaining -= buffer_size + # content_received = (content_length - content_size_remaining) + # pc = math.floor((content_received / content_length) * 100) + # if pc != last_pc: + # self.streamer.update_progress.emit(file_name, " (" + str(content_received) + "/ " + + # str(content_length) + ")", (content_length - + # content_size_remaining), + # content_length) + # last_pc = pc + fh.close() + else: + # self.streamer.add_log.emit(device_name + ": " + file_name + " - Type de fichier non-supporté: " + + # file_type.lower(), LogTypes.LOGTYPE_ERROR) + logging.error(device_name + " - " + file_name + " - Unsupported file type: " + file_type.lower()) + self.send_response(400) + self.send_header('Content-type', 'file-transfer/invalid-file-type') + self.end_headers() + return + + # Check if everything was received correctly + file_infos = os.stat(destination_path) + if file_infos.st_size < content_length: + # Missing data?!?! + error = "Transfer error: " + str(file_infos.st_size) + " bytes received, " + str(content_length) + \ + " expected." + logging.error(device_name + " - " + file_name + " - " + error) + self.send_response(400) + self.send_header('Content-type', 'file-transfer/error') + self.end_headers() + return + + if content_length == 0 or (file_infos.st_size == 0 and content_length != 0): + error = "Transfer error: 0 byte received." + logging.error(device_name + " - " + file_name + " - " + error) + self.send_response(400) + self.send_header('Content-type', 'file-transfer/error') + self.end_headers() + return + + # All is good! + logging.info(device_name + " - " + file_name + ": transfer complete.") + + self.send_response(200) + self.send_header('Content-type', 'file-transfer/ack') + self.end_headers() + + # Signal base server that we got new files + if self.base_server: + self.base_server.new_file_received(file_name) diff --git a/libs/servers/handlers/OpenTeraAppleWatchRequestHandler.py b/libs/servers/handlers/OpenTeraAppleWatchRequestHandler.py new file mode 100644 index 0000000..25218b8 --- /dev/null +++ b/libs/servers/handlers/OpenTeraAppleWatchRequestHandler.py @@ -0,0 +1,18 @@ +from libs.servers.handlers.BaseAppleWatchRequestHandler import BaseAppleWatchRequestHandler +import logging + + +class OpenTeraAppleWatchRequestHandler(BaseAppleWatchRequestHandler): + + def setup(self): + super().setup() + + def do_GET(self): + if self.path.startswith('/api/device'): + if self.path.endswith('register'): + # Do a device register + pass + super().do_GET() + + def do_POST(self): + super().do_POST() diff --git a/libs/servers/handlers/SFTPAppleWatchRequestHandler.py b/libs/servers/handlers/SFTPAppleWatchRequestHandler.py new file mode 100644 index 0000000..a37c5a2 --- /dev/null +++ b/libs/servers/handlers/SFTPAppleWatchRequestHandler.py @@ -0,0 +1,19 @@ +from libs.servers.handlers.BaseAppleWatchRequestHandler import BaseAppleWatchRequestHandler +import logging + + +class SFTPAppleWatchRequestHandler(BaseAppleWatchRequestHandler): + + def setup(self): + super().setup() + + def do_GET(self): + super().do_GET() + + def do_POST(self): + # Stop timer to send data, since we received new data + if self.base_server.file_syncher_timer and self.base_server.file_syncher_timer.is_alive(): + self.base_server.file_syncher_timer.cancel() + self.base_server.file_syncher_timer = None + + super().do_POST() diff --git a/piHub.py b/piHub.py index 7b4ad43..10397d6 100644 --- a/piHub.py +++ b/piHub.py @@ -1,16 +1,12 @@ -################################################## -# PiHub project main script -################################################## -# Authors: Simon Brière, Eng. MASc. -# Mathieu Hamel, Eng. MASc. -################################################## import time import logging import sys +import os from libs.config.ConfigManager import ConfigManager from libs.servers.BedServer import BedServer -from libs.servers.WatchServer import WatchServer +from libs.servers.WatchServerSFTP import WatchServerSFTP +from libs.servers.WatchServerOpenTera import WatchServerOpenTera from libs.servers.folderWatcher import FolderWatcher from libs.hardware.PiHubHardware import PiHubHardware @@ -40,7 +36,13 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Load config file logging.info("Starting up PiHub v" + version_string + "...") - if not config_man.load_config('config/PiHub.json'): + config_file = 'config/PiHub_Defaults.json' + if os.path.isfile('config/PiHub.json'): + config_file = 'config/PiHub.json' + else: + logging.warning('No specific config file - using default config!') + logging.info('Using config file: ' + config_file) + if not config_man.load_config(config_file): logging.critical("Invalid config - system halted.") exit(1) @@ -69,12 +71,20 @@ def handle_exception(exc_type, exc_value, exc_traceback): # Apple Watch server if config_man.general_config["enable_watch_server"]: # Start Apple Watch server - watch_server = WatchServer(server_config=config_man.watch_server_config, - sftp_config=config_man.sftp_config) - - # Start server - watch_server.start() - servers.append(watch_server) + watch_server = None + if config_man.watch_server_config['transfer_type'] == 'sftp': + watch_server = WatchServerSFTP(server_config=config_man.watch_server_config, + sftp_config=config_man.sftp_config) + if config_man.watch_server_config['transfer_type'] == 'opentera': + watch_server = WatchServerOpenTera(server_config=config_man.watch_server_config, + opentera_config=config_man.opentera_config) + if not watch_server: + logging.critical('Unknown Watch Server Transfer type "' + config_man.watch_server_config['transfer_type'] + + '" - will not start server') + else: + # Start server + watch_server.start() + servers.append(watch_server) # Folder Watcher server if config_man.general_config["enable_folderWatcher_server"]: diff --git a/requirements.txt b/requirements.txt index 2e2911e..5a16096 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ fabric == 3.2.2 -paramiko ~= 3.3.1 \ No newline at end of file +paramiko == 3.4.0 \ No newline at end of file