@ -21,6 +21,10 @@ class Plane:
self . last_longitude = None
self . last_contact = None
self . landing_plausible = False
self . last_squawk = None
self . squawk = None
self . nav_modes = None
self . last_nav_modes = None
def getICAO ( self ) :
return self . icao
def run_OPENS ( self , ac_dict ) :
@ -41,20 +45,16 @@ class Plane:
else :
self . feeding = True
self . run_check ( )
def run_ADSBX ( self , ac_dict ) :
#Parse ADBSX Vector
def run_ADSBXv1 ( self , ac_dict ) :
#Parse ADBSX V1 V ector
from colorama import Fore , Back , Style
self . printheader ( " head " )
#print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL)
try :
#postime is divided by 1000 to get seconds from milliseconds, from timestamp expects secs.
self . __dict__ . update ( { ' icao ' : ac_dict [ ' icao ' ] , ' callsign ' : ac_dict [ ' call ' ] , ' reg ' : ac_dict [ ' reg ' ] , ' latitude ' : float ( ac_dict [ ' lat ' ] ) , ' longitude ' : float ( ac_dict [ ' lon ' ] ) , ' geo_alt_ft ' : int ( ac_dict [ ' galt ' ] ) , ' on_ground ' : bool ( int ( ac_dict [ " gnd " ] ) ) , ' last_contact ' : round ( float ( ac_dict [ " postime " ] ) / 1000 ) } )
self . __dict__ . update ( { ' icao ' : ac_dict [ ' icao ' ] , ' callsign ' : ac_dict [ ' call ' ] , ' reg ' : ac_dict [ ' reg ' ] , ' squawk ' : ac_dict [ ' sqk ' ] , ' latitude' : float ( ac_dict [ ' lat ' ] ) , ' longitude ' : float ( ac_dict [ ' lon ' ] ) , ' geo_alt_ft ' : int ( ac_dict [ ' galt ' ] ) , ' on_ground ' : bool ( int ( ac_dict [ " gnd " ] ) ) , ' last_contact ' : round ( float ( ac_dict [ " postime " ] ) / 1000 ) } )
if self . on_ground :
self . geo_alt_ft = 0
if " to " in ac_dict . keys ( ) :
self . to_location = ac_dict [ " to " ]
if " from " in ac_dict . keys ( ) :
self . from_location = ac_dict [ " from " ]
except ValueError as e :
print ( " Got data but some data is invalid! " )
@ -64,6 +64,46 @@ class Plane:
else :
self . feeding = True
self . run_check ( )
def run_ADSBXv2 ( self , ac_dict ) :
#Parse ADBSX V2 Vector
from colorama import Fore , Back , Style
self . printheader ( " head " )
#print (Fore.YELLOW +"ADSBX Sourced Data: ", ac_dict, Style.RESET_ALL)
try :
self . __dict__ . update ( { ' icao ' : ac_dict [ ' hex ' ] . upper ( ) , ' reg ' : ac_dict [ ' r ' ] , ' squawk ' : ac_dict [ ' squawk ' ] , ' latitude ' : float ( ac_dict [ ' lat ' ] ) , ' longitude ' : float ( ac_dict [ ' lon ' ] ) } )
if ' alt_geom ' in ac_dict :
self . geo_alt_ft = int ( ac_dict [ ' alt_geom ' ] )
elif ' alt_geom ' not in ac_dict and ' alt_baro ' in ac_dict :
self . geo_alt_ft = int ( ac_dict [ ' alt_baro ' ] )
if ' flight ' in ac_dict :
self . callsign = ac_dict [ ' flight ' ]
if ac_dict [ ' alt_baro ' ] == " ground " :
self . geo_alt_ft = 0
self . on_ground = True
elif ac_dict [ ' alt_baro ' ] != " ground " :
self . on_ground = False
if ' nav_modes ' in ac_dict :
self . nav_modes = ac_dict [ ' nav_modes ' ]
for idx , mode in enumerate ( self . nav_modes ) :
if mode == ( ' tcas ' or ' lnav ' or ' vnav ' ) :
self . nav_modes [ idx ] = self . nav_modes [ idx ] . upper ( )
else :
self . nav_modes [ idx ] = self . nav_modes [ idx ] . capitalize ( )
from datetime import datetime , timedelta
#Create last seen timestamp from how long ago in secs a pos was rec
now = datetime . now ( )
last_pos_datetime = now - timedelta ( seconds = ac_dict [ " seen_pos " ] )
self . last_contact = datetime . timestamp ( last_pos_datetime )
except ( ValueError , KeyError ) as e :
print ( " Got data but some data is invalid! " )
print ( e )
print ( Fore . YELLOW + " ADSBX Sourced Data: " , ac_dict , Style . RESET_ALL )
self . printheader ( " foot " )
else :
self . feeding = True
self . run_check ( )
def printheader ( self , type ) :
from colorama import Fore , Back , Style
if type == " head " :
@ -137,15 +177,15 @@ class Plane:
time_since_contact = self . get_time_since ( self . last_contact )
output = [
[ ( Fore . CYAN + " ICAO " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + self . icao + Style . RESET_ALL ) ] ,
[ ( Fore . CYAN + " Callsign " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + self . callsign + Style . RESET_ALL ) ] ,
[ ( Fore . CYAN + " Callsign " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + self . callsign + Style . RESET_ALL ) ] if self . callsign != None else None ,
[ ( Fore . CYAN + " Reg " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + self . reg + Style . RESET_ALL ) ] if " reg " in self . __dict__ else None ,
[ ( Fore . CYAN + " From " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + self . from_location + Style . RESET_ALL ) ] if " from_location " in self . __dict__ else None ,
[ ( Fore . CYAN + " To " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + self . to_location + Style . RESET_ALL ) ] if " to_location " in self . __dict__ else None ,
[ ( Fore . CYAN + " Squawk " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + self . squawk + Style . RESET_ALL ) ] if " squawk " in self . __dict__ else None ,
[ ( Fore . CYAN + " Latitude " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + str ( self . latitude ) + Style . RESET_ALL ) ] ,
[ ( Fore . CYAN + " Longitude " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + str ( self . longitude ) + Style . RESET_ALL ) ] ,
[ ( Fore . CYAN + " Last Contact " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + str ( time_since_contact ) . split ( " . " ) [ 0 ] + Style . RESET_ALL ) ] ,
[ ( Fore . CYAN + " On Ground " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + str ( self . on_ground ) + Style . RESET_ALL ) ] ,
[ ( Fore . CYAN + " GEO Alitude " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + str ( " {:,} ft " . format ( self . geo_alt_ft ) ) + Style . RESET_ALL ) ]
[ ( Fore . CYAN + " GEO Alitude " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + str ( " {:,} ft " . format ( self . geo_alt_ft ) ) + Style . RESET_ALL ) ] ,
[ ( Fore . CYAN + " Nav Modes " + Style . RESET_ALL ) , ( Fore . LIGHTGREEN_EX + ' , ' . join ( self . nav_modes ) + Style . RESET_ALL ) ] if " nav_modes " in self . __dict__ and self . nav_modes != None else None ,
]
output = list ( filter ( None , output ) )
print ( tabulate ( output , [ ] , ' fancy_grid ' ) )
@ -161,11 +201,11 @@ class Plane:
if self . last_on_ground :
self . tookoff = True
self . trigger_type = " no longer on ground "
self . tookoff _header = " Took off from "
self . type _header = " Took off from "
elif self . last_feeding is False and self . feeding and self . landing_plausible == False :
self . tookoff = True
self . trigger_type = " data acquisition "
self . tookoff _header = " Took off near "
self . type _header = " Took off near "
else :
self . tookoff = False
else :
@ -179,7 +219,7 @@ class Plane:
if self . on_ground and self . last_on_ground is False and self . last_below_desired_ft :
self . landed = True
self . trigger_type = " now on ground "
self . landed _header = " Landed in "
self . type _header = " Landed in "
self . landing_plausible = False
#Set status for landing plausible
elif self . last_below_desired_ft and self . last_feeding and self . feeding is False and self . last_on_ground is False :
@ -190,7 +230,7 @@ class Plane:
self . landing_plausible = False
self . landed = True
self . trigger_type = " data loss "
self . landed _header = " Landed near "
self . type _header = " Landed near "
else :
self . landed = False
@ -263,59 +303,26 @@ class Plane:
#Set Twitter Title
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
self . twitter_title = self . icao if self . config . get ( ' TWITTER ' , ' TITLE ' ) == " icao " else self . callsign if self . config . get ( ' TWITTER ' , ' TITLE ' ) == " callsign " else self . config . get ( ' TWITTER ' , ' TITLE ' )
#Takeoff Notifcation and Landed
if self . tookoff :
if invalid_Location is False :
tookoff_message = ( self . tookoff_header + aera_hierarchy + " , " + state + " , " + country_code + " . " )
else :
tookoff_message = ( " Took off " )
print ( tookoff_message )
#Google Map or tar1090 screenshot
if self . config . get ( ' MAP ' , ' OPTION ' ) == " GOOGLESTATICMAP " :
getMap ( ( aera_hierarchy + " , " + state + " , " + country_code ) , self . icao )
elif self . config . get ( ' MAP ' , ' OPTION ' ) == " ADSBX " :
getSS ( self . icao , self . overlays )
if nearest_airport_dict != None :
append_airport ( self . map_file_name , nearest_airport_dict [ ' icao ' ] , nearest_airport_dict [ ' name ' ] , nearest_airport_dict [ ' distance ' ] )
airport_string = nearest_airport_dict [ ' icao ' ] + " , " + nearest_airport_dict [ " name " ]
else :
airport_string = " "
else :
raise Exception ( " Map option not set correctly in this planes conf " )
#Discord
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + " " + tookoff_message + airport_string ) . strip ( )
sendDis ( dis_message , self . map_file_name , self . config )
#PushBullet
if self . config . getboolean ( ' PUSHBULLET ' , ' ENABLE ' ) :
with open ( self . map_file_name , " rb " ) as pic :
map_data = self . pb . upload_file ( pic , " Tookoff IMG " )
self . pb_channel . push_note ( self . config . get ( ' PUSHBULLET ' , ' TITLE ' ) , tookoff_message )
self . pb_channel . push_file ( * * map_data )
#Twitter
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
twitter_media_map_obj = self . tweet_api . media_upload ( self . map_file_name )
alt_text = " Call: " + self . callsign + " On Ground: " + str ( self . on_ground ) + " Alt: " + str ( self . geo_alt_ft ) + " Last Contact: " + str ( time_since_contact ) + " Trigger: " + self . trigger_type
self . tweet_api . create_media_metadata ( media_id = twitter_media_map_obj . media_id , alt_text = alt_text )
self . tweet_api . update_status ( status = ( ( self . twitter_title + " " + tookoff_message ) . strip ( ) ) , media_ids = [ twitter_media_map_obj . media_id ] )
#self.tweet_api.update_with_media(self.map_file_name, status = (self.twitter_title + " " + tookoff_message).strip())
self . takeoff_time = time . time ( )
os . remove ( self . map_file_name )
if self . landed :
self . landed_time_msg = " "
if self . takeoff_time != None :
#Takeoff and Land Notification
if self . tookoff or self . landed :
if self . tookoff :
self . takeoff_time = time . time ( )
self . landed_time_msg = None
elif self . landed and self . takeoff_time != None :
self . landed_time = time . time ( ) - self . takeoff_time
if platform . system ( ) == " Linux " :
self . landed_time_msg = time . strftime ( " Apx. flt. time % -H Hours : % -M Mins. " , time . gmtime ( self . landed_time ) )
elif platform . system ( ) == " Windows " :
self . landed_time_msg = time . strftime ( " Apx. flt. time % #H Hours : % #M Mins. " , time . gmtime ( self . landed_time ) )
self . takeoff_time = None
self . landed_time = None
elif self . landed :
self . landed_time_msg = None
if invalid_Location is False :
landed_message = ( self . landed_header + aera_hierarchy + " , " + state + " , " + country_code + " . " + self . landed_time_msg )
message = ( self . type_header + aera_hierarchy + " , " + state + " , " + country_code + " . " ) + ( ( " " + self . landed_time_msg ) if self . landed_time_msg != None else " " )
else :
landed_message = ( " Landed " + " , " + self . landed_time_msg )
print ( landed_ message)
message = ( " Landed " + ( ( " " + self . landed_time_msg ) if self . landed_time_msg != None else " " ) if self . landed else " Tookoff " if self . tookoff else " " )
print ( message )
#Google Map or tar1090 screenshot
if self . config . get ( ' MAP ' , ' OPTION ' ) == " GOOGLESTATICMAP " :
getMap ( ( aera_hierarchy + " , " + state + " , " + country_code ) , self . icao )
@ -326,31 +333,58 @@ class Plane:
airport_string = nearest_airport_dict [ ' icao ' ] + " , " + nearest_airport_dict [ " name " ]
else :
airport_string = " "
else :
raise Exception ( " Map option not set correctly in this planes conf " )
#Discord
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + " " + landed_ message + " " + airport_string ) . strip ( )
dis_message = ( self . dis_title + " " + message + " " + airport_string ) . strip ( )
sendDis ( dis_message , self . map_file_name , self . config )
#PushBullet
if self . config . getboolean ( ' PUSHBULLET ' , ' ENABLE ' ) :
with open ( self . map_file_name , " rb " ) as pic :
map_data = self . pb . upload_file ( pic , " Landed IMG" )
self . pb_channel . push_note ( self . config . get ( ' PUSHBULLET ' , ' TITLE ' ) , landed_ message)
map_data = self . pb . upload_file ( pic , " Tookoff IMG" )
self . pb_channel . push_note ( self . config . get ( ' PUSHBULLET ' , ' TITLE ' ) , message )
self . pb_channel . push_file ( * * map_data )
#Twitter
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
twitter_media_map_obj = self . tweet_api . media_upload ( self . map_file_name )
alt_text = " Call: " + self . callsign + " On Ground: " + str ( self . on_ground ) + " Alt: " + str ( self . geo_alt_ft ) + " Last Contact: " + str ( time_since_contact ) + " Trigger: " + self . trigger_type
self . tweet_api . create_media_metadata ( media_id = twitter_media_map_obj . media_id , alt_text = alt_text )
self . tweet_api . update_status ( status = ( ( self . twitter_title + " " + landed_message ) . strip ( ) ) , media_ids = [ twitter_media_map_obj . media_id ] )
#self.tweet_api.update_with_media(self.map_file_name, status = (self.twitter_title + " " + landed_message).strip())
self . takeoff_time = None
self . landed_time = None
self . time_since_tk = None
self . tweet_api . update_status ( status = ( ( self . twitter_title + " " + message ) . strip ( ) ) , media_ids = [ twitter_media_map_obj . media_id ] )
#self.tweet_api.update_with_media(self.map_file_name, status = (self.twitter_title + " " + tookoff_message).strip())
os . remove ( self . map_file_name )
self . last_contact = None
#Squawks
squawks = [ ( " 7500 " , " Hijacking " ) , ( " 7600 " , " Radio Failure " ) , ( " 7700 " , " Emergency " ) ]
if self . feeding :
for squawk in squawks :
if self . squawk == squawk [ 0 ] and self . squawk != self . last_squawk :
squawk_message = ( " Squawking " + squawk [ 0 ] + " , " + squawk [ 1 ] )
print ( squawk_message )
#Google Map or tar1090 screenshot
# if self.config.get('MAP', 'OPTION') == "GOOGLESTATICMAP":
# getMap((aera_hierarchy + ", " + state + ", " + country_code), self.icao)
if self . config . get ( ' MAP ' , ' OPTION ' ) == " ADSBX " :
getSS ( self . icao , self . overlays )
#Discord
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + " " + squawk_message )
sendDis ( dis_message , self . map_file_name , self . config )
os . remove ( self . map_file_name )
#Nav Modes Notifications
if self . nav_modes != None and self . last_nav_modes != None :
new_modes = [ ]
for mode in self . nav_modes :
if mode not in self . last_nav_modes :
new_modes . append ( mode )
#Discord
if new_modes != [ ] :
modes_string = " , " . join ( new_modes )
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + " " + modes_string + " mode enabled. " ) . strip ( )
getSS ( self . icao , self . overlays )
sendDis ( dis_message , self . map_file_name , self . config )
#Set Variables to compare to next check
self . last_feeding = self . feeding
@ -359,6 +393,8 @@ class Plane:
self . last_below_desired_ft = self . below_desired_ft
self . last_longitude = self . longitude
self . last_latitude = self . latitude
self . last_squawk = self . squawk
self . last_nav_modes = self . nav_modes
if self . takeoff_time != None :