fragattacks/tests/hwsim/test_ap_dynamic.py
Jouni Malinen df20cdc457 tests: Increase timeout in ap_bss_config_file for CTRL-EVENT-TERMINATING
If the kernel is built with CONFIG_DEBUG_KOBJECT_RELEASE=y, the hostapd
termination event for the wlan3 interface may be delayed beyond the
previous five second timeout. This could result in the test case failing
and the following test case failing as well due to the separate hostapd
process being still in the process of cleaning up.

Fix this by increasing the timeout to avoid forcing test termination in
such cases.

Signed-off-by: Jouni Malinen <j@w1.fi>
2017-01-29 14:32:17 +02:00

568 lines
21 KiB
Python

# Test cases for dynamic BSS changes with hostapd
# Copyright (c) 2013, Qualcomm Atheros, Inc.
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
from remotehost import remote_compatible
import time
import subprocess
import logging
logger = logging.getLogger()
import os
import hwsim_utils
import hostapd
from utils import alloc_fail, require_under_vm, get_phy
from test_ap_acs import force_prev_ap_on_24g
@remote_compatible
def test_ap_change_ssid(dev, apdev):
"""Dynamic SSID change with hostapd and WPA2-PSK"""
params = hostapd.wpa2_params(ssid="test-wpa2-psk-start",
passphrase="12345678")
hapd = hostapd.add_ap(apdev[0], params)
id = dev[0].connect("test-wpa2-psk-start", psk="12345678",
scan_freq="2412")
dev[0].request("DISCONNECT")
logger.info("Change SSID dynamically")
res = hapd.request("SET ssid test-wpa2-psk-new")
if "OK" not in res:
raise Exception("SET command failed")
res = hapd.request("RELOAD")
if "OK" not in res:
raise Exception("RELOAD command failed")
dev[0].set_network_quoted(id, "ssid", "test-wpa2-psk-new")
dev[0].connect_network(id)
def multi_check(dev, check, scan_opt=True):
id = []
num_bss = len(check)
for i in range(0, num_bss):
dev[i].request("BSS_FLUSH 0")
dev[i].dump_monitor()
for i in range(0, num_bss):
if check[i]:
continue
id.append(dev[i].connect("bss-" + str(i + 1), key_mgmt="NONE",
scan_freq="2412", wait_connect=False))
for i in range(num_bss):
if not check[i]:
continue
bssid = '02:00:00:00:03:0' + str(i)
if scan_opt:
dev[i].scan_for_bss(bssid, freq=2412)
id.append(dev[i].connect("bss-" + str(i + 1), key_mgmt="NONE",
scan_freq="2412", wait_connect=True))
first = True
for i in range(num_bss):
if not check[i]:
timeout=0.2 if first else 0.01
first = False
ev = dev[i].wait_event(["CTRL-EVENT-CONNECTED"], timeout=timeout)
if ev:
raise Exception("Unexpected connection")
for i in range(0, num_bss):
dev[i].remove_network(id[i])
for i in range(num_bss):
if check[i]:
dev[i].wait_disconnected(timeout=5)
res = ''
for i in range(0, num_bss):
res = res + dev[i].request("BSS RANGE=ALL MASK=0x2")
for i in range(0, num_bss):
if not check[i]:
bssid = '02:00:00:00:03:0' + str(i)
if bssid in res:
raise Exception("Unexpected BSS" + str(i) + " in scan results")
def test_ap_bss_add_remove(dev, apdev):
"""Dynamic BSS add/remove operations with hostapd"""
try:
_test_ap_bss_add_remove(dev, apdev)
finally:
for i in range(3):
dev[i].request("SCAN_INTERVAL 5")
def _test_ap_bss_add_remove(dev, apdev):
for i in range(3):
dev[i].flush_scan_cache()
dev[i].request("SCAN_INTERVAL 1")
ifname1 = apdev[0]['ifname']
ifname2 = apdev[0]['ifname'] + '-2'
ifname3 = apdev[0]['ifname'] + '-3'
logger.info("Set up three BSSes one by one")
hostapd.add_bss(apdev[0], ifname1, 'bss-1.conf')
multi_check(dev, [ True, False, False ])
hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
multi_check(dev, [ True, True, False ])
hostapd.add_bss(apdev[0], ifname3, 'bss-3.conf')
multi_check(dev, [ True, True, True ])
logger.info("Remove the last BSS and re-add it")
hostapd.remove_bss(apdev[0], ifname3)
multi_check(dev, [ True, True, False ])
hostapd.add_bss(apdev[0], ifname3, 'bss-3.conf')
multi_check(dev, [ True, True, True ])
logger.info("Remove the middle BSS and re-add it")
hostapd.remove_bss(apdev[0], ifname2)
multi_check(dev, [ True, False, True ])
hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
multi_check(dev, [ True, True, True ])
logger.info("Remove the first BSS and re-add it and other BSSs")
hostapd.remove_bss(apdev[0], ifname1)
multi_check(dev, [ False, False, False ])
hostapd.add_bss(apdev[0], ifname1, 'bss-1.conf')
hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
hostapd.add_bss(apdev[0], ifname3, 'bss-3.conf')
multi_check(dev, [ True, True, True ])
logger.info("Remove two BSSes and re-add them")
hostapd.remove_bss(apdev[0], ifname2)
multi_check(dev, [ True, False, True ])
hostapd.remove_bss(apdev[0], ifname3)
multi_check(dev, [ True, False, False ])
hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
multi_check(dev, [ True, True, False ])
hostapd.add_bss(apdev[0], ifname3, 'bss-3.conf')
multi_check(dev, [ True, True, True ])
logger.info("Remove three BSSes in and re-add them")
hostapd.remove_bss(apdev[0], ifname3)
multi_check(dev, [ True, True, False ])
hostapd.remove_bss(apdev[0], ifname2)
multi_check(dev, [ True, False, False ])
hostapd.remove_bss(apdev[0], ifname1)
multi_check(dev, [ False, False, False ])
hostapd.add_bss(apdev[0], ifname1, 'bss-1.conf')
multi_check(dev, [ True, False, False ])
hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
multi_check(dev, [ True, True, False ])
hostapd.add_bss(apdev[0], ifname3, 'bss-3.conf')
multi_check(dev, [ True, True, True ])
logger.info("Test error handling if a duplicate ifname is tried")
hostapd.add_bss(apdev[0], ifname3, 'bss-3.conf', ignore_error=True)
multi_check(dev, [ True, True, True ])
def test_ap_bss_add_remove_during_ht_scan(dev, apdev):
"""Dynamic BSS add during HT40 co-ex scan"""
for i in range(3):
dev[i].flush_scan_cache()
ifname1 = apdev[0]['ifname']
ifname2 = apdev[0]['ifname'] + '-2'
hostapd.add_bss(apdev[0], ifname1, 'bss-ht40-1.conf')
hostapd.add_bss(apdev[0], ifname2, 'bss-ht40-2.conf')
multi_check(dev, [ True, True ], scan_opt=False)
hostapd.remove_bss(apdev[0], ifname2)
hostapd.remove_bss(apdev[0], ifname1)
hostapd.add_bss(apdev[0], ifname1, 'bss-ht40-1.conf')
hostapd.add_bss(apdev[0], ifname2, 'bss-ht40-2.conf')
hostapd.remove_bss(apdev[0], ifname2)
multi_check(dev, [ True, False ], scan_opt=False)
hostapd.remove_bss(apdev[0], ifname1)
hostapd.add_bss(apdev[0], ifname1, 'bss-ht40-1.conf')
hostapd.add_bss(apdev[0], ifname2, 'bss-ht40-2.conf')
hostapd.remove_bss(apdev[0], ifname1)
multi_check(dev, [ False, False ])
def test_ap_multi_bss_config(dev, apdev):
"""hostapd start with a multi-BSS configuration file"""
for i in range(3):
dev[i].flush_scan_cache()
ifname1 = apdev[0]['ifname']
ifname2 = apdev[0]['ifname'] + '-2'
ifname3 = apdev[0]['ifname'] + '-3'
logger.info("Set up three BSSes with one configuration file")
hapd = hostapd.add_iface(apdev[0], 'multi-bss.conf')
hapd.enable()
multi_check(dev, [ True, True, True ])
hostapd.remove_bss(apdev[0], ifname2)
multi_check(dev, [ True, False, True ])
hostapd.remove_bss(apdev[0], ifname3)
multi_check(dev, [ True, False, False ])
hostapd.remove_bss(apdev[0], ifname1)
multi_check(dev, [ False, False, False ])
hapd = hostapd.add_iface(apdev[0], 'multi-bss.conf')
hapd.enable()
hostapd.remove_bss(apdev[0], ifname1)
multi_check(dev, [ False, False, False ])
def invalid_ap(ap):
logger.info("Trying to start AP " + ap['ifname'] + " with invalid configuration")
hapd = hostapd.add_ap(ap, {}, no_enable=True)
hapd.set("ssid", "invalid-config")
hapd.set("channel", "12345")
try:
hapd.enable()
started = True
except Exception, e:
started = False
if started:
raise Exception("ENABLE command succeeded unexpectedly")
return hapd
@remote_compatible
def test_ap_invalid_config(dev, apdev):
"""Try to start AP with invalid configuration and fix configuration"""
hapd = invalid_ap(apdev[0])
logger.info("Fix configuration and start AP again")
hapd.set("channel", "1")
hapd.enable()
dev[0].connect("invalid-config", key_mgmt="NONE", scan_freq="2412")
@remote_compatible
def test_ap_invalid_config2(dev, apdev):
"""Try to start AP with invalid configuration and remove interface"""
hapd = invalid_ap(apdev[0])
logger.info("Remove interface with failed configuration")
hostapd.remove_bss(apdev[0])
def test_ap_remove_during_acs(dev, apdev):
"""Remove interface during ACS"""
force_prev_ap_on_24g(apdev[0])
params = hostapd.wpa2_params(ssid="test-acs-remove", passphrase="12345678")
params['channel'] = '0'
hostapd.add_ap(apdev[0], params)
hostapd.remove_bss(apdev[0])
def test_ap_remove_during_acs2(dev, apdev):
"""Remove BSS during ACS in multi-BSS configuration"""
force_prev_ap_on_24g(apdev[0])
ifname = apdev[0]['ifname']
ifname2 = ifname + "-2"
hapd = hostapd.add_ap(apdev[0], {}, no_enable=True)
hapd.set("ssid", "test-acs-remove")
hapd.set("channel", "0")
hapd.set("bss", ifname2)
hapd.set("ssid", "test-acs-remove2")
hapd.enable()
hostapd.remove_bss(apdev[0])
def test_ap_remove_during_acs3(dev, apdev):
"""Remove second BSS during ACS in multi-BSS configuration"""
force_prev_ap_on_24g(apdev[0])
ifname = apdev[0]['ifname']
ifname2 = ifname + "-2"
hapd = hostapd.add_ap(apdev[0], {}, no_enable=True)
hapd.set("ssid", "test-acs-remove")
hapd.set("channel", "0")
hapd.set("bss", ifname2)
hapd.set("ssid", "test-acs-remove2")
hapd.enable()
hostapd.remove_bss(apdev[0], ifname2)
@remote_compatible
def test_ap_remove_during_ht_coex_scan(dev, apdev):
"""Remove interface during HT co-ex scan"""
params = hostapd.wpa2_params(ssid="test-ht-remove", passphrase="12345678")
params['channel'] = '1'
params['ht_capab'] = "[HT40+]"
ifname = apdev[0]['ifname']
hostapd.add_ap(apdev[0], params)
hostapd.remove_bss(apdev[0])
def test_ap_remove_during_ht_coex_scan2(dev, apdev):
"""Remove BSS during HT co-ex scan in multi-BSS configuration"""
ifname = apdev[0]['ifname']
ifname2 = ifname + "-2"
hapd = hostapd.add_ap(apdev[0], {}, no_enable=True)
hapd.set("ssid", "test-ht-remove")
hapd.set("channel", "1")
hapd.set("ht_capab", "[HT40+]")
hapd.set("bss", ifname2)
hapd.set("ssid", "test-ht-remove2")
hapd.enable()
hostapd.remove_bss(apdev[0])
def test_ap_remove_during_ht_coex_scan3(dev, apdev):
"""Remove second BSS during HT co-ex scan in multi-BSS configuration"""
ifname = apdev[0]['ifname']
ifname2 = ifname + "-2"
hapd = hostapd.add_ap(apdev[0], {}, no_enable=True)
hapd.set("ssid", "test-ht-remove")
hapd.set("channel", "1")
hapd.set("ht_capab", "[HT40+]")
hapd.set("bss", ifname2)
hapd.set("ssid", "test-ht-remove2")
hapd.enable()
hostapd.remove_bss(apdev[0], ifname2)
@remote_compatible
def test_ap_enable_disable_reenable(dev, apdev):
"""Enable, disable, re-enable AP"""
hapd = hostapd.add_ap(apdev[0], {}, no_enable=True)
hapd.set("ssid", "dynamic")
hapd.enable()
ev = hapd.wait_event(["AP-ENABLED"], timeout=30)
if ev is None:
raise Exception("AP startup timed out")
dev[0].connect("dynamic", key_mgmt="NONE", scan_freq="2412")
hapd.disable()
ev = hapd.wait_event(["AP-DISABLED"], timeout=30)
if ev is None:
raise Exception("AP disabling timed out")
dev[0].wait_disconnected(timeout=10)
hapd.enable()
ev = hapd.wait_event(["AP-ENABLED"], timeout=30)
if ev is None:
raise Exception("AP startup timed out")
dev[1].connect("dynamic", key_mgmt="NONE", scan_freq="2412")
dev[0].wait_connected(timeout=10)
def test_ap_double_disable(dev, apdev):
"""Double DISABLE regression test"""
hapd = hostapd.add_bss(apdev[0], apdev[0]['ifname'], 'bss-1.conf')
hostapd.add_bss(apdev[0], apdev[0]['ifname'] + '-2', 'bss-2.conf')
hapd.disable()
if "FAIL" not in hapd.request("DISABLE"):
raise Exception("Second DISABLE accepted unexpectedly")
hapd.enable()
hapd.disable()
if "FAIL" not in hapd.request("DISABLE"):
raise Exception("Second DISABLE accepted unexpectedly")
def test_ap_bss_add_many(dev, apdev):
"""Large number of BSS add operations with hostapd"""
try:
_test_ap_bss_add_many(dev, apdev)
finally:
dev[0].request("SCAN_INTERVAL 5")
ifname = apdev[0]['ifname']
hapd = hostapd.HostapdGlobal(apdev[0])
hapd.flush()
for i in range(16):
ifname2 = ifname + '-' + str(i)
hapd.remove(ifname2)
try:
os.remove('/tmp/hwsim-bss.conf')
except:
pass
def _test_ap_bss_add_many(dev, apdev):
ifname = apdev[0]['ifname']
hostapd.add_bss(apdev[0], ifname, 'bss-1.conf')
fname = '/tmp/hwsim-bss.conf'
for i in range(16):
ifname2 = ifname + '-' + str(i)
with open(fname, 'w') as f:
f.write("driver=nl80211\n")
f.write("hw_mode=g\n")
f.write("channel=1\n")
f.write("ieee80211n=1\n")
f.write("interface=%s\n" % ifname2)
f.write("bssid=02:00:00:00:03:%02x\n" % (i + 1))
f.write("ctrl_interface=/var/run/hostapd\n")
f.write("ssid=test-%d\n" % i)
hostapd.add_bss(apdev[0], ifname2, fname)
os.remove(fname)
dev[0].request("SCAN_INTERVAL 1")
dev[0].connect("bss-1", key_mgmt="NONE", scan_freq="2412")
dev[0].request("DISCONNECT")
dev[0].wait_disconnected(timeout=5)
for i in range(16):
dev[0].connect("test-%d" % i, key_mgmt="NONE", scan_freq="2412")
dev[0].request("DISCONNECT")
dev[0].wait_disconnected(timeout=5)
ifname2 = ifname + '-' + str(i)
hostapd.remove_bss(apdev[0], ifname2)
def test_ap_bss_add_reuse_existing(dev, apdev):
"""Dynamic BSS add operation reusing existing interface"""
ifname1 = apdev[0]['ifname']
ifname2 = apdev[0]['ifname'] + '-2'
hostapd.add_bss(apdev[0], ifname1, 'bss-1.conf')
subprocess.check_call(["iw", "dev", ifname1, "interface", "add", ifname2,
"type", "__ap"])
hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
hostapd.remove_bss(apdev[0], ifname2)
subprocess.check_call(["iw", "dev", ifname2, "del"])
def hapd_bss_out_of_mem(hapd, phy, confname, count, func):
with alloc_fail(hapd, count, func):
hapd_global = hostapd.HostapdGlobal()
res = hapd_global.ctrl.request("ADD bss_config=" + phy + ":" + confname)
if "OK" in res:
raise Exception("add_bss succeeded")
def test_ap_bss_add_out_of_memory(dev, apdev):
"""Running out of memory while adding a BSS"""
hapd2 = hostapd.add_ap(apdev[1], { "ssid": "open" })
ifname1 = apdev[0]['ifname']
ifname2 = apdev[0]['ifname'] + '-2'
hapd_bss_out_of_mem(hapd2, 'phy3', 'bss-1.conf', 1, 'hostapd_add_iface')
for i in range(1, 3):
hapd_bss_out_of_mem(hapd2, 'phy3', 'bss-1.conf',
i, 'hostapd_interface_init_bss')
hapd_bss_out_of_mem(hapd2, 'phy3', 'bss-1.conf',
1, 'ieee802_11_build_ap_params')
hostapd.add_bss(apdev[0], ifname1, 'bss-1.conf')
hapd_bss_out_of_mem(hapd2, 'phy3', 'bss-2.conf',
1, 'hostapd_interface_init_bss')
hapd_bss_out_of_mem(hapd2, 'phy3', 'bss-2.conf',
1, 'ieee802_11_build_ap_params')
hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
hostapd.remove_bss(apdev[0], ifname2)
hostapd.remove_bss(apdev[0], ifname1)
def test_ap_multi_bss(dev, apdev):
"""Multiple BSSes with hostapd"""
ifname1 = apdev[0]['ifname']
ifname2 = apdev[0]['ifname'] + '-2'
hapd1 = hostapd.add_bss(apdev[0], ifname1, 'bss-1.conf')
hapd2 = hostapd.add_bss(apdev[0], ifname2, 'bss-2.conf')
dev[0].connect("bss-1", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("bss-2", key_mgmt="NONE", scan_freq="2412")
hwsim_utils.test_connectivity(dev[0], hapd1)
hwsim_utils.test_connectivity(dev[1], hapd2)
sta0 = hapd1.get_sta(dev[0].own_addr())
sta1 = hapd2.get_sta(dev[1].own_addr())
if 'rx_packets' not in sta0 or int(sta0['rx_packets']) < 1:
raise Exception("sta0 did not report receiving packets")
if 'rx_packets' not in sta1 or int(sta1['rx_packets']) < 1:
raise Exception("sta1 did not report receiving packets")
@remote_compatible
def test_ap_add_with_driver(dev, apdev):
"""Add hostapd interface with driver specified"""
ifname = apdev[0]['ifname']
try:
hostname = apdev[0]['hostname']
except:
hostname = None
hapd_global = hostapd.HostapdGlobal(apdev[0])
hapd_global.add(ifname, driver="nl80211")
port = hapd_global.get_ctrl_iface_port(ifname)
hapd = hostapd.Hostapd(ifname, hostname, port)
hapd.set_defaults()
hapd.set("ssid", "dynamic")
hapd.enable()
ev = hapd.wait_event(["AP-ENABLED"], timeout=30)
if ev is None:
raise Exception("AP startup timed out")
dev[0].connect("dynamic", key_mgmt="NONE", scan_freq="2412")
dev[0].request("DISCONNECT")
dev[0].wait_disconnected()
hapd.disable()
def test_ap_iapp(dev, apdev):
"""IAPP and multiple BSSes"""
require_under_vm()
try:
_test_ap_iapp(dev, apdev)
finally:
subprocess.call(['ifconfig', 'br-multicast', 'down'],
stderr=open('/dev/null', 'w'))
subprocess.call(['brctl', 'delbr', 'br-multicast'],
stderr=open('/dev/null', 'w'))
def _test_ap_iapp(dev, apdev):
br_ifname = 'br-multicast'
subprocess.call(['brctl', 'addbr', br_ifname])
subprocess.call(['brctl', 'setfd', br_ifname, '0'])
subprocess.call(['ip', 'addr', 'add', '10.174.65.206/31', 'dev', br_ifname])
subprocess.call(['ip', 'link', 'set', 'dev', br_ifname, 'up'])
subprocess.call(['ip', 'route', 'add', '224.0.0.0/4', 'dev', br_ifname])
params = { "ssid": "test-1",
"bridge": br_ifname,
"iapp_interface": br_ifname }
hapd = hostapd.add_ap(apdev[0], params)
dev[0].scan_for_bss(apdev[0]['bssid'], freq=2412)
dev[0].connect("test-1", key_mgmt="NONE", scan_freq="2412")
dev[1].connect("test-1", key_mgmt="NONE", scan_freq="2412")
hapd2 = hostapd.add_ap(apdev[1], params)
dev[0].scan_for_bss(apdev[1]['bssid'], freq=2412)
dev[0].roam(apdev[1]['bssid'])
dev[0].roam(apdev[0]['bssid'])
dev[0].request("DISCONNECT")
dev[1].request("DISCONNECT")
dev[0].wait_disconnected()
dev[1].wait_disconnected()
hapd.disable()
hapd2.disable()
def test_ap_duplicate_bssid(dev, apdev):
"""Duplicate BSSID"""
params = { "ssid": "test" }
hapd = hostapd.add_ap(apdev[0], params, no_enable=True)
hapd.enable()
ifname2 = apdev[0]['ifname'] + '-2'
ifname3 = apdev[0]['ifname'] + '-3'
# "BSS 'wlan3-2' may not have BSSID set to the MAC address of the radio"
try:
hostapd.add_bss(apdev[0], ifname2, 'bss-2-dup.conf')
raise Exception("BSS add succeeded unexpectedly")
except Exception, e:
if "Could not add hostapd BSS" in str(e):
pass
else:
raise
hostapd.add_bss(apdev[0], ifname3, 'bss-3.conf')
dev[0].connect("test", key_mgmt="NONE", scan_freq="2412")
dev[0].request("DISCONNECT")
dev[0].wait_disconnected()
hapd.set("bssid", "02:00:00:00:03:02")
hapd.disable()
# "Duplicate BSSID 02:00:00:00:03:02 on interface 'wlan3-3' and 'wlan3'."
if "FAIL" not in hapd.request("ENABLE"):
raise Exception("ENABLE with duplicate BSSID succeeded unexpectedly")
def test_ap_bss_config_file(dev, apdev, params):
"""hostapd BSS config file"""
pidfile = os.path.join(params['logdir'], "ap_bss_config_file-hostapd.pid")
logfile = os.path.join(params['logdir'], "ap_bss_config_file-hostapd-log")
prg = os.path.join(params['logdir'], 'alt-hostapd/hostapd/hostapd')
if not os.path.exists(prg):
prg = '../../hostapd/hostapd'
phy = get_phy(apdev[0])
cmd = [ prg, '-B', '-dddt', '-P', pidfile, '-f', logfile, '-S', '-T',
'-b', phy + ':bss-1.conf', '-b', phy + ':bss-2.conf',
'-b', phy + ':bss-3.conf' ]
res = subprocess.check_call(cmd)
if res != 0:
raise Exception("Could not start hostapd: %s" % str(res))
multi_check(dev, [ True, True, True ])
for i in range(0, 3):
dev[i].request("DISCONNECT")
hapd = hostapd.Hostapd(apdev[0]['ifname'])
hapd.ping()
if "OK" not in hapd.request("TERMINATE"):
raise Exception("Failed to terminate hostapd process")
ev = hapd.wait_event(["CTRL-EVENT-TERMINATING"], timeout=15)
if ev is None:
raise Exception("CTRL-EVENT-TERMINATING not seen")
for i in range(30):
time.sleep(0.1)
if not os.path.exists(pidfile):
break
if os.path.exists(pidfile):
raise Exception("PID file exits after process termination")