diff --git a/libs/servers/WatchServerBase.py b/libs/servers/WatchServerBase.py index bf08492..d0e8ec1 100644 --- a/libs/servers/WatchServerBase.py +++ b/libs/servers/WatchServerBase.py @@ -70,9 +70,14 @@ def move_files(source_files, target_folder): def move_folder(source_folder, target_folder): import shutil try: + if os.path.exists(target_folder): + shutil.rmtree(target_folder) shutil.move(source_folder, target_folder) except shutil.Error as exc: - logging.critical('Error moving ' + source_folder + ' to ' + target_folder + ': ' + exc.strerror) + error = exc.strerror + if not error: + error = 'Unknown error' + logging.critical('Error moving ' + source_folder + ' to ' + target_folder + ': ' + error) def file_was_processed(self, full_filepath: str): # Mark file as processed - will be moved later on to prevent conflicts diff --git a/libs/servers/WatchServerOpenTera.py b/libs/servers/WatchServerOpenTera.py index 24aff28..7f2d8f0 100644 --- a/libs/servers/WatchServerOpenTera.py +++ b/libs/servers/WatchServerOpenTera.py @@ -6,7 +6,7 @@ import opentera_libraries.device.DeviceAPI as DeviceAPI from cryptography.fernet import Fernet -from pathlib import Path +from threading import Lock import logging import os @@ -14,11 +14,14 @@ import json import datetime +opentera_lock = Lock() + class WatchServerOpenTera(WatchServerBase): _device_tokens = {} # Mapping of devices names and tokens _device_timeouts = {} # Timers of devices, since a watch can "disappear" and not send a "disconnect" command + file_syncher_timer = None def __init__(self, server_config: dict, opentera_config: dict): @@ -105,153 +108,164 @@ def sync_files(self): logging.info("All done!") def initiate_opentera_transfer(self, device_name: str): - logging.info("WatchServerOpenTera: Initiating data transfer for " + device_name + "...") - if device_name in self._device_timeouts: - # Stop timer if needed - self._device_timeouts[device_name].cancel() - del self._device_timeouts[device_name] - - # Get base folder path - base_folder = os.path.join(self.data_path, 'ToProcess', device_name) - if not os.path.isdir(base_folder): - logging.error('Unable to locate data folder ' + base_folder) - return - - # Create OpenTera com module - device_com = DeviceComManager(self.opentera_config['hostname'], self.opentera_config['port'], - self.allow_insecure_server) - if device_name not in self._device_tokens: - logging.error('No OpenTera token for ' + device_name + ' - aborting transfer.') - return - - device_com.token = self._device_tokens[device_name] - - # Do device login - response = device_com.do_get(DeviceAPI.ENDPOINT_DEVICE_LOGIN) - if response.status_code != 200: - logging.error('OpenTera: Unable to login device ' + device_name + ': ' + str(response.status_code) + - ' - ' + response.text.strip()) - return - - device_infos = response.json()['device_info'] - participants_infos = response.json()['participants_info'] - session_types_infos = response.json()['session_types_info'] - - if len(participants_infos) == 0: - logging.error('No participant assigned to this device - will not transfer until this is fixed.') - return - - # Find correct session type to use - possible_session_types_ids = [st['id_session_type'] for st in session_types_infos - if st['session_type_category'] == SessionCategoryEnum.DATACOLLECT.value] - if len(possible_session_types_ids) == 0: - logging.error('No "Data Collect" session types available to this device - will not transfer until this is ' - 'fixed.') - return - - id_session_type = self.opentera_config['default_session_type_id'] - if id_session_type not in possible_session_types_ids: - logging.warning('Default session type ID not in available session types - will use the first one.') - id_session_type = possible_session_types_ids[0] - - # Browse all data folders - for (dir_path, dir_name, files) in os.walk(base_folder): - if dir_path == base_folder: - continue - logging.info('WatchServerOpenTera: Processing ' + dir_path) - # Read session.oimi file - session_file = os.path.join(dir_path, 'session.oimi') - session_file = session_file.replace('/', os.sep) - if not os.path.isfile(session_file): - logging.error('No session file in ' + dir_path) - continue - - with open(session_file) as f: - session_data = f.read() - - session_data_json = json.loads(session_data) - - # Read watch_logs.txt file - log_file = os.path.join(dir_path, 'watch_logs.txt') - log_file = log_file.replace('/', os.sep) - if not os.path.isfile(log_file): - logging.error('No watch logs file in ' + dir_path) - continue + # Only one thread can transfer at a time - this prevent file conflicts + with opentera_lock: + logging.info("WatchServerOpenTera: Initiating data transfer for " + device_name + "...") + + if device_name in self._device_timeouts: + # Stop timer if needed + self._device_timeouts[device_name].cancel() + + # Get base folder path + base_folder = os.path.join(self.data_path, 'ToProcess', device_name) + if not os.path.isdir(base_folder): + logging.error('Unable to locate data folder ' + base_folder) + return + + # Create OpenTera com module + device_com = DeviceComManager(self.opentera_config['hostname'], self.opentera_config['port'], + self.allow_insecure_server) + if device_name not in self._device_tokens: + logging.error('No OpenTera token for ' + device_name + ' - aborting transfer.') + return + + device_com.token = self._device_tokens[device_name] + + # Do device login + response = device_com.do_get(DeviceAPI.ENDPOINT_DEVICE_LOGIN) + if response.status_code != 200: + logging.error('OpenTera: Unable to login device ' + device_name + ': ' + str(response.status_code) + + ' - ' + response.text.strip()) + return + + device_infos = response.json()['device_info'] + participants_infos = response.json()['participants_info'] + session_types_infos = response.json()['session_types_info'] + + if len(participants_infos) == 0: + logging.error('No participant assigned to this device - will not transfer until this is fixed.') + return + + # Find correct session type to use + possible_session_types_ids = [st['id_session_type'] for st in session_types_infos + if st['session_type_category'] == SessionCategoryEnum.DATACOLLECT.value] + if len(possible_session_types_ids) == 0: + logging.error('No "Data Collect" session types available to this device - will not transfer until this ' + 'is fixed.') + return + + id_session_type = self.opentera_config['default_session_type_id'] + if id_session_type not in possible_session_types_ids: + logging.warning('Default session type ID not in available session types - will use the first one.') + id_session_type = possible_session_types_ids[0] + + # Browse all data folders + for (dir_path, dir_name, files) in os.walk(base_folder): + if dir_path == base_folder: + continue + logging.info('WatchServerOpenTera: Processing ' + dir_path) + # Read session.oimi file + session_file = os.path.join(dir_path, 'session.oimi') + session_file = session_file.replace('/', os.sep) + if not os.path.isfile(session_file): + logging.error('No session file in ' + dir_path) + continue - with open(log_file) as f: - logs_data = f.read().splitlines() + with open(session_file) as f: + session_data = f.read() - if len(logs_data) < 2: - logging.info('Empty log file - ignoring...') - self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected')) - continue + session_data_json = json.loads(session_data) - # Compute duration - first_timestamp = logs_data[0].split('\t')[0] - last_timestamp = logs_data[-1].split('\t')[0] - duration = float(last_timestamp) - float(first_timestamp) - if duration <= self.minimal_dataset_duration: - logging.info('Rejected folder ' + dir_path + ': dataset too small.') - self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected')) - continue + # Read watch_logs.txt file + log_file = os.path.join(dir_path, 'watch_logs.txt') + log_file = log_file.replace('/', os.sep) + if not os.path.isfile(log_file): + logging.error('No watch logs file in ' + dir_path) + continue - # Clean session parameters - session_params = session_data_json['description'].split('Settings:')[-1] - session_params = session_params.replace('\n', '').replace('\t', '').replace(',,', ',').replace('{,', '{'). \ - replace(' ', '').replace(',}', '}') + with open(log_file) as f: + logs_data = f.read().splitlines() - session_comments = 'Created by ' + device_name + ', SensorLogger v' + session_data_json['appVersion'] + if len(logs_data) < 2: + logging.info('Empty log file - ignoring...') + self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected')) + continue - # Create session - if 'timestamp' in session_data_json: - session_starttime = datetime.datetime.fromisoformat(session_data_json['timestamp'].replace('_', ' ')) - else: - logging.warning('No session timestamp found - using current time') - session_starttime = datetime.datetime.now() + # Compute duration + first_timestamp = logs_data[0].split('\t')[0] + last_timestamp = logs_data[-1].split('\t')[0] + duration = float(last_timestamp) - float(first_timestamp) + if duration <= self.minimal_dataset_duration: + logging.info('Rejected folder ' + dir_path + ': dataset too small.') + self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected')) + continue - session_name = device_name + ' (PiHub) ' + session_starttime.strftime("%Y-%m-%d") + # Clean session parameters + session_params = session_data_json['description'].split('Settings:')[-1] + session_params = session_params.replace('\n', '').replace('\t', '').replace(',,', ',').replace('{,', '{'). \ + replace(' ', '').replace(',}', '}') - session_info = {'id_session': 0, 'session_name': session_name, - 'session_start_datetime': session_starttime.isoformat(), - 'session_duration': int(duration), 'session_status': SessionStatus.STATUS_COMPLETED.value, - 'session_parameters': session_params, 'session_comments': session_comments, - 'id_session_type': id_session_type, - 'session_participants': [part['participant_uuid'] for part in participants_infos]} + session_comments = 'Created by ' + device_name + ', SensorLogger v' + session_data_json['appVersion'] - response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSIONS, {'session': session_info}) - if response.status_code != 200: - logging.error('OpenTera: Unable to create session - skipping: ' + str(response.status_code) + - ' - ' + response.text.strip()) - continue + # Create session + if 'timestamp' in session_data_json: + session_starttime = datetime.datetime.fromisoformat(session_data_json['timestamp'].replace('_', ' ')) + else: + logging.warning('No session timestamp found - using current time') + session_starttime = datetime.datetime.now() - id_session = response.json()['id_session'] + session_name = device_name + ' (PiHub) ' + session_starttime.strftime("%Y-%m-%d") - # Create session events - session_events = self.watch_logs_to_events(logs_data) - for event in session_events: - event['id_session'] = id_session - event['id_session_event'] = 0 - response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSION_EVENTS, {'session_event': event}) - if response.status_code != 200: - logging.error('OpenTera: Unable to create session event - skipping: ' + str(response.status_code) + - ' - ' + response.text.strip()) - continue + session_info = {'id_session': 0, 'session_name': session_name, + 'session_start_datetime': session_starttime.isoformat(), + 'session_duration': int(duration), 'session_status': SessionStatus.STATUS_COMPLETED.value, + 'session_parameters': session_params, 'session_comments': session_comments, + 'id_session_type': id_session_type, + 'session_participants': [part['participant_uuid'] for part in participants_infos]} - # Upload all files to FileTransfer service - for data_file in files: - full_path = str(os.path.join(dir_path, data_file)) - logging.info('Uploading ' + full_path + '...') - response = device_com.upload_file(id_session=id_session, asset_name=data_file, file_path=full_path) + response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSIONS, {'session': session_info}) if response.status_code != 200: - logging.error('OpenTera: Unable to upload file - skipping: ' + str(response.status_code) + + logging.error('OpenTera: Unable to create session - skipping: ' + str(response.status_code) + ' - ' + response.text.strip()) continue - # All done for that folder - move to processed folder after a small delay to allow uploaded files to - # properly close - threading.Timer(interval=1, function=self.move_folder, - kwargs={'source_folder': dir_path, - 'target_folder': dir_path.replace('ToProcess', 'Rejected')}).start() + id_session = response.json()['id_session'] + + # Create session events + session_events = self.watch_logs_to_events(logs_data) + for event in session_events: + event['id_session'] = id_session + event['id_session_event'] = 0 + response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSION_EVENTS, {'session_event': event}) + if response.status_code != 200: + logging.error('OpenTera: Unable to create session event - skipping: ' + str(response.status_code) + + ' - ' + response.text.strip()) + continue + + # Upload all files to FileTransfer service + for data_file in files: + full_path = str(os.path.join(dir_path, data_file)) + logging.info('Uploading ' + full_path + '...') + response = device_com.upload_file(id_session=id_session, asset_name=data_file, file_path=full_path) + if response.status_code != 200: + logging.error('OpenTera: Unable to upload file - skipping: ' + str(response.status_code) + + ' - ' + response.text.strip()) + continue + + logging.info('WatchServerOpenTera: Done processing ' + dir_path) + self.processed_files.append(dir_path) + + # All done for that folder - move to processed folder after a small delay to allow uploaded files to + # properly close + # threading.Timer(interval=1, function=self.move_folder, + # kwargs={'source_folder': dir_path, + # 'target_folder': dir_path.replace('ToProcess', 'Processed')}).start() + + for dir_path in self.processed_files: + logging.info('Moving ' + dir_path + '...') + self.move_folder(dir_path, dir_path.replace('ToProcess', 'Processed')) + self.processed_files.clear() + logging.info('WatchServerOpenTera: Data transfer for ' + device_name + ' completed') @staticmethod def watch_logs_to_events(logs: list) -> list: diff --git a/requirements.txt b/requirements.txt index ff7a2d7..c412d30 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -fabric == 3.2.2 -paramiko == 3.4.0 +fabric ~= 3.2.2 +paramiko ~= 3.4.0 opentera-libraries @ git+https://github.com/introlab/opentera-libraries@dev -cryptography ~= 42.0.0 +cryptography ~= 42.0.0 \ No newline at end of file