diff --git a/Globals.py b/Globals.py index dc342cd..8dcbb60 100644 --- a/Globals.py +++ b/Globals.py @@ -8,4 +8,4 @@ config_man = ConfigManager() -version_string = '1.0.1' +version_string = '1.0.3' diff --git a/config/PiHub.json b/config/PiHub.json index e5a0585..bd96f32 100644 --- a/config/PiHub.json +++ b/config/PiHub.json @@ -1,9 +1,9 @@ { "General": { - "enable_logging": false, + "enable_logging": true, "enable_bed_server": false, - "enable_watch_server": false, - "enable_folderWatcher_server": true, + "enable_watch_server": true, + "enable_folderWatcher_server": false, "logs_path": "logs" }, "WatchServer": { @@ -13,7 +13,8 @@ "sftp_transfer": true, "opentera_transfer": false, "server_base_folder": "Watch", - "send_logs_only": false + "send_logs_only": false, + "minimal_dataset_duration": 10 }, "BedServer": { "hostname": "0.0.0.0", @@ -29,10 +30,10 @@ "sensor_ID": "Sensor_0" }, "SFTP": { - "hostname": "telesante.cdrv.ca", + "hostname": "", "port": 40091, "username": "dev", - "password": "tr3cr100" + "password": "" }, "OpenTera": { "hostname": "localhost", diff --git a/libs/servers/WatchServer.py b/libs/servers/WatchServer.py index 7a8625b..8613111 100644 --- a/libs/servers/WatchServer.py +++ b/libs/servers/WatchServer.py @@ -26,9 +26,13 @@ def __init__(self, server_config: dict, sftp_config: dict): self.sftp_transfer = server_config['sftp_transfer'] self.opentera_transfer = server_config['opentera_transfer'] self.send_logs_only = server_config['send_logs_only'] + self.minimal_dataset_duration = server_config['minimal_dataset_duration'] self.synching_files = False + # Set file synching after a few seconds without receiving any data + self.file_syncher_timer = threading.Timer(20, self.sync_files) + def run(self): logging.info('Apple Watch Server starting...') @@ -59,7 +63,7 @@ def sync_files(self, check_internet: bool = True): logging.info("WatchServer: Synchronizing files with server...") if self.synching_files: - logging_info("*** WatchServer: Already synching files. Will wait for next time.") + logging.info("*** WatchServer: Already synching files. Will wait for next time.") return self.synching_files = True @@ -70,13 +74,39 @@ def sync_files(self, check_internet: bool = True): file_folders = [] for (dp, dn, f) in os.walk(base_folder): if f: + dp = dp.replace('/', os.sep) if self.send_logs_only: # Filter list of files to keep only log files folder_files = [file for file in f if file.lower().endswith("txt") or file.lower().endswith("oimi")] else: + if self.minimal_dataset_duration > 0: + # Filter dataset that are too small (<10 seconds) + if 'watch_logs.txt' in f: + import csv + try: + with open(os.path.join(dp, 'watch_logs.txt'), newline='') as csvfile: + log_reader = csv.reader(csvfile, delimiter='\t') + first_timestamp = None + for row in log_reader: + if len(row) == 0: + continue + if not first_timestamp: + first_timestamp = row[0] + last_timestamp = row[0] + duration = float(last_timestamp) - float(first_timestamp) + if duration <= self.minimal_dataset_duration: + # Must reject! Too short! + self.move_files([os.path.join(dp, file) for file in f], 'Rejected') + logging.info('Rejected folder ' + dp + ': dataset too small.') + continue # Move to next folder + except IOError: + pass # Ignore error and move on! + except AssertionError: + pass + folder_files = f files.extend(folder_files) - full_files.extend([os.path.join(dp.replace('/', os.sep), file) for file in folder_files]) + full_files.extend([os.path.join(dp, file) for file in folder_files]) file_folder = dp.replace(base_folder, '') file_folders.extend("/" + self.server_base_folder + "/" + file_folder.replace(os.sep, '/') for _ in folder_files) @@ -95,8 +125,9 @@ def sync_files(self, check_internet: bool = True): file_transferred_callback=self.file_was_processed, check_internet=check_internet) - # Set files as processed - self.move_processed_files() + # Set files as processed + if success: + self.move_processed_files() # for file in full_files: # WatchServer.file_was_processed(file) else: @@ -110,10 +141,10 @@ def file_was_processed(self, full_filepath: str): # Mark file as processed - will be moved later on to prevent conflicts self.processed_files.append(full_filepath) - def move_processed_files(self): - for full_filepath in self.processed_files: - # Move file to the "Processed" folder - target_file = full_filepath.replace(os.sep + 'ToProcess' + os.sep, os.sep + 'Processed' + os.sep) + def move_files(self, source_files, target_folder): + for full_filepath in source_files: + # Move file from "ToProcess" to the target folder + target_file = full_filepath.replace(os.sep + 'ToProcess' + os.sep, os.sep + target_folder + os.sep) # Create directory, if needed target_dir = os.path.dirname(target_file) @@ -121,15 +152,18 @@ def move_processed_files(self): os.makedirs(name=target_dir, exist_ok=True) except OSError as exc: logging.error('Error creating ' + target_dir + ': ' + exc.strerror) - raise + continue + # raise try: os.replace(full_filepath, target_file) except (OSError, IOError) as exc: logging.error('Error moving ' + full_filepath + ' to ' + target_file + ': ' + exc.strerror) - raise - # logging.info("Processed file: " + full_filepath) + continue + # raise + def move_processed_files(self): + self.move_files(self.processed_files, 'Processed') self.processed_files.clear() @staticmethod @@ -166,7 +200,7 @@ def do_GET(self): self.send_response(202) self.send_header('Content-type', 'cdrv-cmd/Disconnect') self.end_headers() - self.base_server.sync_files() + # self.base_server.sync_files() return self.send_response(200) @@ -196,6 +230,11 @@ def do_POST(self): self.end_headers() return + # Stop timer to send data, since we received new data + if self.base_server.file_syncher_timer.is_alive(): + self.base_server.file_syncher_timer.cancel() + self.base_server.file_syncher_timer = None + destination_dir = (self.base_server.data_path + '/ToProcess/' + device_name + '/' + file_path + '/')\ .replace('//', '/').replace('/', os.sep) destination_path = destination_dir + file_name @@ -281,19 +320,11 @@ def do_POST(self): # All is good! logging.info(device_name + " - " + file_name + ": transfer complete.") - # # Need to transfer using SFTP? - # if self.base_server.sftp_transfer: - # - # # Check if we need to transfer only log files and if it's a log file - # if not self.base_server.send_logs_only or \ - # (self.base_server.send_logs_only and file_type.lower() in ['txt', 'oimi']): - # file_name = Path(file_name).absolute() # Get full path - # file_server_location = "/" + self.base_server.server_base_folder + "/" + device_name + "/" + file_path - # sftp = threading.Thread(target=SFTPUploader.sftp_send, args=(self.base_server.sftp_config, - # file_server_location, file_name)) - # sftp.start() - self.send_response(200) self.send_header('Content-type', 'file-transfer/ack') self.end_headers() + # Start timer to sync data, if no other transfer occurs until timeout + self.base_server.file_syncher_timer = threading.Timer(20, self.base_server.sync_files) + self.base_server.file_syncher_timer.start() + diff --git a/libs/uploaders/SFTPUploader.py b/libs/uploaders/SFTPUploader.py index dbd3177..9ec6fce 100644 --- a/libs/uploaders/SFTPUploader.py +++ b/libs/uploaders/SFTPUploader.py @@ -40,6 +40,23 @@ def sftp_send(sftp_config: dict, files_directory_on_server: [str], files_to_tran if not (s.isdir(file_server_location)): s.mkdir(file_server_location) with s.cd(file_server_location): + file_name = os.path.basename(file_to_transfer) + # Query file size from server and compare to local file size + try: + remote_attr = s.stat(remotepath=file_server_location + '/' + file_name) + remote_size = remote_attr.st_size + local_attr = os.stat(file_to_transfer) + local_size = local_attr.st_size + if local_size == remote_size: + # Same size on server as local file, skip! + logging.info('Skipping ' + file_to_transfer + ': already present on server.') + if file_transferred_callback: + file_transferred_callback(file_to_transfer) + continue + except IOError: + # File not on server = ok, continue! + pass + # TODO: Use pysftp.Connection.stat() to find file size and only send if is different s.put(localpath=file_to_transfer, preserve_mtime=True, callback=lambda current, total: SFTPUploader.file_upload_progress(current, total, file_to_transfer, diff --git a/piHub.py b/piHub.py index bf4a8cb..7b4ad43 100644 --- a/piHub.py +++ b/piHub.py @@ -86,7 +86,8 @@ def handle_exception(exc_type, exc_value, exc_traceback): folderWatcher_server.start() servers.append(folderWatcher_server) - logging.info("PiHub started.") + logging.info("PiHub " + version_string + " started.") + try: # Main loop on main thread while True: