Skip to content

Commit

Permalink
Refs #14. Work in progress to manage device sessions in OpenTera
Browse files Browse the repository at this point in the history
  • Loading branch information
SBriere committed Jan 26, 2024
1 parent 6646383 commit 4b2a79e
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ data
/__pycache__/*.pyc
*.pyc
/config/PiHub.json
/config/secure_opentera
3 changes: 2 additions & 1 deletion config/PiHub_Defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"OpenTera": {
"hostname": "127.0.0.1",
"port": 40075,
"device_register_key": "1234567890"
"device_register_key": "1234567890",
"default_session_type_id": 1
}
}
4 changes: 2 additions & 2 deletions libs/config/ConfigManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def load_config(self, filename) -> bool:
if self.validate_config("SFTP", config_json['SFTP'], ['hostname', 'port', 'username', 'password']):
self.sftp_config = config_json["SFTP"]

if self.validate_config("OpenTera", config_json['OpenTera'], ['hostname', 'port',
'device_register_key']):
if self.validate_config("OpenTera", config_json['OpenTera'],
['hostname', 'port', 'device_register_key', 'default_session_type_id']):
self.opentera_config = config_json["OpenTera"]

if self.validate_config("WatchServer", config_json['WatchServer'], ['hostname', 'port', 'data_path',
Expand Down
136 changes: 125 additions & 11 deletions libs/servers/WatchServerOpenTera.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from libs.servers.WatchServerBase import WatchServerBase
from libs.servers.handlers.OpenTeraAppleWatchRequestHandler import OpenTeraAppleWatchRequestHandler

from opentera_libraries.device.DeviceComManager import DeviceComManager
import opentera_libraries.device.DeviceAPI as DeviceAPI
from cryptography.fernet import Fernet

import logging
import os
import threading
Expand All @@ -12,6 +16,7 @@ class WatchServerOpenTera(WatchServerBase):

_device_tokens = {} # Mapping of devices names and tokens
_device_timeouts = {} # Timers of devices, since a watch can "disappear" and not send a "disconnect" command
file_syncher_timer = None

def __init__(self, server_config: dict, opentera_config: dict):

Expand All @@ -25,20 +30,76 @@ def __init__(self, server_config: dict, opentera_config: dict):
self.allow_insecure_server = (self.opentera_config['hostname'] == 'localhost' or
self.opentera_config['hostname'] == '127.0.0.1')

# Load cryptographic key (to encrypt tokens, for example)
if not os.path.isfile('config/secure_opentera'):
# Create key
logging.info('WatchServerOpenTera: Generating encryption key...')
self.secure_key = Fernet.generate_key()
with open('config/secure_opentera', 'wb') as f:
f.write(self.secure_key)
else:
with open('config/secure_opentera', 'rb') as f:
self.secure_key = f.read()

# Open token file and decrypt tokens
self.load_tokens()

def run(self):
# Check if all files are on sync on the server (after the main server has started)
self.file_syncher_timer = threading.Timer(1, self.sync_files)
self.file_syncher_timer.start()

super().run()

def load_tokens(self):
tokens_file = os.path.join(self.data_path, 'tokens')
if os.path.isfile(tokens_file):
# Load tokens from file
with open(tokens_file, 'rb') as f:
tokens = f.read()

# Decrypt tokens
fernet = Fernet(self.secure_key)
device_tokens = fernet.decrypt(tokens).decode()
self._device_tokens = json.loads(device_tokens.replace('\'', '"'))

def save_tokens(self):
tokens_file = os.path.join(self.data_path, 'tokens')
# Encrypt tokens
fernet = Fernet(self.secure_key)
with open(tokens_file, 'wb') as f:
f.write(fernet.encrypt(str(self._device_tokens).encode()))

def update_device_token(self, device_name: str, token: str):
self._device_tokens[device_name] = token
update_required = device_name not in self._device_tokens
if device_name in self._device_tokens and self._device_tokens[device_name] != token:
update_required = True
if update_required:
self._device_tokens[device_name] = token
self.save_tokens()

def new_file_received(self, device_name: str, filename: str):
# Start timeout timer in case device doesn't properly disconnect
if device_name in self._device_timeouts:
# Stop previous timer
self._device_timeouts[device_name].cancel()

# self._device_timeouts[device_name] = threading.Timer(300, self.initiate_opentera_transfer, device_name)
# self._device_timeouts[device_name].start()
# Starts a timeout timer in case the device doesn't properly disconnect (and thus trigger the transfer)
self._device_timeouts[device_name] = threading.Timer(300, self.initiate_opentera_transfer,
kwargs={'device_name': device_name})
self._device_timeouts[device_name].start()

def device_disconnected(self, device_name: str):
self.initiate_opentera_transfer(device_name)

def sync_files(self):
logging.info("WatchServerOpenTera: Checking if any pending transfers...")
# Get base folder path
base_folder = os.path.join(self.data_path, 'ToProcess')
for device_name in os.listdir(base_folder):
self.initiate_opentera_transfer(device_name)

logging.info("All done!")

def initiate_opentera_transfer(self, device_name: str):
logging.info("WatchServerOpenTera: Initiating data transfer for " + device_name + "...")
Expand All @@ -48,14 +109,52 @@ def initiate_opentera_transfer(self, device_name: str):
del self._device_timeouts[device_name]

# Get base folder path
base_folder = self.data_path + '/ToProcess/' + device_name
base_folder = base_folder.replace('/', os.sep)
base_folder = os.path.join(self.data_path, 'ToProcess', device_name)
if not os.path.isdir(base_folder):
logging.error('Unable to locate data folder ' + base_folder)
return

for (dir_path, dir_name, files) in os.walk(base_folder):
# Create OpenTera com module
device_com = DeviceComManager(self.opentera_config['hostname'], self.opentera_config['port'],
self.allow_insecure_server)
if device_name not in self._device_tokens:
logging.error('No OpenTera token for ' + device_name + ' - aborting transfer.')
return

device_com.token = self._device_tokens[device_name]

# Do device login
response = device_com.do_get(DeviceAPI.ENDPOINT_DEVICE_LOGIN)
if response.status_code != 200:
logging.error('OpenTera: Unable to login device ' + device_name + ': ' + str(response.status_code) +
' - ' + response.text.strip())
return

device_infos = response.json()['device_info']
participants_infos = response.json()['participants_info']
session_types_infos = response.json()['session_types_info']

if len(participants_infos) == 0:
logging.error('No participant assigned to this device - will not transfer until this is fixed.')
return

# Find correct session type to use
possible_session_types_ids = [st['id_session_type'] for st in session_types_infos
if st['session_type_category'] == 2] # Filter data collect session types
if len(possible_session_types_ids) == 0:
logging.error('No session types available to this device - will not transfer until this is fixed.')
return

id_session_type = self.opentera_config['default_session_type_id']
if id_session_type not in possible_session_types_ids:
logging.warning('Default session type ID not in available session types - will use the first one.')
id_session_type = possible_session_types_ids[0]

# Browse all data folders
for (dir_path, dir_name, files) in os.walk(base_folder):
if dir_path == base_folder:
continue
logging.info('WatchServerOpenTera: Processing ' + dir_path)
# Read session.oimi file
session_file = os.path.join(dir_path, 'session.oimi')
session_file = session_file.replace('/', os.sep)
Expand All @@ -69,13 +168,13 @@ def initiate_opentera_transfer(self, device_name: str):
session_data_json = json.loads(session_data)

# Read watch_logs.txt file
log_file = os.path.join(dir_path, '/watch_logs.txt')
log_file = os.path.join(dir_path, 'watch_logs.txt')
log_file = log_file.replace('/', os.sep)
if not os.path.isfile(log_file):
logging.error('No watch logs file in ' + dir_path)
continue

with open(session_file) as f:
with open(log_file) as f:
logs_data = f.read().splitlines()

if len(logs_data) < 2:
Expand All @@ -92,17 +191,32 @@ def initiate_opentera_transfer(self, device_name: str):
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
continue

# Create session info structure
# Clean session parameters
session_params = session_data_json['description'].split('Settings:')[-1]
session_params = session_params.replace('\n', '').replace('\t', '').replace(',,', ',').replace('{,', '{'). \
replace(' ', '').replace(',}', '}')

session_comments = 'Created by ' + device_name + ', v' + session_data_json['appVersion']

# Create session
if 'timestamp' in session_data_json:
session_name = session_data_json['timestamp']
session_starttime = datetime.datetime.fromisoformat(session_data_json['timestamp'].replace('_', ' '))
session_starttime = datetime.datetime.fromisoformat(session_data_json['timestamp'].replace('_', ' ')).isoformat()
else:
logging.warning('No session timestamp found - using current time')
session_name = device_name
session_starttime = datetime.datetime.isoformat()

session_info = {'id_session': 0, 'session_name': session_name, 'session_start_datetime': session_starttime,
'session_duration': duration}
'session_duration': int(duration), 'session_status': 2, # Completed
'session_parameters': session_params, 'session_comments': session_comments,
'id_session_type': id_session_type}

response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSIONS, {'session': session_info})
if response.status_code != 200:
logging.error('OpenTera: Unable to create session - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue

# Create session events

Expand Down
3 changes: 2 additions & 1 deletion libs/servers/WatchServerSFTP.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
class WatchServerSFTP(WatchServerBase):

server = None
file_syncher_timer = None

def __init__(self, server_config: dict, sftp_config: dict):

Expand All @@ -24,7 +25,7 @@ def __init__(self, server_config: dict, sftp_config: dict):
self.synching_files = False

# Set file synching after a few seconds without receiving any data
self.file_syncher_timer = threading.Timer(20, self.sync_files)
# self.file_syncher_timer = threading.Timer(20, self.sync_files)

def run(self):
# Check if all files are on sync on the server (after the main server has started)
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
fabric == 3.2.2
paramiko == 3.4.0
opentera-libraries @ git+https://github.com/introlab/opentera-libraries@dev
opentera-libraries @ git+https://github.com/introlab/opentera-libraries@dev
cryptography ~= 42.0.0

0 comments on commit 4b2a79e

Please sign in to comment.