Browse Source

Move config files/reduce repase, intital airport lookup

pull/2/head
Jxck-S 4 years ago
parent
commit
a2a3e44476
  1. 6
      NotifyBotMulti.py
  2. 4
      configs/mainconf.ini
  3. 20
      configs/plane1.ini
  4. 46
      defADSBX.py
  5. 27
      defAirport.py
  6. 5
      defDiscord.py
  7. 7
      defMap.py
  8. 2
      defOpenSky.py
  9. 5
      defTweet.py
  10. 111
      planeClass.py

6
NotifyBotMulti.py

@ -13,14 +13,14 @@ from planeClass import Plane
from datetime import datetime
import pytz
main_config = configparser.ConfigParser()
main_config.read('mainconf.ini')
main_config.read('./configs/mainconf.ini')
import os
#Setup Plane Objects off of Plane configs
planes = {}
for filename in os.listdir(os. getcwd()):
for filename in os.listdir("./configs"):
if filename.endswith(".ini") and filename != "mainconf.ini":
plane_config = configparser.ConfigParser()
plane_config.read(filename)
plane_config.read(("./configs/" + filename))
planes[plane_config.get('DATA', 'ICAO').upper()] = Plane(plane_config.get('DATA', 'ICAO'), filename)
running_Count = 0

4
mainconf.ini → configs/mainconf.ini

@ -19,3 +19,7 @@ API_KEY = apikey
USERNAME = None
PASSWORD = None
[GOOGLE]
#API KEY for Google Static Maps only if you using this on any of the planes.
API_KEY = googleapikey

20
plane1.ini → configs/plane1.ini

@ -1,20 +1,22 @@
[DATA]
#Plane to track, based of ICAO or ICAO24 which is the unique transponder address of a plane.
ICAO = icao
ICAO = icaohere
[MAP]
#Map to create from Google Static Maps or screenshot global tar1090 from globe.adsbexchange.com
#Enter GOOGLESTATICMAP or ADSBX
OPTION = ADSBX
#TITLE for Twitter, PB and Discord are Just text added to the front of each message/tweet sent
[TWITTER]
ENABLE = FALSE
TITLE =
CONSUMER_KEY = ckhere
CONSUMER_SECRET = cshere
ACCESS_TOKEN = athere
ACCESS_TOKEN_SECRET = atshere
[GOOGLE]
#API KEYS
#If static map disabled will load up tar1090 ads-b exchange and take screenshot instead.
STATICMAP_ENABLE = FALSE
STATICMAPKEY = googleapikey
[PUSHBULLET]
ENABLE = FALSE
TITLE = Title Of Pushbullet message
@ -22,8 +24,8 @@ API_KEY = apikey
CHANNEL_TAG = channeltag
[DISCORD]
ENABLE = FALSE
ENABLE = TRUE
#WEBHOOK URL https://support.discord.com/hc/en-us/articles/228383668-Intro-to-Webhooks
URL = webhookurl
Title = title
Title =
USERNAME = plane-notify

46
defADSBX.py

@ -4,8 +4,10 @@ import configparser
import time
from datetime import datetime
from http.client import IncompleteRead
import http.client as http
import urllib3
main_config = configparser.ConfigParser()
main_config.read('mainconf.ini')
main_config.read('./configs/mainconf.ini')
def pullADSBX(planes):
if len(planes) > 1:
url = "https://adsbexchange.com/api/aircraft/json/"
@ -14,22 +16,46 @@ def pullADSBX(planes):
headers = {
'api-auth': main_config.get('ADSBX', 'API_KEY'),
'Content-Encoding': 'gzip'
'Accept-Encoding': 'gzip'
}
try:
response = requests.get(url, headers = headers)
data = response.text
data = json.loads(data)
failed = False
except (requests.HTTPError, requests.Timeout, IncompleteRead, ConnectionError, ConnectionResetError) as error_message:
print("ADSBX Connection Error")
response.raise_for_status()
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException):
print("Basic Connection Error")
print(error_message)
failed = True
except json.decoder.JSONDecodeError as error_message:
print("Error with JSON")
print (json.dumps(data, indent = 2))
data = None
except (urllib3.exceptions.RemoteDisconected, IncompleteRead, http.IncompleteRead, ConnectionResetError, requests.ChunkEncodingError, urllib3.exceptions.ProtocolError, ValueError) as error_message:
print("Connection Error")
print(error_message)
failed = True
data = None
except Exception as error_message:
print("Connection Error uncaught, basic exception for all")
print(error_message)
failed = True
data = None
else:
if response.status_code == 200:
try:
data = json.loads(response.text)
except (json.decoder.JSONDecodeError, ValueError) as error_message:
print("Error with JSON")
print (json.dumps(data, indent = 2))
print(error_message)
failed = True
data = None
except TypeError as error_message:
print("Type Error", error_message)
failed = True
data = None
else:
failed = False
else:
failed = True
data = None
print ("HTTP Status Code:", response.status_code)
if failed is False:
data_ctime = float(data['ctime']) / 1000.0

27
defAirport.py

@ -0,0 +1,27 @@
#https://www.geeksforgeeks.org/python-calculate-distance-between-two-places-using-geopy/
#https://openflights.org/data.html
def getAirport(latitude, longitude):
import json
import csv
from geopy.distance import geodesic
plane = (latitude, longitude)
header = ["id", "name", "city", "country", "iata", "icao", "lat", "lng", "alt", "tz", "dst", "tz_db", "type", "source"]
airports = []
first_run = True
with open('airports.dat', encoding='utf-8') as csvf:
reader = csv.DictReader( csvf, header)
#for row in reader:
# airports.append(row)
for row in reader:
airport = row
airport_coord = float(airport['lat']), float(airport['lng'])
airport_dist = float((geodesic(plane, airport_coord).mi))
if first_run:
closest_airport_dict = airport
closest_airport_dist = airport_dist
first_run = False
elif airport_dist < closest_airport_dist:
closest_airport_dict = airport
closest_airport_dist = airport_dist
print("Closest Airport:", closest_airport_dict['icao'], closest_airport_dict['name'], closest_airport_dist, "Miles Away")
return closest_airport_dict

5
defDiscord.py

@ -1,9 +1,6 @@
def sendDis(message, map_file_name, conf_file):
def sendDis(message, map_file_name, config):
from discord_webhook import DiscordWebhook
import configparser
config = configparser.ConfigParser()
config.read(conf_file)
webhook = DiscordWebhook(url=config.get('DISCORD', 'URL'), content=message, username=config.get('DISCORD', 'USERNAME'))
with open(map_file_name, "rb") as f:

7
defMap.py

@ -1,9 +1,9 @@
def getMap(mapLocation):
def getMap(mapLocation, icao):
import requests
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
api_key = config.get('GOOGLE', 'STATICMAPKEY')
config.read('./configs/mainconf.ini')
api_key = config.get('GOOGLE', 'API_KEY')
url = "https://maps.googleapis.com/maps/api/staticmap?"
center = str(mapLocation)
@ -14,6 +14,7 @@ def getMap(mapLocation):
api_key + "&sensor=false")
# wb mode is stand for write binary mode
file_name = icao + "_map.png"
f = open('map.png', 'wb')
# r.content gives content,

2
defOpenSky.py

@ -1,7 +1,7 @@
def pullOpenSky(planes):
import configparser
main_config = configparser.ConfigParser()
main_config.read('mainconf.ini')
main_config.read('./configs/mainconf.ini')
from opensky_api import OpenSkyApi
planeData = None
opens_api = OpenSkyApi(username= None if main_config.get('OPENSKY', 'USERNAME').upper() == "NONE" else main_config.get('OPENSKY', 'USERNAME'), password= None if main_config.get('OPENSKY', 'PASSWORD').upper() == "NONE" else main_config.get('OPENSKY', 'PASSWORD').upper())

5
defTweet.py

@ -1,9 +1,6 @@
# Authenticate to Twitter
def tweepysetup(conf_file):
import configparser
config = configparser.ConfigParser()
config.read(conf_file)
def tweepysetup(config):
import tweepy
#DOCU
#https://realpython.com/twitter-bot-python-tweepy/

111
planeClass.py

@ -38,24 +38,26 @@ class Plane:
#Setup Config File
import configparser
self.config = configparser.ConfigParser()
self.config.read(self.conf_file)
self.config.read(("./configs/"+ self.conf_file))
main_config = configparser.ConfigParser()
main_config.read('mainconf.ini')
main_config.read('./configs/mainconf.ini')
#Platform for determining OS for strftime
import platform
if self.config.getboolean('GOOGLE', 'STATICMAP_ENABLE'):
from defAirport import getAirport
if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP":
from defMap import getMap
else:
elif self.config.get('MAP', 'OPTION') == "ADSBX":
from defSS import getSS
else:
raise Exception("Map option not set correctly in this planes conf")
if self.config.getboolean('DISCORD', 'ENABLE'):
from defDiscord import sendDis
#Setup Tweepy
if self.config.getboolean('TWITTER', 'ENABLE'):
from defTweet import tweepysetup
self.tweet_api = tweepysetup(self.conf_file)
self.tweet_api = tweepysetup(self.config)
#Setup PushBullet
if self.config.getboolean('PUSHBULLET', 'ENABLE'):
from pushbullet import Pushbullet
@ -70,13 +72,14 @@ class Plane:
self.latitude = None
self.on_ground = None
self.has_location = None
#Get States from ADSBX or OPENS Vector
#Parse OpenSky Vector
self.plane_Dict = None
if main_config.get('DATA', 'SOURCE') == "OPENS":
self.val_error = False
if ac_dict != None:
#print (Fore.YELLOW + "OpenSky Sourced Data: ", ac_dict)
try:
self.plane_Dict ={'icao' : ac_dict.icao24, 'callsign' : ac_dict.callsign, 'latitude' : ac_dict.latitude, 'longitude' : ac_dict.longitude, 'on_ground' : bool(ac_dict.on_ground)}
self.plane_Dict ={'icao' : ac_dict.icao24.upper(), 'callsign' : ac_dict.callsign, 'latitude' : ac_dict.latitude, 'longitude' : ac_dict.longitude, 'on_ground' : bool(ac_dict.on_ground)}
if ac_dict.geo_altitude != None:
self.plane_Dict['geo_alt_ft'] = float(ac_dict.geo_altitude) * 3.281
elif self.plane_Dict['on_ground']:
@ -89,13 +92,11 @@ class Plane:
else:
self.plane_Dict = None
print (Fore.YELLOW)
print ("OpenSky Sourced Data: ", self.plane_Dict)
print(Style.RESET_ALL)
#Parse ADBSX Vector
elif main_config.get('DATA', 'SOURCE') == "ADSBX":
self.val_error = False
if ac_dict != None:
#print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict + Style.RESET_ALL)
try:
self.plane_Dict = {'icao' : ac_dict['icao'], 'callsign' : ac_dict['call'], 'reg' : ac_dict['reg'], 'latitude' : float(ac_dict['lat']), 'longitude' : float(ac_dict['lon']), 'geo_alt_ft' : int(ac_dict['galt']), 'on_ground' : bool(int(ac_dict["gnd"]))}
if self.plane_Dict['on_ground']:
@ -105,32 +106,32 @@ class Plane:
self.val_error = True
print("Got data but some data is invalid!")
print(e)
if "to" in ac_dict.keys():
self.plane_Dict['to_location'] = ac_dict["to"]
if "from" in ac_dict.keys():
self.plane_Dict['from_location'] = ac_dict["from"]
else:
self.plane_Dict = None
print (Fore.YELLOW)
print ("ADSBX Sourced Data: ", self.plane_Dict)
print(Style.RESET_ALL)
print (Fore.CYAN)
print ("ICAO:", self.icao)
print(Style.RESET_ALL)
print (Fore.CYAN + "ICAO:", self.icao + Style.RESET_ALL)
#Print out data, and convert to locals
if self.val_error is False:
if self.plane_Dict == None:
self.feeding = False
print("No Data")
elif self.plane_Dict != None:
self.feeding = True
self.__dict__.update(self.plane_Dict)
print (Fore.CYAN)
if main_config.get('DATA', 'SOURCE') == "ADSBX":
print("Registration: ", self.reg)
print ("Callsign: ", self.callsign)
if "reg" in self.plane_Dict.keys():
print(Fore.CYAN + "Registration: ", self.reg)
if "from_location" in self.plane_Dict.keys():
print("From: ", self.from_location)
if "to_location" in self.plane_Dict.keys():
print("To: ", self.to_location)
print (Fore.CYAN + "Callsign: ", self.callsign)
print ("On Ground: ", self.on_ground)
print ("Latitude: ", self.latitude)
print ("Longitude: ", self.longitude)
print ("GEO Alitude Ft: ", self.geo_alt_ft)
print(Style.RESET_ALL)
print ("GEO Alitude Ft: ", self.geo_alt_ft, Style.RESET_ALL)
#Set Check for inconsistancy in data
if not self.last_recheck_needed:
#Recheck needed if feeding state changes
@ -146,9 +147,9 @@ class Plane:
#Run a Check compares new data to last flagged(check) data
if self.last_recheck_needed:
if self.recheck_feeding == self.feeding:
print("Data Feeding change Consistent")
print("Data Feeding change consistent")
elif self.recheck_feeding != self.feeding:
print("Data Feeding change was Inconsistent last data ignored")
print("Data Feeding change was inconsistent last data ignored")
self.recheck_feeding = self.feeding
self.last_recheck_needed = self.recheck_needed
@ -176,7 +177,7 @@ class Plane:
self.tookoff = False
#self.tookoff = bool(self.below_desired_ft and self.on_ground is False and ((self.last_feeding is False and self.feeding) or (self.last_on_ground)))
print ("Tookoff Just Now:", self.tookoff)
#print ("Tookoff Just Now:", self.tookoff)
#Check if Landed
@ -193,12 +194,12 @@ class Plane:
self.landed = False
else:
self.landed = False
#self.landed = bool(self.last_below_desired_ft and ((self.last_feeding and self.feeding is False and self.last_on_ground is False) or (self.on_ground and self.last_on_ground is False)))
print ("Landed Just Now:", self.landed)
if self.landed or self.tookoff:
print ("Trigger Type:", self.trigger_type)
#print ("Landed Just Now:", self.landed)
if self.landed:
print ("Landed by", self.trigger_type)
if self.tookoff:
print("Tookoff by", self.trigger_type)
#Lookup Location of coordinates
if self.landed or self.tookoff:
if self.landed and self.last_longitude != None and self.last_latitude != None:
@ -273,23 +274,28 @@ class Plane:
#Set Discord Title
if self.config.getboolean('DISCORD', 'ENABLE'):
self.dis_title = self.icao if self.config.get('DISCORD', 'TITLE') == "icao" else self.callsign if self.config.get('DISCORD', 'TITLE') == "callsign" else self.config.get('DISCORD', 'TITLE')
#Set Twitter Title
if self.config.getboolean('TWITTER', 'ENABLE'):
self.twitter_title = self.icao if self.config.get('TWITTER', 'TITLE') == "icao" else self.callsign if self.config.get('TWITTER', 'TITLE') == "callsign" else self.config.get('TWITTER', 'TITLE')
#Takeoff Notifcation and Landed
if self.tookoff:
if self.invalid_Location is False:
self.tookoff_message = (self.tookoff_header + self.aera_hierarchy + ", " + self.state + ", " + self.country_code)
self.tookoff_message = (self.tookoff_header + self.aera_hierarchy + ", " + self.state + ", " + self.country_code + ". ")
else:
self.tookoff_message = ("Took off")
print (self.tookoff_message)
#Google Map or tar1090 screenshot
if self.config.getboolean('GOOGLE', 'STATICMAP_ENABLE'):
getMap(self.aera_hierarchy + ", " + self.state + ", " + self.country_code)
else:
if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP":
getMap((self.aera_hierarchy + ", " + self.state + ", " + self.country_code), self.icao)
elif self.config.get('MAP', 'OPTION') == "ADSBX":
getSS(self.icao)
else:
raise Exception("Map option not set correctly in this planes conf")
#Discord
if self.config.getboolean('DISCORD', 'ENABLE'):
self.dis_message = self.dis_title + " " + self.tookoff_message
sendDis(self.dis_message, self.map_file_name, self.conf_file)
nearest = getAirport(self.latitude, self.longitude)
self.dis_message = (self.dis_title + " " + self.tookoff_message + nearest['icao'] + ", " + nearest["name"]).strip()
sendDis(self.dis_message, self.map_file_name, self.config)
#PushBullet
if self.config.getboolean('PUSHBULLET', 'ENABLE'):
with open(self.map_file_name, "rb") as pic:
@ -298,7 +304,7 @@ class Plane:
push = self.pb_channel.push_file(**map_data)
#Twitter
if self.config.getboolean('TWITTER', 'ENABLE'):
self.tweet_api.update_with_media(self.map_file_name, status = self.tookoff_message)
self.tweet_api.update_with_media(self.map_file_name, status = (self.twitter_title + " " + self.tookoff_message).strip())
self.takeoff_time = time.time()
os.remove(self.map_file_name)
@ -308,23 +314,26 @@ class Plane:
if self.takeoff_time != None:
self.landed_time = time.time() - self.takeoff_time
if platform.system() == "Linux":
self.landed_time_msg = time.strftime("Apx. flt. time %-H Hours : %-M Mins ", time.gmtime(self.landed_time))
self.landed_time_msg = time.strftime("Apx. flt. time %-H Hours : %-M Mins. ", time.gmtime(self.landed_time))
elif platform.system() == "Windows":
self.landed_time_msg = time.strftime("Apx. flt. time %#H Hours : %#M Mins ", time.gmtime(self.landed_time))
self.landed_time_msg = time.strftime("Apx. flt. time %#H Hours : %#M Mins. ", time.gmtime(self.landed_time))
if self.invalid_Location is False:
self.landed_message = (self.landed_header + self.aera_hierarchy + ", " + self.state + ", " + self.country_code + ". " + self.landed_time_msg)
else:
self.landed_message = ("Landed", self.landed_time_msg)
print (self.landed_message)
#Google Map or tar1090 screenshot
if self.config.getboolean('GOOGLE', 'STATICMAP_ENABLE'):
getMap(self.aera_hierarchy + ", " + self.state + ", " + self.country_code)
else:
if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP":
getMap((self.aera_hierarchy + ", " + self.state + ", " + self.country_code), self.icao)
elif self.config.get('MAP', 'OPTION') == "ADSBX":
getSS(self.icao)
else:
raise Exception("Map option not set correctly in this planes conf")
#Discord
if self.config.getboolean('DISCORD', 'ENABLE'):
self.dis_message = self.dis_title + " " + self.landed_message
sendDis(self.dis_message, self.map_file_name, self.conf_file)
nearest = getAirport(self.last_latitude, self.last_longitude)
self.dis_message = (self.dis_title + " " +self.landed_message + nearest['icao'] + ", " + nearest["name"]).strip()
sendDis(self.dis_message, self.map_file_name, self.config)
#PushBullet
if self.config.getboolean('PUSHBULLET', 'ENABLE'):
with open(self.map_file_name, "rb") as pic:
@ -333,7 +342,7 @@ class Plane:
push = self.pb_channel.push_file(**map_data)
#Twitter
if self.config.getboolean('TWITTER', 'ENABLE'):
self.tweet_api.update_with_media(self.map_file_name, status = self.landed_message)
self.tweet_api.update_with_media(self.map_file_name, status = (self.twitter_title + " " + self.landed_message).strip())
self.takeoff_time = None
self.landed_time = None
self.time_since_tk = None

Loading…
Cancel
Save