Browse Source

Dependecies dir, screenshot in temp dir, rename functions

-New dependencies directory, font, airports, regions
-Screenshots put in OS temp directory now
-Fix some function names to be proper
-Testing with resolution advisories
-Adjust nearest airport box
-AppendAirport.py > modify_image.py
pull/2/head
Jxck-S 4 years ago
parent
commit
e09061289b
  1. 6
      .gitignore
  2. 28
      __main__.py
  3. 6
      configs/mainconf.ini
  4. 29
      defADSBX.py
  5. 6
      defAirport.py
  6. 2
      defOpenSky.py
  7. 11
      defSS.py
  8. 33
      modify_image.py
  9. 77
      planeClass.py

6
.gitignore vendored

@ -1,6 +1,6 @@
.vscode/settings.json
pythonenv3.8/
Roboto-Regular.ttf
airports.dat
__pycache__
airports.csv
dependencies
testing
lookup_route.py

28
__main__.py

@ -12,24 +12,26 @@ import pytz
import os
if 'plane-notify' not in os.getcwd():
os.chdir('./plane-notify')
if not os.path.isdir("./dependencies/"):
os.mkdir("./dependencies/")
import sys
sys.path.extend([os.getcwd()])
required_files = [("Roboto-Regular.ttf", 'https://github.com/google/fonts/raw/master/apache/roboto/static/Roboto-Regular.ttf'), ('airports.csv', 'https://ourairports.com/data/airports.csv'), ('regions.csv', 'https://ourairports.com/data/regions.csv')]
required_files = [("Roboto-Regular.ttf", 'https://github.com/googlefonts/roboto/blob/main/src/hinted/Roboto-Regular.ttf?raw=true'), ('airports.csv', 'https://ourairports.com/data/airports.csv'), ('regions.csv', 'https://ourairports.com/data/regions.csv'), ('ADSBX_Logo.png', "https://www.adsbexchange.com/wp-content/uploads/cropped-Stealth.png")]
for file in required_files:
file_name = file[0]
url = file[1]
if not os.path.isfile(file_name):
if not os.path.isfile("./dependencies/" + file_name):
print(file_name, "does not exist downloading now")
try:
import requests
file_content = requests.get(url)
open(file_name, 'wb').write(file_content.content)
except:
raise("Error getting", file_name, "from", url)
open(("./dependencies/" + file_name), 'wb').write(file_content.content)
except Exception as e:
raise e("Error getting", file_name, "from", url)
else:
print("Successfully got", file_name)
elif os.path.isfile(file_name):
else:
print("Already have", file_name, "continuing")
main_config = configparser.ConfigParser()
main_config.read('./configs/mainconf.ini')
@ -75,8 +77,8 @@ try:
icao_key = 'icao'
else:
raise ValueError("Invalid API Version")
from defADSBX import pullADSBX
data, failed = pullADSBX(planes)
from defADSBX import pull_adsbx
data, failed = pull_adsbx(planes)
if failed == False:
if data['ac'] != None:
for key, obj in planes.items():
@ -84,9 +86,9 @@ try:
for planeData in data['ac']:
if planeData[icao_key].upper() == key:
if api_version == 1:
obj.run_ADSBXv1(planeData)
obj.run_adsbx_v1(planeData)
elif api_version == 2:
obj.run_ADSBXv2(planeData)
obj.run_adsbx_v2(planeData)
has_data = True
break
if has_data is False:
@ -97,8 +99,8 @@ try:
elif failed:
failed_count += 1
elif source == "OPENS":
from defOpenSky import pullOpenSky
planeData, failed = pullOpenSky(planes)
from defOpenSky import pull_opensky
planeData, failed = pull_opensky(planes)
if failed == False:
if planeData.states != []:
# print(planeData.time)
@ -106,7 +108,7 @@ try:
has_data = False
for dataState in planeData.states:
if (dataState.icao24).upper() == key:
obj.run_OPENS(dataState)
obj.run_opens(dataState)
has_data = True
break
if has_data is False:

6
configs/mainconf.ini

@ -13,9 +13,11 @@ TZ = UTC
[ADSBX]
API_KEY = apikey
API_VERSION = 1
#ADSBX API Proxy V2, https://gitlab.com/jjwiseman/adsbx-api-proxy
#ADSBX API Proxy, https://gitlab.com/jjwiseman/adsbx-api-proxy, v2 input, v1 or v2 output from proxy
ENABLE_PROXY = FALSE
PROXY_HOST =
#Full URL http://host:port
PROXY_HOST =
#OpenSky https://opensky-network.org/apidoc/index.html
#When using without your own login user and pass should be None

29
defADSBX.py

@ -7,9 +7,12 @@ import http.client as http
import urllib3
main_config = configparser.ConfigParser()
main_config.read('./configs/mainconf.ini')
def pullADSBX(planes):
api_version = main_config.get('ADSBX', 'API_VERSION')
def pull_adsbx(planes):
api_version = int(main_config.get('ADSBX', 'API_VERSION'))
if api_version not in [1, 2]:
raise ValueError("Bad ADSBX API Version")
if main_config.getboolean('ADSBX', 'ENABLE_PROXY') is False:
api_version = int(main_config.get('ADSBX', 'API_VERSION'))
if api_version == 1:
if len(planes) > 1:
url = "https://adsbexchange.com/api/aircraft/json/"
@ -17,14 +20,17 @@ def pullADSBX(planes):
url = "https://adsbexchange.com/api/aircraft/icao/" + str(list(planes.keys())[0]) + "/"
elif api_version == 2:
url = "https://adsbexchange.com/api/aircraft/v2/all"
else:
raise ValueError("No API Version set")
else:
if main_config.has_option('ADSBX', 'PROXY_HOST'):
url = "http://" + main_config.get('ADSBX', 'PROXY_HOST') + ":8000/api/aircraft/v2/all"
if api_version == 1:
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/json/all"
if api_version == 2:
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/v2/all"
else:
raise ValueError("Proxy enabled but no host")
return pull(url)
def pull(url):
headers = {
'api-auth': main_config.get('ADSBX', 'API_KEY'),
'Accept-Encoding': 'gzip'
@ -32,12 +38,12 @@ def pullADSBX(planes):
try:
response = requests.get(url, headers = headers)
response.raise_for_status()
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as error_message:
except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message:
print("Basic Connection Error")
print(error_message)
failed = True
data = None
except (IncompleteRead, ConnectionResetError, urllib3.Exceptions, ValueError) as error_message:
except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message:
print("Connection Error")
print(error_message)
failed = True
@ -73,10 +79,13 @@ def pullADSBX(planes):
try:
if data['msg'] != "No error":
raise ValueError("Error from ADSBX: msg = ", data['msg'])
failed = True
except KeyError:
pass
data_ctime = float(data['ctime']) / 1000.0
print("UTC of Data:",datetime.utcfromtimestamp(data_ctime))
if "ctime" in data.keys():
data_ctime = float(data['ctime']) / 1000.0
print("Data ctime:",datetime.utcfromtimestamp(data_ctime))
if "now" in data.keys():
data_now = float(data['now']) / 1000.0
print("Data now time:",datetime.utcfromtimestamp(data_now))
print("Current UTC:", datetime.utcnow())
return data, failed

6
defAirport.py

@ -2,7 +2,7 @@ def getClosestAirport(latitude, longitude, allowed_types):
import csv
from geopy.distance import geodesic
plane = (latitude, longitude)
with open('airports.csv', 'r', encoding='utf-8') as airport_csv:
with open('./dependencies/airports.csv', 'r', encoding='utf-8') as airport_csv:
airport_csv_reader = csv.DictReader(filter(lambda row: row[0]!='#', airport_csv))
for airport in airport_csv_reader:
if airport['type'] in allowed_types:
@ -18,7 +18,7 @@ def getClosestAirport(latitude, longitude, allowed_types):
#Convert indent key to icao key as its labeled icao in other places not ident
closest_airport_dict['icao'] = closest_airport_dict.pop('gps_code')
#Get full region/state name from iso region name
with open('regions.csv', 'r', encoding='utf-8') as regions_csv:
with open('./dependencies/regions.csv', 'r', encoding='utf-8') as regions_csv:
regions_csv = csv.DictReader(filter(lambda row: row[0]!='#', regions_csv))
for region in regions_csv:
if region['code'] == closest_airport_dict['iso_region']:
@ -26,7 +26,7 @@ def getClosestAirport(latitude, longitude, allowed_types):
return closest_airport_dict
def get_airport_by_icao(icao):
import csv
with open('airports.csv', 'r', encoding='utf-8') as airport_csv:
with open('./dependencies/airports.csv', 'r', encoding='utf-8') as airport_csv:
airport_csv_reader = csv.DictReader(filter(lambda row: row[0]!='#', airport_csv))
for airport in airport_csv_reader:
if airport['gps_code'] == icao:

2
defOpenSky.py

@ -1,4 +1,4 @@
def pullOpenSky(planes):
def pull_opensky(planes):
import configparser
main_config = configparser.ConfigParser()
main_config.read('./configs/mainconf.ini')

11
defSS.py

@ -3,7 +3,7 @@ from webdriver_manager.chrome import ChromeDriverManager
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
def getSS(icao, overlays):
def getSS(icao, file_path, overlays):
chrome_options = webdriver.ChromeOptions()
chrome_options.headless = True
chrome_options.add_argument('window-size=800,800')
@ -19,10 +19,12 @@ def getSS(icao, overlays):
browser.get(url)
WebDriverWait(browser, 40).until(lambda d: d.execute_script("return jQuery.active == 0"))
time.sleep(5)
remove_elements = ["show_trace", "credits", 'infoblock_close', 'selected_photo_link', "history_collapse"]
for element in remove_elements:
remove_id_elements = ["show_trace", "credits", 'infoblock_close', 'selected_photo_link', "history_collapse"]
for element in remove_id_elements:
element = browser.find_element_by_id(element)
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element)
element = browser.find_elements_by_class_name("infoHeading")
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element[19])
#Remove watermark on data
browser.execute_script("document.getElementById('selected_infoblock').className = 'none';")
#Disable slidebar
@ -31,6 +33,5 @@ def getSS(icao, overlays):
element = browser.find_element_by_xpath("//*[contains(text(), 'Share')]")
browser.execute_script("""var element = arguments[0]; element.parentNode.removeChild(element); """, element)
#browser.execute_script("toggleFollow()")
file_name = icao + "_map.png"
browser.save_screenshot(file_name)
browser.save_screenshot(file_path)
browser.quit()

33
AppendAirport.py → modify_image.py

@ -12,7 +12,7 @@ def append_airport(filename, airport):
draw = ImageDraw.Draw(image)
#Setup fonts
fontfile = "Roboto-Regular.ttf"
fontfile = "./dependencies/Roboto-Regular.ttf"
font = ImageFont.truetype(fontfile, 14)
mini_font = ImageFont.truetype(fontfile, 12)
head_font = ImageFont.truetype(fontfile, 16)
@ -23,13 +23,12 @@ def append_airport(filename, airport):
navish = 'rgb(0, 63, 75)'
whitish = 'rgb(248, 248, 248)'
#Info Box
draw.rectangle(((316, 760), (605, 800)), fill= white, outline=black)
draw.rectangle(((325, 760), (624, 800)), fill= white, outline=black)
#Header Box
draw.rectangle(((387, 738), (535, 760)), fill= navish)
draw.rectangle(((401, 738), (549, 760)), fill= navish)
#ADSBX Logo
draw.rectangle(((658, 760), (800, 780)), fill= white)
import requests
adsbx = Image.open(requests.get("https://www.adsbexchange.com/wp-content/uploads/cropped-Stealth-48px.png", stream=True).raw)
draw.rectangle(((658, 762), (800, 782)), fill= white)
adsbx = Image.open("./dependencies/ADSBX_Logo.png")
adsbx = adsbx.resize((25, 25), Image.ANTIALIAS)
image.paste(adsbx, (632, 757), adsbx)
#Create Text
@ -38,20 +37,32 @@ def append_airport(filename, airport):
text = "adsbexchange.com"
draw.text((x, y), text, fill=black, font=head_font)
#Nearest Airport Header
(x, y) = (408, 740)
(x, y) = (422, 740)
text = "Nearest Airport"
draw.text((x, y), text, fill=white, font=head_font)
#ICAO | IATA
(x, y) = (320, 765)
(x, y) = (330, 765)
text = iata + " / " + icao
draw.text((x, y), text, fill=black, font=font)
#Distance
(x, y) = (432, 765)
(x, y) = (460, 765)
text = str(round(distance_mi, 2)) + "mi / " + str(round(distance_km, 2)) + "km away"
draw.text((x, y), text, fill=black, font=font)
#Full name
(x, y) = (320, 783)
text = airport['name'][0:56]
(x, y) = (330, 783)
MAX_WIDTH = 325
if font.getsize(airport['name'])[0] <= MAX_WIDTH:
text = airport['name']
else:
text = ""
for char in airport['name']:
if font.getsize(text)[0] >= (MAX_WIDTH - 10):
text += "..."
break
else:
text += char
draw.text((x, y), text, fill=black, font=mini_font)
image.show()
# save the edited image

77
planeClass.py

@ -17,7 +17,8 @@ class Plane:
self.longitude = None
self.latitude = None
self.takeoff_time = None
self.map_file_name = icao.upper() + "_map.png"
import tempfile
self.map_file_name = f"{tempfile.gettempdir()}\\{icao.upper()}_map.png"
self.last_latitude = None
self.last_longitude = None
self.last_contact = None
@ -28,6 +29,8 @@ class Plane:
self.recheck_to = None
self.speed = None
self.nearest_airport_dict = None
self.acas_ra = None
self.last_acas_ra = None
#Setup Tweepy
if self.config.getboolean('TWITTER', 'ENABLE'):
from defTweet import tweepysetup
@ -37,9 +40,7 @@ class Plane:
from pushbullet import Pushbullet
self.pb = Pushbullet(self.config['PUSHBULLET']['API_KEY'])
self.pb_channel = self.pb.get_channel(self.config.get('PUSHBULLET', 'CHANNEL_TAG'))
def getICAO(self):
return self.icao
def run_OPENS(self, ac_dict):
def run_opens(self, ac_dict):
#Parse OpenSky Vector
from colorama import Fore, Back, Style
self.printheader("head")
@ -62,7 +63,7 @@ class Plane:
else:
self.feeding = True
self.run_check()
def run_ADSBXv1(self, ac_dict):
def run_adsbx_v1(self, ac_dict):
#Parse ADBSX V1 Vector
from colorama import Fore, Back, Style
self.printheader("head")
@ -87,13 +88,17 @@ class Plane:
self.feeding = True
self.run_check()
def run_ADSBXv2(self, ac_dict):
def run_adsbx_v2(self, ac_dict):
#Parse ADBSX V2 Vector
from colorama import Fore, Back, Style
self.printheader("head")
print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL)
try:
self.__dict__.update({'icao' : ac_dict['hex'].upper(), 'reg' : ac_dict['r'], 'latitude' : float(ac_dict['lat']), 'longitude' : float(ac_dict['lon']), 'type' : ac_dict['t'], 'speed': ac_dict['gs']})
self.__dict__.update({'icao' : ac_dict['hex'].upper(), 'latitude' : float(ac_dict['lat']), 'longitude' : float(ac_dict['lon']), 'speed': ac_dict['gs']})
if "r" in ac_dict:
self.reg = ac_dict['r']
if "t" in ac_dict:
self.type = ac_dict['t']
if ac_dict['alt_baro'] != "ground":
self.alt_ft = int(ac_dict['alt_baro'])
self.on_ground = False
@ -101,13 +106,17 @@ class Plane:
self.alt_ft = 0
self.on_ground = True
self.callsign = ac_dict.get('flight')
if'nav_modes' in ac_dict:
if 'nav_modes' in ac_dict:
self.nav_modes = ac_dict['nav_modes']
for idx, mode in enumerate(self.nav_modes):
if mode.upper() in ['TCAS', 'LNAV', 'VNAV']:
self.nav_modes[idx] = self.nav_modes[idx].upper()
else:
self.nav_modes[idx] = self.nav_modes[idx].capitalize()
if 'acas_ra_csvline' in ac_dict:
self.acas_ra = ac_dict['acas_ra_csvline']
else:
self.acas_ra = None
#Insert newest sqwauk at 0, sqwuak length should be 4 long 0-3
self.squawks.insert(0, ac_dict.get('squawk'))
#Removes oldest sqwauk index 4 5th sqwauk
@ -167,11 +176,11 @@ class Plane:
#Platform for determining OS for strftime
import platform
from tabulate import tabulate
from AppendAirport import append_airport
from modify_image import append_airport
from defAirport import getClosestAirport
#Propritary
ENABLE_ROUTE_LOOKUP = False
ENABLE_ROUTE_LOOKUP = True
if ENABLE_ROUTE_LOOKUP:
from lookup_route import lookup_route
else:
@ -234,7 +243,6 @@ class Plane:
alt_above_airport = (self.alt_ft - int(nearest_airport_dict['elevation_ft']))
print(f"AGL nearest airport: {alt_above_airport}")
if alt_above_airport <= 10000:
self.nearest_airport_dict = nearest_airport_dict
self.tookoff = True
self.trigger_type = "data acquisition"
type_header = "Took off near"
@ -251,19 +259,21 @@ class Plane:
self.landing_plausible = False
#Set status for landing plausible
elif self.below_desired_ft and self.last_feeding and self.feeding is False and self.last_on_ground is False:
self.landing_plausible = True
print("Near landing conditions, if contiuned data loss for 5 mins, and if under 10k AGL landing true")
elif self.landing_plausible and self.feeding is False and time_since_contact.seconds >= 300:
nearest_airport_dict = getClosestAirport(self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES"))
alt_above_airport = (self.alt_ft - int(nearest_airport_dict['elevation_ft']))
print(f"AGL nearest airport: {alt_above_airport}")
if alt_above_airport <= 10000:
self.landing_plausible = True
self.nearest_airport_dict = nearest_airport_dict
print("Near landing conditions, if contiuned data loss for 5 mins, landing true")
elif self.landing_plausible and self.feeding is False and time_since_contact.seconds >= 300:
self.landing_plausible = False
self.landed = True
self.trigger_type = "data loss"
type_header = "Landed near"
self.landing_plausible = False
self.landed = True
self.trigger_type = "data loss"
type_header = "Landed near"
else:
print("Alt greater then 10k AGL")
self.landing_plausible = False
else:
self.landed = False
@ -273,9 +283,8 @@ class Plane:
print("Tookoff by", self.trigger_type)
#Find nearest airport, and location
if self.landed or self.tookoff:
if self.nearest_airport_dict != None:
nearest_airport_dict = self.nearest_airport_dict
self.nearest_airport_dict = None
if "nearest_airport_dict" in globals():
pass #Airport already set
elif self.trigger_type in ["now on ground", "data acquisition", "data loss"]:
nearest_airport_dict = getClosestAirport(self.latitude, self.longitude, self.config.get("AIRPORT", "TYPES"))
elif self.trigger_type == "no longer on ground":
@ -292,8 +301,8 @@ class Plane:
else:
area = ""
else:
area = f"{municipality}, {state}, "
location_string = (area + country_code)
area = f"{municipality}, {state}"
location_string = (f"{area}, {country_code}")
print (Fore.GREEN)
print ("Country Code: ", country_code)
print ("State: ", state)
@ -338,7 +347,7 @@ class Plane:
if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP":
getMap((municipality + ", " + state + ", " + country_code), self.map_file_name)
elif self.config.get('MAP', 'OPTION') == "ADSBX":
getSS(self.icao, self.overlays)
getSS(self.icao, self.map_file_name, self.overlays)
append_airport(self.map_file_name, nearest_airport_dict)
#airport_string = nearest_airport_dict['icao'] + ", " + nearest_airport_dict["name"]
else:
@ -384,7 +393,7 @@ class Plane:
if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP":
getMap((municipality + ", " + state + ", " + country_code), self.map_file_name)
if self.config.get('MAP', 'OPTION') == "ADSBX":
getSS(self.icao, self.overlays)
getSS(self.icao, self.map_file_name, self.overlays)
#Discord
if self.config.getboolean('DISCORD', 'ENABLE'):
dis_message = (self.dis_title + " " + squawk_message)
@ -399,20 +408,26 @@ class Plane:
if self.config.getboolean('DISCORD', 'ENABLE'):
dis_message = (self.dis_title + " " + mode + " mode enabled.")
if mode == "Approach":
getSS(self.icao, self.overlays)
getSS(self.icao, self.map_file_name, self.overlays)
sendDis(dis_message, self.config, self.map_file_name)
elif mode == "Althold" and self.nav_altitude != None:
sendDis((dis_message + ", Sel Alt. " + str(self.nav_altitude) + ", Current Alt. " + str(self.alt_ft)), self.config)
else:
sendDis(dis_message, self.config)
#Power Up
if self.last_feeding == False and self.speed == 0 and self.on_ground:
# #Power Up
# if self.last_feeding == False and self.speed == 0 and self.on_ground:
# if self.config.getboolean('DISCORD', 'ENABLE'):
# dis_message = (self.dis_title + "Powered Up").strip()
# sendDis(dis_message, self.config)
#TCAS/ACAS
if self.acas_ra != None and self.last_acas_ra != self.acas_ra:
if self.config.getboolean('DISCORD', 'ENABLE'):
dis_message = (self.dis_title + "Powered Up").strip()
dis_message = f"{self.dis_title} {self.acas_ra}"
sendDis(dis_message, self.config)
#Set Variables to compare to next check
self.last_acas_ra = self.acas_ra
self.last_feeding = self.feeding
self.last_alt_ft = self.alt_ft
self.last_on_ground = self.on_ground

Loading…
Cancel
Save