Browse Source

Merge branch 'rapidapi' of https://github.com/makrsmark/plane-notify into multi

pull/79/head
Jack Sweeney 2 years ago
parent
commit
3eb6f51225
  1. 76
      __main__.py
  2. 13
      configs/mainconf.ini.example
  3. 2
      configs/plane1.ini.example
  4. 46
      defRpdADSBX.py

76
__main__.py

@ -1,5 +1,4 @@
import configparser import configparser
from logging import DEBUG
import time import time
from colorama import Fore, Back, Style from colorama import Fore, Back, Style
import platform import platform
@ -91,7 +90,6 @@ try:
except pytz.exceptions.UnknownTimeZoneError: except pytz.exceptions.UnknownTimeZoneError:
tz = pytz.UTC tz = pytz.UTC
last_ra_count = None last_ra_count = None
print(len(planes), "Planes configured")
while True: while True:
datetime_tz = datetime.now(tz) datetime_tz = datetime.now(tz)
if datetime_tz.hour == 0 and datetime_tz.minute == 0: if datetime_tz.hour == 0 and datetime_tz.minute == 0:
@ -148,12 +146,75 @@ try:
for planeData in data['ac']: for planeData in data['ac']:
data_indexed[planeData[icao_key].upper()] = planeData data_indexed[planeData[icao_key].upper()] = planeData
for key, obj in planes.items(): for key, obj in planes.items():
if key in data_indexed.keys(): try:
if api_version == 1: if api_version == 1:
obj.run_adsbx_v1(data_indexed[key.upper()]) obj.run_adsbx_v1(data_indexed[key.upper()])
elif api_version == 2: elif api_version == 2:
obj.run_adsbx_v2(data_indexed[key.upper()]) obj.run_adsbx_v2(data_indexed[key.upper()])
else: except KeyError:
obj.run_empty()
else:
for obj in planes.values():
obj.run_empty()
else:
failed_count += 1
elif source == "RpdADSBX":
#ACAS data
from defADSBX import pull_date_ras
import ast
today = datetime.utcnow()
date = today.strftime("%Y/%m/%d")
ras = pull_date_ras(date)
sorted_ras = {}
if ras is not None:
#Testing RAs
#if last_ra_count is not None:
# with open('./testing/acastest.json') as f:
# data = f.readlines()
# ras += data
ra_count = len(ras)
if last_ra_count is not None and ra_count != last_ra_count:
print(abs(ra_count - last_ra_count), "new Resolution Advisories")
for ra_num, ra in enumerate(ras[last_ra_count:]):
ra = ast.literal_eval(ra)
if ra['hex'].upper() in planes.keys():
if ra['hex'].upper() not in sorted_ras.keys():
sorted_ras[ra['hex'].upper()] = [ra]
else:
sorted_ras[ra['hex'].upper()].append(ra)
else:
print("No new Resolution Advisories")
last_ra_count = ra_count
for key, obj in planes.items():
if sorted_ras != {} and key in sorted_ras.keys():
print(key, "has", len(sorted_ras[key]), "RAs")
obj.check_new_ras(sorted_ras[key])
obj.expire_ra_types()
icao_key = 'hex'
from defRpdADSBX import pull_rpdadsbx
# print("Planes list: \n"+str((list(planes.keys()))))
# print("\nLen planes:\n"+str(len(planes)))
p = 0
data = dict.fromkeys(['ac'])
# print(data)
while p < len(planes):
planeInfo = pull_rpdadsbx(str(list(planes.keys())[p]))
if p == 0:
data['ac'] = (planeInfo)['ac']
else:
data['ac'].extend((planeInfo)['ac'])
# print("p = "+str(p))
# print(str(list(planes.keys())[p]) + ": " + str(data))
p += 1
if data is not None:
if data['ac'] is not None:
data_indexed = {}
for planeData in data['ac']:
data_indexed[planeData[icao_key].upper()] = planeData
for key, obj in planes.items():
try:
obj.run_adsbx_v2(data_indexed[key.upper()])
except KeyError:
obj.run_empty() obj.run_empty()
else: else:
for obj in planes.values(): for obj in planes.values():
@ -194,7 +255,10 @@ try:
footer = "-------- " + str(running_Count) + " -------- " + str(datetime_tz.strftime("%I:%M:%S %p")) + " ------------------------Elapsed Time- " + str(round(elapsed_calc_time, 3)) + " -------------------------------------" footer = "-------- " + str(running_Count) + " -------- " + str(datetime_tz.strftime("%I:%M:%S %p")) + " ------------------------Elapsed Time- " + str(round(elapsed_calc_time, 3)) + " -------------------------------------"
print (Back.GREEN + Fore.BLACK + footer[0:100] + Style.RESET_ALL) print (Back.GREEN + Fore.BLACK + footer[0:100] + Style.RESET_ALL)
sleep_sec = 30 if main_config.has_section('SLEEP'):
sleep_sec = int(main_config.get('SLEEP', 'SLEEPSEC'))
else:
sleep_sec = 30
for i in range(sleep_sec,0,-1): for i in range(sleep_sec,0,-1):
if i < 10: if i < 10:
i = " " + str(i) i = " " + str(i)
@ -216,7 +280,7 @@ except Exception as e:
except OSError: except OSError:
pass pass
import logging import logging
logging.basicConfig(filename='crash_latest.log', filemode='w', format='%(asctime)s - %(message)s',level=logging.DEBUG) logging.basicConfig(filename='crash_latest.log', filemode='w', format='%(asctime)s - %(message)s')
logging.Formatter.converter = time.gmtime logging.Formatter.converter = time.gmtime
logging.error(e) logging.error(e)
logging.error(str(traceback.format_exc())) logging.error(str(traceback.format_exc()))

13
configs/mainconf.ini.example

@ -2,8 +2,8 @@
#Source to pull data from #Source to pull data from
#SHOULD BE ADSBX which is ADS-B Exchange or OPENS which is OpenSky #SHOULD BE ADSBX which is ADS-B Exchange or OPENS which is OpenSky
#By default configured with OpenSky which anyone can use without a login #By default configured with OpenSky which anyone can use without a login
#ADS-B Exchange has better data but is not avalible unless you pay (see: https://www.adsbexchange.com/data/ ) #ADS-B Exchange has better data but is not avalible unless you feed their network or pay.
SOURCE = OPENS SOURCE = RpdADSBX
#Default amount of time after data loss to trigger a landing when under 10k ft #Default amount of time after data loss to trigger a landing when under 10k ft
DATA_LOSS_MINS = 5 DATA_LOSS_MINS = 5
#Failover from one source to the other, only enable if you have both sources setup. #Failover from one source to the other, only enable if you have both sources setup.
@ -28,6 +28,15 @@ PROXY_HOST =
USERNAME = None USERNAME = None
PASSWORD = None PASSWORD = None
#ADS-B Exchange on RapidAPI https://rapidapi.com/adsbx/api/adsbexchange-com1/
[RpdADSBX]
API_KEY = none
API_VERSION = 2
#Define the delay interval in seconds between each data request. This is usefull if you have limited requests in the API.
[SLEEP]
SLEEPSEC = 60
[GOOGLE] [GOOGLE]
#API KEY for Google Static Maps only if you using this on any of the planes. #API KEY for Google Static Maps only if you using this on any of the planes.
API_KEY = googleapikey API_KEY = googleapikey

2
configs/plane1.ini.example

@ -55,4 +55,4 @@ ACCESS_TOKEN =
ENABLE = FALSE ENABLE = FALSE
TITLE = Title Of Telegram message TITLE = Title Of Telegram message
ROOM_ID = -100xxxxxxxxxx ROOM_ID = -100xxxxxxxxxx
BOT_TOKEN = xxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx BOT_TOKEN = xxxxxxxxxx:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

46
defRpdADSBX.py

@ -0,0 +1,46 @@
import requests
import json
import configparser
from datetime import datetime
main_config = configparser.ConfigParser()
main_config.read('./configs/mainconf.ini')
api_version = main_config.get('RpdADSBX', 'API_VERSION')
def pull_rpdadsbx(planes):
api_version = int(main_config.get('RpdADSBX', 'API_VERSION'))
if api_version != 2:
raise ValueError("Bad RapidAPI ADSBX API Version")
url = "https://adsbexchange-com1.p.rapidapi.com/v2/icao/" + planes + "/"
headers = {
"X-RapidAPI-Host": "adsbexchange-com1.p.rapidapi.com",
"X-RapidAPI-Key": main_config.get('RpdADSBX', 'API_KEY')
}
try:
response = requests.get(url, headers = headers, timeout=30)
except Exception as error:
print('err.args:' + str(error.args))
response = None
if response is not None:
try:
data = json.loads(response.text)
except (json.decoder.JSONDecodeError, ValueError) as error_message:
print("Error with JSON")
print(error_message)
data = None
except TypeError as error_message:
print("Type Error", error_message)
data = None
else:
if "msg" in data.keys() and data['msg'] != "No error":
raise ValueError("Error from ADSBX: msg = ", data['msg'])
if "ctime" in data.keys():
data_ctime = float(data['ctime']) / 1000.0
print("Data ctime:",datetime.utcfromtimestamp(data_ctime))
if "now" in data.keys():
data_now = float(data['now']) / 1000.0
print("Data now time:",datetime.utcfromtimestamp(data_now))
print("Current UTC:", datetime.utcnow())
else:
data = None
return data
Loading…
Cancel
Save