Skip to content

Commit

Permalink
Merge pull request #3 from CDRV/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
SBriere authored Sep 6, 2022
2 parents 5c59399 + 3d704d9 commit 36842ca
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 16 deletions.
14 changes: 11 additions & 3 deletions config/PiHub.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"General": {
"enable_logging": true,
"enable_bed_server": true,
"enable_watch_server": true,
"enable_logging": false,
"enable_bed_server": false,
"enable_watch_server": false,
"enable_folderWatcher_server": true,
"logs_path": "logs"
},
"WatchServer": {
Expand All @@ -20,6 +21,13 @@
"data_path": "data/bed",
"server_base_folder": "Bed"
},
"FolderWatcher": {
"hostname": "0.0.0.0",
"port": 30001,
"data_path": "data/folderData",
"server_base_folder": "FolderData",
"sensor_ID": "Sensor_0"
},
"SFTP": {
"hostname": "telesante.cdrv.ca",
"port": 40091,
Expand Down
5 changes: 5 additions & 0 deletions libs/config/ConfigManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self):
self.opentera_config = {}
self.bed_server_config = {}
self.watch_server_config = {}
self.folderWatcher_server_config = {}

def load_config(self, filename) -> bool:
try:
Expand Down Expand Up @@ -48,6 +49,10 @@ def load_config(self, filename) -> bool:
'server_base_folder']):
self.bed_server_config = config_json["BedServer"]

if self.validate_config("FolderWatcher", config_json['FolderWatcher'], ['data_path',
'server_base_folder', 'sensor_ID']):
self.folderWatcher_server_config = config_json["FolderWatcher"]

return True

@staticmethod
Expand Down
17 changes: 13 additions & 4 deletions libs/servers/WatchServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def __init__(self, server_config: dict, sftp_config: dict):
self.opentera_transfer = server_config['opentera_transfer']
self.send_logs_only = server_config['send_logs_only']

self.synching_files = False

def run(self):
logging.info('Apple Watch Server starting...')

Expand Down Expand Up @@ -55,6 +57,12 @@ def stop(self):

def sync_files(self, check_internet: bool = True):
logging.info("WatchServer: Synchronizing files with server...")

if self.synching_files:
logging_info("*** WatchServer: Already synching files. Will wait for next time.")
return

self.synching_files = True
# Build list of files to transfer
base_folder = self.data_path + '/ToProcess/'
files = []
Expand Down Expand Up @@ -82,10 +90,10 @@ def sync_files(self, check_internet: bool = True):
if self.sftp_transfer:
# Send files using sftp
# Sending files
SFTPUploader.sftp_send(sftp_config=self.sftp_config, files_to_transfer=full_files,
files_directory_on_server=file_folders,
file_transferred_callback=self.file_was_processed,
check_internet=check_internet)
success = SFTPUploader.sftp_send(sftp_config=self.sftp_config, files_to_transfer=full_files,
files_directory_on_server=file_folders,
file_transferred_callback=self.file_was_processed,
check_internet=check_internet)

# Set files as processed
self.move_processed_files()
Expand All @@ -96,6 +104,7 @@ def sync_files(self, check_internet: bool = True):
# Clean up empty folders
WatchServer.remove_empty_folders(Path(base_folder).absolute())
logging.info("WatchServer: Synchronization done.")
self.synching_files = False

def file_was_processed(self, full_filepath: str):
# Mark file as processed - will be moved later on to prevent conflicts
Expand Down
100 changes: 100 additions & 0 deletions libs/servers/folderWatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
##################################################
# PiHub main folder Watcher communication server
##################################################
# Authors: Antoine Guillerand
##################################################
from libs.servers.BaseServer import BaseServer
from libs.uploaders.SFTPUploader import SFTPUploader

import logging
from os import makedirs
from pathlib import Path

import threading
import os
import time


class FolderWatcher(BaseServer):
server = None

def __init__(self, server_config: dict, sftp_config: dict):
super().__init__(server_config=server_config)
self.sftp_config = sftp_config
self.server_base_folder = server_config['server_base_folder']
self.sensor_ID = server_config['sensor_ID']

def sync_files(self):
logging.info("FolderWatcher: Synchronizing files with server...")
full_path = Path(self.data_path)

# Sync local files with the ones on the server
try:
SFTPUploader.sftp_sync_last(sftp_config=self.sftp_config, local_base_path=str(full_path.absolute()),
remote_base_path=self.server_base_folder, check_internet=False)
# SFTPUploader.sftp_sync(sftp_config=self.sftp_config, local_base_path=str(full_path.absolute()),
# remote_base_path=self.server_base_folder)
except Exception as e:
logging.error("FolderWatcher: Unable to synchronize files - " + str(e))
logging.info("FolderWatcher: Synchronization done.")

def run(self):
logging.info('folderWatcher starting...')
path_to_watch = self.data_path + '/local_only/' + self.sensor_ID
logging.info("FolderWatcher: Path to watch: " + path_to_watch)
# Check if all files are on sync on the server, do it on a thread so the logger still start
if os.path.isdir(path_to_watch):
thread_sync = threading.Thread(target=self.sync_files)
thread_sync.start()
else:
logging.info("FolderWatcher: No Data Folder to sync at start")
os.makedirs(path_to_watch)
before = dict([(f, None) for f in os.listdir(path_to_watch)])

logging.info("folderWatcher started")
try:
# Watch the filepath every 1s and if number of file changes transfer files.
while 1:
time.sleep(1)
# Add this because we remove the folder if everything is transferred:
if not (os.path.isdir(path_to_watch)):
os.makedirs(path_to_watch)
after = dict([(f, None) for f in os.listdir(path_to_watch)])
added = [f for f in after if not f in before]
removed = [f for f in before if not f in after]
# Check if a file was added or removed
if added:
logging.info("FolderWatcher: Local File(s) Added: " + ", ".join(added))
for i in range(0, len(added)):
filename = self.data_path + '/local_only/' + self.sensor_ID + "/" + added[i]
# Send file using SFTP
file_server_directory = "/" + self.server_base_folder + "/" + self.sensor_ID
file_server_path = file_server_directory + "/" + added[i] #
temp_file = self.data_path + "/local_only/" + self.sensor_ID + "/tempData.txt"
file_transferred_directory = self.data_path + "/transferred/" + self.sensor_ID
logging.info("Try to create " + file_transferred_directory + ", dir exists = " +
str(not os.path.isdir(file_transferred_directory)))
if not os.path.isdir(file_transferred_directory):
makedirs(file_transferred_directory)
file_transferred_location = file_transferred_directory + "/" + added[i]
# Add a file merge before transfer
SFTPUploader.sftp_merge_and_send(sftp_config=self.sftp_config,
file_path_on_server=file_server_path,
file_server_location=file_server_directory,
temporary_file=temp_file,
file_transferred_location=file_transferred_location,
file_to_transfer=filename,
check_internet=False)
if removed:
logging.info("FolderWatcher: Local file(s) Removed: " + ", ".join(removed))
before = after

except OSError as e:
logging.critical(e.strerror)
return
except OverflowError as e:
logging.critical(str(e))
return

finally:
logging.info("folderWatcher stopped.")
16 changes: 10 additions & 6 deletions libs/uploaders/SFTPUploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def sftp_send(sftp_config: dict, files_directory_on_server: [str], files_to_tran
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
logging.info('About to send files to server at ' + sftp_config["hostname"] + ':' + str(sftp_config["port"]))
file_to_transfer = None
try:
with pysftp.Connection(host=sftp_config["hostname"], username=sftp_config["username"],
password=sftp_config["password"], port=sftp_config["port"],
Expand All @@ -49,7 +50,10 @@ def sftp_send(sftp_config: dict, files_directory_on_server: [str], files_to_tran
except (pysftp.exceptions.ConnectionException, pysftp.CredentialException,
pysftp.AuthenticationException, pysftp.HostKeysException,
paramiko.SSHException, paramiko.PasswordRequiredException) as exc:
logging.error('Error occurred transferring ' + file_to_transfer + ': ' + str(exc))
if file_to_transfer:
logging.error('Error occurred transferring ' + file_to_transfer + ': ' + str(exc))
else:
logging.error('Error occured while trying to transfer: ' + str(exc))
return False

logging.info('Files transfer complete!')
Expand Down Expand Up @@ -139,17 +143,17 @@ def sftp_sync_last(sftp_config: dict, remote_base_path: str, local_base_path: st
logging.info('Sensors complete folder list' + str(folders))
# Wait for internet connection
# PiHubHardware.wait_for_internet_infinite ()
logging.info("BedServer: Testing the internet connection...")
logging.info("SyncServer: Testing the internet connection...")
while not(Network.is_internet_connected()):
logging.info("BedServer: Connection failed, retry sync in 10min...")
logging.info("SyncServer: Connection failed, retry sync in 10min...")
time.sleep(600)
logging.info("BedServer: Pass, syncing files to server...")
logging.info("SyncServer: Pass, syncing files to server...")
# It is called sync_last but has been modified to sync all files!
for i in range(0, len(folders)):
filenames = next(walk(folders[i]), (None, None, []))[2] # [] if no file
for j in range(0, len(filenames)):
file_server_directory = remote_base_path + "/" + only_folders[i]
filename_2_transfer = folders[i] + "/" + filenames[j] # Only the last file in directory is 0 (change to
# something more robust)
filename_2_transfer = folders[i] + "/" + filenames[j] # all files are synced
file_server_path = file_server_directory + "/" + filenames[j]
file_transferred_directory = local_base_path + "/transferred/" + only_folders[i]
if not os.path.isdir(file_transferred_directory):
Expand Down
17 changes: 14 additions & 3 deletions piHub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from libs.config.ConfigManager import ConfigManager
from libs.servers.BedServer import BedServer
from libs.servers.WatchServer import WatchServer
from libs.servers.folderWatcher import FolderWatcher
from libs.hardware.PiHubHardware import PiHubHardware

from Globals import version_string
Expand All @@ -26,11 +27,11 @@ def handle_exception(exc_type, exc_value, exc_traceback):

sys.excepthook = handle_exception


if __name__ == '__main__':
# Logging module
################
from libs.logging.Logger import init_global_logger

init_global_logger()

# Init globals
Expand All @@ -46,6 +47,7 @@ def handle_exception(exc_type, exc_value, exc_traceback):
# Set logger parameters
if config_man.general_config["enable_logging"]:
from libs.logging.Logger import init_file_logger

init_file_logger(config_man.general_config["logs_path"])

# Initializing...
Expand Down Expand Up @@ -74,6 +76,16 @@ def handle_exception(exc_type, exc_value, exc_traceback):
watch_server.start()
servers.append(watch_server)

# Folder Watcher server
if config_man.general_config["enable_folderWatcher_server"]:
# Start Apple Watch server
folderWatcher_server = FolderWatcher(server_config=config_man.folderWatcher_server_config,
sftp_config=config_man.sftp_config)

# Start server
folderWatcher_server.start()
servers.append(folderWatcher_server)

logging.info("PiHub started.")
try:
# Main loop on main thread
Expand All @@ -83,11 +95,10 @@ def handle_exception(exc_type, exc_value, exc_traceback):
time.sleep(600)
# Check if we have Internet access or not
PiHubHardware.ensure_internet_is_available()

except (KeyboardInterrupt, SystemExit):
for server in servers:
server.stop()
logging.info("PiHub stopped by user.")
exit(0)
logging.info("PiHub stopped.")

0 comments on commit 36842ca

Please sign in to comment.