Skip to content

Commit

Permalink
Merge pull request #18 from CDRV/dev
Browse files Browse the repository at this point in the history
Main merge for 1.2.2 release
  • Loading branch information
SBriere authored May 16, 2024
2 parents 94159ef + 3881990 commit ea590cf
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

config_man = ConfigManager()

version_string = '1.2.1'
version_string = '1.2.2'
13 changes: 11 additions & 2 deletions libs/servers/WatchServerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class WatchServerBase(BaseServer):
server: ThreadingHTTPServer | None = None
_request_handler = None
processed_files = []
_connected_devices = []

def __init__(self, server_config: dict, request_handler):
super().__init__(server_config=server_config)
Expand Down Expand Up @@ -39,10 +40,18 @@ def stop(self):
self.server.server_close()

def new_file_received(self, device_name: str, filename: str):
logging.debug(self.__class__.__name__ + ' - unhandled new file received')
# logging.debug(self.__class__.__name__ + ' - unhandled new file received')
if device_name not in self._connected_devices:
self._connected_devices.append(device_name)

def device_disconnected(self, device_name: str):
logging.debug(self.__class__.__name__ + ' - unhandled device disconnected')
# logging.debug(self.__class__.__name__ + ' - unhandled device disconnected')
if device_name in self._connected_devices:
self._connected_devices.remove(device_name)

def device_connected(self, device_name: str):
if device_name not in self._connected_devices:
self._connected_devices.append(device_name)

@staticmethod
def move_files(source_files, target_folder):
Expand Down
140 changes: 116 additions & 24 deletions libs/servers/WatchServerOpenTera.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import threading
import json
import datetime
import struct

opentera_lock = Lock()

Expand All @@ -22,6 +23,8 @@ class WatchServerOpenTera(WatchServerBase):
_device_tokens = {} # Mapping of devices names and tokens
_device_timeouts = {} # Timers of devices, since a watch can "disappear" and not send a "disconnect" command

_device_retries = {} # Mapping of device names and number of retries, to automatically try to resend data

file_syncher_timer = None

def __init__(self, server_config: dict, opentera_config: dict):
Expand Down Expand Up @@ -52,7 +55,7 @@ def __init__(self, server_config: dict, opentera_config: dict):

def run(self):
# Check if all files are on sync on the server (after the main server has started)
self.file_syncher_timer = threading.Timer(1, self.sync_files)
self.file_syncher_timer = threading.Timer(30, self.sync_files)
self.file_syncher_timer.start()

super().run()
Expand Down Expand Up @@ -85,28 +88,50 @@ def update_device_token(self, device_name: str, token: str):
self.save_tokens()

def new_file_received(self, device_name: str, filename: str):
super().new_file_received(device_name, filename)
# Start timeout timer in case device doesn't properly disconnect
if device_name in self._device_timeouts:
# Stop previous timer
self._device_timeouts[device_name].cancel()

# Starts a timeout timer in case the device doesn't properly disconnect (and thus trigger the transfer)
self._device_timeouts[device_name] = threading.Timer(300, self.initiate_opentera_transfer,
self._device_timeouts[device_name] = threading.Timer(1200, self.device_disconnected,
kwargs={'device_name': device_name})
self._device_timeouts[device_name].start()

# Cancel sync timer on new file
if self.file_syncher_timer:
self.file_syncher_timer.cancel()
self.file_syncher_timer = None

def device_disconnected(self, device_name: str):
self.initiate_opentera_transfer(device_name)
super().device_disconnected(device_name)
# self.initiate_opentera_transfer(device_name)
# Wait 30 seconds after the last disconnected device to start transfer
self.file_syncher_timer = threading.Timer(30, self.sync_files)
self.file_syncher_timer.start()

def device_connected(self, device_name: str):
super().device_connected(device_name)
if self.file_syncher_timer:
self.file_syncher_timer.cancel()
self.file_syncher_timer = None

def sync_files(self):
self.file_syncher_timer = None
logging.info("WatchServerOpenTera: Checking if any pending transfers...")
if self._connected_devices:
logging.info("WatchServerOpenTera: Devices still connected: " + ', '.join(self._connected_devices) +
' - will transfer later.')
return

# Get base folder path
base_folder = os.path.join(self.data_path, 'ToProcess')
if os.path.isdir(base_folder):
for device_name in os.listdir(base_folder):
self.initiate_opentera_transfer(device_name)

logging.info("All done!")
# logging.info("All done!")

def initiate_opentera_transfer(self, device_name: str):
# Only one thread can transfer at a time - this prevent file conflicts
Expand Down Expand Up @@ -137,6 +162,7 @@ def initiate_opentera_transfer(self, device_name: str):
if response.status_code != 200:
logging.error('OpenTera: Unable to login device ' + device_name + ': ' + str(response.status_code) +
' - ' + response.text.strip())
self.plan_upload_retry(device_name)
return

device_infos = response.json()['device_info']
Expand All @@ -161,6 +187,7 @@ def initiate_opentera_transfer(self, device_name: str):
id_session_type = possible_session_types_ids[0]

# Browse all data folders
erronous_paths = []
for (dir_path, dir_name, files) in os.walk(base_folder):
if dir_path == base_folder:
continue
Expand All @@ -177,6 +204,17 @@ def initiate_opentera_transfer(self, device_name: str):

session_data_json = json.loads(session_data)

# Check if we have all the required files for that session
if 'files' in session_data_json:
required_files = session_data_json['files']
current_files = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]
missing_files = set(required_files).difference(current_files)
if missing_files:
# Missing files
logging.error('Missing files in dataset: ' + ', '.join(missing_files) + ' - ignoring dataset '
'for now')
continue

# Read watch_logs.txt file
log_file = os.path.join(dir_path, 'watch_logs.txt')
log_file = log_file.replace('/', os.sep)
Expand All @@ -196,6 +234,20 @@ def initiate_opentera_transfer(self, device_name: str):
first_timestamp = logs_data[0].split('\t')[0]
last_timestamp = logs_data[-1].split('\t')[0]
duration = float(last_timestamp) - float(first_timestamp)

# Update duration from "battery" file, if present, since "watch_logs" duration can be under-evaluated
# if watch battery was depleted or a new day started
battery_file = os.path.join(dir_path, 'watch_Battery.data')
battery_file = battery_file.replace('/', os.sep)
if os.path.isfile(battery_file):
with open(battery_file, mode='rb') as f:
f.seek(-10, os.SEEK_END)
batt_data = f.read(8) # Read the last timestamp of the file
if len(batt_data) == 8:
batt_last_timestamp = struct.unpack("<Q", batt_data)[0] / 1000
if batt_last_timestamp and batt_last_timestamp > float(last_timestamp):
duration = float(batt_last_timestamp) - float(first_timestamp)

if duration <= self.minimal_dataset_duration:
logging.info('Rejected folder ' + dir_path + ': dataset too small.')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Rejected'))
Expand Down Expand Up @@ -235,46 +287,86 @@ def initiate_opentera_transfer(self, device_name: str):

id_session = response.json()['id_session']

# Get list of assets already present for that session
response = device_com.do_get(DeviceAPI.ENDPOINT_DEVICE_ASSETS, {'id_session': id_session})
if response.status_code != 200:
logging.error('OpenTera: Unable to query assets for session: ' + str(response.status_code) +
' - ' + response.text.strip())
continue
session_file_names = [asset['asset_name'] for asset in response.json()]

# Create session events
session_events = self.watch_logs_to_events(logs_data)
for index, event in enumerate(session_events):
if index >= 100:
logging.warning('OpenTera: More than 100 session events for session ' + session_name +
' - ignoring the rest...')
break
event['id_session'] = id_session
event['id_session_event'] = 0
response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSION_EVENTS, {'session_event': event})
if response.status_code != 200:
logging.error('OpenTera: Unable to create session event - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue
if not session_file_names: # No files uploaded - create events
session_events = self.watch_logs_to_events(logs_data)
for index, event in enumerate(session_events):
if index >= 100:
logging.warning('OpenTera: More than 100 session events for session ' + session_name +
' - ignoring the rest...')
break
event['id_session'] = id_session
event['id_session_event'] = 0
response = device_com.do_post(DeviceAPI.ENDPOINT_DEVICE_SESSION_EVENTS,
{'session_event': event})
if response.status_code != 200:
logging.error(
'OpenTera: Unable to create session event - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
continue

# Upload all files to FileTransfer service
upload_errors = False
for data_file in files:
full_path = str(os.path.join(dir_path, data_file))
if data_file in session_file_names:
logging.warning('File ' + data_file + ' already in session - ignoring.')
continue
logging.info('Uploading ' + full_path + '...')
response = device_com.upload_file(id_session=id_session, asset_name=data_file, file_path=full_path)
if response.status_code != 200:
logging.error('OpenTera: Unable to upload file - skipping: ' + str(response.status_code) +
' - ' + response.text.strip())
upload_errors = True
continue

logging.info('WatchServerOpenTera: Done processing ' + dir_path)
self.processed_files.append(dir_path)

# All done for that folder - move to processed folder after a small delay to allow uploaded files to
# properly close
# threading.Timer(interval=1, function=self.move_folder,
# kwargs={'source_folder': dir_path,
# 'target_folder': dir_path.replace('ToProcess', 'Processed')}).start()
if not upload_errors:
self.processed_files.append(dir_path)
else:
erronous_paths.append(dir_path)

for dir_path in self.processed_files:
logging.info('Moving ' + dir_path + '...')
self.move_folder(dir_path, dir_path.replace('ToProcess', 'Processed'))
self.processed_files.clear()

logging.info('WatchServerOpenTera: Data transfer for ' + device_name + ' completed')

if erronous_paths:
self.plan_upload_retry(device_name)
else:
if device_name in self._device_retries:
del self._device_retries[device_name]

def plan_upload_retry(self, device_name):
if device_name not in self._device_retries.keys():
self._device_retries[device_name] = 1
else:
self._device_retries[device_name] = self._device_retries[device_name] + 1
if self._device_retries[device_name] > 5:
logging.warning('Too many retries for device ' + device_name + ' - abandonning automatic '
'transfer resuming')
del self._device_retries[device_name]
return
# Plan next retry timer
logging.warning('Errors occured in transfer for ' + device_name + ' - will retry later!')
retry_timer = threading.Timer(120, self.initiate_opentera_transfer,
kwargs={'device_name': device_name})
retry_timer.start()

# def file_upload_callback(self, monitor):
# pc = (monitor.bytes_read / monitor.len) * 100
# print(pc)

@staticmethod
def watch_logs_to_events(logs: list) -> list:
# Parse the watch logs to TeraSession events - {event_type, event_datetime, event_text, event_context}
Expand Down
44 changes: 21 additions & 23 deletions libs/servers/handlers/BaseAppleWatchRequestHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def do_GET(self):
self.send_response(202)
self.send_header('Content-type', 'cdrv-cmd/Connect')
self.end_headers()
if self.base_server:
self.base_server.device_connected(self.headers['Device-Name'])
return

if content_type == 'cdrv-cmd/Disconnect':
Expand All @@ -29,15 +31,15 @@ def do_GET(self):
self.send_response(202)
self.send_header('Content-type', 'cdrv-cmd/Disconnect')
self.end_headers()
self.base_server.device_disconnected(self.headers['Device-Name'])
if self.base_server:
self.base_server.device_disconnected(self.headers['Device-Name'])
return

self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()

def do_POST(self):

# Unpack metadata
content_type = self.headers['Content-Type']
content_length = int(self.headers['Content-Length'])
Expand Down Expand Up @@ -88,14 +90,14 @@ def do_POST(self):
# Supported file type?
if file_type.lower() in ['data', 'dat', 'csv', 'txt', 'oimi']:

if file_type.lower() in ['data', 'dat']:
# Binary file
fh = open(destination_path, 'wb')
text_format = False
else:
# Text file
fh = open(destination_path, 'w')
text_format = True
# if file_type.lower() in ['data', 'dat']:
# Binary file
fh = open(destination_path, 'wb')
# text_format = False
# else:
# Text file
# fh = open(destination_path, 'w')
# text_format = True

while content_size_remaining > 0:
if buffer_size > content_size_remaining:
Expand All @@ -108,21 +110,15 @@ def do_POST(self):
err_desc = err.args[0]
logging.error(device_name + " - Error occured while transferring " + file_name + ": " +
str(err_desc))
fh.close()
os.remove(destination_path)
return

if text_format:
fh.write(data.decode(errors="ignore")) # Ignore unknown characters and errors
else:
fh.write(data)
# if text_format:
# fh.write(data.decode(errors="ignore")) # Ignore unknown characters and errors
# else:
fh.write(data)
content_size_remaining -= buffer_size
# content_received = (content_length - content_size_remaining)
# pc = math.floor((content_received / content_length) * 100)
# if pc != last_pc:
# self.streamer.update_progress.emit(file_name, " (" + str(content_received) + "/ " +
# str(content_length) + ")", (content_length -
# content_size_remaining),
# content_length)
# last_pc = pc
fh.close()
else:
# self.streamer.add_log.emit(device_name + ": " + file_name + " - Type de fichier non-supporté: " +
Expand All @@ -143,6 +139,7 @@ def do_POST(self):
self.send_response(400)
self.send_header('Content-type', 'file-transfer/error')
self.end_headers()
os.remove(destination_path)
return

if content_length == 0 or (file_infos.st_size == 0 and content_length != 0):
Expand All @@ -151,10 +148,11 @@ def do_POST(self):
self.send_response(400)
self.send_header('Content-type', 'file-transfer/error')
self.end_headers()
os.remove(destination_path)
return

# All is good!
logging.info(device_name + " - " + file_name + ": transfer complete.")
logging.info("Completed: " + device_name + " - " + file_name)

self.send_response(200)
self.send_header('Content-type', 'file-transfer/ack')
Expand Down

0 comments on commit ea590cf

Please sign in to comment.