Skip to content

Commit

Permalink
Refs #14. Restructured and added routing for "/api/device" calls, inc…
Browse files Browse the repository at this point in the history
…luding device registration. Work in progress to integrate with OpenTera.
  • Loading branch information
SBriere committed Jan 25, 2024
1 parent def8946 commit 6646383
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 13 deletions.
3 changes: 2 additions & 1 deletion config/PiHub_Defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
},
"OpenTera": {
"hostname": "127.0.0.1",
"port": 40075
"port": 40075,
"device_register_key": "1234567890"
}
}
3 changes: 2 additions & 1 deletion libs/config/ConfigManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def load_config(self, filename) -> bool:
if self.validate_config("SFTP", config_json['SFTP'], ['hostname', 'port', 'username', 'password']):
self.sftp_config = config_json["SFTP"]

if self.validate_config("OpenTera", config_json['OpenTera'], ['hostname', 'port']):
if self.validate_config("OpenTera", config_json['OpenTera'], ['hostname', 'port',
'device_register_key']):
self.opentera_config = config_json["OpenTera"]

if self.validate_config("WatchServer", config_json['WatchServer'], ['hostname', 'port', 'data_path',
Expand Down
15 changes: 13 additions & 2 deletions libs/servers/WatchServerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ def stop(self):
self.server.shutdown()
self.server.server_close()

def new_file_received(self, filename: str):
logging.error(self.__class__.__name__ + ' - unhandled new file received')
def new_file_received(self, device_name: str, filename: str):
logging.debug(self.__class__.__name__ + ' - unhandled new file received')

def device_disconnected(self, device_name: str):
logging.debug(self.__class__.__name__ + ' - unhandled device disconnected')

@staticmethod
def move_files(source_files, target_folder):
Expand All @@ -63,6 +66,14 @@ def move_files(source_files, target_folder):
continue
# raise

@staticmethod
def move_folder(source_folder, target_folder):
import shutil
try:
shutil.move(source_folder, target_folder)
except shutil.Error as exc:
logging.critical('Error moving ' + source_folder + ' to ' + target_folder + ': ' + exc.strerror)

def file_was_processed(self, full_filepath: str):
# Mark file as processed - will be moved later on to prevent conflicts
self.processed_files.append(full_filepath)
Expand Down
90 changes: 88 additions & 2 deletions libs/servers/WatchServerOpenTera.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,106 @@
import logging
import os
import threading
import json
import datetime


class WatchServerOpenTera(WatchServerBase):

_device_tokens = {} # Mapping of devices names and tokens
_device_timeouts = {} # Timers of devices, since a watch can "disappear" and not send a "disconnect" command

def __init__(self, server_config: dict, opentera_config: dict):

# Setup request handler
request_handler = OpenTeraAppleWatchRequestHandler

super().__init__(server_config=server_config, request_handler=request_handler)
self.opentera_config = opentera_config
self.opentera_server_url = ('https://' + self.opentera_config['hostname'] + ':' +
str(self.opentera_config['port']))
self.allow_insecure_server = (self.opentera_config['hostname'] == 'localhost' or
self.opentera_config['hostname'] == '127.0.0.1')

def run(self):
super().run()

def new_file_received(self, filename: str):
logging.error(self.__class__.__name__ + ' - TODO - Handle file!')
def update_device_token(self, device_name: str, token: str):
self._device_tokens[device_name] = token

def new_file_received(self, device_name: str, filename: str):
# Start timeout timer in case device doesn't properly disconnect
if device_name in self._device_timeouts:
# Stop previous timer
self._device_timeouts[device_name].cancel()

# self._device_timeouts[device_name] = threading.Timer(300, self.initiate_opentera_transfer, device_name)
# self._device_timeouts[device_name].start()

def initiate_opentera_transfer(self, device_name: str):
logging.info("WatchServerOpenTera: Initiating data transfer for " + device_name + "...")
if device_name in self._device_timeouts:
# Stop timer if needed
self._device_timeouts[device_name].cancel()
del self._device_timeouts[device_name]

# Get base folder path
base_folder = self.data_path + '/ToProcess/' + device_name
base_folder = base_folder.replace('/', os.sep)
if not os.path.isdir(base_folder):
logging.error('Unable to locate data folder ' + base_folder)
return

for (dir_path, dir_name, files) in os.walk(base_folder):

# Read session.oimi file
session_file = os.path.join(dir_path, 'session.oimi')
session_file = session_file.replace('/', os.sep)
if not os.path.isfile(session_file):
logging.error('No session file in ' + dir_path)
continue

with open(session_file) as f:
session_data = f.read()

session_data_json = json.loads(session_data)

# Read watch_logs.txt file
log_file = os.path.join(dir_path, '/watch_logs.txt')
log_file = log_file.replace('/', os.sep)
if not os.path.isfile(log_file):
logging.error('No watch logs file in ' + dir_path)
continue

with open(session_file) as f:
logs_data = f.read().splitlines()

if len(logs_data) < 2:
logging.info('Empty log file - ignoring...')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
continue

# Compute duration
first_timestamp = logs_data[0].split('\t')[0]
last_timestamp = logs_data[-1].split('\t')[0]
duration = float(last_timestamp) - float(first_timestamp)
if duration <= self.minimal_dataset_duration:
logging.info('Rejected folder ' + dir_path + ': dataset too small.')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
continue

# Create session info structure
if 'timestamp' in session_data_json:
session_name = session_data_json['timestamp']
session_starttime = datetime.datetime.fromisoformat(session_data_json['timestamp'].replace('_', ' '))
else:
logging.warning('No session timestamp found - using current time')
session_name = device_name
session_starttime = datetime.datetime.isoformat()

session_info = {'id_session': 0, 'session_name': session_name, 'session_start_datetime': session_starttime,
'session_duration': duration}

# Create session events

# Upload all files to FileTransfer service
2 changes: 1 addition & 1 deletion libs/servers/WatchServerSFTP.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def sync_files(self, check_internet: bool = True):
logging.info("WatchServerSFTP: Synchronization done.")
self.synching_files = False

def new_file_received(self, filename: str):
def new_file_received(self, device_name: str, filename: str):
# Start timer to batch transfer files in 20 seconds
self.file_syncher_timer = threading.Timer(20, self.sync_files)
self.file_syncher_timer.start()
4 changes: 2 additions & 2 deletions libs/servers/handlers/BaseAppleWatchRequestHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def do_GET(self):
self.send_response(202)
self.send_header('Content-type', 'cdrv-cmd/Disconnect')
self.end_headers()
# self.base_server.sync_files()
self.base_server.device_disconnected(self.headers['Device-Name'])
return

self.send_response(200)
Expand Down Expand Up @@ -162,4 +162,4 @@ def do_POST(self):

# Signal base server that we got new files
if self.base_server:
self.base_server.new_file_received(file_name)
self.base_server.new_file_received(device_name, file_name)
59 changes: 56 additions & 3 deletions libs/servers/handlers/OpenTeraAppleWatchRequestHandler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from libs.servers.handlers.BaseAppleWatchRequestHandler import BaseAppleWatchRequestHandler
from opentera_libraries.device.DeviceComManager import DeviceComManager
import urllib.parse

import logging
import requests


class OpenTeraAppleWatchRequestHandler(BaseAppleWatchRequestHandler):
Expand All @@ -9,10 +13,59 @@ def setup(self):

def do_GET(self):
if self.path.startswith('/api/device'):
if self.path.endswith('register'):
# Do a device register
pass
logging.debug('Routing request to /api/device to OpenTera...')
# Parse parameters from query
query_params = urllib.parse.urlparse(self.path).query
query_path = urllib.parse.urlparse(self.path).path
params = urllib.parse.parse_qs(query_params)
opentera_config = self.base_server.opentera_config

if query_path.endswith('register'):
device_com = DeviceComManager(server_url=opentera_config['hostname'],
server_port=opentera_config['port'],
allow_insecure=self.base_server.allow_insecure_server)
if 'name' not in params or 'type_key' not in params:
self.send_error(400, 'Bad parameters')
return
subtype_name = None
device_name = params['name'][0]
logging.info('Registering device "' + device_name + '"...')
if 'subtype_name' in params:
subtype_name = params['subtype_name'][0]
response = device_com.register_device(server_key=opentera_config['device_register_key'],
device_name=device_name,
device_type_key=params['type_key'][0],
device_subtype_name=subtype_name)
if response.status_code == 200:
logging.info("Registered!")
# Save token for future use!
self.base_server.update_device_token(device_name, response.json()['device_token'])
else:
logging.warning("Register failed.")
self.forward_opentera_response(response)
else:
response = requests.get(url=self.base_server.opentera_server_url + query_path, params=query_params,
headers=self.headers, verify=not self.base_server.allow_insecure_server)
# Copy response from OpenTera
self.forward_opentera_response(response)
return

# Catch token if available
content_type = self.headers['Content-Type']
if content_type == 'cdrv-cmd/Connect':
if 'Device-Name' in self.headers and 'Device-Token' in self.headers:
device_name = self.headers['Device-Name']
device_token = self.headers['Device-Token']
self.base_server.update_device_token(device_name, device_token)

super().do_GET()

def do_POST(self):
super().do_POST()

def forward_opentera_response(self, response: requests.Response):
self.send_response_only(response.status_code)
for header in response.headers:
self.send_header(header, response.headers[header])
self.end_headers()
self.wfile.write(bytes(response.text, 'utf-8'))
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
fabric == 3.2.2
paramiko == 3.4.0
paramiko == 3.4.0
opentera-libraries @ git+https://github.com/introlab/opentera-libraries@dev

0 comments on commit 6646383

Please sign in to comment.