diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..3d8de43 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +plane-notify diff --git a/ADSBX.py b/ADSBX.py new file mode 100644 index 0000000..629fbfb --- /dev/null +++ b/ADSBX.py @@ -0,0 +1,107 @@ +import requests +import json +import configparser +from datetime import datetime +from http.client import IncompleteRead +import urllib3 +import socket + +from config import MAIN_CONFIG + +API_VERSION = int(MAIN_CONFIG.get("ADSBX", "API_VERSION")) +API_KEY = MAIN_CONFIG.get("ADSBX", "API_KEY") + + +def pull(url, headers): + try: + response = requests.get(url, headers=headers, timeout=30) + print("HTTP Status Code:", response.status_code) + response.raise_for_status() + except ( + requests.HTTPError, + ConnectionError, + requests.Timeout, + urllib3.exceptions.ConnectionError, + ) as error_message: + print("Basic Connection Error") + print(error_message) + response = None + except ( + requests.RequestException, + IncompleteRead, + ValueError, + socket.timeout, + socket.gaierror, + ) as error_message: + print("Connection Error") + print(error_message) + response = None + except Exception as error_message: + print("Connection Error uncaught, basic exception for all") + print(error_message) + response = None + return response + + +def pull_adsbx(planes): + if API_VERSION not in [1, 2]: + raise ValueError("Bad ADSBX API Version") + + if MAIN_CONFIG.getboolean("ADSBX", "ENABLE_PROXY") is False: + if API_VERSION == 1: + if len(planes) > 1: + url = "https://adsbexchange.com/api/aircraft/json/" + + elif len(planes) == 1: + url = ( + f"https://adsbexchange.com/api/aircraft/icao/{next(planes.keys())}/" + ) + + elif API_VERSION == 2: + url = "https://adsbexchange.com/api/aircraft/v2/all" + + else: + if MAIN_CONFIG.has_option("ADSBX", "PROXY_HOST"): + url = MAIN_CONFIG.get("ADSBX", "PROXY_HOST") + if API_VERSION == 1: + url += "/api/aircraft/json/all" + if API_VERSION == 2: + url += "/api/aircraft/v2/all" + else: + raise ValueError("Proxy enabled but no host") + headers = {"api-auth": API_KEY, "Accept-Encoding": "gzip"} + response = pull(url, headers) + if response is not None: + try: + data = json.loads(response.text) + except (json.decoder.JSONDecodeError, ValueError) as error_message: + print("Error with JSON") + print(error_message) + data = None + except TypeError as error_message: + print("Type Error", error_message) + data = None + else: + if "msg" in data.keys() and data["msg"] != "No error": + raise ValueError("Error from ADSBX: msg = ", data["msg"]) + if "ctime" in data.keys(): + data_ctime = float(data["ctime"]) / 1000 + print("Data ctime:", datetime.utcfromtimestamp(data_ctime)) + if "now" in data.keys(): + data_now = float(data["now"]) / 1000 + print("Data now time:", datetime.utcfromtimestamp(data_now)) + print("Current UTC:", datetime.utcnow()) + else: + data = None + return data + + +def pull_date_ras(date): + url = f"https://globe.adsbexchange.com/globe_history/{date}/acas/acas.json" + headers = {"Accept-Encoding": "gzip"} + response = pull(url, headers) + if response is not None: + data = response.text.splitlines() + else: + data = None + return data diff --git a/__main__.py b/__main__.py index 232480a..86a9515 100644 --- a/__main__.py +++ b/__main__.py @@ -3,57 +3,84 @@ from colorama import Fore, Back, Style import platform import traceback -if platform.system() == "Windows": - from colorama import init - init(convert=True) -from planeClass import Plane + from datetime import datetime import pytz import os import signal +import sys +import requests +from zipfile import ZipFile + +from plane import Plane +from discord_utils import send_discord_message +from config import MAIN_CONFIG +from ADSBX import pull_date_ras + +if platform.system() == "Windows": + from colorama import init + + init(convert=True) + + abspath = os.path.abspath(__file__) dname = os.path.dirname(abspath) os.chdir(dname) -import sys + sys.path.extend([os.getcwd()]) -#Dependency Handling +# Dependency Handling if not os.path.isdir("./dependencies/"): os.mkdir("./dependencies/") -required_files = [("Roboto-Regular.ttf", 'https://github.com/googlefonts/roboto/blob/main/src/hinted/Roboto-Regular.ttf?raw=true'), ('airports.csv', 'https://ourairports.com/data/airports.csv'), ('regions.csv', 'https://ourairports.com/data/regions.csv'), ('ADSBX_Logo.png', "https://www.adsbexchange.com/wp-content/uploads/cropped-Stealth.png"), ('Mictronics_db.zip', "https://www.mictronics.de/aircraft-database/indexedDB.php")] -for file in required_files: - file_name = file[0] - url = file[1] - if not os.path.isfile("./dependencies/" + file_name): - print(file_name, "does not exist downloading now") - try: - import requests - file_content = requests.get(url) - - open(("./dependencies/" + file_name), 'wb').write(file_content.content) - except Exception as e: - raise e("Error getting", file_name, "from", url) - else: - print("Successfully got", file_name) - else: - print("Already have", file_name, "continuing") -if os.path.isfile("./dependencies/" + required_files[4][0]) and not os.path.isfile("./dependencies/aircrafts.json"): + +REQUIRED_FILES = [ + ( + "Roboto-Regular.ttf", + "https://github.com/googlefonts/roboto/blob/main/src/hinted/Roboto-Regular.ttf?raw=true", + ), + ("airports.csv", "https://ourairports.com/data/airports.csv"), + ("regions.csv", "https://ourairports.com/data/regions.csv"), + ( + "ADSBX_Logo.png", + "https://www.adsbexchange.com/wp-content/uploads/cropped-Stealth.png", + ), + ("Mictronics_db.zip", "https://www.mictronics.de/aircraft-database/indexedDB.php"), +] + +for file in REQUIRED_FILES: + file_name, url = file + if not os.path.isfile("./dependencies/" + file_name): + print(file_name, "does not exist downloading now") + try: + file_content = requests.get(url) + + with open(f"./dependencies/{file_name}", "wb") as f: + f.write(file_content.content) + + except Exception as e: + raise e(f"Error getting{file_name} from {url}") + else: + print(f"Successfully got {file_name}") + else: + print(f"Already have {file_name} continuing") + +if os.path.isfile("./dependencies/" + REQUIRED_FILES[4][0]) and not os.path.isfile( + "./dependencies/aircrafts.json" +): print("Extracting Mictronics DB") - from zipfile import ZipFile - with ZipFile("./dependencies/" + required_files[4][0], 'r') as mictronics_db: + with ZipFile(f"./dependencies/{REQUIRED_FILES[4][0]}", "r") as mictronics_db: mictronics_db.extractall("./dependencies/") -main_config = configparser.ConfigParser() -print(os.getcwd()) -main_config.read('./configs/mainconf.ini') -source = main_config.get('DATA', 'SOURCE') -if main_config.getboolean('DISCORD', 'ENABLE'): - from defDiscord import sendDis - sendDis("Started", main_config) +source = MAIN_CONFIG.get("DATA", "SOURCE") +if MAIN_CONFIG.getboolean("DISCORD", "ENABLE"): + send_discord_message("Started", MAIN_CONFIG) + + def service_exit(signum, frame): - if main_config.getboolean('DISCORD', 'ENABLE'): - from defDiscord import sendDis - sendDis("Service Stop", main_config) + if MAIN_CONFIG.getboolean("DISCORD", "ENABLE"): + send_discord_message("Service Stop", MAIN_CONFIG) raise SystemExit("Service Stop") + + signal.signal(signal.SIGTERM, service_exit) if os.path.isfile("lookup_route.py"): print("Route lookup is enabled") @@ -62,45 +89,54 @@ def service_exit(signum, frame): try: print("Source is set to", source) - import sys - #Setup plane objects from plane configs + # Setup plane objects from plane configs planes = {} print("Found the following configs") for dirpath, dirname, filename in os.walk("./configs"): - for filename in [f for f in filename if f.endswith(".ini") and f != "mainconf.ini"]: - if not "disabled" in dirpath: - print(os.path.join(dirpath, filename)) - plane_config = configparser.ConfigParser() - plane_config.read((os.path.join(dirpath, filename))) - #Creates a Key labeled the ICAO of the plane, with the value being a plane object - planes[plane_config.get('DATA', 'ICAO').upper()] = Plane(plane_config.get('DATA', 'ICAO'), os.path.join(dirpath, filename), plane_config) - - running_Count = 0 + for filename in [ + f for f in filename if f.endswith(".ini") and f != "mainconf.ini" + ]: + if not "disabled" in dirpath: + print(os.path.join(dirpath, filename)) + plane_config = configparser.ConfigParser() + plane_config.read((os.path.join(dirpath, filename))) + # Creates a Key labeled the ICAO of the plane, with the value being a plane object + planes[plane_config.get("DATA", "ICAO").upper()] = Plane( + plane_config.get("DATA", "ICAO"), + os.path.join(dirpath, filename), + plane_config, + ) + + running_count = 0 failed_count = 0 try: - tz = pytz.timezone(main_config.get('DATA', 'TZ')) + tz = pytz.timezone(MAIN_CONFIG.get("DATA", "TZ")) except pytz.exceptions.UnknownTimeZoneError: tz = pytz.UTC last_ra_count = None while True: datetime_tz = datetime.now(tz) if datetime_tz.hour == 0 and datetime_tz.minute == 0: - running_Count = 0 - running_Count +=1 + running_count = 0 + running_count += 1 start_time = time.time() - header = ("-------- " + str(running_Count) + " -------- " + str(datetime_tz.strftime("%I:%M:%S %p")) + " ---------------------------------------------------------------------------") - print (Back.GREEN + Fore.BLACK + header[0:100] + Style.RESET_ALL) + header = ( + "-------- " + + str(running_count) + + " -------- " + + str(datetime_tz.strftime("%I:%M:%S %p")) + + " ---------------------------------------------------------------------------" + ) + print(Back.GREEN + Fore.BLACK + header[0:100] + Style.RESET_ALL) if source == "ADSBX": - #ACAS data - from defADSBX import pull_date_ras - import ast + # ACAS data today = datetime.utcnow() date = today.strftime("%Y/%m/%d") ras = pull_date_ras(date) sorted_ras = {} if ras is not None: - #Testing RAs - #if last_ra_count is not None: + # Testing RAs + # if last_ra_count is not None: # with open('./testing/acastest.json') as f: # data = f.readlines() # ras += data @@ -109,33 +145,34 @@ def service_exit(signum, frame): print(abs(ra_count - last_ra_count), "new Resolution Advisories") for ra_num, ra in enumerate(ras[last_ra_count:]): ra = ast.literal_eval(ra) - if ra['hex'].upper() in planes.keys(): - if ra['hex'].upper() not in sorted_ras.keys(): - sorted_ras[ra['hex'].upper()] = [ra] + if ra["hex"].upper() in planes.keys(): + if ra["hex"].upper() not in sorted_ras.keys(): + sorted_ras[ra["hex"].upper()] = [ra] else: - sorted_ras[ra['hex'].upper()].append(ra) + sorted_ras[ra["hex"].upper()].append(ra) else: print("No new Resolution Advisories") last_ra_count = ra_count for key, obj in planes.items(): if sorted_ras != {} and key in sorted_ras.keys(): - print(key, "has", len(sorted_ras[key]), "RAs") - obj.check_new_ras(sorted_ras[key]) + print(key, "has", len(sorted_ras[key]), "RAs") + obj.check_new_ras(sorted_ras[key]) obj.expire_ra_types() - #Normal API data - api_version = int(main_config.get('ADSBX', 'API_VERSION')) + # Normal API data + api_version = int(MAIN_CONFIG.get("ADSBX", "API_VERSION")) if api_version == 2: - icao_key = 'hex' + icao_key = "hex" elif api_version == 1: - icao_key = 'icao' + icao_key = "icao" else: raise ValueError("Invalid API Version") - from defADSBX import pull_adsbx + from ADSBX import pull_adsbx + data = pull_adsbx(planes) if data is not None: - if data['ac'] is not None: + if data["ac"] is not None: data_indexed = {} - for planeData in data['ac']: + for planeData in data["ac"]: data_indexed[planeData[icao_key].upper()] = planeData for key, obj in planes.items(): try: @@ -151,7 +188,8 @@ def service_exit(signum, frame): else: failed_count += 1 elif source == "OPENS": - from defOpenSky import pull_opensky + from opensky_utils import pull_opensky + planeData, failed = pull_opensky(planes) if failed == False: if planeData != None and planeData.states != []: @@ -170,46 +208,62 @@ def service_exit(signum, frame): obj.run_empty() elif failed: failed_count += 1 - if failed_count >= 10 and main_config.getboolean('DATA', 'FAILOVER'): + if failed_count >= 10 and MAIN_CONFIG.getboolean("DATA", "FAILOVER"): if source == "OPENS": source = "ADSBX" elif source == "ADSBX": source = "OPENS" failed_count = 0 - if main_config.getboolean('DISCORD', 'ENABLE'): - from defDiscord import sendDis - sendDis(str("Failed over to " + source), main_config) + if MAIN_CONFIG.getboolean("DISCORD", "ENABLE"): + send_discord_message(str("Failed over to " + source), MAIN_CONFIG) elapsed_calc_time = time.time() - start_time datetime_tz = datetime.now(tz) - footer = "-------- " + str(running_Count) + " -------- " + str(datetime_tz.strftime("%I:%M:%S %p")) + " ------------------------Elapsed Time- " + str(round(elapsed_calc_time, 3)) + " -------------------------------------" - print (Back.GREEN + Fore.BLACK + footer[0:100] + Style.RESET_ALL) + footer = ( + "-------- " + + str(running_count) + + " -------- " + + str(datetime_tz.strftime("%I:%M:%S %p")) + + " ------------------------Elapsed Time- " + + str(round(elapsed_calc_time, 3)) + + " -------------------------------------" + ) + print(Back.GREEN + Fore.BLACK + footer[0:100] + Style.RESET_ALL) sleep_sec = 30 - for i in range(sleep_sec,0,-1): + for i in range(sleep_sec, 0, -1): if i < 10: i = " " + str(i) sys.stdout.write("\r") sys.stdout.write(Back.RED + "Sleep {00000000}".format(i) + Style.RESET_ALL) sys.stdout.flush() time.sleep(1) - sys.stdout.write(Back.RED + ('\x1b[1K\r' +"Slept for " +str(sleep_sec)) + Style.RESET_ALL) + sys.stdout.write( + Back.RED + ("\x1b[1K\r" + "Slept for " + str(sleep_sec)) + Style.RESET_ALL + ) print() except KeyboardInterrupt as e: print(e) - if main_config.getboolean('DISCORD', 'ENABLE'): - from defDiscord import sendDis - sendDis(str("Manual Exit: " + str(e)), main_config) + if MAIN_CONFIG.getboolean("DISCORD", "ENABLE"): + send_discord_message(str("Manual Exit: " + str(e)), MAIN_CONFIG) except Exception as e: - if main_config.getboolean('DISCORD', 'ENABLE'): + if MAIN_CONFIG.getboolean("DISCORD", "ENABLE"): try: - os.remove('crash_latest.log') + os.remove("crash_latest.log") except OSError: pass import logging - logging.basicConfig(filename='crash_latest.log', filemode='w', format='%(asctime)s - %(message)s') + + logging.basicConfig( + filename="crash_latest.log", + filemode="w", + format="%(asctime)s - %(message)s", + ) logging.Formatter.converter = time.gmtime logging.error(e) logging.error(str(traceback.format_exc())) - from defDiscord import sendDis - sendDis(str("Error Exiting: " + str(e) + "Failed on " + key), main_config, "crash_latest.log") - raise e \ No newline at end of file + send_discord_message( + str("Error Exiting: " + str(e) + "Failed on " + key), + MAIN_CONFIG, + "crash_latest.log", + ) + raise e diff --git a/airport_utils.py b/airport_utils.py new file mode 100644 index 0000000..7a9d72e --- /dev/null +++ b/airport_utils.py @@ -0,0 +1,54 @@ +import csv +from geopy.distance import geodesic + + +def add_airport_region(airport_dict): + # Get full region/state name from iso region name + with open("./dependencies/regions.csv", "r", encoding="utf-8") as regions_csv: + regions_csv = csv.DictReader(filter(lambda row: row[0] != "#", regions_csv)) + for region in regions_csv: + if region["code"] == airport_dict["iso_region"]: + airport_dict["region"] = region["name"] + return airport_dict + + +def get_closest_airport(latitude, longitude, allowed_types): + plane = latitude, longitude + with open("./dependencies/airports.csv", "r", encoding="utf-8") as airport_csv: + airport_csv_reader = csv.DictReader( + filter(lambda row: row[0] != "#", airport_csv) + ) + for airport in airport_csv_reader: + if airport["type"] in allowed_types: + airport_coord = ( + float(airport["latitude_deg"]), + float(airport["longitude_deg"]), + ) + airport_dist = float((geodesic(plane, airport_coord).mi)) + if "closest_airport_dict" not in locals(): + closest_airport_dict = airport + closest_airport_dist = airport_dist + elif airport_dist < closest_airport_dist: + closest_airport_dict = airport + closest_airport_dist = airport_dist + closest_airport_dict["distance_mi"] = closest_airport_dist + # Convert indent key to icao key as its labeled icao in other places not ident + closest_airport_dict["icao"] = closest_airport_dict.pop("gps_code") + closest_airport_dict = add_airport_region(closest_airport_dict) + return closest_airport_dict + + +def get_airport_by_icao(icao): + with open("./dependencies/airports.csv", "r", encoding="utf-8") as airport_csv: + airport_csv_reader = csv.DictReader( + filter(lambda row: row[0] != "#", airport_csv) + ) + for airport in airport_csv_reader: + if airport["gps_code"] == icao: + matching_airport = airport + # Convert indent key to icao key as its labeled icao in other places not ident + matching_airport["icao"] = matching_airport.pop("gps_code") + break + matching_airport = add_airport_region(matching_airport) + return matching_airport + diff --git a/calculate_headings.py b/calculate_headings.py index 20e5697..b4cb273 100644 --- a/calculate_headings.py +++ b/calculate_headings.py @@ -1,26 +1,51 @@ +from math import atan2, cos, radians, sin, degrees + +DIRECTIONS = [ + "N", + "NNE", + "NE", + "ENE", + "E", + "ESE", + "SE", + "SSE", + "S", + "SSW", + "SW", + "WSW", + "W", + "WNW", + "NW", + "NNW", +] + + def calculate_from_bearing(frm, to): """Calculate inital bearing from one coordinate to next (two tuples of coordinates(lat/lng) in degrees in, returns single bearing)""" - #https://gis.stackexchange.com/questions/228656/finding-compass-direction-between-two-distant-gps-points - from math import atan2, cos, radians, sin, degrees + # https://gis.stackexchange.com/questions/228656/finding-compass-direction-between-two-distant-gps-points + frm = (radians(frm[0]), radians(frm[1])) to = (radians(to[0]), radians(to[1])) - y = sin(to[1]- frm[1]) * cos(to[0]) - x = cos(frm[0]) * sin(to[0]) - sin(frm[0]) * cos(to[0]) * cos(to[1]-frm[1]) + y = sin(to[1] - frm[1]) * cos(to[0]) + x = cos(frm[0]) * sin(to[0]) - sin(frm[0]) * cos(to[0]) * cos(to[1] - frm[1]) from_bearing = degrees(atan2(y, x)) if from_bearing < 0: from_bearing += 360 return from_bearing + + def calculate_cardinal(d): """Finds cardinal direction from bearing degree""" - dirs = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW'] - ix = int(round(d / (360. / len(dirs)))) - card = dirs[ix % len(dirs)] + ix = int(round(d / (360 / len(DIRECTIONS)))) + card = DIRECTIONS[ix % len(DIRECTIONS)] print(card) return card + + def calculate_deg_change(new_heading, original_heading): """Calculates change between two headings, returns negative degree if change is left, positive if right""" - normal = abs(original_heading-new_heading) - across_inital = 360 - abs(original_heading-new_heading) + normal = abs(original_heading - new_heading) + across_inital = 360 - abs(original_heading - new_heading) if across_inital < normal: direction = "left" if original_heading < new_heading else "right" track_change = across_inital diff --git a/config.py b/config.py new file mode 100644 index 0000000..ac717ba --- /dev/null +++ b/config.py @@ -0,0 +1,4 @@ +import configparser + +MAIN_CONFIG = configparser.ConfigParser() +MAIN_CONFIG.read("./configs/mainconf.ini") diff --git a/defADSBX.py b/defADSBX.py deleted file mode 100644 index 763bf7f..0000000 --- a/defADSBX.py +++ /dev/null @@ -1,91 +0,0 @@ -import requests -import json -import configparser -from datetime import datetime -from http.client import IncompleteRead -import http.client as http -import urllib3 -import socket -main_config = configparser.ConfigParser() -main_config.read('./configs/mainconf.ini') -api_version = main_config.get('ADSBX', 'API_VERSION') - -def pull(url, headers): - try: - response = requests.get(url, headers = headers, timeout=30) - print ("HTTP Status Code:", response.status_code) - response.raise_for_status() - except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message: - print("Basic Connection Error") - print(error_message) - response = None - except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message: - print("Connection Error") - print(error_message) - response = None - except Exception as error_message: - print("Connection Error uncaught, basic exception for all") - print(error_message) - response = None - return response - -def pull_adsbx(planes): - api_version = int(main_config.get('ADSBX', 'API_VERSION')) - if api_version not in [1, 2]: - raise ValueError("Bad ADSBX API Version") - if main_config.getboolean('ADSBX', 'ENABLE_PROXY') is False: - if api_version == 1: - if len(planes) > 1: - url = "https://adsbexchange.com/api/aircraft/json/" - elif len(planes) == 1: - url = "https://adsbexchange.com/api/aircraft/icao/" + str(list(planes.keys())[0]) + "/" - elif api_version == 2: - url = "https://adsbexchange.com/api/aircraft/v2/all" - else: - if main_config.has_option('ADSBX', 'PROXY_HOST'): - if api_version == 1: - url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/json/all" - if api_version == 2: - url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/v2/all" - else: - raise ValueError("Proxy enabled but no host") - headers = { - 'api-auth': main_config.get('ADSBX', 'API_KEY'), - 'Accept-Encoding': 'gzip' - } - response = pull(url, headers) - if response is not None: - try: - data = json.loads(response.text) - except (json.decoder.JSONDecodeError, ValueError) as error_message: - print("Error with JSON") - print(error_message) - data = None - except TypeError as error_message: - print("Type Error", error_message) - data = None - else: - if "msg" in data.keys() and data['msg'] != "No error": - raise ValueError("Error from ADSBX: msg = ", data['msg']) - if "ctime" in data.keys(): - data_ctime = float(data['ctime']) / 1000.0 - print("Data ctime:",datetime.utcfromtimestamp(data_ctime)) - if "now" in data.keys(): - data_now = float(data['now']) / 1000.0 - print("Data now time:",datetime.utcfromtimestamp(data_now)) - print("Current UTC:", datetime.utcnow()) - else: - data = None - return data - -def pull_date_ras(date): - url = f"https://globe.adsbexchange.com/globe_history/{date}/acas/acas.json" - headers = { - 'Accept-Encoding': 'gzip' - } - response = pull(url, headers) - if response is not None: - data = response.text.splitlines() - else: - data = None - return data \ No newline at end of file diff --git a/defAirport.py b/defAirport.py deleted file mode 100644 index ed64da8..0000000 --- a/defAirport.py +++ /dev/null @@ -1,41 +0,0 @@ -import csv -import math -def add_airport_region(airport_dict): - #Get full region/state name from iso region name - with open('./dependencies/regions.csv', 'r', encoding='utf-8') as regions_csv: - regions_csv = csv.DictReader(filter(lambda row: row[0]!='#', regions_csv)) - for region in regions_csv: - if region['code'] == airport_dict['iso_region']: - airport_dict['region'] = region['name'] - return airport_dict -def getClosestAirport(latitude, longitude, allowed_types): - from geopy.distance import geodesic - plane = (latitude, longitude) - with open('./dependencies/airports.csv', 'r', encoding='utf-8') as airport_csv: - airport_csv_reader = csv.DictReader(filter(lambda row: row[0]!='#', airport_csv)) - for airport in airport_csv_reader: - if airport['type'] in allowed_types: - airport_coord = float(airport['latitude_deg']), float(airport['longitude_deg']) - airport_dist = float((geodesic(plane, airport_coord).mi)) - if "closest_airport_dict" not in locals(): - closest_airport_dict = airport - closest_airport_dist = airport_dist - elif airport_dist < closest_airport_dist: - closest_airport_dict = airport - closest_airport_dist = airport_dist - closest_airport_dict['distance_mi'] = closest_airport_dist - #Convert indent key to icao key as its labeled icao in other places not ident - closest_airport_dict['icao'] = closest_airport_dict.pop('gps_code') - closest_airport_dict = add_airport_region(closest_airport_dict) - return closest_airport_dict -def get_airport_by_icao(icao): - with open('./dependencies/airports.csv', 'r', encoding='utf-8') as airport_csv: - airport_csv_reader = csv.DictReader(filter(lambda row: row[0]!='#', airport_csv)) - for airport in airport_csv_reader: - if airport['gps_code'] == icao: - matching_airport = airport - #Convert indent key to icao key as its labeled icao in other places not ident - matching_airport['icao'] = matching_airport.pop('gps_code') - break - matching_airport = add_airport_region(matching_airport) - return matching_airport \ No newline at end of file diff --git a/defDiscord.py b/defDiscord.py deleted file mode 100644 index eb6e71c..0000000 --- a/defDiscord.py +++ /dev/null @@ -1,13 +0,0 @@ -def sendDis(message, config, file_name = None, role_id = None): - import requests - from discord_webhook import DiscordWebhook - if role_id != None: - message += f" <@&{role_id}>" - webhook = DiscordWebhook(url=config.get('DISCORD', 'URL'), content=message[0:1999], username=config.get('DISCORD', 'USERNAME')) - if file_name != None: - with open(file_name, "rb") as f: - webhook.add_file(file=f.read(), filename=file_name) - try: - webhook.execute() - except requests.exceptions.RequestException: - pass \ No newline at end of file diff --git a/defMap.py b/defMap.py deleted file mode 100644 index 607f7c1..0000000 --- a/defMap.py +++ /dev/null @@ -1,25 +0,0 @@ -def getMap(mapLocation, file_name): - import requests - import configparser - config = configparser.ConfigParser() - config.read('./configs/mainconf.ini') - api_key = config.get('GOOGLE', 'API_KEY') - url = "https://maps.googleapis.com/maps/api/staticmap?" - - center = str(mapLocation) - zoom = 9 - - r = requests.get(url + "center=" + center + "&zoom=" + - str(zoom) + "&size=800x800 &key=" + - api_key + "&sensor=false") - - # wb mode is stand for write binary mode - f = open(file_name, 'wb') - - # r.content gives content, - # in this case gives image - f.write(r.content) - - # close method of file object - # save and close the file - f.close() \ No newline at end of file diff --git a/defOpenSky.py b/defOpenSky.py deleted file mode 100644 index 7b84e8b..0000000 --- a/defOpenSky.py +++ /dev/null @@ -1,17 +0,0 @@ -def pull_opensky(planes): - import configparser - main_config = configparser.ConfigParser() - main_config.read('./configs/mainconf.ini') - from opensky_api import OpenSkyApi - planeData = None - opens_api = OpenSkyApi(username= None if main_config.get('OPENSKY', 'USERNAME').upper() == "NONE" else main_config.get('OPENSKY', 'USERNAME'), password= None if main_config.get('OPENSKY', 'PASSWORD').upper() == "NONE" else main_config.get('OPENSKY', 'PASSWORD').upper()) - failed = False - icao_array = [] - for key in planes.keys(): - icao_array.append(key.lower()) - try: - planeData = opens_api.get_states(time_secs=0, icao24=icao_array) - except Exception as e: - print ("OpenSky Error", e) - failed = True - return planeData, failed \ No newline at end of file diff --git a/defSS.py b/defSS.py deleted file mode 100644 index d29d607..0000000 --- a/defSS.py +++ /dev/null @@ -1,79 +0,0 @@ -from selenium import webdriver -from webdriver_manager.chrome import ChromeDriverManager -import time -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.common.by import By -def get_adsbx_screenshot(file_path, url_params, enable_labels=False, enable_track_labels=False): - chrome_options = webdriver.ChromeOptions() - chrome_options.headless = True - chrome_options.add_argument('window-size=800,800') - chrome_options.add_argument('ignore-certificate-errors') - chrome_options.add_argument("--enable-logging --v=1") - import os - import platform - if platform.system() == "Linux" and os.geteuid()==0: - chrome_options.add_argument('--no-sandbox') # required when running as root user. otherwise you would get no sandbox errors. - browser = webdriver.Chrome(ChromeDriverManager().install(), options=chrome_options) - url = f"https://globe.adsbexchange.com/?{url_params}" - browser.set_page_load_timeout(80) - browser.get(url) - remove_id_elements = ["show_trace", "credits", 'infoblock_close', 'selected_photo_link', "history_collapse"] - for element in remove_id_elements: - try: - element = browser.find_element_by_id(element) - browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element) - except: - print("issue removing", element, "from map") - #Remove watermark on data - try: - browser.execute_script("document.getElementById('selected_infoblock').className = 'none';") - except: - print("Couldn't remove watermark from map") - #Disable slidebar - try: - browser.execute_script("$('#infoblock-container').css('overflow', 'hidden');") - except: - print("Couldn't disable sidebar on map") - #Remove share - try: - element = browser.find_element_by_xpath("//*[contains(text(), 'Share')]") - browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element) - except: - print("Couldn't remove share button from map") - #browser.execute_script("toggleFollow()") - if enable_labels: - browser.find_element_by_tag_name('body').send_keys('l') - if enable_track_labels: - browser.find_element_by_tag_name('body').send_keys('k') - WebDriverWait(browser, 40).until(lambda d: d.execute_script("return jQuery.active == 0")) - try: - photo_box = browser.find_element_by_id("silhouette") - except: - pass - else: - import requests, json - photo_list = json.loads(requests.get("https://raw.githubusercontent.com/Jxck-S/aircraft-photos/main/photo-list.json").text) - if "icao" in url_params: - import re - - icao = re.search('icao=(.+?)&', url_params).group(1).lower() - print(icao) - if icao in photo_list.keys(): - browser.execute_script("arguments[0].id = 'airplanePhoto';", photo_box) - browser.execute_script(f"arguments[0].src = 'https://raw.githubusercontent.com/Jxck-S/aircraft-photos/main/images/{photo_list[icao]['reg']}.jpg';", photo_box) - copyright = browser.find_element_by_id("copyrightInfo") - browser.execute_script("arguments[0].id = 'copyrightInfoFreeze';", copyright) - browser.execute_script("$('#copyrightInfoFreeze').css('font-size', '12px');") - browser.execute_script(f"arguments[0].appendChild(document.createTextNode('Image © {photo_list[icao]['photographer']}'))", copyright) - - time.sleep(5) - browser.save_screenshot(file_path) - browser.quit() -def generate_adsbx_screenshot_time_params(timestamp): - from datetime import datetime - from datetime import timedelta - timestamp_dt = datetime.utcfromtimestamp(timestamp) - print(timestamp_dt) - start_time = timestamp_dt - timedelta(minutes=1) - time_params = "&showTrace=" + timestamp_dt.strftime("%Y-%m-%d") + "&startTime=" + start_time.strftime("%H:%M:%S") + "&endTime=" + timestamp_dt.strftime("%H:%M:%S") - return time_params \ No newline at end of file diff --git a/defTweet.py b/defTweet.py deleted file mode 100644 index a6dd749..0000000 --- a/defTweet.py +++ /dev/null @@ -1,9 +0,0 @@ -# Authenticate to Twitter -def tweepysetup(config): - import tweepy - #DOCU - #https://realpython.com/twitter-bot-python-tweepy/ - auth = tweepy.OAuthHandler(config.get('TWITTER', 'CONSUMER_KEY'), config.get('TWITTER', 'CONSUMER_SECRET')) - auth.set_access_token(config.get('TWITTER', 'ACCESS_TOKEN'), config.get('TWITTER', 'ACCESS_TOKEN_SECRET')) - tweet_api = tweepy.API(auth, wait_on_rate_limit=True) - return tweet_api \ No newline at end of file diff --git a/discord_utils.py b/discord_utils.py new file mode 100644 index 0000000..c5f8c39 --- /dev/null +++ b/discord_utils.py @@ -0,0 +1,20 @@ +import requests +from discord_webhook import DiscordWebhook + + +def send_discord_message(message, config, file_name=None, role_id=None): + if role_id is not None: + message += f" <@&{role_id}>" + + webhook = DiscordWebhook( + url=config.get("DISCORD", "URL"), + content=message[0:1999], + username=config.get("DISCORD", "USERNAME"), + ) + if file_name is not None: + with open(file_name, "rb") as f: + webhook.add_file(file=f.read(), filename=file_name) + try: + webhook.execute() + except requests.exceptions.RequestException: + pass diff --git a/map_utils.py b/map_utils.py new file mode 100644 index 0000000..4c5662f --- /dev/null +++ b/map_utils.py @@ -0,0 +1,19 @@ +import requests +import configparser + +ZOOM = 9 +URL = "https://maps.googleapis.com/maps/api/staticmap?" + +CONFIG = configparser.ConfigParser() +CONFIG.read("./configs/mainconf.ini") + +API_KEY = CONFIG.get("GOOGLE", "API_KEY") + + +def get_map(map_location, file_name): + r = requests.get( + f"{URL}center={map_location}&zoom={ZOOM}&size=800x800 &key={API_KEY}&sensor=false" + ) + + with open(file_name, "wb") as f: + f.write(r.content) diff --git a/opensky_utils.py b/opensky_utils.py new file mode 100644 index 0000000..7e1e4fb --- /dev/null +++ b/opensky_utils.py @@ -0,0 +1,22 @@ +from config import MAIN_CONFIG + +from opensky_api import OpenSkyApi + +USERNAME = MAIN_CONFIG.get("OPENSKY", "USERNAME") +PASSWORD = MAIN_CONFIG.get("OPENSKY", "PASSWORD") + + +def pull_opensky(planes): + plane_data = None + opens_api = OpenSkyApi( + username=None if USERNAME.upper() == "NONE" else USERNAME, + password=None if PASSWORD.upper() == "NONE" else PASSWORD.upper(), + ) + failed = False + icao_array = list(map(str.lower, planes.keys())) + try: + plane_data = opens_api.get_states(time_secs=0, icao24=icao_array) + except Exception as e: + print("OpenSky Error", e) + failed = True + return plane_data, failed diff --git a/plane.py b/plane.py new file mode 100644 index 0000000..0e373ad --- /dev/null +++ b/plane.py @@ -0,0 +1,970 @@ +import tempfile +from datetime import datetime, timedelta + +from config import MAIN_CONFIG + +from tweet import tweepysetup +from pushbullet import Pushbullet +from colorama import Fore, Style, Back + +from mictronics_parse import get_aircraft_reg_by_icao, get_type_code_by_icao +from airport_utils import get_airport_by_icao, get_closest_airport +from map_utils import get_map +from ss import get_adsbx_screenshot +from modify_image import append_airport +from calculate_headings import ( + calculate_deg_change, + calculate_from_bearing, + calculate_cardinal, +) +from discord_utils import send_discord_message + +from tabulate import tabulate +from shapely.geometry import MultiPoint +from geopy.distance import geodesic + + +class Plane: + def __init__(self, icao, config_path, config): + """Initializes a plane object from its config file and given icao.""" + self.icao = icao.upper() + self.callsign = None + self.reg = None + self.config = config + self.conf_file_path = config_path + self.alt_ft = None + self.below_desired_ft = None + self.last_below_desired_ft = None + self.feeding = None + self.last_feeding = None + self.last_on_ground = None + self.on_ground = None + self.longitude = None + self.latitude = None + self.takeoff_time = None + + self.map_file_name = f"{tempfile.gettempdir()}/{icao.upper()}_map.png" + self.last_latitude = None + self.last_longitude = None + self.last_pos_datetime = None + self.landing_plausible = False + self.nav_modes = None + self.last_nav_modes = None + self.speed = None + self.recent_ra_types = {} + self.db_flags = None + self.sel_nav_alt = None + self.last_sel_alt = None + self.squawk = None + self.emergency_already_triggered = None + self.last_emergency = None + self.recheck_route_time = None + self.known_to_airport = None + self.track = None + self.last_track = None + self.circle_history = None + if self.config.has_option("DATA", "DATA_LOSS_MINS"): + self.data_loss_mins = self.config.getint("DATA", "DATA_LOSS_MINS") + else: + self.data_loss_mins = MAIN_CONFIG.getint("DATA", "DATA_LOSS_MINS") + # Setup Tweepy + if self.config.getboolean("TWITTER", "ENABLE"): + self.tweet_api = tweepysetup(self.config) + # Setup PushBullet + if self.config.getboolean("PUSHBULLET", "ENABLE"): + self.pb = Pushbullet(self.config["PUSHBULLET"]["API_KEY"]) + self.pb_channel = self.pb.get_channel( + self.config.get("PUSHBULLET", "CHANNEL_TAG") + ) + + def run_opens(self, ac_dict): + # Parse OpenSky Vector + + self.printheader("head") + # print (Fore.YELLOW + "OpenSky Sourced Data: ", ac_dict) + try: + self.icao = ac_dict.icao24.upper() + self.callsign = ac_dict.callsign + self.latitude = ac_dict.latitude + self.longitude = ac_dict.longitude + self.on_ground = bool(ac_dict.on_ground) + self.squawk = ac_dict.squawk + self.track = ac_dict.heading + + if ac_dict.baro_altitude is not None: + self.alt_ft = round(float(ac_dict.baro_altitude) * 3.281) + elif self.on_ground: + self.alt_ft = 0 + + self.reg = get_aircraft_reg_by_icao(self.icao) + self.type = get_type_code_by_icao(self.icao) + self.last_pos_datetime = datetime.fromtimestamp(ac_dict.time_position) + except ValueError as e: + print("Got data but some data is invalid!") + print(e) + self.printheader("foot") + else: + self.feeding = True + self.run_check() + + def run_adsbx_v1(self, ac_dict): + # Parse ADBSX V1 Vector + + self.printheader("head") + # print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL) + try: + # postime is divided by 1000 to get seconds from milliseconds, from timestamp expects secs. + self.icao = ac_dict["icao"].upper() + self.callsign = ac_dict["call"] + self.reg = ac_dict["reg"] + self.latitude = float(ac_dict["lat"]) + self.longitude = float(ac_dict["lon"]) + self.alt_ft = int(ac_dict["alt"]) + self.on_ground = bool(int(ac_dict["gnd"])) + self.squawk = ac_dict["sqk"] + self.track = float(ac_dict["trak"]) + + if self.on_ground: + self.alt_ft = 0 + self.last_pos_datetime = datetime.fromtimestamp( + int(ac_dict["postime"]) / 1000 + ) + except ValueError as e: + + print("Got data but some data is invalid!") + print(e) + print(Fore.YELLOW + "ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL) + self.printheader("foot") + else: + self.feeding = True + self.run_check() + + def run_adsbx_v2(self, ac_dict): + # Parse ADBSX V2 Vector + + self.printheader("head") + print(ac_dict) + try: + self.icao = ac_dict["hex"].upper() + self.latitude = float(ac_dict["lat"]) + self.longitude = float(ac_dict["lon"]) + self.speed = ac_dict["gs"] + + if "r" in ac_dict: + self.reg = ac_dict["r"] + if "t" in ac_dict: + self.type = ac_dict["t"] + if ac_dict["alt_baro"] != "ground": + self.alt_ft = int(ac_dict["alt_baro"]) + self.on_ground = False + elif ac_dict["alt_baro"] == "ground": + self.alt_ft = 0 + self.on_ground = True + if ac_dict.get("flight") is not None: + self.callsign = ac_dict.get("flight").strip() + if ac_dict.get("dbFlags") is not None: + self.db_flags = ac_dict["dbFlags"] + if "nav_modes" in ac_dict: + self.nav_modes = ac_dict["nav_modes"] + for idx, mode in enumerate(self.nav_modes): + if mode.upper() in ["TCAS", "LNAV", "VNAV"]: + self.nav_modes[idx] = self.nav_modes[idx].upper() + else: + self.nav_modes[idx] = self.nav_modes[idx].capitalize() + self.squawk = ac_dict.get("squawk") + if "track" in ac_dict: + self.track = ac_dict["track"] + if "nav_altitude_fms" in ac_dict: + self.sel_nav_alt = ac_dict["nav_altitude_fms"] + elif "nav_altitude_mcp" in ac_dict: + self.sel_nav_alt = ac_dict["nav_altitude_mcp"] + else: + self.sel_nav_alt = None + + # Create last seen timestamp from how long ago in secs a pos was rec + self.last_pos_datetime = datetime.now() - timedelta( + seconds=ac_dict["seen_pos"] + ) + except (ValueError, KeyError) as e: + + print("Got data but some data is invalid!") + print(e) + print(Fore.YELLOW + "ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL) + self.printheader("foot") + else: + # Error Handling for bad data, sometimes it would seem to be ADSB Decode error + if (not self.on_ground) and self.speed <= 10: + print("Not running check, appears to be bad ADSB Decode") + else: + self.feeding = True + self.run_check() + + def __str__(self): + if self.last_pos_datetime is not None: + time_since_contact = self.get_time_since(self.last_pos_datetime) + output = [ + [ + (Fore.CYAN + "ICAO" + Style.RESET_ALL), + (Fore.LIGHTGREEN_EX + self.icao + Style.RESET_ALL), + ], + [ + (Fore.CYAN + "Callsign" + Style.RESET_ALL), + (Fore.LIGHTGREEN_EX + self.callsign + Style.RESET_ALL), + ] + if self.callsign is not None + else None, + [ + (Fore.CYAN + "Reg" + Style.RESET_ALL), + (Fore.LIGHTGREEN_EX + self.reg + Style.RESET_ALL), + ] + if self.reg is not None + else None, + [ + (Fore.CYAN + "Squawk" + Style.RESET_ALL), + (Fore.LIGHTGREEN_EX + self.squawk + Style.RESET_ALL), + ] + if self.squawk is not None + else None, + [ + (Fore.CYAN + "Coordinates" + Style.RESET_ALL), + ( + Fore.LIGHTGREEN_EX + + str(self.latitude) + + ", " + + str(self.longitude) + + Style.RESET_ALL + ), + ] + if self.latitude is not None and self.longitude is not None + else None, + [ + (Fore.CYAN + "Last Contact" + Style.RESET_ALL), + ( + Fore.LIGHTGREEN_EX + + str(time_since_contact).split(".")[0] + + Style.RESET_ALL + ), + ] + if self.last_pos_datetime is not None + else None, + [ + (Fore.CYAN + "On Ground" + Style.RESET_ALL), + (Fore.LIGHTGREEN_EX + str(self.on_ground) + Style.RESET_ALL), + ] + if self.on_ground is not None + else None, + [ + (Fore.CYAN + "Baro Altitude" + Style.RESET_ALL), + ( + Fore.LIGHTGREEN_EX + + str("{:,} ft".format(self.alt_ft)) + + Style.RESET_ALL + ), + ] + if self.alt_ft is not None + else None, + [ + (Fore.CYAN + "Nav Modes" + Style.RESET_ALL), + (Fore.LIGHTGREEN_EX + ", ".join(self.nav_modes) + Style.RESET_ALL), + ] + if "nav_modes" in self.__dict__ and self.nav_modes != None + else None, + [ + (Fore.CYAN + "Sel Alt Ft" + Style.RESET_ALL), + ( + Fore.LIGHTGREEN_EX + + str("{:,} ft".format(self.sel_nav_alt)) + + Style.RESET_ALL + ), + ] + if "sel_nav_alt" in self.__dict__ and self.sel_nav_alt is not None + else None, + ] + output = list(filter(None, output)) + return tabulate(output, [], "fancy_grid") + + def printheader(self, type): + if type == "head": + header = str( + "--------- " + + self.conf_file_path + + " ---------------------------- ICAO: " + + self.icao + + " ---------------------------------------" + ) + elif type == "foot": + header = "----------------------------------------------------------------------------------------------------" + print(Back.MAGENTA + header[0:100] + Style.RESET_ALL) + + def get_time_since(self, datetime_obj): + if datetime_obj is not None: + time_since = datetime.now() - datetime_obj + else: + time_since = None + return time_since + + def get_adsbx_map_overlays(self): + if self.config.has_option("MAP", "OVERLAYS"): + overlays = self.config.get("MAP", "OVERLAYS") + else: + overlays = "" + return overlays + + def route_info(self): + from lookup_route import lookup_route, clean_data + + def route_format(extra_route_info, type): + to_airport = get_airport_by_icao(self.known_to_airport) + code = ( + to_airport["iata_code"] + if to_airport["iata_code"] != "" + else to_airport["icao"] + ) + airport_text = f"{code}, {to_airport['name']}" + if "time_to" in extra_route_info.keys() and type != "divert": + arrival_rel = "in ~" + extra_route_info["time_to"] + else: + arrival_rel = None + if self.known_to_airport != self.nearest_from_airport: + if type == "inital": + header = "Going to" + elif type == "change": + header = "Now going to" + elif type == "divert": + header = "Now diverting to" + area = f"{to_airport['municipality']}, {to_airport['region']}, {to_airport['iso_country']}" + route_to = f"{header} {area} ({airport_text})" + ( + f" arriving {arrival_rel}" if arrival_rel is not None else "" + ) + else: + if type == "inital": + header = "Will be returning to" + elif type == "change": + header = "Now returning to" + elif type == "divert": + header = "Now diverting back to" + route_to = f"{header} {airport_text}" + ( + f" {arrival_rel}" if arrival_rel is not None else "" + ) + return route_to + + if hasattr(self, "type"): + extra_route_info = clean_data( + lookup_route( + self.reg, (self.latitude, self.longitude), self.type, self.alt_ft + ) + ) + else: + extra_route_info = None + route_to = None + if extra_route_info is None: + pass + elif extra_route_info is not None: + # Diversion + if "divert_icao" in extra_route_info.keys(): + if self.known_to_airport != extra_route_info["divert_icao"]: + self.known_to_airport = extra_route_info["divert_icao"] + route_to = route_format(extra_route_info, "divert") + # Destination + elif "dest_icao" in extra_route_info.keys(): + # Inital Destination Found + if self.known_to_airport is None: + self.known_to_airport = extra_route_info["dest_icao"] + route_to = route_format(extra_route_info, "inital") + # Destination Change + elif self.known_to_airport != extra_route_info["dest_icao"]: + self.known_to_airport = extra_route_info["dest_icao"] + route_to = route_format(extra_route_info, "change") + + return route_to + + def run_empty(self): + self.printheader("head") + self.feeding = False + self.run_check() + + def run_check(self): + """Runs a check of a plane module to see if its landed or takenoff using plane data, and takes action if so.""" + print(self) + # Ability to Remove old Map + import os + from colorama import Fore, Style + + # Proprietary Route Lookup + if os.path.isfile("lookup_route.py") and ( + self.db_flags is None or not self.db_flags & 1 + ): + from lookup_route import lookup_route + + ENABLE_ROUTE_LOOKUP = True + else: + ENABLE_ROUTE_LOOKUP = False + if self.config.getboolean("DISCORD", "ENABLE"): + from discord_utils import send_discord_message + if self.last_pos_datetime is not None: + time_since_contact = self.get_time_since(self.last_pos_datetime) + # Check if below desire ft + desired_ft = 15000 + if self.alt_ft is None or self.alt_ft > desired_ft: + self.below_desired_ft = False + elif self.alt_ft < desired_ft: + self.below_desired_ft = True + # Check if tookoff + if self.below_desired_ft and self.on_ground is False: + if self.last_on_ground: + self.tookoff = True + trigger_type = "no longer on ground" + type_header = "Took off from" + elif ( + self.last_feeding is False + and self.feeding + and self.landing_plausible == False + ): + nearest_airport_dict = get_closest_airport( + self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES") + ) + if nearest_airport_dict["elevation_ft"] != "": + alt_above_airport = self.alt_ft - int( + nearest_airport_dict["elevation_ft"] + ) + print(f"AGL nearest airport: {alt_above_airport}") + else: + alt_above_airport = None + if ( + alt_above_airport != None and alt_above_airport <= 10000 + ) or self.alt_ft <= 15000: + self.tookoff = True + trigger_type = "data acquisition" + type_header = "Took off near" + else: + self.tookoff = False + else: + self.tookoff = False + + # Check if Landed + if ( + self.on_ground + and self.last_on_ground is False + and self.last_below_desired_ft + ): + self.landed = True + trigger_type = "now on ground" + type_header = "Landed in" + self.landing_plausible = False + # Set status for landing plausible + elif ( + self.below_desired_ft + and self.last_feeding + and self.feeding is False + and self.last_on_ground is False + ): + self.landing_plausible = True + print( + "Near landing conditions, if contiuned data loss for configured time, and if under 10k AGL landing true" + ) + + elif ( + self.landing_plausible + and self.feeding is False + and time_since_contact.total_seconds() >= (self.data_loss_mins * 60) + ): + from airport_utils import getClosestAirport + + nearest_airport_dict = get_closest_airport( + self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES") + ) + if nearest_airport_dict["elevation_ft"] != "": + alt_above_airport = self.alt_ft - int( + nearest_airport_dict["elevation_ft"] + ) + print(f"AGL nearest airport: {alt_above_airport}") + else: + alt_above_airport = None + if ( + alt_above_airport != None and alt_above_airport <= 10000 + ) or self.alt_ft <= 15000: + self.landing_plausible = False + self.on_ground = None + self.landed = True + trigger_type = "data loss" + type_header = "Landed near" + else: + print("Alt greater then 10k AGL") + self.landing_plausible = False + self.on_ground = None + else: + self.landed = False + + if self.landed: + print("Landed by", trigger_type) + if self.tookoff: + print("Tookoff by", trigger_type) + # Find nearest airport, and location + if self.landed or self.tookoff: + if "nearest_airport_dict" in globals(): + pass # Airport already set + elif trigger_type in ["now on ground", "data acquisition", "data loss"]: + nearest_airport_dict = get_closest_airport( + self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES") + ) + elif trigger_type == "no longer on ground": + nearest_airport_dict = get_closest_airport( + self.last_latitude, + self.last_longitude, + self.config.get("AIRPORT", "TYPES"), + ) + # Convert dictionary keys to sep variables + country_code = nearest_airport_dict["iso_country"] + state = nearest_airport_dict["region"].strip() + municipality = nearest_airport_dict["municipality"].strip() + if municipality == "" or state == "" or municipality == state: + if municipality != "": + area = municipality + elif state != "": + area = state + else: + area = "" + else: + area = f"{municipality}, {state}" + location_string = f"{area}, {country_code}" + print( + Fore.GREEN + "Country Code:", + country_code, + "State:", + state, + "Municipality:", + municipality + Style.RESET_ALL, + ) + title_switch = {"reg": self.reg, "callsign": self.callsign, "icao": self.icao} + # Set Discord Title + if self.config.getboolean("DISCORD", "ENABLE"): + self.dis_title = ( + (title_switch.get(self.config.get("DISCORD", "TITLE")) or "NA").strip() + if self.config.get("DISCORD", "TITLE") in title_switch.keys() + else self.config.get("DISCORD", "TITLE") + ) + # Set Twitter Title + if self.config.getboolean("TWITTER", "ENABLE"): + self.twitter_title = ( + (title_switch.get(self.config.get("TWITTER", "TITLE")) or "NA") + if self.config.get("TWITTER", "TITLE") in title_switch.keys() + else self.config.get("TWITTER", "TITLE") + ) + # Takeoff and Land Notification + if self.tookoff or self.landed: + route_to = None + if self.tookoff: + self.takeoff_time = datetime.utcnow() + landed_time_msg = None + # Proprietary Route Lookup + if ENABLE_ROUTE_LOOKUP: + self.nearest_from_airport = nearest_airport_dict["icao"] + route_to = self.route_info() + if route_to is None: + self.recheck_route_time = 1 + else: + self.recheck_route_time = 10 + elif self.landed and self.takeoff_time != None: + landed_time = datetime.utcnow() - self.takeoff_time + if trigger_type == "data loss": + landed_time -= timedelta(seconds=time_since_contact.total_seconds()) + hours, remainder = divmod(landed_time.total_seconds(), 3600) + minutes, seconds = divmod(remainder, 60) + min_syntax = "Mins" if minutes > 1 else "Min" + if hours > 0: + hour_syntax = "Hours" if hours > 1 else "Hour" + landed_time_msg = f"Apx. flt. time {int(hours)} {hour_syntax}" + ( + f" : {int(minutes)} {min_syntax}. " if minutes > 0 else "." + ) + else: + landed_time_msg = f"Apx. flt. time {int(minutes)} {min_syntax}." + self.takeoff_time = None + elif self.landed: + landed_time_msg = None + message = ( + (f"{type_header} {location_string}.") + + ("" if route_to is None else f" {route_to}.") + + ((f" {landed_time_msg}") if landed_time_msg is not None else "") + ) + print(message) + # Google Map or tar1090 screenshot + if self.config.get("MAP", "OPTION") == "GOOGLESTATICMAP": + get_map( + (municipality + ", " + state + ", " + country_code), + self.map_file_name, + ) + elif self.config.get("MAP", "OPTION") == "ADSBX": + url_params = ( + f"icao={self.icao}&zoom=9&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays=" + + self.get_adsbx_map_overlays() + ) + get_adsbx_screenshot(self.map_file_name, url_params) + + append_airport(self.map_file_name, nearest_airport_dict) + else: + raise ValueError("Map option not set correctly in this planes conf") + # Discord + if self.config.getboolean("DISCORD", "ENABLE"): + dis_message = f"{self.dis_title} {message}".strip() + role_id = ( + self.config.get("DISCORD", "ROLE_ID") + if self.config.has_option("DISCORD", "ROLE_ID") + else None + ) + send_discord_message( + dis_message, self.config, self.map_file_name, role_id=role_id + ) + # PushBullet + if self.config.getboolean("PUSHBULLET", "ENABLE"): + with open(self.map_file_name, "rb") as pic: + map_data = self.pb.upload_file( + pic, "Tookoff IMG" if self.tookoff else "Landed IMG" + ) + self.pb_channel.push_note( + self.config.get("PUSHBULLET", "TITLE"), message + ) + self.pb_channel.push_file(**map_data) + # Twitter + if self.config.getboolean("TWITTER", "ENABLE"): + twitter_media_map_obj = self.tweet_api.media_upload(self.map_file_name) + alt_text = f"Reg: {self.reg} On Ground: {str(self.on_ground)} Alt: {str(self.alt_ft)} Last Contact: {str(time_since_contact)} Trigger: {trigger_type}" + self.tweet_api.create_media_metadata( + media_id=twitter_media_map_obj.media_id, alt_text=alt_text + ) + self.latest_tweet_id = self.tweet_api.update_status( + status=((self.twitter_title + " " + message).strip()), + media_ids=[twitter_media_map_obj.media_id], + ).id + os.remove(self.map_file_name) + if self.landed: + self.latest_tweet_id = None + self.recheck_route_time = None + self.known_to_airport = None + self.nearest_from_airport = None + # Recheck Proprietary Route Info. + if ( + self.takeoff_time is not None + and self.recheck_route_time is not None + and (datetime.utcnow() - self.takeoff_time).total_seconds() + > 60 * self.recheck_route_time + ): + self.recheck_route_time += 10 + route_to = self.route_info() + if route_to is not None: + print(route_to) + # Discord + if self.config.getboolean("DISCORD", "ENABLE"): + dis_message = f"{self.dis_title} {route_to}".strip() + role_id = ( + self.config.get("DISCORD", "ROLE_ID") + if self.config.has_option("DISCORD", "ROLE_ID") + else None + ) + send_discord_message(dis_message, self.config, role_id=role_id) + # Twitter + if self.config.getboolean("TWITTER", "ENABLE"): + # tweet = self.tweet_api.user_timeline(count = 1)[0] + self.latest_tweet_id = self.tweet_api.update_status( + status=f"{self.twitter_title} {route_to}".strip(), + in_reply_to_status_id=self.latest_tweet_id, + ).id + + if self.circle_history is not None: + # Expires traces for circles + if self.circle_history["traces"] != []: + for trace in self.circle_history["traces"]: + if ( + datetime.now() - datetime.fromtimestamp(trace[0]) + ).total_seconds() >= 20 * 60: + print("Trace Expire, removed") + self.circle_history["traces"].remove(trace) + # Expire touchngo + if ( + "touchngo" in self.circle_history.keys() + and ( + datetime.now() + - datetime.fromtimestamp(self.circle_history["touchngo"]) + ).total_seconds() + >= 10 * 60 + ): + self.circle_history.pop("touchngo") + if self.feeding: + # Squawks + emergency_squawks = { + "7500": "Hijacking", + "7600": "Radio Failure", + "7700": "General Emergency", + } + seen = datetime.now() - self.last_pos_datetime + # Only run check if emergency data previously set + if self.last_emergency is not None and not self.emergency_already_triggered: + time_since_org_emer = datetime.now() - self.last_emergency[0] + # Checks times to see x time and still same squawk + if ( + time_since_org_emer.total_seconds() >= 60 + and self.last_emergency[1] == self.squawk + and seen.total_seconds() <= 60 + ): + self.emergency_already_triggered = True + squawk_message = ( + f"{self.dis_title} Squawking {self.last_emergency[1]} {emergency_squawks[self.squawk]}" + ).strip() + print(squawk_message) + # Google Map or tar1090 screenshot + if self.config.get("MAP", "OPTION") == "GOOGLESTATICMAP": + get_map( + (municipality + ", " + state + ", " + country_code), + self.map_file_name, + ) + if self.config.get("MAP", "OPTION") == "ADSBX": + from ss import get_adsbx_screenshot + + url_params = ( + f"icao={self.icao}&zoom=9&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays=" + + self.get_adsbx_map_overlays() + ) + get_adsbx_screenshot(self.map_file_name, url_params) + if self.config.getboolean("DISCORD", "ENABLE"): + dis_message = self.dis_title + " " + squawk_message + send_discord_message( + dis_message, self.config, self.map_file_name + ) + os.remove(self.map_file_name) + # Realizes first time seeing emergency, stores time and type + elif ( + self.squawk in emergency_squawks.keys() + and not self.emergency_already_triggered + and not self.on_ground + ): + print( + "Emergency", + self.squawk, + "detected storing code and time and waiting to trigger", + ) + self.last_emergency = (self.last_pos_datetime, self.squawk) + elif ( + self.squawk not in emergency_squawks.keys() + and self.emergency_already_triggered + ): + self.emergency_already_triggered = None + + # Nav Modes Notifications + if self.nav_modes is not None and self.last_nav_modes is not None: + for mode in self.nav_modes: + if mode not in self.last_nav_modes: + # Discord + print(mode, "enabled") + if self.config.getboolean("DISCORD", "ENABLE"): + dis_message = self.dis_title + " " + mode + " mode enabled." + if mode == "Approach": + url_params = f"icao={self.icao}&zoom=9&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.get_adsbx_map_overlays()}" + get_adsbx_screenshot(self.map_file_name, url_params) + send_discord_message( + dis_message, self.config, self.map_file_name + ) + # elif mode in ["Althold", "VNAV", "LNAV"] and self.sel_nav_alt != None: + # sendDis((dis_message + ", Sel Alt. " + str(self.sel_nav_alt) + ", Current Alt. " + str(self.alt_ft)), self.config) + else: + send_discord_message(dis_message, self.config) + # Selected Altitude + if ( + self.sel_nav_alt is not None + and self.last_sel_alt is not None + and self.last_sel_alt != self.sel_nav_alt + ): + # Discord + print("Nav altitude is now", self.sel_nav_alt) + if self.config.getboolean("DISCORD", "ENABLE"): + dis_message = ( + self.dis_title + + " Sel. alt. " + + str("{:,} ft".format(self.sel_nav_alt)) + ) + send_discord_message(dis_message, self.config) + # Circling + if self.last_track is not None: + import time + + if self.circle_history is None: + self.circle_history = {"traces": [], "triggered": False} + # Add touchngo + if self.on_ground or self.alt_ft <= 500: + self.circle_history["touchngo"] = time.time() + # Add a Trace + if self.on_ground is False: + track_change = calculate_deg_change(self.track, self.last_track) + track_change = round(track_change, 3) + self.circle_history["traces"].append( + (time.time(), self.latitude, self.longitude, track_change) + ) + + total_change = 0 + coords = [] + for trace in self.circle_history["traces"]: + total_change += float(trace[3]) + coords.append((float(trace[1]), float(trace[2]))) + + print("Total Bearing Change", round(total_change, 3)) + # Check Centroid when Bearing change meets req + if ( + abs(total_change) >= 720 + and self.circle_history["triggered"] is False + ): + print("Circling Bearing Change Met") + + aircraft_coords = (self.latitude, self.longitude) + points = MultiPoint(coords) + cent = ( + points.centroid + ) # True centroid, not necessarily an existing point + # rp = (points.representative_point()) #A represenative point, not centroid, + print(cent) + # print(rp) + distance_to_centroid = geodesic(aircraft_coords, cent.coords).mi + print( + f"Distance to centroid of circling coordinates {distance_to_centroid} miles" + ) + if distance_to_centroid <= 15: + print("Within 15 miles of centroid, CIRCLING") + nearest_airport_dict = get_closest_airport( + self.latitude, + self.longitude, + ["small_airport", "medium_airport", "large_airport"], + ) + + from_bearing = calculate_from_bearing( + ( + float(nearest_airport_dict["latitude_deg"]), + float(nearest_airport_dict["longitude_deg"]), + ), + (self.latitude, self.longitude), + ) + cardinal = calculate_cardinal(from_bearing) + + url_params = f"icao={self.icao}&zoom=10&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.get_adsbx_map_overlays()}" + get_adsbx_screenshot(self.map_file_name, url_params) + if nearest_airport_dict["distance_mi"] < 3: + if "touchngo" in self.circle_history.keys(): + message = f"Doing touch and goes at {nearest_airport_dict['icao']}" + else: + message = f"Circling over {nearest_airport_dict['icao']} at {self.alt_ft}ft" + else: + message = f"Circling {round(nearest_airport_dict['distance_mi'], 2)}mi {cardinal} of {nearest_airport_dict['icao']}, {nearest_airport_dict['name']} at {self.alt_ft}ft" + print(message) + if self.config.getboolean("DISCORD", "ENABLE"): + role_id = ( + self.config.get("DISCORD", "ROLE_ID") + if self.config.has_option("DISCORD", "ROLE_ID") + else None + ) + send_discord_message( + message, self.config, self.map_file_name, role_id + ) + if self.config.getboolean("TWITTER", "ENABLE"): + twitter_media_map_obj = self.tweet_api.media_upload( + self.map_file_name + ) + alt_text = f"Distance to centroid: {distance_to_centroid}, Total change: {total_change}" + self.tweet_api.create_media_metadata( + media_id=twitter_media_map_obj.media_id, + alt_text=alt_text, + ) + self.latest_tweet_id = self.tweet_api.update_status( + status=f"{self.twitter_title} {message}".strip(), + in_reply_to_status_id=self.latest_tweet_id, + media_ids=[twitter_media_map_obj.media_id], + ).id + + self.circle_history["triggered"] = True + elif abs(total_change) <= 360 and self.circle_history["triggered"]: + print("No Longer Circling, trigger cleared") + self.circle_history["triggered"] = False + # #Power Up + # if self.last_feeding == False and self.speed == 0 and self.on_ground: + # if self.config.getboolean('DISCORD', 'ENABLE'): + # dis_message = (self.dis_title + "Powered Up").strip() + # sendDis(dis_message, self.config) + + # Set Variables to compare to next check + self.last_track = self.track + self.last_feeding = self.feeding + self.last_on_ground = self.on_ground + self.last_below_desired_ft = self.below_desired_ft + self.last_longitude = self.longitude + self.last_latitude = self.latitude + self.last_nav_modes = self.nav_modes + self.last_sel_alt = self.sel_nav_alt + + if self.takeoff_time != None: + elapsed_time = datetime.utcnow() - self.takeoff_time + hours, remainder = divmod(elapsed_time.total_seconds(), 3600) + minutes, seconds = divmod(remainder, 60) + print( + ( + f"Time Since Take off {int(hours)} Hours : {int(minutes)} Mins : {int(seconds)} Secs" + ) + ) + self.printheader("foot") + + def check_new_ras(self, ras): + for ra in ras: + if ( + self.recent_ra_types == {} + or ra["acas_ra"]["advisory"] not in self.recent_ra_types.keys() + ): + self.recent_ra_types[ra["acas_ra"]["advisory"]] = ra["acas_ra"][ + "unix_timestamp" + ] + ra_message = f"TCAS Resolution Advisory: {ra['acas_ra']['advisory']}" + if ra["acas_ra"]["advisory_complement"] != "": + ra_message += f", {ra['acas_ra']['advisory_complement']}" + if bool(int(ra["acas_ra"]["MTE"])): + ra_message += ", Multi threat" + from ss import ( + get_adsbx_screenshot, + generate_adsbx_screenshot_time_params, + ) + + url_params = f"&lat={ra['lat']}&lon={ra['lon']}&zoom=11&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.get_adsbx_map_overlays()}" + if "threat_id_hex" in ra["acas_ra"].keys(): + threat_reg = get_aircraft_reg_by_icao( + ra["acas_ra"]["threat_id_hex"] + ) + threat_id = ( + threat_reg + if threat_reg is not None + else "ICAO: " + ra["acas_ra"]["threat_id_hex"] + ) + ra_message += f", invader: {threat_id}" + url_params += ( + generate_adsbx_screenshot_time_params( + ra["acas_ra"]["unix_timestamp"] + ) + + f"&icao={ra['acas_ra']['threat_id_hex']},{self.icao.lower()}×tamp={ra['acas_ra']['unix_timestamp']}" + ) + else: + url_params += f"&icao={self.icao.lower()}&noIsolation" + print(url_params) + get_adsbx_screenshot(self.map_file_name, url_params, True, True) + + if self.config.getboolean("DISCORD", "ENABLE"): + dis_message = f"{self.dis_title} {ra_message}" + role_id = ( + self.config.get("DISCORD", "ROLE_ID") + if self.config.has_option("DISCORD", "ROLE_ID") + else None + ) + send_discord_message( + dis_message, self.config, self.map_file_name, role_id=role_id + ) + # if twitter + + def expire_ra_types(self): + if self.recent_ra_types != {}: + for ra_type, postime in self.recent_ra_types.copy().items(): + timestamp = datetime.fromtimestamp(postime) + time_since_ra = datetime.now() - timestamp + print(time_since_ra) + if time_since_ra.seconds >= 600: + print(ra_type) + self.recent_ra_types.pop(ra_type) diff --git a/planeClass.py b/planeClass.py deleted file mode 100644 index 83bb786..0000000 --- a/planeClass.py +++ /dev/null @@ -1,640 +0,0 @@ -from datetime import datetime, timedelta -class Plane: - import configparser - main_config = configparser.ConfigParser() - main_config.read('./configs/mainconf.ini') - def __init__(self, icao, config_path, config): - """Initializes a plane object from its config file and given icao.""" - self.icao = icao.upper() - self.callsign = None - self.reg = None - self.config = config - self.conf_file_path = config_path - self.alt_ft = None - self.below_desired_ft = None - self.last_below_desired_ft = None - self.feeding = None - self.last_feeding = None - self.last_on_ground = None - self.on_ground = None - self.longitude = None - self.latitude = None - self.takeoff_time = None - import tempfile - self.map_file_name = f"{tempfile.gettempdir()}/{icao.upper()}_map.png" - self.last_latitude = None - self.last_longitude = None - self.last_pos_datetime = None - self.landing_plausible = False - self.nav_modes = None - self.last_nav_modes = None - self.speed = None - self.recent_ra_types = {} - self.db_flags = None - self.sel_nav_alt = None - self.last_sel_alt = None - self.squawk = None - self.emergency_already_triggered = None - self.last_emergency = None - self.recheck_route_time = None - self.known_to_airport = None - self.track = None - self.last_track = None - self.circle_history = None - if self.config.has_option('DATA', 'DATA_LOSS_MINS'): - self.data_loss_mins = self.config.getint('DATA', 'DATA_LOSS_MINS') - else: - self.data_loss_mins = Plane.main_config.getint('DATA', 'DATA_LOSS_MINS') - #Setup Tweepy - if self.config.getboolean('TWITTER', 'ENABLE'): - from defTweet import tweepysetup - self.tweet_api = tweepysetup(self.config) - #Setup PushBullet - if self.config.getboolean('PUSHBULLET', 'ENABLE'): - from pushbullet import Pushbullet - self.pb = Pushbullet(self.config['PUSHBULLET']['API_KEY']) - self.pb_channel = self.pb.get_channel(self.config.get('PUSHBULLET', 'CHANNEL_TAG')) - def run_opens(self, ac_dict): - #Parse OpenSky Vector - from colorama import Fore, Back, Style - self.printheader("head") - #print (Fore.YELLOW + "OpenSky Sourced Data: ", ac_dict) - try: - self.__dict__.update({'icao' : ac_dict.icao24.upper(), 'callsign' : ac_dict.callsign, 'latitude' : ac_dict.latitude, 'longitude' : ac_dict.longitude, 'on_ground' : bool(ac_dict.on_ground), 'squawk' : ac_dict.squawk, 'track' : float(ac_dict.heading)}) - if ac_dict.baro_altitude != None: - self.alt_ft = round(float(ac_dict.baro_altitude) * 3.281) - elif self.on_ground: - self.alt_ft = 0 - from mictronics_parse import get_aircraft_reg_by_icao, get_type_code_by_icao - self.reg = get_aircraft_reg_by_icao(self.icao) - self.type = get_type_code_by_icao(self.icao) - self.last_pos_datetime = datetime.fromtimestamp(ac_dict.time_position) - except ValueError as e: - print("Got data but some data is invalid!") - print(e) - self.printheader("foot") - else: - self.feeding = True - self.run_check() - def run_adsbx_v1(self, ac_dict): - #Parse ADBSX V1 Vector - from colorama import Fore, Back, Style - self.printheader("head") - #print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL) - try: - #postime is divided by 1000 to get seconds from milliseconds, from timestamp expects secs. - self.__dict__.update({'icao' : ac_dict['icao'].upper(), 'callsign' : ac_dict['call'], 'reg' : ac_dict['reg'], 'latitude' : float(ac_dict['lat']), 'longitude' : float(ac_dict['lon']), 'alt_ft' : int(ac_dict['alt']), 'on_ground' : bool(int(ac_dict["gnd"])), 'squawk' : ac_dict['sqk'], 'track' : float(ac_dict["trak"])}) - if self.on_ground: - self.alt_ft = 0 - self.last_pos_datetime = datetime.fromtimestamp(int(ac_dict['postime'])/1000) - except ValueError as e: - - print("Got data but some data is invalid!") - print(e) - print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL) - self.printheader("foot") - else: - self.feeding = True - self.run_check() - - def run_adsbx_v2(self, ac_dict): - #Parse ADBSX V2 Vector - from colorama import Fore, Back, Style - self.printheader("head") - print(ac_dict) - try: - self.__dict__.update({'icao' : ac_dict['hex'].upper(), 'latitude' : float(ac_dict['lat']), 'longitude' : float(ac_dict['lon']), 'speed': ac_dict['gs']}) - if "r" in ac_dict: - self.reg = ac_dict['r'] - if "t" in ac_dict: - self.type = ac_dict['t'] - if ac_dict['alt_baro'] != "ground": - self.alt_ft = int(ac_dict['alt_baro']) - self.on_ground = False - elif ac_dict['alt_baro'] == "ground": - self.alt_ft = 0 - self.on_ground = True - if ac_dict.get('flight') is not None: - self.callsign = ac_dict.get('flight').strip() - if ac_dict.get('dbFlags') is not None: - self.db_flags = ac_dict['dbFlags'] - if 'nav_modes' in ac_dict: - self.nav_modes = ac_dict['nav_modes'] - for idx, mode in enumerate(self.nav_modes): - if mode.upper() in ['TCAS', 'LNAV', 'VNAV']: - self.nav_modes[idx] = self.nav_modes[idx].upper() - else: - self.nav_modes[idx] = self.nav_modes[idx].capitalize() - self.squawk = ac_dict.get('squawk') - if "track" in ac_dict: - self.track = ac_dict['track'] - if "nav_altitude_fms" in ac_dict: - self.sel_nav_alt = ac_dict['nav_altitude_fms'] - elif "nav_altitude_mcp" in ac_dict: - self.sel_nav_alt = ac_dict['nav_altitude_mcp'] - else: - self.sel_nav_alt = None - - #Create last seen timestamp from how long ago in secs a pos was rec - self.last_pos_datetime = datetime.now() - timedelta(seconds= ac_dict["seen_pos"]) - except (ValueError, KeyError) as e: - - print("Got data but some data is invalid!") - print(e) - print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL) - self.printheader("foot") - else: - #Error Handling for bad data, sometimes it would seem to be ADSB Decode error - if (not self.on_ground) and self.speed <= 10: - print("Not running check, appears to be bad ADSB Decode") - else: - self.feeding = True - self.run_check() - def __str__(self): - from colorama import Fore, Back, Style - from tabulate import tabulate - if self.last_pos_datetime is not None: - time_since_contact = self.get_time_since(self.last_pos_datetime) - output = [ - [(Fore.CYAN + "ICAO" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + self.icao + Style.RESET_ALL)], - [(Fore.CYAN + "Callsign" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + self.callsign + Style.RESET_ALL)] if self.callsign is not None else None, - [(Fore.CYAN + "Reg" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + self.reg + Style.RESET_ALL)] if self.reg is not None else None, - [(Fore.CYAN + "Squawk" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + self.squawk + Style.RESET_ALL)] if self.squawk is not None else None, - [(Fore.CYAN + "Coordinates" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + str(self.latitude) + ", " + str(self.longitude) + Style.RESET_ALL)] if self.latitude is not None and self.longitude is not None else None, - [(Fore.CYAN + "Last Contact" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + str(time_since_contact).split(".")[0]+ Style.RESET_ALL)] if self.last_pos_datetime is not None else None, - [(Fore.CYAN + "On Ground" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + str(self.on_ground) + Style.RESET_ALL)] if self.on_ground is not None else None, - [(Fore.CYAN + "Baro Altitude" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + str("{:,} ft".format(self.alt_ft)) + Style.RESET_ALL)] if self.alt_ft is not None else None, - [(Fore.CYAN + "Nav Modes" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + ', '.join(self.nav_modes) + Style.RESET_ALL)] if "nav_modes" in self.__dict__ and self.nav_modes != None else None, - [(Fore.CYAN + "Sel Alt Ft" + Style.RESET_ALL), (Fore.LIGHTGREEN_EX + str("{:,} ft".format(self.sel_nav_alt)) + Style.RESET_ALL)] if "sel_nav_alt" in self.__dict__ and self.sel_nav_alt is not None else None - ] - output = list(filter(None, output)) - return tabulate(output, [], 'fancy_grid') - def printheader(self, type): - from colorama import Fore, Back, Style - if type == "head": - header = str("--------- " + self.conf_file_path + " ---------------------------- ICAO: " + self.icao + " ---------------------------------------") - elif type == "foot": - header = "----------------------------------------------------------------------------------------------------" - print(Back.MAGENTA + header[0:100] + Style.RESET_ALL) - def get_time_since(self, datetime_obj): - if datetime_obj != None: - time_since = datetime.now() - datetime_obj - else: - time_since = None - return time_since - def get_adsbx_map_overlays(self): - if self.config.has_option('MAP', 'OVERLAYS'): - overlays = self.config.get('MAP', 'OVERLAYS') - else: - overlays = "" - return overlays - def route_info(self): - from lookup_route import lookup_route, clean_data - def route_format(extra_route_info, type): - from defAirport import get_airport_by_icao - to_airport = get_airport_by_icao(self.known_to_airport) - code = to_airport['iata_code'] if to_airport['iata_code'] != "" else to_airport['icao'] - airport_text = f"{code}, {to_airport['name']}" - if 'time_to' in extra_route_info.keys() and type != "divert": - arrival_rel = "in ~" + extra_route_info['time_to'] - else: - arrival_rel = None - if self.known_to_airport != self.nearest_from_airport: - if type == "inital": - header = "Going to" - elif type == "change": - header = "Now going to" - elif type == "divert": - header = "Now diverting to" - area = f"{to_airport['municipality']}, {to_airport['region']}, {to_airport['iso_country']}" - route_to = f"{header} {area} ({airport_text})" + (f" arriving {arrival_rel}" if arrival_rel is not None else "") - else: - if type == "inital": - header = "Will be returning to" - elif type == "change": - header = "Now returning to" - elif type == "divert": - header = "Now diverting back to" - route_to = f"{header} {airport_text}" + (f" {arrival_rel}" if arrival_rel is not None else "") - return route_to - if hasattr(self, "type"): - extra_route_info = clean_data(lookup_route(self.reg, (self.latitude, self.longitude), self.type, self.alt_ft)) - else: - extra_route_info = None - route_to = None - if extra_route_info is None: - pass - elif extra_route_info is not None: - #Diversion - if "divert_icao" in extra_route_info.keys(): - if self.known_to_airport != extra_route_info["divert_icao"]: - self.known_to_airport = extra_route_info['divert_icao'] - route_to = route_format(extra_route_info, "divert") - #Destination - elif "dest_icao" in extra_route_info.keys(): - #Inital Destination Found - if self.known_to_airport is None: - self.known_to_airport = extra_route_info['dest_icao'] - route_to = route_format(extra_route_info, "inital") - #Destination Change - elif self.known_to_airport != extra_route_info["dest_icao"]: - self.known_to_airport = extra_route_info['dest_icao'] - route_to = route_format(extra_route_info, "change") - - return route_to - def run_empty(self): - self.printheader("head") - self.feeding = False - self.run_check() - def run_check(self): - """Runs a check of a plane module to see if its landed or takenoff using plane data, and takes action if so.""" - print(self) - #Ability to Remove old Map - import os - from colorama import Fore, Style - from tabulate import tabulate - #Proprietary Route Lookup - if os.path.isfile("lookup_route.py") and (self.db_flags is None or not self.db_flags & 1): - from lookup_route import lookup_route - ENABLE_ROUTE_LOOKUP = True - else: - ENABLE_ROUTE_LOOKUP = False - if self.config.getboolean('DISCORD', 'ENABLE'): - from defDiscord import sendDis - if self.last_pos_datetime is not None: - time_since_contact = self.get_time_since(self.last_pos_datetime) -#Check if below desire ft - desired_ft = 15000 - if self.alt_ft is None or self.alt_ft > desired_ft: - self.below_desired_ft = False - elif self.alt_ft < desired_ft: - self.below_desired_ft = True -#Check if tookoff - if self.below_desired_ft and self.on_ground is False: - if self.last_on_ground: - self.tookoff = True - trigger_type = "no longer on ground" - type_header = "Took off from" - elif self.last_feeding is False and self.feeding and self.landing_plausible == False: - from defAirport import getClosestAirport - nearest_airport_dict = getClosestAirport(self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES")) - if nearest_airport_dict['elevation_ft'] != "": - alt_above_airport = (self.alt_ft - int(nearest_airport_dict['elevation_ft'])) - print(f"AGL nearest airport: {alt_above_airport}") - else: - alt_above_airport = None - if (alt_above_airport != None and alt_above_airport <= 10000) or self.alt_ft <= 15000: - self.tookoff = True - trigger_type = "data acquisition" - type_header = "Took off near" - else: - self.tookoff = False - else: - self.tookoff = False - -#Check if Landed - if self.on_ground and self.last_on_ground is False and self.last_below_desired_ft: - self.landed = True - trigger_type = "now on ground" - type_header = "Landed in" - self.landing_plausible = False - #Set status for landing plausible - elif self.below_desired_ft and self.last_feeding and self.feeding is False and self.last_on_ground is False: - self.landing_plausible = True - print("Near landing conditions, if contiuned data loss for configured time, and if under 10k AGL landing true") - - elif self.landing_plausible and self.feeding is False and time_since_contact.total_seconds() >= (self.data_loss_mins * 60): - from defAirport import getClosestAirport - nearest_airport_dict = getClosestAirport(self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES")) - if nearest_airport_dict['elevation_ft'] != "": - alt_above_airport = (self.alt_ft - int(nearest_airport_dict['elevation_ft'])) - print(f"AGL nearest airport: {alt_above_airport}") - else: - alt_above_airport = None - if (alt_above_airport != None and alt_above_airport <= 10000) or self.alt_ft <= 15000: - self.landing_plausible = False - self.on_ground = None - self.landed = True - trigger_type = "data loss" - type_header = "Landed near" - else: - print("Alt greater then 10k AGL") - self.landing_plausible = False - self.on_ground = None - else: - self.landed = False - - if self.landed: - print ("Landed by", trigger_type) - if self.tookoff: - print("Tookoff by", trigger_type) - #Find nearest airport, and location - if self.landed or self.tookoff: - from defAirport import getClosestAirport - if "nearest_airport_dict" in globals(): - pass #Airport already set - elif trigger_type in ["now on ground", "data acquisition", "data loss"]: - nearest_airport_dict = getClosestAirport(self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES")) - elif trigger_type == "no longer on ground": - nearest_airport_dict = getClosestAirport(self.last_latitude, self.last_longitude, self.config.get("AIRPORT", "TYPES")) - #Convert dictionary keys to sep variables - country_code = nearest_airport_dict['iso_country'] - state = nearest_airport_dict['region'].strip() - municipality = nearest_airport_dict['municipality'].strip() - if municipality == "" or state == "" or municipality == state: - if municipality != "": - area = municipality - elif state != "": - area = state - else: - area = "" - else: - area = f"{municipality}, {state}" - location_string = (f"{area}, {country_code}") - print (Fore.GREEN + "Country Code:", country_code, "State:", state, "Municipality:", municipality + Style.RESET_ALL) - title_switch = { - "reg": self.reg, - "callsign": self.callsign, - "icao": self.icao, - } - #Set Discord Title - if self.config.getboolean('DISCORD', 'ENABLE'): - self.dis_title = (title_switch.get(self.config.get('DISCORD', 'TITLE')) or "NA").strip() if self.config.get('DISCORD', 'TITLE') in title_switch.keys() else self.config.get('DISCORD', 'TITLE') - #Set Twitter Title - if self.config.getboolean('TWITTER', 'ENABLE'): - self.twitter_title = (title_switch.get(self.config.get('TWITTER', 'TITLE')) or "NA") if self.config.get('TWITTER', 'TITLE') in title_switch.keys() else self.config.get('TWITTER', 'TITLE') - #Takeoff and Land Notification - if self.tookoff or self.landed: - route_to = None - if self.tookoff: - self.takeoff_time = datetime.utcnow() - landed_time_msg = None - #Proprietary Route Lookup - if ENABLE_ROUTE_LOOKUP: - self.nearest_from_airport = nearest_airport_dict['icao'] - route_to = self.route_info() - if route_to is None: - self.recheck_route_time = 1 - else: - self.recheck_route_time = 10 - elif self.landed and self.takeoff_time != None: - landed_time = datetime.utcnow() - self.takeoff_time - if trigger_type == "data loss": - landed_time -= timedelta(seconds=time_since_contact.total_seconds()) - hours, remainder = divmod(landed_time.total_seconds(), 3600) - minutes, seconds = divmod(remainder, 60) - min_syntax = "Mins" if minutes > 1 else "Min" - if hours > 0: - hour_syntax = "Hours" if hours > 1 else "Hour" - landed_time_msg = (f"Apx. flt. time {int(hours)} {hour_syntax}" + (f" : {int(minutes)} {min_syntax}. " if minutes > 0 else ".")) - else: - landed_time_msg = (f"Apx. flt. time {int(minutes)} {min_syntax}.") - self.takeoff_time = None - elif self.landed: - landed_time_msg = None - message = (f"{type_header} {location_string}.") + ("" if route_to is None else f" {route_to}.") + ((f" {landed_time_msg}") if landed_time_msg != None else "") - print (message) - #Google Map or tar1090 screenshot - if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP": - from defMap import getMap - getMap((municipality + ", " + state + ", " + country_code), self.map_file_name) - elif self.config.get('MAP', 'OPTION') == "ADSBX": - from defSS import get_adsbx_screenshot - - url_params = f"icao={self.icao}&zoom=9&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays=" + self.get_adsbx_map_overlays() - get_adsbx_screenshot(self.map_file_name, url_params) - from modify_image import append_airport - append_airport(self.map_file_name, nearest_airport_dict) - else: - raise ValueError("Map option not set correctly in this planes conf") - #Discord - if self.config.getboolean('DISCORD', 'ENABLE'): - dis_message = f"{self.dis_title} {message}".strip() - role_id = self.config.get('DISCORD', 'ROLE_ID') if self.config.has_option('DISCORD', 'ROLE_ID') else None - sendDis(dis_message, self.config, self.map_file_name, role_id = role_id) - #PushBullet - if self.config.getboolean('PUSHBULLET', 'ENABLE'): - with open(self.map_file_name, "rb") as pic: - map_data = self.pb.upload_file(pic, "Tookoff IMG" if self.tookoff else "Landed IMG") - self.pb_channel.push_note(self.config.get('PUSHBULLET', 'TITLE'), message) - self.pb_channel.push_file(**map_data) - #Twitter - if self.config.getboolean('TWITTER', 'ENABLE'): - twitter_media_map_obj = self.tweet_api.media_upload(self.map_file_name) - alt_text = f"Reg: {self.reg} On Ground: {str(self.on_ground)} Alt: {str(self.alt_ft)} Last Contact: {str(time_since_contact)} Trigger: {trigger_type}" - self.tweet_api.create_media_metadata(media_id= twitter_media_map_obj.media_id, alt_text= alt_text) - self.latest_tweet_id = self.tweet_api.update_status(status = ((self.twitter_title + " " + message).strip()), media_ids=[twitter_media_map_obj.media_id]).id - os.remove(self.map_file_name) - if self.landed: - self.latest_tweet_id = None - self.recheck_route_time = None - self.known_to_airport = None - self.nearest_from_airport = None - #Recheck Proprietary Route Info. - if self.takeoff_time is not None and self.recheck_route_time is not None and (datetime.utcnow() - self.takeoff_time).total_seconds() > 60 * self.recheck_route_time: - self.recheck_route_time += 10 - route_to = self.route_info() - if route_to != None: - print(route_to) - #Discord - if self.config.getboolean('DISCORD', 'ENABLE'): - dis_message = f"{self.dis_title} {route_to}".strip() - role_id = self.config.get('DISCORD', 'ROLE_ID') if self.config.has_option('DISCORD', 'ROLE_ID') else None - sendDis(dis_message, self.config, role_id = role_id) - #Twitter - if self.config.getboolean('TWITTER', 'ENABLE'): - #tweet = self.tweet_api.user_timeline(count = 1)[0] - self.latest_tweet_id = self.tweet_api.update_status(status = f"{self.twitter_title} {route_to}".strip(), in_reply_to_status_id = self.latest_tweet_id).id - - if self.circle_history is not None: - #Expires traces for circles - if self.circle_history["traces"] != []: - for trace in self.circle_history["traces"]: - if (datetime.now() - datetime.fromtimestamp(trace[0])).total_seconds() >= 20*60: - print("Trace Expire, removed") - self.circle_history["traces"].remove(trace) - #Expire touchngo - if "touchngo" in self.circle_history.keys() and (datetime.now() - datetime.fromtimestamp(self.circle_history['touchngo'])).total_seconds() >= 10*60: - self.circle_history.pop("touchngo") - if self.feeding: - #Squawks - emergency_squawks ={"7500" : "Hijacking", "7600" :"Radio Failure", "7700" : "General Emergency"} - seen = datetime.now() - self.last_pos_datetime - #Only run check if emergency data previously set - if self.last_emergency is not None and not self.emergency_already_triggered: - time_since_org_emer = datetime.now() - self.last_emergency[0] - #Checks times to see x time and still same squawk - if time_since_org_emer.total_seconds() >= 60 and self.last_emergency[1] == self.squawk and seen.total_seconds() <= 60: - self.emergency_already_triggered = True - squawk_message = (f"{self.dis_title} Squawking {self.last_emergency[1]} {emergency_squawks[self.squawk]}").strip() - print(squawk_message) - #Google Map or tar1090 screenshot - if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP": - getMap((municipality + ", " + state + ", " + country_code), self.map_file_name) - if self.config.get('MAP', 'OPTION') == "ADSBX": - from defSS import get_adsbx_screenshot - url_params = f"icao={self.icao}&zoom=9&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays=" + self.get_adsbx_map_overlays() - get_adsbx_screenshot(self.map_file_name, url_params) - if self.config.getboolean('DISCORD', 'ENABLE'): - dis_message = (self.dis_title + " " + squawk_message) - sendDis(dis_message, self.config, self.map_file_name) - os.remove(self.map_file_name) - #Realizes first time seeing emergency, stores time and type - elif self.squawk in emergency_squawks.keys() and not self.emergency_already_triggered and not self.on_ground: - print("Emergency", self.squawk, "detected storing code and time and waiting to trigger") - self.last_emergency = (self.last_pos_datetime, self.squawk) - elif self.squawk not in emergency_squawks.keys() and self.emergency_already_triggered: - self.emergency_already_triggered = None - - #Nav Modes Notifications - if self.nav_modes != None and self.last_nav_modes != None: - for mode in self.nav_modes: - if mode not in self.last_nav_modes: - #Discord - print(mode, "enabled") - if self.config.getboolean('DISCORD', 'ENABLE'): - dis_message = (self.dis_title + " " + mode + " mode enabled.") - if mode == "Approach": - from defSS import get_adsbx_screenshot - url_params = f"icao={self.icao}&zoom=9&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.get_adsbx_map_overlays()}" - get_adsbx_screenshot(self.map_file_name, url_params) - sendDis(dis_message, self.config, self.map_file_name) - #elif mode in ["Althold", "VNAV", "LNAV"] and self.sel_nav_alt != None: - # sendDis((dis_message + ", Sel Alt. " + str(self.sel_nav_alt) + ", Current Alt. " + str(self.alt_ft)), self.config) - else: - sendDis(dis_message, self.config) - #Selected Altitude - if self.sel_nav_alt is not None and self.last_sel_alt is not None and self.last_sel_alt != self.sel_nav_alt: - #Discord - print("Nav altitude is now", self.sel_nav_alt) - if self.config.getboolean('DISCORD', 'ENABLE'): - dis_message = (self.dis_title + " Sel. alt. " + str("{:,} ft".format(self.sel_nav_alt))) - sendDis(dis_message,self.config) - #Circling - if self.last_track is not None: - import time - if self.circle_history is None: - self.circle_history = {"traces" : [], "triggered" : False} - #Add touchngo - if self.on_ground or self.alt_ft <= 500: - self.circle_history["touchngo"] = time.time() - #Add a Trace - if self.on_ground is False: - from calculate_headings import calculate_deg_change - track_change = calculate_deg_change(self.track, self.last_track) - track_change = round(track_change, 3) - self.circle_history["traces"].append((time.time(), self.latitude, self.longitude, track_change)) - - total_change = 0 - coords = [] - for trace in self.circle_history["traces"]: - total_change += float(trace[3]) - coords.append((float(trace[1]), float(trace[2]))) - - print("Total Bearing Change", round(total_change, 3)) - #Check Centroid when Bearing change meets req - if abs(total_change) >= 720 and self.circle_history['triggered'] is False: - print("Circling Bearing Change Met") - from shapely.geometry import MultiPoint - from geopy.distance import geodesic - aircraft_coords = (self.latitude, self.longitude) - points = MultiPoint(coords) - cent = (points.centroid) #True centroid, not necessarily an existing point - #rp = (points.representative_point()) #A represenative point, not centroid, - print(cent) - #print(rp) - distance_to_centroid = geodesic(aircraft_coords, cent.coords).mi - print(f"Distance to centroid of circling coordinates {distance_to_centroid} miles") - if distance_to_centroid <= 15: - print("Within 15 miles of centroid, CIRCLING") - from defAirport import getClosestAirport - nearest_airport_dict = getClosestAirport(self.latitude, self.longitude, ["small_airport", "medium_airport", "large_airport"]) - from calculate_headings import calculate_from_bearing, calculate_cardinal - from_bearing = calculate_from_bearing((float(nearest_airport_dict['latitude_deg']), float(nearest_airport_dict['longitude_deg'])), (self.latitude, self.longitude)) - cardinal = calculate_cardinal(from_bearing) - from defSS import get_adsbx_screenshot - url_params = f"icao={self.icao}&zoom=10&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.get_adsbx_map_overlays()}" - get_adsbx_screenshot(self.map_file_name, url_params) - if nearest_airport_dict['distance_mi'] < 3: - if "touchngo" in self.circle_history.keys(): - message = f"Doing touch and goes at {nearest_airport_dict['icao']}" - else: - message = f"Circling over {nearest_airport_dict['icao']} at {self.alt_ft}ft" - else: - message = f"Circling {round(nearest_airport_dict['distance_mi'], 2)}mi {cardinal} of {nearest_airport_dict['icao']}, {nearest_airport_dict['name']} at {self.alt_ft}ft" - print(message) - if self.config.getboolean('DISCORD', 'ENABLE'): - role_id = self.config.get('DISCORD', 'ROLE_ID') if self.config.has_option('DISCORD', 'ROLE_ID') else None - sendDis(message, self.config, self.map_file_name, role_id) - if self.config.getboolean('TWITTER', 'ENABLE'): - twitter_media_map_obj = self.tweet_api.media_upload(self.map_file_name) - alt_text = f"Distance to centroid: {distance_to_centroid}, Total change: {total_change}" - self.tweet_api.create_media_metadata(media_id= twitter_media_map_obj.media_id, alt_text= alt_text) - tweet = self.tweet_api.user_timeline(count = 1)[0] - self.latest_tweet_id = self.tweet_api.update_status(status = f"{self.twitter_title} {message}".strip(), in_reply_to_status_id = self.latest_tweet_id, media_ids=[twitter_media_map_obj.media_id]).id - - self.circle_history['triggered'] = True - elif abs(total_change) <= 360 and self.circle_history["triggered"]: - print("No Longer Circling, trigger cleared") - self.circle_history['triggered'] = False - # #Power Up - # if self.last_feeding == False and self.speed == 0 and self.on_ground: - # if self.config.getboolean('DISCORD', 'ENABLE'): - # dis_message = (self.dis_title + "Powered Up").strip() - # sendDis(dis_message, self.config) - - -#Set Variables to compare to next check - self.last_track = self.track - self.last_feeding = self.feeding - self.last_on_ground = self.on_ground - self.last_below_desired_ft = self.below_desired_ft - self.last_longitude = self.longitude - self.last_latitude = self.latitude - self.last_nav_modes = self.nav_modes - self.last_sel_alt = self.sel_nav_alt - - - if self.takeoff_time != None: - elapsed_time = datetime.utcnow() - self.takeoff_time - hours, remainder = divmod(elapsed_time.total_seconds(), 3600) - minutes, seconds = divmod(remainder, 60) - print((f"Time Since Take off {int(hours)} Hours : {int(minutes)} Mins : {int(seconds)} Secs")) - self.printheader("foot") - def check_new_ras(self, ras): - for ra in ras: - if self.recent_ra_types == {} or ra['acas_ra']['advisory'] not in self.recent_ra_types.keys(): - self.recent_ra_types[ra['acas_ra']['advisory']] = ra['acas_ra']['unix_timestamp'] - ra_message = f"TCAS Resolution Advisory: {ra['acas_ra']['advisory']}" - if ra['acas_ra']['advisory_complement'] != "": - ra_message += f", {ra['acas_ra']['advisory_complement']}" - if bool(int(ra['acas_ra']['MTE'])): - ra_message += ", Multi threat" - from defSS import get_adsbx_screenshot, generate_adsbx_screenshot_time_params - url_params = f"&lat={ra['lat']}&lon={ra['lon']}&zoom=11&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.get_adsbx_map_overlays()}" - if "threat_id_hex" in ra['acas_ra'].keys(): - from mictronics_parse import get_aircraft_reg_by_icao - threat_reg = get_aircraft_reg_by_icao(ra['acas_ra']['threat_id_hex']) - threat_id = threat_reg if threat_reg is not None else "ICAO: " + ra['acas_ra']['threat_id_hex'] - ra_message += f", invader: {threat_id}" - url_params += generate_adsbx_screenshot_time_params(ra['acas_ra']['unix_timestamp']) + f"&icao={ra['acas_ra']['threat_id_hex']},{self.icao.lower()}×tamp={ra['acas_ra']['unix_timestamp']}" - else: - url_params += f"&icao={self.icao.lower()}&noIsolation" - print(url_params) - get_adsbx_screenshot(self.map_file_name, url_params, True, True) - - if self.config.getboolean('DISCORD', 'ENABLE'): - from defDiscord import sendDis - dis_message = f"{self.dis_title} {ra_message}" - role_id = self.config.get('DISCORD', 'ROLE_ID') if self.config.has_option('DISCORD', 'ROLE_ID') else None - sendDis(dis_message, self.config, self.map_file_name, role_id = role_id) - #if twitter - def expire_ra_types(self): - if self.recent_ra_types != {}: - for ra_type, postime in self.recent_ra_types.copy().items(): - timestamp = datetime.fromtimestamp(postime) - time_since_ra = datetime.now() - timestamp - print(time_since_ra) - if time_since_ra.seconds >= 600: - print(ra_type) - self.recent_ra_types.pop(ra_type) \ No newline at end of file diff --git a/ss.py b/ss.py new file mode 100644 index 0000000..7d68def --- /dev/null +++ b/ss.py @@ -0,0 +1,140 @@ +from selenium import webdriver +from webdriver_manager.chrome import ChromeDriverManager +from selenium.webdriver.support.ui import WebDriverWait + +import time +import os +import platform +import requests +import json +import re + +from datetime import datetime, timedelta + +CHROME_OPTIONS = webdriver.ChromeOptions() +CHROME_OPTIONS.headless = True +CHROME_OPTIONS.add_argument("window-size=800,800") +CHROME_OPTIONS.add_argument("ignore-certificate-errors") +CHROME_OPTIONS.add_argument("--enable-logging --v=1") + +BROWSER = webdriver.Chrome(ChromeDriverManager().install(), options=CHROME_OPTIONS) +BROWSER.set_page_load_timeout(80) + +REMOVE_ID_ELEMENTS = [ + "show_trace", + "credits", + "infoblock_close", + "selected_photo_link", + "history_collapse", +] + +if platform.system() == "Linux" and os.getuid() == 0: + CHROME_OPTIONS.add_argument( + "--no-sandbox" + ) # required when running as root user. otherwise you would get no sandbox errors. + + +def get_adsbx_screenshot( + file_path, url_params, enable_labels=False, enable_track_labels=False +): + url = f"https://globe.adsbexchange.com/?{url_params}" + BROWSER.get(url) + + for element in REMOVE_ID_ELEMENTS: + try: + element = BROWSER.find_element_by_id(element) + BROWSER.execute_script( + """ + var element = arguments[0]; + element.parentNode.removeChild(element); + """, + element, + ) + except: + print("issue removing", element, "from map") + + # Remove watermark on data + try: + BROWSER.execute_script( + "document.getElementById('selected_infoblock').className = 'none';" + ) + except: + print("Couldn't remove watermark from map") + + # Disable slidebar + try: + BROWSER.execute_script("$('#infoblock-container').css('overflow', 'hidden');") + except: + print("Couldn't disable sidebar on map") + + # Remove share + try: + element = BROWSER.find_element_by_xpath("//*[contains(text(), 'Share')]") + BROWSER.execute_script( + """ + var element = arguments[0]; + element.parentNode.removeChild(element); + """, + element, + ) + except: + print("Couldn't remove share button from map") + + # browser.execute_script("toggleFollow()") + if enable_labels is True: + BROWSER.find_element_by_tag_name("body").send_keys("l") + + if enable_track_labels is True: + BROWSER.find_element_by_tag_name("body").send_keys("k") + + WebDriverWait(BROWSER, 40).until( + lambda d: d.execute_script("return jQuery.active == 0") + ) + + try: + photo_box = BROWSER.find_element_by_id("silhouette") + except: + pass + else: + photo_list = json.loads( + requests.get( + "https://raw.githubusercontent.com/Jxck-S/aircraft-photos/main/photo-list.json" + ).text + ) + if "icao" in url_params: + icao = re.search("icao=(.+?)&", url_params).group(1).lower() + print(icao) + + if icao in photo_list.keys(): + BROWSER.execute_script("arguments[0].id = 'airplanePhoto';", photo_box) + BROWSER.execute_script( + f"arguments[0].src = 'https://raw.githubusercontent.com/Jxck-S/aircraft-photos/main/images/{photo_list[icao]['reg']}.jpg';", + photo_box, + ) + copyright = BROWSER.find_element_by_id("copyrightInfo") + BROWSER.execute_script( + "arguments[0].id = 'copyrightInfoFreeze';", copyright + ) + BROWSER.execute_script( + "$('#copyrightInfoFreeze').css('font-size', '12px');" + ) + BROWSER.execute_script( + f"arguments[0].appendChild(document.createTextNode('Image © {photo_list[icao]['photographer']}'))", + copyright, + ) + + time.sleep(5) + BROWSER.save_screenshot(file_path) + + +def generate_adsbx_screenshot_time_params(timestamp): + + timestamp_dt = datetime.utcfromtimestamp(timestamp) + print(timestamp_dt) + start_time = timestamp_dt - timedelta(minutes=1) + time_params = ( + f"&showTrace={timestamp_dt.strftime('%Y-%m-%d')}" + f"&startTime={start_time.strftime('%H:%M:%S')}" + f"&endTime={timestamp_dt.strftime('%H:%M:%S')}" + ) + return time_params diff --git a/tweet.py b/tweet.py new file mode 100644 index 0000000..2c2c679 --- /dev/null +++ b/tweet.py @@ -0,0 +1,15 @@ +import tweepy + +# Authenticate to Twitter +def tweepysetup(config): + # DOCU + # https://realpython.com/twitter-bot-python-tweepy/ + auth = tweepy.OAuthHandler( + config.get("TWITTER", "CONSUMER_KEY"), config.get("TWITTER", "CONSUMER_SECRET") + ) + auth.set_access_token( + config.get("TWITTER", "ACCESS_TOKEN"), + config.get("TWITTER", "ACCESS_TOKEN_SECRET"), + ) + tweet_api = tweepy.API(auth, wait_on_rate_limit=True) + return tweet_api