Mirror of https://github.com/Jxck-S/plane-notify This is the backend to ElonsJet and other bots
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

91 lines
3.5 KiB

import requests
import json
import configparser
from datetime import datetime
from http.client import IncompleteRead
import http.client as http
import urllib3
import socket
main_config = configparser.ConfigParser()
main_config.read('./configs/mainconf.ini')
api_version = main_config.get('ADSBX', 'API_VERSION')
def pull(url, headers):
try:
response = requests.get(url, headers = headers, timeout=30)
print ("HTTP Status Code:", response.status_code)
response.raise_for_status()
except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message:
print("Basic Connection Error")
print(error_message)
response = None
except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message:
print("Connection Error")
print(error_message)
response = None
except Exception as error_message:
print("Connection Error uncaught, basic exception for all")
print(error_message)
response = None
return response
def pull_adsbx(planes):
api_version = int(main_config.get('ADSBX', 'API_VERSION'))
if api_version not in [1, 2]:
raise ValueError("Bad ADSBX API Version")
if main_config.getboolean('ADSBX', 'ENABLE_PROXY') is False:
if api_version == 1:
if len(planes) > 1:
url = "https://adsbexchange.com/api/aircraft/json/"
elif len(planes) == 1:
url = "https://adsbexchange.com/api/aircraft/icao/" + str(list(planes.keys())[0]) + "/"
elif api_version == 2:
url = "https://adsbexchange.com/api/aircraft/v2/all"
else:
if main_config.has_option('ADSBX', 'PROXY_HOST'):
if api_version == 1:
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/json/all"
if api_version == 2:
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/v2/all"
else:
raise ValueError("Proxy enabled but no host")
headers = {
'api-auth': main_config.get('ADSBX', 'API_KEY'),
'Accept-Encoding': 'gzip'
}
response = pull(url, headers)
if response is not None:
try:
data = json.loads(response.text)
except (json.decoder.JSONDecodeError, ValueError) as error_message:
print("Error with JSON")
print(error_message)
data = None
except TypeError as error_message:
print("Type Error", error_message)
data = None
else:
if "msg" in data.keys() and data['msg'] != "No error":
raise ValueError("Error from ADSBX: msg = ", data['msg'])
if "ctime" in data.keys():
data_ctime = float(data['ctime']) / 1000.0
print("Data ctime:",datetime.utcfromtimestamp(data_ctime))
if "now" in data.keys():
data_now = float(data['now']) / 1000.0
print("Data now time:",datetime.utcfromtimestamp(data_now))
print("Current UTC:", datetime.utcnow())
else:
data = None
return data
def pull_date_ras(date):
url = f"https://globe.adsbexchange.com/globe_history/{date}/acas/acas.json"
headers = {
'Accept-Encoding': 'gzip'
}
response = pull(url, headers)
if response is not None:
data = response.text.splitlines()
else:
data = None
return data