Mirror of https://github.com/Jxck-S/plane-notify
This is the backend to ElonsJet and other bots
acasads-badsbadsbexchangeadsbxairportemergency-squawklandopenskyopensky-apiopensky-networkplaneresolutionadviosrysquawktakeofftcas
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
91 lines
3.4 KiB
91 lines
3.4 KiB
import requests |
|
import json |
|
import configparser |
|
from datetime import datetime |
|
from http.client import IncompleteRead |
|
import http.client as http |
|
import urllib3 |
|
import socket |
|
main_config = configparser.ConfigParser() |
|
main_config.read('./configs/mainconf.ini') |
|
api_version = main_config.get('ADSBX', 'API_VERSION') |
|
|
|
def pull(url, headers): |
|
try: |
|
response = requests.get(url, headers = headers) |
|
print ("HTTP Status Code:", response.status_code) |
|
response.raise_for_status() |
|
except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message: |
|
print("Basic Connection Error") |
|
print(error_message) |
|
response = None |
|
except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message: |
|
print("Connection Error") |
|
print(error_message) |
|
response = None |
|
except Exception as error_message: |
|
print("Connection Error uncaught, basic exception for all") |
|
print(error_message) |
|
response = None |
|
return response |
|
|
|
def pull_adsbx(planes): |
|
api_version = int(main_config.get('ADSBX', 'API_VERSION')) |
|
if api_version not in [1, 2]: |
|
raise ValueError("Bad ADSBX API Version") |
|
if main_config.getboolean('ADSBX', 'ENABLE_PROXY') is False: |
|
if api_version == 1: |
|
if len(planes) > 1: |
|
url = "https://adsbexchange.com/api/aircraft/json/" |
|
elif len(planes) == 1: |
|
url = "https://adsbexchange.com/api/aircraft/icao/" + str(list(planes.keys())[0]) + "/" |
|
elif api_version == 2: |
|
url = "https://adsbexchange.com/api/aircraft/v2/all" |
|
else: |
|
if main_config.has_option('ADSBX', 'PROXY_HOST'): |
|
if api_version == 1: |
|
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/json/all" |
|
if api_version == 2: |
|
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/v2/all" |
|
else: |
|
raise ValueError("Proxy enabled but no host") |
|
headers = { |
|
'api-auth': main_config.get('ADSBX', 'API_KEY'), |
|
'Accept-Encoding': 'gzip' |
|
} |
|
response = pull(url, headers) |
|
if response is not None: |
|
try: |
|
data = json.loads(response.text) |
|
except (json.decoder.JSONDecodeError, ValueError) as error_message: |
|
print("Error with JSON") |
|
print(error_message) |
|
data = None |
|
except TypeError as error_message: |
|
print("Type Error", error_message) |
|
data = None |
|
else: |
|
if "msg" in data.keys() and data['msg'] != "No error": |
|
raise ValueError("Error from ADSBX: msg = ", data['msg']) |
|
if "ctime" in data.keys(): |
|
data_ctime = float(data['ctime']) / 1000.0 |
|
print("Data ctime:",datetime.utcfromtimestamp(data_ctime)) |
|
if "now" in data.keys(): |
|
data_now = float(data['now']) / 1000.0 |
|
print("Data now time:",datetime.utcfromtimestamp(data_now)) |
|
print("Current UTC:", datetime.utcnow()) |
|
else: |
|
data = None |
|
return data |
|
|
|
def pull_date_ras(date): |
|
url = f"https://globe.adsbexchange.com/globe_history/{date}/acas/acas.json" |
|
headers = { |
|
'Accept-Encoding': 'gzip' |
|
} |
|
response = pull(url, headers) |
|
if response is not None: |
|
data = response.text.splitlines() |
|
else: |
|
data = None |
|
return data |