# Open mode AP tests # Copyright (c) 2014, Qualcomm Atheros, Inc. # # This software may be distributed under the terms of the BSD license. # See README for more details. import logging logger = logging.getLogger() import struct import subprocess import time import os import hostapd import hwsim_utils from tshark import run_tshark from utils import alloc_fail from wpasupplicant import WpaSupplicant def test_ap_open(dev, apdev): """AP with open mode (no security) configuration""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", bg_scan_period="0") ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5) if ev is None: raise Exception("No connection event received from hostapd") hwsim_utils.test_connectivity(dev[0], hapd) dev[0].request("DISCONNECT") ev = hapd.wait_event([ "AP-STA-DISCONNECTED" ], timeout=5) if ev is None: raise Exception("No disconnection event received from hostapd") def test_ap_open_packet_loss(dev, apdev): """AP with open mode configuration and large packet loss""" params = { "ssid": "open", "ignore_probe_probability": "0.5", "ignore_auth_probability": "0.5", "ignore_assoc_probability": "0.5", "ignore_reassoc_probability": "0.5" } hapd = hostapd.add_ap(apdev[0]['ifname'], params) for i in range(0, 3): dev[i].connect("open", key_mgmt="NONE", scan_freq="2412", wait_connect=False) for i in range(0, 3): dev[i].wait_connected(timeout=20) def test_ap_open_unknown_action(dev, apdev): """AP with open mode configuration and unknown Action frame""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") bssid = apdev[0]['bssid'] cmd = "MGMT_TX {} {} freq=2412 action=765432".format(bssid, bssid) if "FAIL" in dev[0].request(cmd): raise Exception("Could not send test Action frame") ev = dev[0].wait_event(["MGMT-TX-STATUS"], timeout=10) if ev is None: raise Exception("Timeout on MGMT-TX-STATUS") if "result=SUCCESS" not in ev: raise Exception("AP did not ack Action frame") def test_ap_open_invalid_wmm_action(dev, apdev): """AP with open mode configuration and invalid WMM Action frame""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") bssid = apdev[0]['bssid'] cmd = "MGMT_TX {} {} freq=2412 action=1100".format(bssid, bssid) if "FAIL" in dev[0].request(cmd): raise Exception("Could not send test Action frame") ev = dev[0].wait_event(["MGMT-TX-STATUS"], timeout=10) if ev is None or "result=SUCCESS" not in ev: raise Exception("AP did not ack Action frame") def test_ap_open_reconnect_on_inactivity_disconnect(dev, apdev): """Reconnect to open mode AP after inactivity related disconnection""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") hapd.request("DEAUTHENTICATE " + dev[0].p2p_interface_addr() + " reason=4") dev[0].wait_disconnected(timeout=5) dev[0].wait_connected(timeout=2, error="Timeout on reconnection") def test_ap_open_assoc_timeout(dev, apdev): """AP timing out association""" ssid = "test" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].scan(freq="2412") hapd.set("ext_mgmt_frame_handling", "1") dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", wait_connect=False) for i in range(0, 10): req = hapd.mgmt_rx() if req is None: raise Exception("MGMT RX wait timed out") if req['subtype'] == 11: break req = None if not req: raise Exception("Authentication frame not received") resp = {} resp['fc'] = req['fc'] resp['da'] = req['sa'] resp['sa'] = req['da'] resp['bssid'] = req['bssid'] resp['payload'] = struct.pack('<HHH', 0, 2, 0) hapd.mgmt_tx(resp) assoc = 0 for i in range(0, 10): req = hapd.mgmt_rx() if req is None: raise Exception("MGMT RX wait timed out") if req['subtype'] == 0: assoc += 1 if assoc == 3: break if assoc != 3: raise Exception("Association Request frames not received: assoc=%d" % assoc) hapd.set("ext_mgmt_frame_handling", "0") dev[0].wait_connected(timeout=15) def test_ap_open_id_str(dev, apdev): """AP with open mode and id_str""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", id_str="foo", wait_connect=False) ev = dev[0].wait_connected(timeout=10) if "id_str=foo" not in ev: raise Exception("CTRL-EVENT-CONNECT did not have matching id_str: " + ev) if dev[0].get_status_field("id_str") != "foo": raise Exception("id_str mismatch") def test_ap_open_select_any(dev, apdev): """AP with open mode and select any network""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) id = dev[0].connect("unknown", key_mgmt="NONE", scan_freq="2412", only_add_network=True) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", only_add_network=True) dev[0].select_network(id) ev = dev[0].wait_event(["CTRL-EVENT-NETWORK-NOT-FOUND", "CTRL-EVENT-CONNECTED"], timeout=10) if ev is None: raise Exception("No result reported") if "CTRL-EVENT-CONNECTED" in ev: raise Exception("Unexpected connection") dev[0].select_network("any") dev[0].wait_connected(timeout=10) def test_ap_open_unexpected_assoc_event(dev, apdev): """AP with open mode and unexpected association event""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[0].request("DISCONNECT") dev[0].wait_disconnected(timeout=15) dev[0].dump_monitor() # This will be accepted due to matching network subprocess.call(['iw', 'dev', dev[0].ifname, 'connect', 'open', "2412", apdev[0]['bssid']]) dev[0].wait_connected(timeout=15) dev[0].dump_monitor() dev[0].request("REMOVE_NETWORK all") dev[0].wait_disconnected(timeout=5) dev[0].dump_monitor() # This will result in disconnection due to no matching network subprocess.call(['iw', 'dev', dev[0].ifname, 'connect', 'open', "2412", apdev[0]['bssid']]) dev[0].wait_disconnected(timeout=15) def test_ap_bss_load(dev, apdev): """AP with open mode (no security) configuration""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open", "bss_load_update_period": "10" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") # this does not really get much useful output with mac80211_hwsim currently, # but run through the channel survey update couple of times for i in range(0, 10): hwsim_utils.test_connectivity(dev[0], hapd) hwsim_utils.test_connectivity(dev[0], hapd) hwsim_utils.test_connectivity(dev[0], hapd) time.sleep(0.15) def hapd_out_of_mem(hapd, apdev, count, func): with alloc_fail(hapd, count, func): started = False try: hostapd.add_ap(apdev['ifname'], { "ssid": "open" }) started = True except: pass if started: raise Exception("hostapd interface started even with memory allocation failure: " + arg) def test_ap_open_out_of_memory(dev, apdev): """hostapd failing to setup interface due to allocation failure""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) hapd_out_of_mem(hapd, apdev[1], 1, "hostapd_alloc_bss_data") for i in range(1, 3): hapd_out_of_mem(hapd, apdev[1], i, "hostapd_iface_alloc") for i in range(1, 5): hapd_out_of_mem(hapd, apdev[1], i, "hostapd_config_defaults;hostapd_config_alloc") hapd_out_of_mem(hapd, apdev[1], 1, "hostapd_config_alloc") hapd_out_of_mem(hapd, apdev[1], 1, "hostapd_driver_init") for i in range(1, 4): hapd_out_of_mem(hapd, apdev[1], i, "=wpa_driver_nl80211_drv_init") # eloop_register_read_sock() call from i802_init() hapd_out_of_mem(hapd, apdev[1], 1, "eloop_sock_table_add_sock;eloop_register_sock;?eloop_register_read_sock;=i802_init") # verify that a new interface can still be added when memory allocation does # not fail hostapd.add_ap(apdev[1]['ifname'], { "ssid": "open" }) def test_bssid_black_white_list(dev, apdev): """BSSID black/white list""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) hapd2 = hostapd.add_ap(apdev[1]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", bssid_whitelist=apdev[1]['bssid']) dev[1].connect("open", key_mgmt="NONE", scan_freq="2412", bssid_blacklist=apdev[1]['bssid']) dev[2].connect("open", key_mgmt="NONE", scan_freq="2412", bssid_whitelist="00:00:00:00:00:00/00:00:00:00:00:00", bssid_blacklist=apdev[1]['bssid']) if dev[0].get_status_field('bssid') != apdev[1]['bssid']: raise Exception("dev[0] connected to unexpected AP") if dev[1].get_status_field('bssid') != apdev[0]['bssid']: raise Exception("dev[1] connected to unexpected AP") if dev[2].get_status_field('bssid') != apdev[0]['bssid']: raise Exception("dev[2] connected to unexpected AP") dev[0].request("REMOVE_NETWORK all") dev[1].request("REMOVE_NETWORK all") dev[2].request("REMOVE_NETWORK all") dev[2].connect("open", key_mgmt="NONE", scan_freq="2412", bssid_whitelist="00:00:00:00:00:00", wait_connect=False) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", bssid_whitelist="11:22:33:44:55:66/ff:00:00:00:00:00 " + apdev[1]['bssid'] + " aa:bb:cc:dd:ee:ff") dev[1].connect("open", key_mgmt="NONE", scan_freq="2412", bssid_blacklist="11:22:33:44:55:66/ff:00:00:00:00:00 " + apdev[1]['bssid'] + " aa:bb:cc:dd:ee:ff") if dev[0].get_status_field('bssid') != apdev[1]['bssid']: raise Exception("dev[0] connected to unexpected AP") if dev[1].get_status_field('bssid') != apdev[0]['bssid']: raise Exception("dev[1] connected to unexpected AP") dev[0].request("REMOVE_NETWORK all") dev[1].request("REMOVE_NETWORK all") ev = dev[2].wait_event(["CTRL-EVENT-CONNECTED"], timeout=0.1) if ev is not None: raise Exception("Unexpected dev[2] connectin") dev[2].request("REMOVE_NETWORK all") def test_ap_open_wpas_in_bridge(dev, apdev): """Open mode AP and wpas interface in a bridge""" br_ifname='sta-br0' ifname='wlan5' try: _test_ap_open_wpas_in_bridge(dev, apdev) finally: subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'down']) subprocess.call(['brctl', 'delif', br_ifname, ifname]) subprocess.call(['brctl', 'delbr', br_ifname]) subprocess.call(['iw', ifname, 'set', '4addr', 'off']) def _test_ap_open_wpas_in_bridge(dev, apdev): hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) br_ifname='sta-br0' ifname='wlan5' wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') # First, try a failure case of adding an interface try: wpas.interface_add(ifname, br_ifname=br_ifname) raise Exception("Interface addition succeeded unexpectedly") except Exception, e: if "Failed to add" in str(e): logger.info("Ignore expected interface_add failure due to missing bridge interface: " + str(e)) else: raise # Next, add the bridge interface and add the interface again subprocess.call(['brctl', 'addbr', br_ifname]) subprocess.call(['brctl', 'setfd', br_ifname, '0']) subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'up']) subprocess.call(['iw', ifname, 'set', '4addr', 'on']) subprocess.check_call(['brctl', 'addif', br_ifname, ifname]) wpas.interface_add(ifname, br_ifname=br_ifname) wpas.connect("open", key_mgmt="NONE", scan_freq="2412") def test_ap_open_start_disabled(dev, apdev): """AP with open mode and beaconing disabled""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open", "start_disabled": "1" }) bssid = apdev[0]['bssid'] dev[0].flush_scan_cache() dev[0].scan(freq=2412, only_new=True) if dev[0].get_bss(bssid) is not None: raise Exception("AP was seen beaconing") if "OK" not in hapd.request("RELOAD"): raise Exception("RELOAD failed") dev[0].scan_for_bss(bssid, freq=2412) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") def test_ap_open_start_disabled2(dev, apdev): """AP with open mode and beaconing disabled (2)""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open", "start_disabled": "1" }) bssid = apdev[0]['bssid'] dev[0].flush_scan_cache() dev[0].scan(freq=2412, only_new=True) if dev[0].get_bss(bssid) is not None: raise Exception("AP was seen beaconing") if "OK" not in hapd.request("UPDATE_BEACON"): raise Exception("UPDATE_BEACON failed") dev[0].scan_for_bss(bssid, freq=2412) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") if "OK" not in hapd.request("UPDATE_BEACON"): raise Exception("UPDATE_BEACON failed") dev[0].request("DISCONNECT") dev[0].wait_disconnected() dev[0].request("RECONNECT") dev[0].wait_connected() def test_ap_open_ifdown(dev, apdev): """AP with open mode and external ifconfig down""" params = { "ssid": "open", "ap_max_inactivity": "1" } hapd = hostapd.add_ap(apdev[0]['ifname'], params) bssid = apdev[0]['bssid'] dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[1].connect("open", key_mgmt="NONE", scan_freq="2412") subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'], 'down']) ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=10) if ev is None: raise Exception("Timeout on AP-STA-DISCONNECTED (1)") ev = hapd.wait_event(["AP-STA-DISCONNECTED"], timeout=5) if ev is None: raise Exception("Timeout on AP-STA-DISCONNECTED (2)") ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5) if ev is None: raise Exception("No INTERFACE-DISABLED event") # The following wait tests beacon loss detection in mac80211 on dev0. # dev1 is used to test stopping of AP side functionality on client polling. dev[1].request("REMOVE_NETWORK all") subprocess.call(['ip', 'link', 'set', 'dev', apdev[0]['ifname'], 'up']) dev[0].wait_disconnected() dev[1].wait_disconnected() ev = hapd.wait_event(["INTERFACE-ENABLED"], timeout=10) if ev is None: raise Exception("No INTERFACE-ENABLED event") dev[0].wait_connected() hwsim_utils.test_connectivity(dev[0], hapd) def test_ap_open_disconnect_in_ps(dev, apdev, params): """Disconnect with the client in PS to regression-test a kernel bug""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", bg_scan_period="0") ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5) if ev is None: raise Exception("No connection event received from hostapd") time.sleep(0.2) hwsim_utils.set_powersave(dev[0], hwsim_utils.PS_MANUAL_POLL) try: # inject some traffic sa = hapd.own_addr() da = dev[0].own_addr() hapd.request('DATA_TEST_CONFIG 1') hapd.request('DATA_TEST_TX {} {} 0'.format(da, sa)) hapd.request('DATA_TEST_CONFIG 0') # let the AP send couple of Beacon frames time.sleep(0.3) # disconnect - with traffic pending - shouldn't cause kernel warnings dev[0].request("DISCONNECT") finally: hwsim_utils.set_powersave(dev[0], hwsim_utils.PS_DISABLED) time.sleep(0.2) out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"), "wlan_mgt.tim.partial_virtual_bitmap", ["wlan_mgt.tim.partial_virtual_bitmap"]) if out is not None: state = 0 for l in out.splitlines(): pvb = int(l, 16) if pvb > 0 and state == 0: state = 1 elif pvb == 0 and state == 1: state = 2 if state != 2: raise Exception("Didn't observe TIM bit getting set and unset (state=%d)" % state) def test_ap_open_select_network(dev, apdev): """Open mode connection and SELECT_NETWORK to change network""" hapd1 = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) bssid1 = apdev[0]['bssid'] hapd2 = hostapd.add_ap(apdev[1]['ifname'], { "ssid": "open2" }) bssid2 = apdev[1]['bssid'] id1 = dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", only_add_network=True) id2 = dev[0].connect("open2", key_mgmt="NONE", scan_freq="2412") hwsim_utils.test_connectivity(dev[0], hapd2) dev[0].select_network(id1) dev[0].wait_connected() res = dev[0].request("BLACKLIST") if bssid1 in res or bssid2 in res: raise Exception("Unexpected blacklist entry") hwsim_utils.test_connectivity(dev[0], hapd1) dev[0].select_network(id2) dev[0].wait_connected() hwsim_utils.test_connectivity(dev[0], hapd2) res = dev[0].request("BLACKLIST") if bssid1 in res or bssid2 in res: raise Exception("Unexpected blacklist entry(2)") def test_ap_open_disable_enable(dev, apdev): """AP with open mode getting disabled and re-enabled""" hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412", bg_scan_period="0") for i in range(2): hapd.request("DISABLE") dev[0].wait_disconnected() hapd.request("ENABLE") dev[0].wait_connected() hwsim_utils.test_connectivity(dev[0], hapd)