# wpa_supplicant D-Bus old interface tests # Copyright (c) 2014-2015, Jouni Malinen # # This software may be distributed under the terms of the BSD license. # See README for more details. import gobject import logging logger = logging.getLogger() try: import dbus dbus_imported = True except ImportError: dbus_imported = False import hostapd from test_dbus import TestDbus, start_ap WPAS_DBUS_OLD_SERVICE = "fi.epitest.hostap.WPASupplicant" WPAS_DBUS_OLD_PATH = "/fi/epitest/hostap/WPASupplicant" WPAS_DBUS_OLD_IFACE = "fi.epitest.hostap.WPASupplicant.Interface" WPAS_DBUS_OLD_BSSID = "fi.epitest.hostap.WPASupplicant.BSSID" WPAS_DBUS_OLD_NETWORK = "fi.epitest.hostap.WPASupplicant.Network" def prepare_dbus(dev): if not dbus_imported: logger.info("No dbus module available") raise Exception("hwsim-SKIP") try: from dbus.mainloop.glib import DBusGMainLoop dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() wpas_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, WPAS_DBUS_OLD_PATH) wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE) path = wpas.getInterface(dev.ifname) if_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path) return (bus,wpas_obj,path,if_obj) except Exception, e: logger.info("No D-Bus support available: " + str(e)) raise Exception("hwsim-SKIP") class TestDbusOldWps(TestDbus): def __init__(self, bus): TestDbus.__init__(self, bus) self.event_ok = False def __enter__(self): gobject.timeout_add(1, self.run_wps) gobject.timeout_add(15000, self.timeout) self.add_signal(self.wpsCred, WPAS_DBUS_OLD_IFACE, "WpsCred") self.loop.run() return self def wpsCred(self, cred): logger.debug("wpsCred: " + str(cred)) self.event_ok = True self.loop.quit() def success(self): return self.event_ok def test_dbus_old(dev, apdev): """The old D-Bus interface""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) res = if_obj.capabilities(dbus_interface=WPAS_DBUS_OLD_IFACE) logger.debug("capabilities(): " + str(res)) if 'auth_alg' not in res or "OPEN" not in res['auth_alg']: raise Exception("Unexpected capabilities") res2 = if_obj.capabilities(dbus.Boolean(True), dbus_interface=WPAS_DBUS_OLD_IFACE) logger.debug("capabilities(strict): " + str(res2)) res = if_obj.state(dbus_interface=WPAS_DBUS_OLD_IFACE) logger.debug("State: " + res) res = if_obj.scanning(dbus_interface=WPAS_DBUS_OLD_IFACE) if res != 0: raise Exception("Unexpected scanning: " + str(res)) if_obj.setAPScan(dbus.UInt32(1), dbus_interface=WPAS_DBUS_OLD_IFACE) for t in [ dbus.UInt32(123), "foo" ]: try: if_obj.setAPScan(t, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid setAPScan() accepted") except dbus.exceptions.DBusException, e: if "InvalidOptions" not in str(e): raise Exception("Unexpected error message for invalid setAPScan: " + str(e)) for p in [ path + "/Networks/12345", path + "/Networks/foo" ]: obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p) try: obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK) raise Exception("Invalid disable() accepted") except dbus.exceptions.DBusException, e: if "InvalidNetwork" not in str(e): raise Exception("Unexpected error message for invalid disable: " + str(e)) for p in [ path + "/BSSIDs/foo", path + "/BSSIDs/001122334455"]: obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p) try: obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID) raise Exception("Invalid properties() accepted") except dbus.exceptions.DBusException, e: if "InvalidBSSID" not in str(e): raise Exception("Unexpected error message for invalid properties: " + str(e)) def test_dbus_old_scan(dev, apdev): """The old D-Bus interface - scanning""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) params = hostapd.wpa2_eap_params(ssid="test-wpa2-eap") params['wpa'] = '3' hapd2 = hostapd.add_ap(apdev[1]['ifname'], params) class TestDbusScan(TestDbus): def __init__(self, bus): TestDbus.__init__(self, bus) self.scan_completed = False def __enter__(self): gobject.timeout_add(1, self.run_scan) gobject.timeout_add(7000, self.timeout) self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE, "ScanResultsAvailable") self.loop.run() return self def scanDone(self): logger.debug("scanDone") self.scan_completed = True self.loop.quit() def run_scan(self, *args): logger.debug("run_scan") if not if_obj.scan(dbus_interface=WPAS_DBUS_OLD_IFACE): raise Exception("Failed to trigger scan") return False def success(self): return self.scan_completed with TestDbusScan(bus) as t: if not t.success(): raise Exception("Expected signals not seen") res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE) if len(res) != 2: raise Exception("Unexpected number of scan results: " + str(res)) for i in range(2): logger.debug("Scan result BSS path: " + res[i]) bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i]) bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID, byte_arrays=True) logger.debug("BSS: " + str(bss)) obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[0]) try: bss_obj.properties2(dbus_interface=WPAS_DBUS_OLD_BSSID) raise Exception("Unknown BSSID method accepted") except Exception, e: logger.debug("Unknown BSSID method exception: " + str(e)) if not if_obj.flush(0, dbus_interface=WPAS_DBUS_OLD_IFACE): raise Exception("Failed to issue flush(0)") res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE) if len(res) != 0: raise Exception("Unexpected BSS entry after flush") if not if_obj.flush(1, dbus_interface=WPAS_DBUS_OLD_IFACE): raise Exception("Failed to issue flush(1)") try: if_obj.flush("foo", dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid flush arguments accepted") except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid flush: " + str(e)) try: bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID, byte_arrays=True) except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidBSSID"): raise Exception("Unexpected error message for invalid BSS: " + str(e)) def test_dbus_old_debug(dev, apdev): """The old D-Bus interface - debug""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE) try: wpas.setDebugParams(123) raise Exception("Invalid setDebugParams accepted") except dbus.exceptions.DBusException, e: if "InvalidOptions" not in str(e): raise Exception("Unexpected error message for invalid setDebugParam: " + str(e)) try: wpas.setDebugParams(123, True, True) raise Exception("Invalid setDebugParams accepted") except dbus.exceptions.DBusException, e: if "InvalidOptions" not in str(e): raise Exception("Unexpected error message for invalid setDebugParam: " + str(e)) wpas.setDebugParams(1, True, True) dev[0].request("LOG_LEVEL MSGDUMP") def test_dbus_old_smartcard(dev, apdev): """The old D-Bus interface - smartcard""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) params = dbus.Dictionary(signature='sv') if_obj.setSmartcardModules(params, dbus_interface=WPAS_DBUS_OLD_IFACE) params = dbus.Dictionary({ 'opensc_engine_path': "foobar1", 'pkcs11_engine_path': "foobar2", 'pkcs11_module_path': "foobar3", 'foo': 'bar' }, signature='sv') params2 = dbus.Dictionary({ 'pkcs11_engine_path': "foobar2", 'foo': 'bar' }, signature='sv') params3 = dbus.Dictionary({ 'pkcs11_module_path': "foobar3", 'foo2': 'bar' }, signature='sv') params4 = dbus.Dictionary({ 'opensc_engine_path': "foobar4", 'foo3': 'bar' }, signature='sv') tests = [ 1, params, params2, params3, params4 ] for t in tests: try: if_obj.setSmartcardModules(t, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid setSmartcardModules accepted: " + str(t)) except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid setSmartcardModules(%s): %s" % (str(t), str(e))) def test_dbus_old_interface(dev, apdev): """The old D-Bus interface - interface get/add/remove""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) wpas = dbus.Interface(wpas_obj, WPAS_DBUS_OLD_SERVICE) tests = [ (123, "InvalidOptions"), ("foo", "InvalidInterface") ] for (ifname,err) in tests: try: wpas.getInterface(ifname) raise Exception("Invalid getInterface accepted") except dbus.exceptions.DBusException, e: if err not in str(e): raise Exception("Unexpected error message for invalid getInterface: " + str(e)) params = dbus.Dictionary({ 'driver': 'none' }, signature='sv') wpas.addInterface("lo", params) path = wpas.getInterface("lo") logger.debug("New interface path: " + str(path)) wpas.removeInterface(path) try: wpas.removeInterface(path) raise Exception("Invalid removeInterface() accepted") except dbus.exceptions.DBusException, e: if "InvalidInterface" not in str(e): raise Exception("Unexpected error message for invalid removeInterface: " + str(e)) params1 = dbus.Dictionary({ 'driver': 'foo', 'driver-params': 'foo', 'config-file': 'foo', 'bridge-ifname': 'foo' }, signature='sv') params2 = dbus.Dictionary({ 'foo': 'bar' }, signature='sv') tests = [ (123, None, "InvalidOptions"), ("", None, "InvalidOptions"), ("foo", None, "AddError"), ("foo", params1, "AddError"), ("foo", params2, "InvalidOptions"), ("foo", 1234, "InvalidOptions"), (dev[0].ifname, None, "ExistsError" ) ] for (ifname,params,err) in tests: try: if params is None: wpas.addInterface(ifname) else: wpas.addInterface(ifname, params) raise Exception("Invalid addInterface accepted: " + str(params)) except dbus.exceptions.DBusException, e: if err not in str(e): raise Exception("Unexpected error message for invalid addInterface(%s): %s" % (str(params), str(e))) try: wpas.removeInterface(123) raise Exception("Invalid removeInterface accepted") except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid removeInterface: " + str(e)) def test_dbus_old_blob(dev, apdev): """The old D-Bus interface - blob operations""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) param1 = dbus.Dictionary({ 'blob3': 123 }, signature='sv') param2 = dbus.Dictionary({ 'blob3': "foo" }) param3 = dbus.Dictionary({ '': dbus.ByteArray([ 1, 2 ]) }, signature='sv') tests = [ (1, "InvalidOptions"), (param1, "InvalidOptions"), (param2, "InvalidOptions"), (param3, "InvalidOptions") ] for (arg,err) in tests: try: if_obj.setBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid setBlobs() accepted: " + str(arg)) except dbus.exceptions.DBusException, e: logger.debug("setBlobs(%s): %s" % (str(arg), str(e))) if err not in str(e): raise Exception("Unexpected error message for invalid setBlobs: " + str(e)) tests = [ (["foo"], "RemoveError: Error removing blob"), ([""], "RemoveError: Invalid blob name"), ([1], "InvalidOptions"), ("foo", "InvalidOptions") ] for (arg,err) in tests: try: if_obj.removeBlobs(arg, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid removeBlobs() accepted: " + str(arg)) except dbus.exceptions.DBusException, e: logger.debug("removeBlobs(%s): %s" % (str(arg), str(e))) if err not in str(e): raise Exception("Unexpected error message for invalid removeBlobs: " + str(e)) blobs = dbus.Dictionary({ 'blob1': dbus.ByteArray([ 1, 2, 3 ]), 'blob2': dbus.ByteArray([ 1, 2 ]) }, signature='sv') if_obj.setBlobs(blobs, dbus_interface=WPAS_DBUS_OLD_IFACE) if_obj.removeBlobs(['blob1', 'blob2'], dbus_interface=WPAS_DBUS_OLD_IFACE) def test_dbus_old_connect(dev, apdev): """The old D-Bus interface - add a network and connect""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) ssid = "test-wpa2-psk" passphrase = 'qwertyuiop' params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) hapd = hostapd.add_ap(apdev[0]['ifname'], params) for p in [ "/no/where/to/be/found", path + "/Networks/12345", path + "/Networks/foo", "/fi/epitest/hostap/WPASupplicant/Interfaces", "/fi/epitest/hostap/WPASupplicant/Interfaces/12345/Networks/0" ]: obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, p) try: if_obj.removeNetwork(obj, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid removeNetwork accepted: " + p) except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"): raise Exception("Unexpected error message for invalid removeNetwork: " + str(e)) try: if_obj.removeNetwork("foo", dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid removeNetwork accepted") except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid removeNetwork: " + str(e)) try: if_obj.removeNetwork(path, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid removeNetwork accepted") except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"): raise Exception("Unexpected error message for invalid removeNetwork: " + str(e)) tests = [ (path, "InvalidNetwork"), (bus.get_object(WPAS_DBUS_OLD_SERVICE, "/no/where"), "InvalidInterface"), (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/1234"), "InvalidNetwork"), (bus.get_object(WPAS_DBUS_OLD_SERVICE, path + "/Networks/foo"), "InvalidNetwork"), (1, "InvalidOptions") ] for t,err in tests: try: if_obj.selectNetwork(t, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid selectNetwork accepted: " + str(t)) except dbus.exceptions.DBusException, e: if err not in str(e): raise Exception("Unexpected error message for invalid selectNetwork(%s): %s" % (str(t), str(e))) npath = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE) if not npath.startswith(WPAS_DBUS_OLD_PATH): raise Exception("Unexpected addNetwork result: " + path) netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, npath) tests = [ 123, dbus.Dictionary({ 'foo': 'bar' }, signature='sv') ] for t in tests: try: netw_obj.set(t, dbus_interface=WPAS_DBUS_OLD_NETWORK) raise Exception("Invalid set() accepted: " + str(t)) except dbus.exceptions.DBusException, e: if "InvalidOptions" not in str(e): raise Exception("Unexpected error message for invalid set: " + str(e)) params = dbus.Dictionary({ 'ssid': ssid, 'key_mgmt': 'WPA-PSK', 'psk': passphrase, 'identity': dbus.ByteArray([ 1, 2 ]), 'priority': dbus.Int32(0), 'scan_freq': dbus.UInt32(2412) }, signature='sv') netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK) if_obj.removeNetwork(npath, dbus_interface=WPAS_DBUS_OLD_IFACE) class TestDbusConnect(TestDbus): def __init__(self, bus): TestDbus.__init__(self, bus) self.state = 0 def __enter__(self): gobject.timeout_add(1, self.run_connect) gobject.timeout_add(15000, self.timeout) self.add_signal(self.scanDone, WPAS_DBUS_OLD_IFACE, "ScanResultsAvailable") self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE, "StateChange") self.loop.run() return self def scanDone(self): logger.debug("scanDone") def stateChange(self, new, old): logger.debug("stateChange(%d): %s --> %s" % (self.state, old, new)) if new == "COMPLETED": if self.state == 0: self.state = 1 self.netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK) elif self.state == 2: self.state = 3 if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE) elif self.state == 4: self.state = 5 if_obj.disconnect(dbus_interface=WPAS_DBUS_OLD_IFACE) elif self.state == 6: self.state = 7 if_obj.removeNetwork(self.path, dbus_interface=WPAS_DBUS_OLD_IFACE) try: if_obj.removeNetwork(self.path, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid removeNetwork accepted") except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.Interface.InvalidNetwork"): raise Exception("Unexpected error message for invalid wpsPbc: " + str(e)) self.loop.quit() elif new == "DISCONNECTED": if self.state == 1: self.state = 2 self.netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK) elif self.state == 3: self.state = 4 if_obj.selectNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE) elif self.state == 5: self.state = 6 if_obj.selectNetwork(self.path, dbus_interface=WPAS_DBUS_OLD_IFACE) def run_connect(self, *args): logger.debug("run_connect") path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE) netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path) netw_obj.disable(dbus_interface=WPAS_DBUS_OLD_NETWORK) params = dbus.Dictionary({ 'ssid': ssid, 'key_mgmt': 'WPA-PSK', 'psk': passphrase, 'scan_freq': 2412 }, signature='sv') netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK) netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK) self.path = path self.netw_obj = netw_obj return False def success(self): return self.state == 7 with TestDbusConnect(bus) as t: if not t.success(): raise Exception("Expected signals not seen") if len(dev[0].list_networks()) != 0: raise Exception("Unexpected network") def test_dbus_old_connect_eap(dev, apdev): """The old D-Bus interface - add an EAP network and connect""" (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) ssid = "test-wpa2-eap" params = hostapd.wpa2_eap_params(ssid=ssid) hapd = hostapd.add_ap(apdev[0]['ifname'], params) class TestDbusConnect(TestDbus): def __init__(self, bus): TestDbus.__init__(self, bus) self.connected = False self.certification_received = False def __enter__(self): gobject.timeout_add(1, self.run_connect) gobject.timeout_add(15000, self.timeout) self.add_signal(self.stateChange, WPAS_DBUS_OLD_IFACE, "StateChange") self.add_signal(self.certification, WPAS_DBUS_OLD_IFACE, "Certification") self.loop.run() return self def stateChange(self, new, old): logger.debug("stateChange: %s --> %s" % (old, new)) if new == "COMPLETED": self.connected = True self.loop.quit() def certification(self, depth, subject, hash, cert_hex): logger.debug("certification: depth={} subject={} hash={} cert_hex={}".format(depth, subject, hash, cert_hex)) self.certification_received = True def run_connect(self, *args): logger.debug("run_connect") path = if_obj.addNetwork(dbus_interface=WPAS_DBUS_OLD_IFACE) netw_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, path) params = dbus.Dictionary({ 'ssid': ssid, 'key_mgmt': 'WPA-EAP', 'eap': 'TTLS', 'anonymous_identity': 'ttls', 'identity': 'pap user', 'ca_cert': 'auth_serv/ca.pem', 'phase2': 'auth=PAP', 'password': 'password', 'scan_freq': 2412 }, signature='sv') netw_obj.set(params, dbus_interface=WPAS_DBUS_OLD_NETWORK) netw_obj.enable(dbus_interface=WPAS_DBUS_OLD_NETWORK) self.path = path self.netw_obj = netw_obj return False def success(self): return self.connected and self.certification_received with TestDbusConnect(bus) as t: if not t.success(): raise Exception("Expected signals not seen") def test_dbus_old_wps_pbc(dev, apdev): """The old D-Bus interface and WPS/PBC""" try: return _test_dbus_old_wps_pbc(dev, apdev) finally: dev[0].request("SET wps_cred_processing 0") def _test_dbus_old_wps_pbc(dev, apdev): (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) hapd = start_ap(apdev[0]) hapd.request("WPS_PBC") bssid = apdev[0]['bssid'] dev[0].scan_for_bss(bssid, freq="2412") dev[0].request("SET wps_cred_processing 2") for arg in [ 123, "123" ]: try: if_obj.wpsPbc(arg, dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid wpsPbc arguments accepted: " + str(arg)) except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid wpsPbc: " + str(e)) class TestDbusWps(TestDbusOldWps): def __init__(self, bus, pbc_param): TestDbusOldWps.__init__(self, bus) self.pbc_param = pbc_param def run_wps(self, *args): logger.debug("run_wps: pbc_param=" + self.pbc_param) if_obj.wpsPbc(self.pbc_param, dbus_interface=WPAS_DBUS_OLD_IFACE) return False with TestDbusWps(bus, "any") as t: if not t.success(): raise Exception("Expected signals not seen") res = if_obj.scanResults(dbus_interface=WPAS_DBUS_OLD_IFACE) if len(res) != 1: raise Exception("Unexpected number of scan results: " + str(res)) for i in range(1): logger.debug("Scan result BSS path: " + res[i]) bss_obj = bus.get_object(WPAS_DBUS_OLD_SERVICE, res[i]) bss = bss_obj.properties(dbus_interface=WPAS_DBUS_OLD_BSSID, byte_arrays=True) logger.debug("BSS: " + str(bss)) dev[0].wait_connected(timeout=10) dev[0].request("DISCONNECT") dev[0].wait_disconnected(timeout=10) dev[0].request("FLUSH") hapd.request("WPS_PBC") dev[0].scan_for_bss(bssid, freq="2412") with TestDbusWps(bus, bssid) as t: if not t.success(): raise Exception("Expected signals not seen") dev[0].wait_connected(timeout=10) dev[0].request("DISCONNECT") dev[0].wait_disconnected(timeout=10) hapd.disable() dev[0].flush_scan_cache() def test_dbus_old_wps_pin(dev, apdev): """The old D-Bus interface and WPS/PIN""" try: return _test_dbus_old_wps_pin(dev, apdev) finally: dev[0].request("SET wps_cred_processing 0") def _test_dbus_old_wps_pin(dev, apdev): (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) hapd = start_ap(apdev[0]) hapd.request("WPS_PIN any 12345670") bssid = apdev[0]['bssid'] dev[0].scan_for_bss(bssid, freq="2412") dev[0].request("SET wps_cred_processing 2") for arg in [ (123, "12345670"), ("123", "12345670") ]: try: if_obj.wpsPin(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid wpsPin arguments accepted: " + str(arg)) except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid wpsPbc: " + str(e)) class TestDbusWps(TestDbusOldWps): def __init__(self, bus, bssid, pin): TestDbusOldWps.__init__(self, bus) self.bssid = bssid self.pin = pin def run_wps(self, *args): logger.debug("run_wps %s %s" % (self.bssid, self.pin)) pin = if_obj.wpsPin(self.bssid, self.pin, dbus_interface=WPAS_DBUS_OLD_IFACE) if len(self.pin) == 0: h = hostapd.Hostapd(apdev[0]['ifname']) h.request("WPS_PIN any " + pin) return False with TestDbusWps(bus, bssid, "12345670") as t: if not t.success(): raise Exception("Expected signals not seen") dev[0].wait_connected(timeout=10) dev[0].request("DISCONNECT") dev[0].wait_disconnected(timeout=10) dev[0].request("FLUSH") dev[0].scan_for_bss(bssid, freq="2412") with TestDbusWps(bus, "any", "") as t: if not t.success(): raise Exception("Expected signals not seen") def test_dbus_old_wps_reg(dev, apdev): """The old D-Bus interface and WPS/Registar""" try: return _test_dbus_old_wps_reg(dev, apdev) finally: dev[0].request("SET wps_cred_processing 0") def _test_dbus_old_wps_reg(dev, apdev): (bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0]) hapd = start_ap(apdev[0]) bssid = apdev[0]['bssid'] dev[0].scan_for_bss(bssid, freq="2412") dev[0].request("SET wps_cred_processing 2") for arg in [ (123, "12345670"), ("123", "12345670") ]: try: if_obj.wpsReg(arg[0], arg[1], dbus_interface=WPAS_DBUS_OLD_IFACE) raise Exception("Invalid wpsReg arguments accepted: " + str(arg)) except dbus.exceptions.DBusException, e: if not str(e).startswith("fi.epitest.hostap.WPASupplicant.InvalidOptions"): raise Exception("Unexpected error message for invalid wpsPbc: " + str(e)) class TestDbusWps(TestDbusOldWps): def run_wps(self, *args): logger.debug("run_wps") if_obj.wpsReg(bssid, "12345670", dbus_interface=WPAS_DBUS_OLD_IFACE) return False with TestDbusWps(bus) as t: if not t.success(): raise Exception("Expected signals not seen") dev[0].wait_connected(timeout=10)