@ -25,6 +25,18 @@ class Plane:
self . squawks = [ None , None , None , None ]
self . nav_modes = None
self . last_nav_modes = None
self . recheck_to = None
self . speed = None
self . nearest_airport_dict = None
#Setup Tweepy
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
from defTweet import tweepysetup
self . tweet_api = tweepysetup ( self . config )
#Setup PushBullet
if self . config . getboolean ( ' PUSHBULLET ' , ' ENABLE ' ) :
from pushbullet import Pushbullet
self . pb = Pushbullet ( self . config [ ' PUSHBULLET ' ] [ ' API_KEY ' ] )
self . pb_channel = self . pb . get_channel ( self . config . get ( ' PUSHBULLET ' , ' CHANNEL_TAG ' ) )
def getICAO ( self ) :
return self . icao
def run_OPENS ( self , ac_dict ) :
@ -81,7 +93,7 @@ class Plane:
self . printheader ( " head " )
print ( Fore . YELLOW + " ADSBX Sourced Data: " , ac_dict , Style . RESET_ALL )
try :
self . __dict__ . update ( { ' icao ' : ac_dict [ ' hex ' ] . upper ( ) , ' reg ' : ac_dict [ ' r ' ] , ' latitude ' : float ( ac_dict [ ' lat ' ] ) , ' longitude ' : float ( ac_dict [ ' lon ' ] ) } )
self . __dict__ . update ( { ' icao ' : ac_dict [ ' hex ' ] . upper ( ) , ' reg ' : ac_dict [ ' r ' ] , ' latitude ' : float ( ac_dict [ ' lat ' ] ) , ' longitude ' : float ( ac_dict [ ' lon ' ] ) , ' type ' : ac_dict [ ' t ' ] , ' speed ' : ac_dict [ ' gs ' ] } )
if ac_dict [ ' alt_baro ' ] != " ground " :
self . alt_ft = int ( ac_dict [ ' alt_baro ' ] )
self . on_ground = False
@ -129,7 +141,6 @@ class Plane:
elif type == " foot " :
header = " ---------------------------------------------------------------------------------------------------- "
print ( Back . MAGENTA + header [ 0 : 100 ] + Style . RESET_ALL )
def get_time_since ( self , last_contact ) :
from datetime import datetime
if last_contact != None :
@ -138,6 +149,10 @@ class Plane:
else :
time_since_contact = None
return time_since_contact
def time_since ( self , start_time ) :
import time
elapsed_time = time . time ( ) - start_time
return elapsed_time
def run_empty ( self ) :
self . printheader ( " head " )
self . feeding = False
@ -154,6 +169,15 @@ class Plane:
from tabulate import tabulate
from AppendAirport import append_airport
from defAirport import getClosestAirport
#Propritary
ENABLE_ROUTE_LOOKUP = False
if ENABLE_ROUTE_LOOKUP :
from lookup_route import lookup_route
else :
#Dead Place function
def lookup_route ( * args ) :
return None
if self . config . get ( ' MAP ' , ' OPTION ' ) == " GOOGLESTATICMAP " :
from defMap import getMap
elif self . config . get ( ' MAP ' , ' OPTION ' ) == " ADSBX " :
@ -166,16 +190,6 @@ class Plane:
raise ValueError ( " Map option not set correctly in this planes conf " )
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
from defDiscord import sendDis
#Setup Tweepy
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
from defTweet import tweepysetup
self . tweet_api = tweepysetup ( self . config )
#Setup PushBullet
if self . config . getboolean ( ' PUSHBULLET ' , ' ENABLE ' ) :
from pushbullet import Pushbullet
self . pb = Pushbullet ( self . config [ ' PUSHBULLET ' ] [ ' API_KEY ' ] )
self . pb_channel = self . pb . get_channel ( self . config . get ( ' PUSHBULLET ' , ' CHANNEL_TAG ' ) )
if self . feeding == False :
time_since_contact = self . get_time_since ( self . last_contact )
output = [
@ -204,7 +218,7 @@ class Plane:
print ( tabulate ( output , [ ] , ' fancy_grid ' ) )
#Check if below desire ft
desired_ft = 10 000
desired_ft = 15 000
if self . alt_ft is None or self . alt_ft > desired_ft :
self . below_desired_ft = False
elif self . alt_ft < desired_ft :
@ -214,71 +228,77 @@ class Plane:
if self . last_on_ground :
self . tookoff = True
self . trigger_type = " no longer on ground "
self . type_header = " Took off from "
type_header = " Took off from "
elif self . last_feeding is False and self . feeding and self . landing_plausible == False :
self . tookoff = True
self . trigger_type = " data acquisition "
self . type_header = " Took off near "
nearest_airport_dict = getClosestAirport ( self . latitude , self . longitude , self . config . get ( " AIRPORT " , " TYPES " ) )
alt_above_airport = ( self . alt_ft - int ( nearest_airport_dict [ ' elevation_ft ' ] ) )
print ( f " AGL nearest airport: { alt_above_airport } " )
if alt_above_airport < = 10000 :
self . nearest_airport_dict = nearest_airport_dict
self . tookoff = True
self . trigger_type = " data acquisition "
type_header = " Took off near "
else :
self . tookoff = False
else :
self . tookoff = False
#self.tookoff = bool(self.below_desired_ft and self.on_ground is False and ((self.last_feeding is False and self.feeding) or (self.last_on_ground)))
#print ("Tookoff Just Now:", self.tookoff)
#Check if Landed
if self . on_ground and self . last_on_ground is False and self . last_below_desired_ft :
self . landed = True
self . trigger_type = " now on ground "
self . type_header = " Landed in "
type_header = " Landed in "
self . landing_plausible = False
#Set status for landing plausible
elif self . last_below_desired_ft and self . last_feeding and self . feeding is False and self . last_on_ground is False :
print ( " Near landing conditions, if contiuned data loss for 5 mins, landing true " )
self . landing_plausible = True
elif self . below_desired_ft and self . last_feeding and self . feeding is False and self . last_on_ground is False :
nearest_airport_dict = getClosestAirport ( self . latitude , self . longitude , self . config . get ( " AIRPORT " , " TYPES " ) )
alt_above_airport = ( self . alt_ft - int ( nearest_airport_dict [ ' elevation_ft ' ] ) )
print ( f " AGL nearest airport: { alt_above_airport } " )
if alt_above_airport < = 10000 :
self . landing_plausible = True
self . nearest_airport_dict = nearest_airport_dict
print ( " Near landing conditions, if contiuned data loss for 5 mins, landing true " )
elif self . landing_plausible and self . feeding is False and time_since_contact . seconds > = 300 :
self . landing_plausible = False
self . landed = True
self . trigger_type = " data loss "
self . type_header = " Landed near "
type_header = " Landed near "
else :
self . landed = False
#self.landed = bool(self.last_below_desired_ft and ((self.last_feeding and self.feeding is False and self.last_on_ground is False) or (self.on_ground and self.last_on_ground is False)))
#print ("Landed Just Now:", self.landed)
if self . landed :
print ( " Landed by " , self . trigger_type )
if self . tookoff :
print ( " Tookoff by " , self . trigger_type )
#Find nearest airport, and location
if self . landed or self . tookoff :
if self . trigger_type == " now on ground " or " data acquisition " and self . longitude != None and self . latitude != None :
if self . nearest_airport_dict != None :
nearest_airport_dict = self . nearest_airport_dict
self . nearest_airport_dict = None
elif self . trigger_type in [ " now on ground " , " data acquisition " , " data loss " ] :
nearest_airport_dict = getClosestAirport ( self . latitude , self . longitude , self . config . get ( " AIRPORT " , " TYPES " ) )
has_coords = True
elif self . trigger_type == " data loss " or " no longer on ground " and self . last_longitude != None and self . last_latitude != None :
elif self . trigger_type == " no longer on ground " :
nearest_airport_dict = getClosestAirport ( self . last_latitude , self . last_longitude , self . config . get ( " AIRPORT " , " TYPES " ) )
has_coords = True
else :
print ( Fore . RED + ' No Location, No coordinates ' )
has_coords = False
print ( Style . RESET_ALL )
if has_coords :
#Convert dictionary keys to sep variables
country_code = nearest_airport_dict [ ' iso_country ' ]
state = nearest_airport_dict [ ' region ' ] . strip ( )
municipality = nearest_airport_dict [ ' municipality ' ] . strip ( )
print ( Fore . GREEN )
print ( " Country Code: " , country_code )
print ( " State: " , state )
print ( " Municipality: " , municipality )
print ( Style . RESET_ALL )
#Convert dictionary keys to sep variables
country_code = nearest_airport_dict [ ' iso_country ' ]
state = nearest_airport_dict [ ' region ' ] . strip ( )
municipality = nearest_airport_dict [ ' municipality ' ] . strip ( )
if municipality == " " or state == " " or municipality == state :
if municipality != " " :
area = municipality
elif state != " " :
area = state
else :
area = " "
else :
print ( Fore . RED )
print ( " Invalid Location " )
print ( Style . RESET_ALL )
area = f " { municipality } , { state } , "
location_string = ( area + country_code )
print ( Fore . GREEN )
print ( " Country Code: " , country_code )
print ( " State: " , state )
print ( " Municipality: " , municipality )
print ( Style . RESET_ALL )
title_switch = {
" reg " : self . reg ,
" callsign " : self . callsign ,
@ -286,46 +306,46 @@ class Plane:
}
#Set Discord Title
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
self . dis_title = ( title_switch . get ( self . config . get ( ' DISCORD ' , ' TITLE ' ) ) or " NA " ) if self . config . get ( ' DISCORD ' , ' TITLE ' ) in title_switch . keys ( ) else self . config . get ( ' DISCORD ' , ' TITLE ' )
self . dis_title = ( title_switch . get ( self . config . get ( ' DISCORD ' , ' TITLE ' ) ) or " NA " ) . strip ( ) if self . config . get ( ' DISCORD ' , ' TITLE ' ) in title_switch . keys ( ) else self . config . get ( ' DISCORD ' , ' TITLE ' )
#Set Twitter Title
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
self . twitter_title = ( title_switch . get ( self . config . get ( ' TWITTER ' , ' TITLE ' ) ) or " NA " ) if self . config . get ( ' TWITTER ' , ' TITLE ' ) in title_switch . keys ( ) else self . config . get ( ' TWITTER ' , ' TITLE ' )
#Takeoff and Land Notification
if self . tookoff or self . landed :
route_to = None
if self . tookoff :
self . takeoff_time = time . time ( )
self . landed_time_msg = None
landed_time_msg = None
#Route Lookup | Proprietary
if ENABLE_ROUTE_LOOKUP :
route_to = lookup_route ( self . reg , ( self . latitude , self . longitude ) , self . type , self . alt_ft )
if route_to == None :
self . recheck_to = True
elif self . landed and self . takeoff_time != None :
self . landed_time = time . time ( ) - self . takeoff_time
landed_time = time . time ( ) - self . takeoff_time
if platform . system ( ) == " Linux " :
self . landed_time_msg = time . strftime ( " Apx. flt. time % -H Hours : % -M Mins. " , time . gmtime ( self . landed_time ) )
strftime_splitter = " - "
elif platform . system ( ) == " Windows " :
self . landed_time_msg = time . strftime ( " Apx. flt. time % #H Hours : % #M Mins. " , time . gmtime ( self . landed_time ) )
self . landed_time_msg = self . landed_time_msg . replace ( " 0 Hours : " , " " )
strftime_splitter = " # "
landed_time_msg = time . strftime ( f " Apx. flt. time % { strftime_splitter } H Hours : % { strftime_splitter } M Mins. " , time . gmtime ( landed_time ) )
landed_time_msg = landed_time_msg . replace ( " 0 Hours : " , " " )
self . takeoff_time = None
self . landed_time = None
elif self . landed :
self . landed_time_msg = None
if has_coords :
message = ( self . type_header + ( ( ( municipality + " , " + state ) if municipality != " " else " " ) if municipality != state else state ) + " , " + country_code + " . " ) + ( ( " " + self . landed_time_msg ) if self . landed_time_msg != None else " " )
else :
message = ( " Landed " + ( ( " " + self . landed_time_msg ) if self . landed_time_msg != None else " " ) if self . landed else " Tookoff " if self . tookoff else " " )
landed_time_msg = None
message = ( f " { type_header } { location_string } . " ) + ( " " if route_to is None else f " { route_to } . " ) + ( ( f " { landed_time_msg } " ) if landed_time_msg != None else " " )
print ( message )
#Google Map or tar1090 screenshot
if self . config . get ( ' MAP ' , ' OPTION ' ) == " GOOGLESTATICMAP " :
getMap ( ( municipality + " , " + state + " , " + country_code ) , self . map_file_name )
elif self . config . get ( ' MAP ' , ' OPTION ' ) == " ADSBX " :
getSS ( self . icao , self . overlays )
if nearest_airport_dict != None :
append_airport ( self . map_file_name , nearest_airport_dict [ ' icao ' ] , nearest_airport_dict [ ' name ' ] , nearest_airport_dict [ ' distance_mi ' ] )
airport_string = nearest_airport_dict [ ' icao ' ] + " , " + nearest_airport_dict [ " name " ]
else :
airport_string = " "
append_airport ( self . map_file_name , nearest_airport_dict )
#airport_string = nearest_airport_dict['icao'] + ", " + nearest_airport_dict["name"]
else :
raise Exception ( " Map option not set correctly in this planes conf " )
raise ValueError ( " Map option not set correctly in this planes conf " )
#Discord
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + " " + message + " " + airport_string ) . strip ( )
dis_message = f " { self . dis_title } { message } " . strip ( )
sendDis ( dis_message , self . config , self . map_file_name )
#PushBullet
if self . config . getboolean ( ' PUSHBULLET ' , ' ENABLE ' ) :
@ -336,11 +356,22 @@ class Plane:
#Twitter
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
twitter_media_map_obj = self . tweet_api . media_upload ( self . map_file_name )
alt_text = " Call: " + ( self . callsign or " NA " ) + " On Ground: " + str ( self . on_ground ) + " Alt: " + str ( self . alt_ft ) + " Last Contact: " + str ( time_since_contact ) + " Trigger: " + self . trigger_type
alt_text = f " Reg: { self . reg } On Ground: { str ( self . on_ground ) } Alt: { str ( self . alt_ft ) } Last Contact: { str ( time_since_contact ) } Trigger: { self . trigger_type } "
self . tweet_api . create_media_metadata ( media_id = twitter_media_map_obj . media_id , alt_text = alt_text )
self . tweet_api . update_status ( status = ( ( self . twitter_title + " " + message ) . strip ( ) ) , media_ids = [ twitter_media_map_obj . media_id ] )
#self.tweet_api.update_with_media(self.map_file_name, status = (self.twitter_title + " " + tookoff_message).strip())
os . remove ( self . map_file_name )
#To Location
if self . recheck_to and self . time_since ( self . takeoff_time ) > 60 :
self . recheck_to = False
route_to = lookup_route ( self . reg , ( self . latitude , self . longitude ) , self . type , self . alt_ft )
if route_to != None :
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + route_to ) . strip ( )
sendDis ( dis_message , self . config )
#Twitter
if self . config . getboolean ( ' TWITTER ' , ' ENABLE ' ) :
tweet = self . tweet_api . user_timeline ( count = 1 ) [ 0 ]
self . tweet_api . update_status ( status = f " { self . twitter_title } { route_to } " . strip ( ) , in_reply_to_status_id = tweet . id )
#Squawks
squawks = [ ( " 7500 " , " Hijacking " ) , ( " 7600 " , " Radio Failure " ) , ( " 7700 " , " Emergency " ) ]
@ -374,6 +405,11 @@ class Plane:
sendDis ( ( dis_message + " , Sel Alt. " + str ( self . nav_altitude ) + " , Current Alt. " + str ( self . alt_ft ) ) , self . config )
else :
sendDis ( dis_message , self . config )
#Power Up
if self . last_feeding == False and self . speed == 0 and self . on_ground :
if self . config . getboolean ( ' DISCORD ' , ' ENABLE ' ) :
dis_message = ( self . dis_title + " Powered Up " ) . strip ( )
sendDis ( dis_message , self . config )
#Set Variables to compare to next check
@ -387,7 +423,7 @@ class Plane:
if self . takeoff_time != None :
self . elapsed_time = time . time ( ) - self . takeoff_time
self . time_since_tk = time . strftime ( " Time Since Take off % H Hours : % M Mins : % S Secs " , time . gmtime ( self . elapsed_time ) )
print ( self . time_since_tk )
elapsed_time = self . time_since ( self . takeoff_time )
time_since_tk = time . strftime ( " Time Since Take off % H Hours : % M Mins : % S Secs " , time . gmtime ( elapsed_time ) )
print ( time_since_tk )
self . printheader ( " foot " )