@ -28,9 +28,7 @@ class Plane:
self . landing_plausible = False
self . nav_modes = None
self . last_nav_modes = None
self . recheck_to = None
self . speed = None
self . nearest_airport_dict = None
self . recent_ra_types = { }
self . db_flags = None
self . sel_nav_alt = None
@ -38,6 +36,11 @@ class Plane:
self . squawk = None
self . emergency_already_triggered = None
self . last_emergency = None
self . recheck_route_time = None
self . known_to_airport = None
self . track = None
self . last_track = None
self . circle_history = None
if self . config . has_option ( ' DATA ' , ' DATA_LOSS_MINS ' ) :
self . data_loss_mins = self . config . getint ( ' DATA ' , ' DATA_LOSS_MINS ' )
else :
@ -57,7 +60,7 @@ class Plane:
self . printheader ( " head " )
#print (Fore.YELLOW + "OpenSky Sourced Data: ", ac_dict)
try :
self . __dict__ . update ( { ' icao ' : ac_dict . icao24 . upper ( ) , ' callsign ' : ac_dict . callsign , ' latitude ' : ac_dict . latitude , ' longitude ' : ac_dict . longitude , ' on_ground ' : bool ( ac_dict . on_ground ) , ' squawk ' : ac_dict . squawk } )
self . __dict__ . update ( { ' icao ' : ac_dict . icao24 . upper ( ) , ' callsign ' : ac_dict . callsign , ' latitude ' : ac_dict . latitude , ' longitude ' : ac_dict . longitude , ' on_ground ' : bool ( ac_dict . on_ground ) , ' squawk ' : ac_dict . squawk , ' track ' : float ( ac_dict . heading ) } )
if ac_dict . baro_altitude != None :
self . alt_ft = round ( float ( ac_dict . baro_altitude ) * 3.281 )
elif self . on_ground :
@ -77,7 +80,7 @@ class Plane:
#print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL)
try :
#postime is divided by 1000 to get seconds from milliseconds, from timestamp expects secs.
self . __dict__ . update ( { ' icao ' : ac_dict [ ' icao ' ] . upper ( ) , ' callsign ' : ac_dict [ ' call ' ] , ' reg ' : ac_dict [ ' reg ' ] , ' latitude ' : float ( ac_dict [ ' lat ' ] ) , ' longitude ' : float ( ac_dict [ ' lon ' ] ) , ' alt_ft ' : int ( ac_dict [ ' alt ' ] ) , ' on_ground ' : bool ( int ( ac_dict [ " gnd " ] ) ) , ' squawk ' : ac_dict [ ' sqk ' ] } )
self . __dict__ . update ( { ' icao ' : ac_dict [ ' icao ' ] . upper ( ) , ' callsign ' : ac_dict [ ' call ' ] , ' reg ' : ac_dict [ ' reg ' ] , ' latitude ' : float ( ac_dict [ ' lat ' ] ) , ' longitude ' : float ( ac_dict [ ' lon ' ] ) , ' alt_ft ' : int ( ac_dict [ ' alt ' ] ) , ' on_ground ' : bool ( int ( ac_dict [ " gnd " ] ) ) , ' squawk ' : ac_dict [ ' sqk ' ] , ' track ' : float ( ac_dict [ " trak " ] ) } )
if self . on_ground :
self . alt_ft = 0
self . last_pos_datetime = datetime . fromtimestamp ( int ( ac_dict [ ' postime ' ] ) / 1000 )
@ -120,6 +123,8 @@ class Plane:
else :
self . nav_modes [ idx ] = self . nav_modes [ idx ] . capitalize ( )
self . squawk = ac_dict . get ( ' squawk ' )
if " track " in ac_dict :
self . track = ac_dict [ ' track ' ]
if " nav_altitude_fms " in ac_dict :
self . sel_nav_alt = ac_dict [ ' nav_altitude_fms ' ]
elif " nav_altitude_mcp " in ac_dict :
@ -181,27 +186,55 @@ class Plane:
overlays = " "
return overlays
def route_info ( self ) :
from lookup_route import lookup_route
extra_route_info = lookup_route ( self . reg , ( self . latitude , self . longitude ) , self . type , self . alt_ft )
if extra_route_info is None and not self . recheck_to :
self . recheck_to = True
route_to = None
elif extra_route_info is not None :
from lookup_route import lookup_route , clean_data
def route_format ( extra_route_info , type ) :
from defAirport import get_airport_by_icao
to_airport = get_airport_by_icao ( extra_route_info [ ' apdstic ' ] )
to_airport = get_airport_by_icao ( self . known_to_airport )
code = to_airport [ ' iata_code ' ] if to_airport [ ' iata_code ' ] != " " else to_airport [ ' icao ' ]
airport_text = f " { code } , { to_airport [ ' name ' ] } "
if ' arrivalRelative ' in extra_route_info . keys ( ) and " In " in extra_route_info [ ' arrivalRelative ' ] :
arrival_rel = " in ~ " + extra_route_info [ ' arrivalRelative ' ] . strip ( " In " )
if ' time_to ' in extra_route_info . keys ( ) and type != " divert " :
arrival_rel = " in ~ " + extra_route_info [ ' time_to ' ]
else :
arrival_rel = None
if extra_route_info [ ' apdstic ' ] != self . nearest_airport_dict [ ' icao ' ] :
if self . known_to_airport != self . nearest_from_airport :
if type == " inital " :
header = " Going to "
elif type == " change " :
header = " Now going to "
elif type == " divert " :
header = " Now diverting to "
area = f " { to_airport [ ' municipality ' ] } , { to_airport [ ' region ' ] } , { to_airport [ ' iso_country ' ] } "
route_to = f " Going to { area } ( { airport_text } ) " + f " arriving { arrival_rel } " if arrival_rel is not None else " "
route_to = f " { header } { area } ( { airport_text } ) " + ( f " arriving { arrival_rel } " if arrival_rel is not None else " " )
else :
route_to = f " Will be returning to { airport_text } " + f " { arrival_rel } " if arrival_rel is not None else " "
else :
route_to = None
if type == " inital " :
header = " Will be returning to "
elif type == " change " :
header = " Now returning to "
elif type == " divert " :
header = " Now diverting back to "
route_to = f " { header } { airport_text } " + ( f " { arrival_rel } " if arrival_rel is not None else " " )
return route_to
extra_route_info = clean_data ( lookup_route ( self . reg , ( self . latitude , self . longitude ) , self . type , self . alt_ft ) )
route_to = None
if extra_route_info is None :
pass
elif extra_route_info is not None :
#Diversion
if " divert_icao " in extra_route_info . keys ( ) :
if self . known_to_airport != extra_route_info [ " divert_icao " ] :
self . known_to_airport = extra_route_info [ ' divert_icao ' ]
route_to = route_format ( extra_route_info , " divert " )
#Destination
elif " dest_icao " in extra_route_info . keys ( ) :
#Inital Destination Found
if self . known_to_airport is None :
self . known_to_airport = extra_route_info [ ' dest_icao ' ]
route_to = route_format ( extra_route_info , " inital " )
#Destination Change
elif self . known_to_airport != extra_route_info [ " dest_icao " ] :
self . known_to_airport = extra_route_info [ ' dest_icao ' ]
route_to = route_format ( extra_route_info , " change " )
return route_to
def run_empty ( self ) :
self . printheader ( " head " )
@ -332,8 +365,12 @@ class Plane:
landed_time_msg = None
#Proprietary Route Lookup
if ENABLE_ROUTE_LOOKUP :
self . nearest_airport_dic t = nearest_airport_dict
self . nearest_from_ airport = nearest_airport_dict [ ' icao ' ]
route_to = self . route_info ( )
if route_to is None :
self . recheck_route_time = 1
else :
self . recheck_route_time = 10
elif self . landed and self . takeoff_time != None :
landed_time = datetime . utcnow ( ) - self . takeoff_time
if trigger_type == " data loss " :
@ -382,11 +419,14 @@ class Plane:
self . tweet_api . create_media_metadata ( media_id = twitter_media_map_obj . media_id , alt_text = alt_text )
self . tweet_api . update_status ( status = ( ( self . twitter_title + " " + message ) . strip ( ) ) , media_ids = [ twitter_media_map_obj . media_id ] )
os . remove ( self . map_file_name )
#Recheck Proprietary Route Lookup a minute later if infomation was not available on takeoff.
if self . recheck_to and self . takeoff_time is not None and ( datetime . utcnow ( ) - self . takeoff_time ) . total_seconds ( ) > 60 :
if self . landed :
self . recheck_route_time = None
self . known_to_airport = None
self . nearest_from_airport = None
#Recheck Proprietary Route Info.
if self . takeoff_time is not None and self . recheck_route_time is not None and ( datetime . utcnow ( ) - self . takeoff_time ) . total_seconds ( ) > 60 * self . recheck_route_time :
self . recheck_route_time + = 10
route_to = self . route_info ( )
self . recheck_to = False
self . nearest_takeoff_airport = None
if route_to != None :
print ( route_to )
#Discord
@ -399,6 +439,16 @@ class Plane:
tweet = self . tweet_api . user_timeline ( count = 1 ) [ 0 ]
self . tweet_api . update_status ( status = f " { self . twitter_title } { route_to } " . strip ( ) , in_reply_to_status_id = tweet . id )
if self . circle_history is not None :
#Expires traces for circles
if self . circle_history [ " traces " ] != [ ] :
for trace in self . circle_history [ " traces " ] :
if ( datetime . now ( ) - datetime . fromtimestamp ( trace [ 0 ] ) ) . total_seconds ( ) > = 20 * 60 :
print ( " Trace Expire, removed " )
self . circle_history [ " traces " ] . remove ( trace )
#Expire touchngo
if " touchngo " in self . circle_history . keys ( ) and ( datetime . now ( ) - datetime . fromtimestamp ( self . circle_history [ ' touchngo ' ] ) ) . total_seconds ( ) > = 10 * 60 :
self . circle_history . pop ( " touchngo " )
if self . feeding :
#Squawks
emergency_squawks = { " 7500 " : " Hijacking " , " 7600 " : " Radio Failure " , " 7700 " : " General Emergency " }
@ -453,6 +503,73 @@ class Plane:
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + " Sel. alt. " + str ( " {:,} ft " . format ( self . sel_nav_alt ) ) )
sendDis ( dis_message , self . config )
#Circling
if self . last_track is not None :
import time
if self . circle_history is None :
self . circle_history = { " traces " : [ ] , " triggered " : False }
#Add touchngo
if self . on_ground or self . alt_ft < = 400 :
self . circle_history [ " touchngo " ] = time . time ( )
#Add a Trace
if self . on_ground is False :
from calculate_headings import calculate_deg_change
track_change = calculate_deg_change ( self . track , self . last_track )
track_change = round ( track_change , 3 )
self . circle_history [ " traces " ] . append ( ( time . time ( ) , self . latitude , self . longitude , track_change ) )
total_change = 0
coords = [ ]
for trace in self . circle_history [ " traces " ] :
total_change + = float ( trace [ 3 ] )
coords . append ( ( float ( trace [ 1 ] ) , float ( trace [ 2 ] ) ) )
print ( " Total Bearing Change " , round ( total_change , 3 ) )
#Check Centroid when Bearing change meets req
if abs ( total_change ) > = 720 and self . circle_history [ ' triggered ' ] is False :
print ( " Circling Bearing Change Met " )
from shapely . geometry import MultiPoint
from geopy . distance import geodesic
aircraft_coords = ( self . latitude , self . longitude )
points = MultiPoint ( coords )
cent = ( points . centroid ) #True centroid, not necessarily an existing point
#rp = (points.representative_point()) #A represenative point, not centroid,
print ( cent )
#print(rp)
distance_to_centroid = geodesic ( aircraft_coords , cent . coords ) . mi
print ( f " Distance to centroid of circling coordinates { distance_to_centroid } miles " )
if distance_to_centroid < = 15 :
print ( " Within 15 miles of centroid, CIRCLING " )
from defAirport import getClosestAirport
nearest_airport_dict = getClosestAirport ( self . latitude , self . longitude , [ " medium_airport " , " large_airport " ] )
from calculate_headings import calculate_from_bearing , calculate_cardinal
from_bearing = calculate_from_bearing ( ( float ( nearest_airport_dict [ ' latitude_deg ' ] ) , float ( nearest_airport_dict [ ' longitude_deg ' ] ) ) , ( self . latitude , self . longitude ) )
cardinal = calculate_cardinal ( from_bearing )
from defSS import get_adsbx_screenshot
url_params = f " icao= { self . icao } &zoom=10&largeMode=2&hideButtons&hideSidebar&mapDim=0&overlays= { self . get_adsbx_map_overlays ( ) } "
get_adsbx_screenshot ( self . map_file_name , url_params )
if nearest_airport_dict [ ' distance_mi ' ] < 3 :
if " touchngo " in self . circle_history . keys ( ) :
message = f " Doing touch and goes at { nearest_airport_dict [ ' icao ' ] } "
else :
message = f " Circling over { nearest_airport_dict [ ' icao ' ] } at { self . alt_ft } ft "
else :
message = f " Circling { round ( nearest_airport_dict [ ' distance_mi ' ] , 2 ) } mi { cardinal } of { nearest_airport_dict [ ' icao ' ] } , { nearest_airport_dict [ ' name ' ] } at { self . alt_ft } ft "
print ( message )
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
role_id = self . config . get ( ' DISCORD ' , ' ROLE_ID ' ) if self . config . has_option ( ' DISCORD ' , ' ROLE_ID ' ) else None
sendDis ( message , self . config , self . map_file_name , role_id )
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
twitter_media_map_obj = self . tweet_api . media_upload ( self . map_file_name )
alt_text = f " Distance to centroid: { distance_to_centroid } , Total change: { total_change } "
self . tweet_api . create_media_metadata ( media_id = twitter_media_map_obj . media_id , alt_text = alt_text )
tweet = self . tweet_api . user_timeline ( count = 1 ) [ 0 ]
self . tweet_api . update_status ( status = f " { self . twitter_title } { message } " . strip ( ) , in_reply_to_status_id = tweet . id , media_ids = [ twitter_media_map_obj . media_id ] )
self . circle_history [ ' triggered ' ] = True
elif abs ( total_change ) < = 360 and self . circle_history [ " triggered " ] :
print ( " No Longer Circling, trigger cleared " )
self . circle_history [ ' triggered ' ] = False
# #Power Up
# if self.last_feeding == False and self.speed == 0 and self.on_ground:
# if self.config.getboolean('DISCORD', 'ENABLE'):
@ -461,6 +578,7 @@ class Plane:
#Set Variables to compare to next check
self . last_track = self . track
self . last_feeding = self . feeding
self . last_on_ground = self . on_ground
self . last_below_desired_ft = self . below_desired_ft