tests: Verify QoS Mapping results in Data frames

This verifies that IP packets with various DSCP values are mapped to the
correct TID both with default mapping and with custom QoS mappings.

Signed-hostap: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2013-12-24 20:21:58 +02:00
parent 99d7c1dedf
commit 65249f6c4a
3 changed files with 87 additions and 11 deletions

View File

@ -11,7 +11,7 @@ import subprocess
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
def test_connectivity(ifname1, ifname2): def test_connectivity(ifname1, ifname2, dscp=None, tos=None):
if os.path.isfile("../../mac80211_hwsim/tools/hwsim_test"): if os.path.isfile("../../mac80211_hwsim/tools/hwsim_test"):
hwsim_test = "../../mac80211_hwsim/tools/hwsim_test" hwsim_test = "../../mac80211_hwsim/tools/hwsim_test"
else: else:
@ -20,6 +20,12 @@ def test_connectivity(ifname1, ifname2):
hwsim_test, hwsim_test,
ifname1, ifname1,
ifname2] ifname2]
if dscp:
cmd.append('-D')
cmd.append(str(dscp))
elif tos:
cmd.append('-t')
cmd.append(str(tos))
try: try:
s = subprocess.check_output(cmd) s = subprocess.check_output(cmd)
logger.debug(s) logger.debug(s)
@ -28,17 +34,17 @@ def test_connectivity(ifname1, ifname2):
logger.info(e.output) logger.info(e.output)
raise raise
def test_connectivity_p2p(dev1, dev2): def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname
ifname2 = dev2.group_ifname if dev2.group_ifname else dev2.ifname ifname2 = dev2.group_ifname if dev2.group_ifname else dev2.ifname
test_connectivity(ifname1, ifname2) test_connectivity(ifname1, ifname2, dscp, tos)
def test_connectivity_p2p_sta(dev1, dev2): def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname ifname1 = dev1.group_ifname if dev1.group_ifname else dev1.ifname
ifname2 = dev2.ifname ifname2 = dev2.ifname
test_connectivity(ifname1, ifname2) test_connectivity(ifname1, ifname2, dscp, tos)
def test_connectivity_sta(dev1, dev2): def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
ifname1 = dev1.ifname ifname1 = dev1.ifname
ifname2 = dev2.ifname ifname2 = dev2.ifname
test_connectivity(ifname1, ifname2) test_connectivity(ifname1, ifname2, dscp, tos)

View File

@ -13,6 +13,21 @@ logger = logging.getLogger()
import hwsim_utils import hwsim_utils
import hostapd import hostapd
from wlantest import Wlantest
def check_qos_map(ap, dev, dscp, tid):
bssid = ap['bssid']
sta = dev.p2p_interface_addr()
wt = Wlantest()
wt.clear_sta_counters(bssid, sta)
hwsim_utils.test_connectivity(dev.ifname, ap['ifname'], dscp=dscp)
[ tx, rx ] = wt.get_tid_counters(bssid, sta)
if tx[tid] == 0:
logger.info("Expected TX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(tx))
raise Exception("No STA->AP data frame using the expected TID")
if rx[tid] == 0:
logger.info("Expected RX DSCP " + str(dscp) + " with TID " + str(tid) + " but counters: " + str(rx))
raise Exception("No AP->STA data frame using the expected TID")
def test_ap_qosmap(dev, apdev): def test_ap_qosmap(dev, apdev):
"""QoS mapping""" """QoS mapping"""
@ -21,11 +36,38 @@ def test_ap_qosmap(dev, apdev):
return "skip" return "skip"
ssid = "test-qosmap" ssid = "test-qosmap"
params = { "ssid": ssid } params = { "ssid": ssid }
params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,255,255' params['qos_map_set'] = '53,2,22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55'
hostapd.add_ap(apdev[0]['ifname'], params) hostapd.add_ap(apdev[0]['ifname'], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412") dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
hwsim_utils.test_connectivity(dev[0].ifname, apdev[0]['ifname']) check_qos_map(apdev[0], dev[0], 53, 2)
check_qos_map(apdev[0], dev[0], 22, 6)
check_qos_map(apdev[0], dev[0], 8, 0)
check_qos_map(apdev[0], dev[0], 15, 0)
check_qos_map(apdev[0], dev[0], 0, 1)
check_qos_map(apdev[0], dev[0], 7, 1)
check_qos_map(apdev[0], dev[0], 16, 3)
check_qos_map(apdev[0], dev[0], 31, 3)
check_qos_map(apdev[0], dev[0], 32, 4)
check_qos_map(apdev[0], dev[0], 39, 4)
check_qos_map(apdev[0], dev[0], 40, 6)
check_qos_map(apdev[0], dev[0], 47, 6)
check_qos_map(apdev[0], dev[0], 48, 7)
check_qos_map(apdev[0], dev[0], 55, 7)
hapd = hostapd.Hostapd(apdev[0]['ifname']) hapd = hostapd.Hostapd(apdev[0]['ifname'])
hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,255,255") hapd.request("SET_QOS_MAP_SET 22,6,8,15,0,7,255,255,16,31,32,39,255,255,40,47,48,55")
hapd.request("SEND_QOS_MAP_CONF " + dev[0].get_status_field("address")) hapd.request("SEND_QOS_MAP_CONF " + dev[0].get_status_field("address"))
hwsim_utils.test_connectivity(dev[0].ifname, apdev[0]['ifname']) check_qos_map(apdev[0], dev[0], 53, 7)
check_qos_map(apdev[0], dev[0], 22, 6)
check_qos_map(apdev[0], dev[0], 48, 7)
check_qos_map(apdev[0], dev[0], 55, 7)
check_qos_map(apdev[0], dev[0], 56, 56 >> 3)
check_qos_map(apdev[0], dev[0], 63, 63 >> 3)
def test_ap_qosmap_default(dev, apdev):
"""QoS mapping with default values"""
ssid = "test-qosmap-default"
params = { "ssid": ssid }
hostapd.add_ap(apdev[0]['ifname'], params)
dev[0].connect(ssid, key_mgmt="NONE", scan_freq="2412")
for dscp in [ 0, 7, 8, 15, 16, 23, 24, 31, 32, 39, 40, 47, 48, 55, 56, 63]:
check_qos_map(apdev[0], dev[0], dscp, dscp >> 3)

View File

@ -63,6 +63,12 @@ class Wlantest:
raise Exception("wlantest_cli command failed") raise Exception("wlantest_cli command failed")
return int(res) return int(res)
def clear_sta_counters(self, bssid, addr):
res = subprocess.check_output([self.wlantest_cli, "clear_sta_counters",
bssid, addr]);
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
def tdls_clear(self, bssid, addr1, addr2): def tdls_clear(self, bssid, addr1, addr2):
res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters", res = subprocess.check_output([self.wlantest_cli, "clear_tdls_counters",
bssid, addr1, addr2]); bssid, addr1, addr2]);
@ -114,3 +120,25 @@ class Wlantest:
res = self.info_sta("key_mgmt", bssid, addr) res = self.info_sta("key_mgmt", bssid, addr)
if key_mgmt not in res: if key_mgmt not in res:
raise Exception("Unexpected STA key_mgmt") raise Exception("Unexpected STA key_mgmt")
def get_tx_tid(self, bssid, addr, tid):
res = subprocess.check_output([self.wlantest_cli, "get_tx_tid",
bssid, addr, str(tid)]);
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def get_rx_tid(self, bssid, addr, tid):
res = subprocess.check_output([self.wlantest_cli, "get_rx_tid",
bssid, addr, str(tid)]);
if "FAIL" in res:
raise Exception("wlantest_cli command failed")
return int(res)
def get_tid_counters(self, bssid, addr):
tx = {}
rx = {}
for tid in range(0, 17):
tx[tid] = self.get_tx_tid(bssid, addr, tid)
rx[tid] = self.get_rx_tid(bssid, addr, tid)
return [ tx, rx ]