#!/usr/bin/python # Tests p2p_connect # Will try to connect to another peer # and form a group ######### MAY NEED TO RUN AS SUDO ############# import dbus import sys, os import time import gobject import getopt from dbus.mainloop.glib import DBusGMainLoop def usage(): print("Usage:") print(" %s -i <interface_name> -m <wps_method> \ " \ % sys.argv[0]) print(" -a <addr> [-p <pin>] [-g <go_intent>] \ ") print(" [-w <wpas_dbus_interface>]") print("Options:") print(" -i = interface name") print(" -m = wps method") print(" -a = peer address") print(" -p = pin number (8 digits)") print(" -g = group owner intent") print(" -w = wpas dbus interface = fi.w1.wpa_supplicant1") print("Example:") print(" %s -i wlan0 -a 0015008352c0 -m display -p 12345670" % sys.argv[0]) # Required Signals def GONegotiationSuccess(status): print("Go Negotiation Success") def GONegotiationFailure(status): print('Go Negotiation Failed. Status:') print(format(status)) os._exit(0) def GroupStarted(properties): if properties.has_key("group_object"): print('Group Formation Complete %s' \ % properties["group_object"]) os._exit(0) def WpsFailure(status, etc): print("WPS Authentication Failure".format(status)) print(etc) os._exit(0) class P2P_Connect(): # Needed Variables global bus global wpas_object global interface_object global p2p_interface global ifname global wpas global wpas_dbus_interface global timeout global path global wps_method global go_intent global addr global pin # Dbus Paths global wpas_dbus_opath global wpas_dbus_interfaces_opath global wpas_dbus_interfaces_interface global wpas_dbus_interfaces_p2pdevice # Dictionary of Arguements global p2p_connect_arguements # Constructor def __init__(self,ifname,wpas_dbus_interface,addr, pin,wps_method,go_intent): # Initializes variables and threads self.ifname = ifname self.wpas_dbus_interface = wpas_dbus_interface self.wps_method = wps_method self.go_intent = go_intent self.addr = addr self.pin = pin # Generating interface/object paths self.wpas_dbus_opath = \ "/" + self.wpas_dbus_interface.replace(".","/") self.wpas_wpas_dbus_interfaces_opath = \ self.wpas_dbus_opath + "/Interfaces" self.wpas_dbus_interfaces_interface = \ self.wpas_dbus_interface + ".Interface" self.wpas_dbus_interfaces_p2pdevice = \ self.wpas_dbus_interfaces_interface + ".P2PDevice" # Getting interfaces and objects DBusGMainLoop(set_as_default=True) self.bus = dbus.SystemBus() self.wpas_object = self.bus.get_object( self.wpas_dbus_interface, self.wpas_dbus_opath) self.wpas = dbus.Interface( self.wpas_object, self.wpas_dbus_interface) # See if wpa_supplicant already knows about this interface self.path = None try: self.path = self.wpas.GetInterface(ifname) except: if not str(exc).startswith( self.wpas_dbus_interface + \ ".InterfaceUnknown:"): raise exc try: path = self.wpas.CreateInterface( {'Ifname': ifname, 'Driver': 'test'}) time.sleep(1) except dbus.DBusException as exc: if not str(exc).startswith( self.wpas_dbus_interface + \ ".InterfaceExists:"): raise exc # Get Interface and objects self.interface_object = self.bus.get_object( self.wpas_dbus_interface,self.path) self.p2p_interface = dbus.Interface( self.interface_object, self.wpas_dbus_interfaces_p2pdevice) # Add signals self.bus.add_signal_receiver(GONegotiationSuccess, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="GONegotiationSuccess") self.bus.add_signal_receiver(GONegotiationFailure, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="GONegotiationFailure") self.bus.add_signal_receiver(GroupStarted, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="GroupStarted") self.bus.add_signal_receiver(WpsFailure, dbus_interface=self.wpas_dbus_interfaces_p2pdevice, signal_name="WpsFailed") #Constructing all the arguements needed to connect def constructArguements(self): # Adding required arguements self.p2p_connect_arguements = {'wps_method':self.wps_method, 'peer':dbus.ObjectPath(self.path+'/Peers/'+self.addr)} # Display requires a pin, and a go intent of 15 if (self.wps_method == 'display'): if (self.pin != None): self.p2p_connect_arguements.update({'pin':self.pin}) else: print("Error:\n Pin required for wps_method=display") usage() quit() if (self.go_intent != None and int(self.go_intent) != 15): print("go_intent overwritten to 15") self.go_intent = '15' # Keypad requires a pin, and a go intent of less than 15 elif (self.wps_method == 'keypad'): if (self.pin != None): self.p2p_connect_arguements.update({'pin':self.pin}) else: print("Error:\n Pin required for wps_method=keypad") usage() quit() if (self.go_intent != None and int(self.go_intent) == 15): error = "Error :\n Group Owner intent cannot be" + \ " 15 for wps_method=keypad" print(error) usage() quit() # Doesn't require pin # for ./wpa_cli, p2p_connect [mac] [pin#], wps_method=keypad elif (self.wps_method == 'pin'): if (self.pin != None): print("pin ignored") # No pin is required for pbc so it is ignored elif (self.wps_method == 'pbc'): if (self.pin != None): print("pin ignored") else: print("Error:\n wps_method not supported or does not exist") usage() quit() # Go_intent is optional for all arguements if (self.go_intent != None): self.p2p_connect_arguements.update( {'go_intent':dbus.Int32(self.go_intent)}) # Running p2p_connect def run(self): try: result_pin = self.p2p_interface.Connect( self.p2p_connect_arguements) except dbus.DBusException as exc: raise exc if (self.wps_method == 'pin' and \ not self.p2p_connect_arguements.has_key('pin') ): print("Connect return with pin value of %d " % int(result_pin)) gobject.MainLoop().run() if __name__ == "__main__": # Required interface_name = None wps_method = None addr = None # Conditionally optional pin = None # Optional wpas_dbus_interface = 'fi.w1.wpa_supplicant1' go_intent = None # Using getopts to handle options try: options, args = getopt.getopt(sys.argv[1:],"hi:m:a:p:g:w:") except getopt.GetoptError: usage() quit() # If theres a switch, override default option for key, value in options: # Help if (key == "-h"): usage() quit() # Interface Name elif (key == "-i"): interface_name = value # WPS Method elif (key == "-m"): wps_method = value # Address elif (key == "-a"): addr = value # Pin elif (key == "-p"): pin = value # Group Owner Intent elif (key == "-g"): go_intent = value # Dbus interface elif (key == "-w"): wpas_dbus_interface = value else: assert False, "unhandled option" # Required Arguements check if (interface_name == None or wps_method == None or addr == None): print("Error:\n Required arguements not specified") usage() quit() # Group Owner Intent Check if (go_intent != None and (int(go_intent) > 15 or int(go_intent) < 0) ): print("Error:\n Group Owner Intent must be between 0 and 15 inclusive") usage() quit() # Pin Check if (pin != None and len(pin) != 8): print("Error:\n Pin is not 8 digits") usage() quit() try: p2p_connect_test = P2P_Connect(interface_name,wpas_dbus_interface, addr,pin,wps_method,go_intent) except: print("Error:\n Invalid Arguements") usage() quit() p2p_connect_test.constructArguements() p2p_connect_test.run() os._exit(0)