Browse Source

Relative ETA, Auto toggle route lookup, Crash file to Discord

-Auto route lookup toggle
-Simplify failure, removed failed var rely on data to None instead
-ADSBX data matching efficiency improved, index then loop rather than nested loops
-Send crash log as file to discord
-Deduplicate requst code for adsbx requests
-Discord Tag Role ability
-Print out OpenSky error when one exsist
-Ignore missing elements when screenshoting ADSBX
-Relative ETA to destination added
-Fix issue with RA screenshot, add lat/lon param just incase map breaks
pull/3/head
Jxck-S 4 years ago
parent
commit
d53be29961
  1. 54
      __main__.py
  2. 6
      configs/plane1.ini
  3. 121
      defADSBX.py
  4. 10
      defDiscord.py
  5. 4
      defOpenSky.py
  6. 26
      defSS.py
  7. 83
      planeClass.py

54
__main__.py

@ -24,10 +24,11 @@ import signal
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)
if not os.path.isdir("./dependencies/"):
os.mkdir("./dependencies/")
import sys
sys.path.extend([os.getcwd()])
#Dependency Handling
if not os.path.isdir("./dependencies/"):
os.mkdir("./dependencies/")
required_files = [("Roboto-Regular.ttf", 'https://github.com/googlefonts/roboto/blob/main/src/hinted/Roboto-Regular.ttf?raw=true'), ('airports.csv', 'https://ourairports.com/data/airports.csv'), ('regions.csv', 'https://ourairports.com/data/regions.csv'), ('ADSBX_Logo.png', "https://www.adsbexchange.com/wp-content/uploads/cropped-Stealth.png"), ('Mictronics_db.zip', "https://www.mictronics.de/aircraft-database/indexedDB.php")]
for file in required_files:
file_name = file[0]
@ -50,6 +51,7 @@ if os.path.isfile("./dependencies/" + required_files[4][0]) and not os.path.isfi
from zipfile import ZipFile
with ZipFile("./dependencies/" + required_files[4][0], 'r') as mictronics_db:
mictronics_db.extractall("./dependencies/")
main_config = configparser.ConfigParser()
print(os.getcwd())
main_config.read('./configs/mainconf.ini')
@ -64,6 +66,10 @@ def service_exit(signum, frame):
os.remove(pid_file_path)
raise SystemExit("Service Stop")
signal.signal(signal.SIGTERM, service_exit)
if os.path.isfile("lookup_route.py"):
print("Route lookup is enabled")
else:
print("Route lookup is disabled")
try:
print("Source is set to", source)
@ -101,9 +107,9 @@ try:
import ast
today = datetime.utcnow()
date = today.strftime("%Y/%m/%d")
ras, failed = pull_date_ras(date)
ras = pull_date_ras(date)
sorted_ras = {}
if failed is False and ras != None:
if ras is not None:
#Testing RAs
#if last_ra_count is not None:
# with open('./testing/acastest.json') as f:
@ -136,25 +142,24 @@ try:
else:
raise ValueError("Invalid API Version")
from defADSBX import pull_adsbx
data, failed = pull_adsbx(planes)
if failed == False:
if data['ac'] != None:
data = pull_adsbx(planes)
if data is not None:
if data['ac'] is not None:
data_indexed = {}
for planeData in data['ac']:
data_indexed[planeData[icao_key].upper()] = planeData
for key, obj in planes.items():
has_data = False
for planeData in data['ac']:
if planeData[icao_key].upper() == key:
if api_version == 1:
obj.run_adsbx_v1(planeData)
elif api_version == 2:
obj.run_adsbx_v2(planeData)
has_data = True
break
if has_data is False:
try:
if api_version == 1:
obj.run_adsbx_v1(data_indexed[key.upper()])
elif api_version == 2:
obj.run_adsbx_v2(data_indexed[key.upper()])
except KeyError:
obj.run_empty()
else:
for obj in planes.values():
obj.run_empty()
elif failed:
else:
failed_count += 1
elif source == "OPENS":
from defOpenSky import pull_opensky
@ -190,7 +195,6 @@ try:
footer = "-------- " + str(running_Count) + " -------- " + str(datetime_tz.strftime("%I:%M:%S %p")) + " ------------------------Elapsed Time- " + str(round(elapsed_calc_time, 3)) + " -------------------------------------"
print (Back.GREEN + Fore.BLACK + footer[0:100] + Style.RESET_ALL)
sleep_sec = 30
for i in range(sleep_sec,0,-1):
if i < 10:
@ -208,11 +212,17 @@ except KeyboardInterrupt as e:
sendDis(str("Manual Exit: " + str(e)), main_config)
except Exception as e:
if main_config.getboolean('DISCORD', 'ENABLE'):
from defDiscord import sendDis
sendDis(str("Error Exiting: " + str(traceback.format_exc())), main_config)
try:
os.remove('crash_latest.log')
except OSError:
pass
import logging
logging.basicConfig(filename='crash.log', filemode='a', format='%(asctime)s - %(message)s')
logging.basicConfig(filename='crash_latest.log', filemode='w', format='%(asctime)s - %(message)s')
logging.Formatter.converter = time.gmtime
logging.error(e)
logging.error(str(traceback.format_exc()))
from defDiscord import sendDis
sendDis(str("Error Exiting: " + str(e) + "Failed on " + key), main_config, "crash_latest.log")
raise e
finally:
if platform.system() == "Linux":

6
configs/plane1.ini

@ -30,8 +30,10 @@ API_KEY = apikey
CHANNEL_TAG = channeltag
[DISCORD]
ENABLE = TRUE
ENABLE = FALSE
#WEBHOOK URL https://support.discord.com/hc/en-us/articles/228383668-Intro-to-Webhooks
URL = webhookurl
Title =
#Role to tag optional, the role ID
ROLE_ID =
Title =
USERNAME = plane-notify

121
defADSBX.py

@ -9,6 +9,27 @@ import socket
main_config = configparser.ConfigParser()
main_config.read('./configs/mainconf.ini')
api_version = main_config.get('ADSBX', 'API_VERSION')
def pull(url, headers):
try:
response = requests.get(url, headers = headers)
response.raise_for_status()
except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message:
print("Basic Connection Error")
print(error_message)
response = None
except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message:
print("Connection Error")
print(error_message)
response = None
except Exception as error_message:
print("Connection Error uncaught, basic exception for all")
print(error_message)
response = None
if "response" in locals():
print ("HTTP Status Code:", response.status_code)
return response
def pull_adsbx(planes):
api_version = int(main_config.get('ADSBX', 'API_VERSION'))
if api_version not in [1, 2]:
@ -29,59 +50,24 @@ def pull_adsbx(planes):
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/v2/all"
else:
raise ValueError("Proxy enabled but no host")
return pull(url)
def pull(url):
headers = {
'api-auth': main_config.get('ADSBX', 'API_KEY'),
'Accept-Encoding': 'gzip'
'api-auth': main_config.get('ADSBX', 'API_KEY'),
'Accept-Encoding': 'gzip'
}
try:
response = requests.get(url, headers = headers)
response.raise_for_status()
except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message:
print("Basic Connection Error")
print(error_message)
failed = True
data = None
except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message:
print("Connection Error")
print(error_message)
failed = True
data = None
except Exception as error_message:
print("Connection Error uncaught, basic exception for all")
print(error_message)
failed = True
data = None
else:
if "response" in locals() and response.status_code == 200:
try:
data = json.loads(response.text)
except (json.decoder.JSONDecodeError, ValueError) as error_message:
print("Error with JSON")
if 'data' in locals() and data != None:
print (json.dumps(data, indent = 2))
print(error_message)
failed = True
data = None
except TypeError as error_message:
print("Type Error", error_message)
failed = True
data = None
else:
failed = False
else:
failed = True
data = None
if "response" in locals():
print ("HTTP Status Code:", response.status_code)
if failed is False:
response = pull(url, headers)
if response is not None:
try:
if data['msg'] != "No error":
data = json.loads(response.text)
except (json.decoder.JSONDecodeError, ValueError) as error_message:
print("Error with JSON")
print(error_message)
data = None
except TypeError as error_message:
print("Type Error", error_message)
data = None
else:
if "msg" in data.keys() and data['msg'] != "No error":
raise ValueError("Error from ADSBX: msg = ", data['msg'])
except KeyError:
pass
if "ctime" in data.keys():
data_ctime = float(data['ctime']) / 1000.0
print("Data ctime:",datetime.utcfromtimestamp(data_ctime))
@ -89,37 +75,18 @@ def pull(url):
data_now = float(data['now']) / 1000.0
print("Data now time:",datetime.utcfromtimestamp(data_now))
print("Current UTC:", datetime.utcnow())
return data, failed
else:
data = None
return data
def pull_date_ras(date):
url = f"https://globe.adsbexchange.com/globe_history/{date}/acas/acas.json"
headers = {
'Accept-Encoding': 'gzip'
}
try:
response = requests.get(url, headers = headers)
response.raise_for_status()
except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message:
print("Basic Connection Error")
print(error_message)
failed = True
data = None
except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message:
print("Connection Error")
print(error_message)
failed = True
data = None
except Exception as error_message:
print("Connection Error uncaught, basic exception for all")
print(error_message)
failed = True
data = None
response = pull(url, headers)
if response is not None:
data = response.text.splitlines()
else:
if "response" in locals() and response.status_code == 200:
failed = False
data = response.text.splitlines()
else:
failed = True
data = None
if "response" in locals():
print ("HTTP Status Code:", response.status_code)
return data, failed
data = None
return data

10
defDiscord.py

@ -1,10 +1,12 @@
def sendDis(message, config, image_name = None):
def sendDis(message, config, file_name = None, role_id = None):
import requests
from discord_webhook import DiscordWebhook
if role_id != None:
message += f" <@&{role_id}>"
webhook = DiscordWebhook(url=config.get('DISCORD', 'URL'), content=message[0:1999], username=config.get('DISCORD', 'USERNAME'))
if image_name != None:
with open(image_name, "rb") as f:
webhook.add_file(file=f.read(), filename='map.png')
if file_name != None:
with open(file_name, "rb") as f:
webhook.add_file(file=f.read(), filename=file_name)
try:
webhook.execute()
except requests.Exceptions:

4
defOpenSky.py

@ -11,7 +11,7 @@ def pull_opensky(planes):
icao_array.append(key.lower())
try:
planeData = opens_api.get_states(time_secs=0, icao24=icao_array)
except:
print ("OpenSky Error")
except Exception as e:
print ("OpenSky Error", e)
failed = True
return planeData, failed

26
defSS.py

@ -19,17 +19,27 @@ def get_adsbx_screenshot(file_path, url_params, enable_labels=False, enable_trac
browser.get(url)
remove_id_elements = ["show_trace", "credits", 'infoblock_close', 'selected_photo_link', "history_collapse"]
for element in remove_id_elements:
element = browser.find_element_by_id(element)
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element)
element = browser.find_elements_by_class_name("infoHeading")
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element[19])
try:
element = browser.find_element_by_id(element)
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element)
except:
print("issue removing", element, "from map")
#Remove watermark on data
browser.execute_script("document.getElementById('selected_infoblock').className = 'none';")
try:
browser.execute_script("document.getElementById('selected_infoblock').className = 'none';")
except:
print("Couldn't remove watermark from map")
#Disable slidebar
browser.execute_script("$('#infoblock-container').css('overflow', 'hidden');")
try:
browser.execute_script("$('#infoblock-container').css('overflow', 'hidden');")
except:
print("Couldn't disable sidebar on map")
#Remove share
element = browser.find_element_by_xpath("//*[contains(text(), 'Share')]")
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element)
try:
element = browser.find_element_by_xpath("//*[contains(text(), 'Share')]")
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element)
except:
print("Couldn't remove share button from map")
#browser.execute_script("toggleFollow()")
if enable_labels:
browser.find_element_by_tag_name('body').send_keys('l')

83
planeClass.py

@ -104,7 +104,8 @@ class Plane:
elif ac_dict['alt_baro'] == "ground":
self.alt_ft = 0
self.on_ground = True
self.callsign = ac_dict.get('flight')
if ac_dict.get('flight') is not None:
self.callsign = ac_dict.get('flight').strip()
if 'nav_modes' in ac_dict:
self.nav_modes = ac_dict['nav_modes']
for idx, mode in enumerate(self.nav_modes):
@ -158,28 +159,23 @@ class Plane:
self.run_check()
def run_check(self):
"""Runs a check of a plane module to see if its landed or takenoff using plane data, and takes action if so."""
#Import Modules
#Ability to Remove old Map
import os
from colorama import Fore, Style
#Platform for determining OS for strftime
import platform
from tabulate import tabulate
from modify_image import append_airport
from defAirport import getClosestAirport
#Propritary
ENABLE_ROUTE_LOOKUP = False
if ENABLE_ROUTE_LOOKUP:
#Proprietary Route Lookup
if os.path.isfile("lookup_route.py"):
from lookup_route import lookup_route
ENABLE_ROUTE_LOOKUP = True
else:
#Dead Place function
def lookup_route(*args):
return None
ENABLE_ROUTE_LOOKUP = False
if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP":
from defMap import getMap
elif self.config.get('MAP', 'OPTION') == "ADSBX":
from defSS import get_adsbx_screenshot, generate_adsbx_overlay_param
from defSS import get_adsbx_screenshot
if self.config.has_option('MAP', 'OVERLAYS'):
self.overlays = self.config.get('MAP', 'OVERLAYS')
else:
@ -298,11 +294,7 @@ class Plane:
else:
area = f"{municipality}, {state}"
location_string = (f"{area}, {country_code}")
print (Fore.GREEN)
print ("Country Code: ", country_code)
print ("State: ", state)
print ("Municipality: ", municipality)
print(Style.RESET_ALL)
print (Fore.GREEN + "Country Code:", country_code, "State:", state, "Municipality:", municipality + Style.RESET_ALL)
title_switch = {
"reg": self.reg,
"callsign": self.callsign,
@ -320,7 +312,7 @@ class Plane:
if self.tookoff:
self.takeoff_time = datetime.utcnow()
landed_time_msg = None
#Route Lookup | Proprietary
#Proprietary Route Lookup
if ENABLE_ROUTE_LOOKUP:
extra_route_info = lookup_route(self.reg, (self.latitude, self.longitude), self.type, self.alt_ft)
if extra_route_info == None:
@ -328,29 +320,23 @@ class Plane:
self.nearest_takeoff_airport = nearest_airport_dict
else:
from defAirport import get_airport_by_icao
to_airport = get_airport_by_icao(extra_route_info[11])
to_airport = get_airport_by_icao(extra_route_info['apdstic'])
code = to_airport['iata_code'] if to_airport['iata_code'] != "" else to_airport['icao']
if extra_route_info[11] != nearest_airport_dict['icao']:
route_to = "Going to"
airport_text = f" {code}, {to_airport['name']}"
if extra_route_info['apdstic'] != nearest_airport_dict['icao']:
route_to = "Going to" + airport_text + " arriving " + extra_route_info['arrivalRelative']
else:
route_to = "Will be returning to"
route_to += f" {code}, {to_airport['name']}"
route_to = f"Will be returning to {airport_text} {extra_route_info['arrivalRelative']}"
elif self.landed and self.takeoff_time != None:
landed_time = datetime.utcnow() - self.takeoff_time
if platform.system() == "Linux":
strftime_splitter = "-"
elif platform.system() == "Windows":
strftime_splitter = "#"
hours, remainder = divmod(landed_time.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
min_syntax = "Mins" if minutes > 1 else "Min"
if hours > 0:
hour_syntax = "Hours" if hours > 1 else "Hour"
landed_time_msg = (f"Apx. flt. time {int(hours)} {hour_syntax}: {int(minutes)} {min_syntax}. ")
landed_time_msg = (f"Apx. flt. time {int(hours)} {hour_syntax}" + (f" : {int(minutes)} {min_syntax}. " if minutes > 0 else "."))
else:
landed_time_msg = (f"Apx. flt. time {int(minutes)} {min_syntax}. ")
# landed_time_msg = time.strftime(f"Apx. flt. time %{strftime_splitter}H Hours : %{strftime_splitter}M Mins. ", time.gmtime(landed_time))
# landed_time_msg = landed_time_msg.replace("0 Hours : ", "")
landed_time_msg = (f"Apx. flt. time {int(minutes)} {min_syntax}.")
self.takeoff_time = None
elif self.landed:
landed_time_msg = None
@ -363,13 +349,13 @@ class Plane:
url_params = f"icao={self.icao}&zoom=9&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays=" + self.overlays
get_adsbx_screenshot(self.map_file_name, url_params)
append_airport(self.map_file_name, nearest_airport_dict)
#airport_string = nearest_airport_dict['icao'] + ", " + nearest_airport_dict["name"]
else:
raise ValueError("Map option not set correctly in this planes conf")
#Discord
if self.config.getboolean('DISCORD', 'ENABLE'):
dis_message = f"{self.dis_title} {message}".strip()
sendDis(dis_message, self.config, self.map_file_name)
role_id = self.config.get('DISCORD', 'ROLE_ID') if self.config.has_option('DISCORD', 'ROLE_ID') else None
sendDis(dis_message, self.config, self.map_file_name, role_id = role_id)
#PushBullet
if self.config.getboolean('PUSHBULLET', 'ENABLE'):
with open(self.map_file_name, "rb") as pic:
@ -383,32 +369,35 @@ class Plane:
self.tweet_api.create_media_metadata(media_id= twitter_media_map_obj.media_id, alt_text= alt_text)
self.tweet_api.update_status(status = ((self.twitter_title + " " + message).strip()), media_ids=[twitter_media_map_obj.media_id])
os.remove(self.map_file_name)
#To Location
if self.recheck_to and (datetime.utcnow() - self.takeoff_time).total_seconds() > 60:
#Recheck Proprietary Route Lookup a minute later if infomation was not available on takeoff.
if self.recheck_to and self.takeoff_time is not None and (datetime.utcnow() - self.takeoff_time).total_seconds() > 60:
self.recheck_to = False
extra_route_info = lookup_route(self.reg, (self.latitude, self.longitude), self.type, self.alt_ft)
nearest_airport_dict = self.nearest_takeoff_airport
self.nearest_takeoff_airport = None
if extra_route_info != None:
from defAirport import get_airport_by_icao
to_airport = get_airport_by_icao(extra_route_info[11])
to_airport = get_airport_by_icao(extra_route_info['apdstic'])
code = to_airport['iata_code'] if to_airport['iata_code'] != "" else to_airport['icao']
if extra_route_info[11] != nearest_airport_dict['icao']:
route_to = "Going to"
airport_text = f" {code}, {to_airport['name']}"
if extra_route_info['apdstic'] != nearest_airport_dict['icao']:
route_to = "Going to" + airport_text + " arriving " + extra_route_info['arrivalRelative']
else:
route_to = "Will be returning to"
route_to += f" {code}, {to_airport['name']}"
route_to = f"Will be returning to {airport_text} {extra_route_info['arrivalRelative']}"
print(route_to)
#Discord
if self.config.getboolean('DISCORD', 'ENABLE'):
dis_message = (self.dis_title + route_to).strip()
sendDis(dis_message, self.config)
dis_message = f"{self.dis_title} {route_to}".strip()
role_id = self.config.get('DISCORD', 'ROLE_ID') if self.config.has_option('DISCORD', 'ROLE_ID') else None
sendDis(dis_message, self.config, role_id = role_id)
#Twitter
if self.config.getboolean('TWITTER', 'ENABLE'):
tweet = self.tweet_api.user_timeline(count = 1)[0]
self.tweet_api.update_status(status = f"{self.twitter_title} {route_to}".strip(), in_reply_to_status_id = tweet.id)
#Squawks
squawks =[("7500", "Hijacking"), ("7600", "Radio Failure"), ("7700", "Emergency")]
if self.feeding:
#Squawks
squawks =[("7500", "Hijacking"), ("7600", "Radio Failure"), ("7700", "Emergency")]
for squawk in squawks:
if all(v == squawk[0] for v in (self.squawks[0:2])) and self.squawks[2] != self.squawks[3] and None not in self.squawks:
squawk_message = ("Squawking " + squawk[0] + ", " + squawk[1])
@ -473,21 +462,23 @@ class Plane:
if bool(int(ra['acas_ra']['MTE'])):
ra_message += ", Multi threat"
from defSS import get_adsbx_screenshot, generate_adsbx_screenshot_time_params, generate_adsbx_overlay_param
url_params = generate_adsbx_screenshot_time_params(ra['acas_ra']['unix_timestamp']) + f"&zoom=11&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.overlays}"
url_params = generate_adsbx_screenshot_time_params(ra['acas_ra']['unix_timestamp']) + f"&lat={ra['lat']}&lon={ra['lon']}&zoom=11&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays={self.overlays}&timestamp={ra['acas_ra']['unix_timestamp']}"
if "threat_id_hex" in ra['acas_ra'].keys():
from mictronics_parse import get_aircraft_by_icao
threat_reg = get_aircraft_by_icao(ra['acas_ra']['threat_id_hex'])[0]
threat_id = threat_reg if threat_reg is not None else "ICAO: " + ra['acas_ra']['threat_id_hex']
ra_message += f", invader: {threat_id}"
url_params += f"&icao={self.icao.lower()},{ra['acas_ra']['threat_id_hex']}"
url_params += f"&icao={ra['acas_ra']['threat_id_hex']},{self.icao.lower()}"
else:
url_params += f"&icao={self.icao.lower()}&noIsolation"
print(url_params)
get_adsbx_screenshot(self.map_file_name, url_params, True, True)
if self.config.getboolean('DISCORD', 'ENABLE'):
from defDiscord import sendDis
dis_message = f"{self.dis_title} {ra_message}"
sendDis(dis_message, self.config, self.map_file_name)
role_id = self.config.get('DISCORD', 'ROLE_ID') if self.config.has_option('DISCORD', 'ROLE_ID') else None
sendDis(dis_message, self.config, self.map_file_name, role_id = role_id)
#if twitter
def expire_ra_types(self):
if self.recent_ra_types != {}:

Loading…
Cancel
Save