mirror of
https://github.com/vanhoefm/fragattacks.git
synced 2025-01-18 10:54:03 -05:00
tests: Use internal DATA_TEST_* functionality instead of hwsim_test
This replaces use of the external hwsim_test tool for most data connectivity test cases. Only the cases where a special interface (bridge/VLAN) is used are still executed through hwsim_test. The internal DATA_TEST_* functionality makes it easier to extend the connectivity test cases through an external device with real WLAN hardware instead of the hwsim test setup. In addition, the error reports from this code can be made more informative. Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
parent
4823566ca7
commit
ce9c8c4097
@ -10,6 +10,8 @@ import time
|
|||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
from wpasupplicant import WpaSupplicant
|
||||||
|
|
||||||
def test_connectivity_run(ifname1, ifname2, dscp=None, tos=None, max_tries=1):
|
def test_connectivity_run(ifname1, ifname2, dscp=None, tos=None, max_tries=1):
|
||||||
if os.path.isfile("../../mac80211_hwsim/tools/hwsim_test"):
|
if os.path.isfile("../../mac80211_hwsim/tools/hwsim_test"):
|
||||||
hwsim_test = "../../mac80211_hwsim/tools/hwsim_test"
|
hwsim_test = "../../mac80211_hwsim/tools/hwsim_test"
|
||||||
@ -40,25 +42,125 @@ def test_connectivity_run(ifname1, ifname2, dscp=None, tos=None, max_tries=1):
|
|||||||
if not success:
|
if not success:
|
||||||
raise Exception("hwsim_test failed")
|
raise Exception("hwsim_test failed")
|
||||||
|
|
||||||
def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1):
|
def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False):
|
||||||
test_connectivity_run(dev1.ifname, dev2.ifname, dscp=dscp, tos=tos,
|
addr1 = dev1.own_addr()
|
||||||
max_tries=max_tries)
|
if not dev1group and isinstance(dev1, WpaSupplicant):
|
||||||
|
addr1 = dev1.get_driver_status_field('addr')
|
||||||
|
|
||||||
|
addr2 = dev2.own_addr()
|
||||||
|
if not dev2group and isinstance(dev2, WpaSupplicant):
|
||||||
|
addr2 = dev2.get_driver_status_field('addr')
|
||||||
|
|
||||||
|
try:
|
||||||
|
cmd = "DATA_TEST_CONFIG 1"
|
||||||
|
if dev1group:
|
||||||
|
res = dev1.group_request(cmd)
|
||||||
|
else:
|
||||||
|
res = dev1.request(cmd)
|
||||||
|
if "OK" not in res:
|
||||||
|
raise Exception("Failed to enable data test functionality")
|
||||||
|
|
||||||
|
if dev2group:
|
||||||
|
res = dev2.group_request(cmd)
|
||||||
|
else:
|
||||||
|
res = dev2.request(cmd)
|
||||||
|
if "OK" not in res:
|
||||||
|
raise Exception("Failed to enable data test functionality")
|
||||||
|
|
||||||
|
cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
|
||||||
|
if dev1group:
|
||||||
|
dev1.group_request(cmd)
|
||||||
|
else:
|
||||||
|
dev1.request(cmd)
|
||||||
|
if dev2group:
|
||||||
|
ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
else:
|
||||||
|
ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
if ev is None:
|
||||||
|
raise Exception("dev1->dev2 unicast data delivery failed")
|
||||||
|
if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
|
||||||
|
raise Exception("Unexpected dev1->dev2 unicast data result")
|
||||||
|
|
||||||
|
cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
|
||||||
|
if dev1group:
|
||||||
|
dev1.group_request(cmd)
|
||||||
|
else:
|
||||||
|
dev1.request(cmd)
|
||||||
|
if dev2group:
|
||||||
|
ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
else:
|
||||||
|
ev = dev2.wait_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
if ev is None:
|
||||||
|
raise Exception("dev1->dev2 broadcast data delivery failed")
|
||||||
|
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
|
||||||
|
raise Exception("Unexpected dev1->dev2 broadcast data result")
|
||||||
|
|
||||||
|
cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
|
||||||
|
if dev2group:
|
||||||
|
dev2.group_request(cmd)
|
||||||
|
else:
|
||||||
|
dev2.request(cmd)
|
||||||
|
if dev1group:
|
||||||
|
ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
else:
|
||||||
|
ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
if ev is None:
|
||||||
|
raise Exception("dev2->dev1 unicast data delivery failed")
|
||||||
|
if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
|
||||||
|
raise Exception("Unexpected dev2->dev1 unicast data result")
|
||||||
|
|
||||||
|
cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr2, tos)
|
||||||
|
if dev2group:
|
||||||
|
dev2.group_request(cmd)
|
||||||
|
else:
|
||||||
|
dev2.request(cmd)
|
||||||
|
if dev1group:
|
||||||
|
ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
else:
|
||||||
|
ev = dev1.wait_event(["DATA-TEST-RX"], timeout=5)
|
||||||
|
if ev is None:
|
||||||
|
raise Exception("dev2->dev1 broadcast data delivery failed")
|
||||||
|
if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr2) not in ev:
|
||||||
|
raise Exception("Unexpected dev2->dev1 broadcast data result")
|
||||||
|
finally:
|
||||||
|
if dev1group:
|
||||||
|
dev1.group_request("DATA_TEST_CONFIG 0")
|
||||||
|
else:
|
||||||
|
dev1.request("DATA_TEST_CONFIG 0")
|
||||||
|
if dev2group:
|
||||||
|
dev2.group_request("DATA_TEST_CONFIG 0")
|
||||||
|
else:
|
||||||
|
dev2.request("DATA_TEST_CONFIG 0")
|
||||||
|
|
||||||
|
def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, dev1group=False, dev2group=False):
|
||||||
|
if dscp:
|
||||||
|
tos = dscp << 2
|
||||||
|
if not tos:
|
||||||
|
tos = 0
|
||||||
|
|
||||||
|
success = False
|
||||||
|
last_err = None
|
||||||
|
for i in range(0, max_tries):
|
||||||
|
try:
|
||||||
|
run_connectivity_test(dev1, dev2, tos, dev1group, dev2group)
|
||||||
|
success = True
|
||||||
|
break
|
||||||
|
except Exception, e:
|
||||||
|
last_err = e
|
||||||
|
if i + 1 < max_tries:
|
||||||
|
time.sleep(1)
|
||||||
|
if not success:
|
||||||
|
raise Exception(last_err)
|
||||||
|
|
||||||
def test_connectivity_iface(dev1, ifname, dscp=None, tos=None, max_tries=1):
|
def test_connectivity_iface(dev1, ifname, dscp=None, tos=None, max_tries=1):
|
||||||
test_connectivity_run(dev1.ifname, ifname, dscp=dscp, tos=tos,
|
test_connectivity_run(dev1.ifname, ifname, dscp=dscp, tos=tos,
|
||||||
max_tries=max_tries)
|
max_tries=max_tries)
|
||||||
|
|
||||||
def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
|
def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
|
||||||
ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname
|
test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
|
||||||
ifname2 = dev2.group_ifname if dev2.group_ifname else dev2.ifname
|
|
||||||
test_connectivity_run(ifname1, ifname2, dscp, tos)
|
|
||||||
|
|
||||||
def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
|
def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
|
||||||
ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname
|
test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
|
||||||
ifname2 = dev2.ifname
|
|
||||||
test_connectivity_run(ifname1, ifname2, dscp, tos)
|
|
||||||
|
|
||||||
def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
|
def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
|
||||||
ifname1 = dev1.ifname
|
test_connectivity(dev1, dev2, dscp, tos)
|
||||||
ifname2 = dev2.ifname
|
|
||||||
test_connectivity_run(ifname1, ifname2, dscp, tos)
|
|
||||||
|
@ -17,10 +17,11 @@ def test_ap_open(dev, apdev):
|
|||||||
"""AP with open mode (no security) configuration"""
|
"""AP with open mode (no security) configuration"""
|
||||||
hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
|
hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
|
||||||
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
|
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
|
||||||
hwsim_utils.test_connectivity(dev[0], hapd)
|
|
||||||
ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
|
ev = hapd.wait_event([ "AP-STA-CONNECTED" ], timeout=5)
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("No connection event received from hostapd")
|
raise Exception("No connection event received from hostapd")
|
||||||
|
hwsim_utils.test_connectivity(dev[0], hapd)
|
||||||
|
|
||||||
dev[0].request("DISCONNECT")
|
dev[0].request("DISCONNECT")
|
||||||
ev = hapd.wait_event([ "AP-STA-DISCONNECTED" ], timeout=5)
|
ev = hapd.wait_event([ "AP-STA-DISCONNECTED" ], timeout=5)
|
||||||
if ev is None:
|
if ev is None:
|
||||||
|
Loading…
Reference in New Issue
Block a user