Skip to content

Commit

Permalink
Refs #14. Tested and adjusted OpenTera integration with multiple watc…
Browse files Browse the repository at this point in the history
…hes.
  • Loading branch information
SBriere committed Jan 30, 2024
1 parent f21d8d1 commit e6c67f9
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 136 deletions.
7 changes: 6 additions & 1 deletion libs/servers/WatchServerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ def move_files(source_files, target_folder):
def move_folder(source_folder, target_folder):
import shutil
try:
if os.path.exists(target_folder):
shutil.rmtree(target_folder)
shutil.move(source_folder, target_folder)
except shutil.Error as exc:
logging.critical('Error moving ' + source_folder + ' to ' + target_folder + ': ' + exc.strerror)
error = exc.strerror
if not error:
error = 'Unknown error'
logging.critical('Error moving ' + source_folder + ' to ' + target_folder + ': ' + error)

def file_was_processed(self, full_filepath: str):
# Mark file as processed - will be moved later on to prevent conflicts
Expand Down
278 changes: 146 additions & 132 deletions libs/servers/WatchServerOpenTera.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@

import opentera_libraries.device.DeviceAPI as DeviceAPI
from cryptography.fernet import Fernet
from pathlib import Path
from threading import Lock

import logging
import os
import threading
import json
import datetime

opentera_lock = Lock()


class WatchServerOpenTera(WatchServerBase):

_device_tokens = {} # Mapping of devices names and tokens
_device_timeouts = {} # Timers of devices, since a watch can "disappear" and not send a "disconnect" command

file_syncher_timer = None

def __init__(self, server_config: dict, opentera_config: dict):
Expand Down Expand Up @@ -105,153 +108,164 @@ def sync_files(self):
logging.info("All done!")

def initiate_opentera_transfer(self, device_name: str):
logging.info("WatchServerOpenTera: Initiating data transfer for " + device_name + "...")
if device_name in self._device_timeouts:
# Stop timer if needed
self._device_timeouts[device_name].cancel()
del self._device_timeouts[device_name]

# Get base folder path
base_folder = os.path.join(self.data_path, 'ToProcess', device_name)
if not os.path.isdir(base_folder):
logging.error('Unable to locate data folder ' + base_folder)
return

# Create OpenTera com module
device_com = DeviceComManager(self.opentera_config['hostname'], self.opentera_config['port'],
self.allow_insecure_server)
if device_name not in self._device_tokens:
logging.error('No OpenTera token for ' + device_name + ' - aborting transfer.')
return

device_com.token = self._device_tokens[device_name]

# Do device login
response = device_com.do_get(DeviceAPI.ENDPOINT_DEVICE_LOGIN)
if response.status_code != 200:
logging.error('OpenTera: Unable to login device ' + device_name + ': ' + str(response.status_code) +
' - ' + response.text.strip())
return

device_infos = response.json()['device_info']
participants_infos = response.json()['participants_info']
session_types_infos = response.json()['session_types_info']

if len(participants_infos) == 0:
logging.error('No participant assigned to this device - will not transfer until this is fixed.')
return

# Find correct session type to use
possible_session_types_ids = [st['id_session_type'] for st in session_types_infos
if st['session_type_category'] == SessionCategoryEnum.DATACOLLECT.value]
if len(possible_session_types_ids) == 0:
logging.error('No "Data Collect" session types available to this device - will not transfer until this is '
'fixed.')
return

id_session_type = self.opentera_config['default_session_type_id']
if id_session_type not in possible_session_types_ids:
logging.warning('Default session type ID not in available session types - will use the first one.')
id_session_type = possible_session_types_ids[0]

# Browse all data folders
for (dir_path, dir_name, files) in os.walk(base_folder):
if dir_path == base_folder:
continue
logging.info('WatchServerOpenTera: Processing ' + dir_path)
# Read session.oimi file
session_file = os.path.join(dir_path, 'session.oimi')
session_file = session_file.replace('/', os.sep)
if not os.path.isfile(session_file):
logging.error('No session file in ' + dir_path)
continue

with open(session_file) as f:
session_data = f.read()

session_data_json = json.loads(session_data)

# Read watch_logs.txt file
log_file = os.path.join(dir_path, 'watch_logs.txt')
log_file = log_file.replace('/', os.sep)
if not os.path.isfile(log_file):
logging.error('No watch logs file in ' + dir_path)
continue
# Only one thread can transfer at a time - this prevent file conflicts
with opentera_lock:
logging.info("WatchServerOpenTera: Initiating data transfer for " + device_name + "...")

if device_name in self._device_timeouts:
# Stop timer if needed
self._device_timeouts[device_name].cancel()

# Get base folder path
base_folder = os.path.join(self.data_path, 'ToProcess', device_name)
if not os.path.isdir(base_folder):
logging.error('Unable to locate data folder ' + base_folder)
return

# Create OpenTera com module
device_com = DeviceComManager(self.opentera_config['hostname'], self.opentera_config['port'],
self.allow_insecure_server)
if device_name not in self._device_tokens:
logging.error('No OpenTera token for ' + device_name + ' - aborting transfer.')
return

device_com.token = self._device_tokens[device_name]

# Do device login
response = device_com.do_get(DeviceAPI.ENDPOINT_DEVICE_LOGIN)
if response.status_code != 200:
logging.error('OpenTera: Unable to login device ' + device_name + ': ' + str(response.status_code) +
' - ' + response.text.strip())
return

device_infos = response.json()['device_info']
participants_infos = response.json()['participants_info']
session_types_infos = response.json()['session_types_info']

if len(participants_infos) == 0:
logging.error('No participant assigned to this device - will not transfer until this is fixed.')
return

# Find correct session type to use
possible_session_types_ids = [st['id_session_type'] for st in session_types_infos
if st['session_type_category'] == SessionCategoryEnum.DATACOLLECT.value]
if len(possible_session_types_ids) == 0:
logging.error('No "Data Collect" session types available to this device - will not transfer until this '
'is fixed.')
return

id_session_type = self.opentera_config['default_session_type_id']
if id_session_type not in possible_session_types_ids:
logging.warning('Default session type ID not in available session types - will use the first one.')
id_session_type = possible_session_types_ids[0]

# Browse all data folders
for (dir_path, dir_name, files) in os.walk(base_folder):
if dir_path == base_folder:
continue
logging.info('WatchServerOpenTera: Processing ' + dir_path)
# Read session.oimi file
session_file = os.path.join(dir_path, 'session.oimi')
session_file = session_file.replace('/', os.sep)
if not os.path.isfile(session_file):
logging.error('No session file in ' + dir_path)
continue

with open(log_file) as f:
logs_data = f.read().splitlines()
with open(session_file) as f:
session_data = f.read()

if len(logs_data) < 2:
logging.info('Empty log file - ignoring...')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
continue
session_data_json = json.loads(session_data)

# Compute duration
first_timestamp = logs_data[0].split('\t')[0]
last_timestamp = logs_data[-1].split('\t')[0]
duration = float(last_timestamp) - float(first_timestamp)
if duration <= self.minimal_dataset_duration:
logging.info('Rejected folder ' + dir_path + ': dataset too small.')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
continue
# Read watch_logs.txt file
log_file = os.path.join(dir_path, 'watch_logs.txt')
log_file = log_file.replace('/', os.sep)
if not os.path.isfile(log_file):
logging.error('No watch logs file in ' + dir_path)
continue

# Clean session parameters
session_params = session_data_json['description'].split('Settings:')[-1]
session_params = session_params.replace('\n', '').replace('\t', '').replace(',,', ',').replace('{,', '{'). \
replace(' ', '').replace(',}', '}')
with open(log_file) as f:
logs_data = f.read().splitlines()

session_comments = 'Created by ' + device_name + ', SensorLogger v' + session_data_json['appVersion']
if len(logs_data) < 2:
logging.info('Empty log file - ignoring...')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
continue

# Create session
if 'timestamp' in session_data_json:
session_starttime = datetime.datetime.fromisoformat(session_data_json['timestamp'].replace('_', ' '))
else:
logging.warning('No session timestamp found - using current time')
session_starttime = datetime.datetime.now()
# Compute duration
first_timestamp = logs_data[0].split('\t')[0]
last_timestamp = logs_data[-1].split('\t')[0]
duration = float(last_timestamp) - float(first_timestamp)
if duration <= self.minimal_dataset_duration:
logging.info('Rejected folder ' + dir_path + ': dataset too small.')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
continue

session_name = device_name + ' (PiHub) ' + session_starttime.strftime("%Y-%m-%d")
# Clean session parameters
session_params = session_data_json['description'].split('Settings:')[-1]
session_params = session_params.replace('\n', '').replace('\t', '').replace(',,', ',').replace('{,', '{'). \
replace(' ', '').replace(',}', '}')

session_info = {'id_session': 0, 'session_name': session_name,
'session_start_datetime': session_starttime.isoformat(),
'session_duration': int(duration), 'session_status': SessionStatus.STATUS_COMPLETED.value,
'session_parameters': session_params, 'session_comments': session_comments,
'id_session_type': id_session_type,
'session_participants': [part['participant_uuid'] for part in participants_infos]}
session_comments = 'Created by ' + device_name + ', SensorLogger v' + session_data_json['appVersion']

response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSIONS, {'session': session_info})
if response.status_code != 200:
logging.error('OpenTera: Unable to create session - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue
# Create session
if 'timestamp' in session_data_json:
session_starttime = datetime.datetime.fromisoformat(session_data_json['timestamp'].replace('_', ' '))
else:
logging.warning('No session timestamp found - using current time')
session_starttime = datetime.datetime.now()

id_session = response.json()['id_session']
session_name = device_name + ' (PiHub) ' + session_starttime.strftime("%Y-%m-%d")

# Create session events
session_events = self.watch_logs_to_events(logs_data)
for event in session_events:
event['id_session'] = id_session
event['id_session_event'] = 0
response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSION_EVENTS, {'session_event': event})
if response.status_code != 200:
logging.error('OpenTera: Unable to create session event - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue
session_info = {'id_session': 0, 'session_name': session_name,
'session_start_datetime': session_starttime.isoformat(),
'session_duration': int(duration), 'session_status': SessionStatus.STATUS_COMPLETED.value,
'session_parameters': session_params, 'session_comments': session_comments,
'id_session_type': id_session_type,
'session_participants': [part['participant_uuid'] for part in participants_infos]}

# Upload all files to FileTransfer service
for data_file in files:
full_path = str(os.path.join(dir_path, data_file))
logging.info('Uploading ' + full_path + '...')
response = device_com.upload_file(id_session=id_session, asset_name=data_file, file_path=full_path)
response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSIONS, {'session': session_info})
if response.status_code != 200:
logging.error('OpenTera: Unable to upload file - skipping: ' + str(response.status_code) +
logging.error('OpenTera: Unable to create session - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue

# All done for that folder - move to processed folder after a small delay to allow uploaded files to
# properly close
threading.Timer(interval=1, function=self.move_folder,
kwargs={'source_folder': dir_path,
'target_folder': dir_path.replace('ToProcess', 'Rejected')}).start()
id_session = response.json()['id_session']

# Create session events
session_events = self.watch_logs_to_events(logs_data)
for event in session_events:
event['id_session'] = id_session
event['id_session_event'] = 0
response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSION_EVENTS, {'session_event': event})
if response.status_code != 200:
logging.error('OpenTera: Unable to create session event - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue

# Upload all files to FileTransfer service
for data_file in files:
full_path = str(os.path.join(dir_path, data_file))
logging.info('Uploading ' + full_path + '...')
response = device_com.upload_file(id_session=id_session, asset_name=data_file, file_path=full_path)
if response.status_code != 200:
logging.error('OpenTera: Unable to upload file - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue

logging.info('WatchServerOpenTera: Done processing ' + dir_path)
self.processed_files.append(dir_path)

# All done for that folder - move to processed folder after a small delay to allow uploaded files to
# properly close
# threading.Timer(interval=1, function=self.move_folder,
# kwargs={'source_folder': dir_path,
# 'target_folder': dir_path.replace('ToProcess', 'Processed')}).start()

for dir_path in self.processed_files:
logging.info('Moving ' + dir_path + '...')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Processed'))
self.processed_files.clear()
logging.info('WatchServerOpenTera: Data transfer for ' + device_name + ' completed')

@staticmethod
def watch_logs_to_events(logs: list) -> list:
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
fabric == 3.2.2
paramiko == 3.4.0
fabric ~= 3.2.2
paramiko ~= 3.4.0
opentera-libraries @ git+https://github.com/introlab/opentera-libraries@dev
cryptography ~= 42.0.0
cryptography ~= 42.0.0

0 comments on commit e6c67f9

Please sign in to comment.