diff --git a/tests/hwsim/test_ap_hs20.py b/tests/hwsim/test_ap_hs20.py index 23445e730..598f7f8cd 100644 --- a/tests/hwsim/test_ap_hs20.py +++ b/tests/hwsim/test_ap_hs20.py @@ -4,12 +4,15 @@ # This software may be distributed under the terms of the BSD license. # See README for more details. +import binascii +import struct import time import subprocess import logging logger = logging.getLogger() import os import os.path +import socket import subprocess import hostapd @@ -2135,12 +2138,6 @@ def _test_ap_hs20_proxyarp(dev, apdev): dev[0].hs20_enable() subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) - subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:cccc::1/64', - 'dev', dev[0].ifname]) - subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:dddd::1/64', - 'dev', dev[1].ifname]) - subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:eeee::1/64', - 'dev', dev[1].ifname]) id = dev[0].add_cred_values({ 'realm': "example.com", 'username': "hs20-test", @@ -2157,12 +2154,40 @@ def _test_ap_hs20_proxyarp(dev, apdev): scan_freq="2412") time.sleep(0.1) - subprocess.call(['ping6', 'aaaa:bbbb:cccc::2', '-c', '1', '-w', '1'], - stdout=open('/dev/null', 'w')) + addr0 = dev[0].p2p_interface_addr() + addr1 = dev[1].p2p_interface_addr() + + src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':','')) + src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':','')) + + pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2", + opt=src_ll_opt0) + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + + pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2", + opt=src_ll_opt1) + if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + + pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2", + opt=src_ll_opt1) + if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + matches = get_permanent_neighbors("ap-br0") logger.info("After connect: " + str(matches)) - if len(matches) < 1: - raise Exception("No neighbor entries after connect") + if len(matches) != 3: + raise Exception("Unexpected number of neighbor entries after connect") + if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches: + raise Exception("dev0 addr missing") + if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: + raise Exception("dev1 addr(1) missing") + if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: + raise Exception("dev1 addr(2) missing") dev[0].request("DISCONNECT") dev[1].request("DISCONNECT") time.sleep(0.5) @@ -2181,15 +2206,49 @@ def test_ap_hs20_proxyarp(dev, apdev): stderr=open('/dev/null', 'w')) subprocess.call(['brctl', 'delbr', 'ap-br0'], stderr=open('/dev/null', 'w')) - subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:cccc::1/64', - 'dev', dev[0].ifname]) - subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:dddd::1/64', - 'dev', dev[1].ifname]) - subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:eeee::1/64', - 'dev', dev[1].ifname]) return res +def ip_checksum(buf): + sum = 0 + if len(buf) & 0x01: + buf += '\0x00' + for i in range(0, len(buf), 2): + val, = struct.unpack('H', buf[i:i+2]) + sum += val + while (sum >> 16): + sum = (sum & 0xffff) + (sum >> 16) + return struct.pack('H', ~sum & 0xffff) + +def build_icmpv6(ipv6_addrs, type, code, payload): + start = struct.pack("BB", type, code) + end = payload + icmp = start + '\x00\x00' + end + pseudo = ipv6_addrs + struct.pack(">LBBBB", len(icmp), 0, 0, 0, 58) + csum = ip_checksum(pseudo + icmp) + return start + csum + end + +def build_ns(src_ll, ip_src, ip_dst, target, opt=None): + link_mc = binascii.unhexlify("3333ff000002") + _src_ll = binascii.unhexlify(src_ll.replace(':','')) + proto = '\x86\xdd' + ehdr = link_mc + _src_ll + proto + _ip_src = socket.inet_pton(socket.AF_INET6, ip_src) + _ip_dst = socket.inet_pton(socket.AF_INET6, ip_dst) + + reserved = '\x00\x00\x00\x00' + _target = socket.inet_pton(socket.AF_INET6, target) + if opt: + payload = reserved + _target + opt + else: + payload = reserved + _target + icmp = build_icmpv6(_ip_src + _ip_dst, 135, 0, payload) + + ipv6 = struct.pack('>BBBBHBB', 0x60, 0, 0, 0, len(icmp), 58, 255) + ipv6 += _ip_src + _ip_dst + + return ehdr + ipv6 + icmp + def get_permanent_neighbors(ifname): cmd = subprocess.Popen(['ip', 'nei'], stdout=subprocess.PIPE) res = cmd.stdout.read() @@ -2218,27 +2277,78 @@ def _test_proxyarp_open(dev, apdev): subprocess.call(['brctl', 'setfd', 'ap-br0', '0']) subprocess.call(['ip', 'link', 'set', 'dev', 'ap-br0', 'up']) - subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:cccc::1/64', - 'dev', dev[0].ifname]) - subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:dddd::1/64', - 'dev', dev[1].ifname]) - subprocess.call(['ip', 'addr', 'add', 'aaaa:bbbb:eeee::1/64', - 'dev', dev[1].ifname]) dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[1].connect("open", key_mgmt="NONE", scan_freq="2412") time.sleep(0.1) - subprocess.call(['ping6', 'aaaa:bbbb:cccc::2', '-c', '1', '-w', '1'], - stdout=open('/dev/null', 'w')) - subprocess.call(['ping6', 'aaaa:bbbb:dddd::2', '-c', '1', '-w', '1'], - stdout=open('/dev/null', 'w')) - subprocess.call(['ping6', 'aaaa:bbbb:eeee::2', '-c', '1', '-w', '1'], - stdout=open('/dev/null', 'w')) + addr0 = dev[0].p2p_interface_addr() + addr1 = dev[1].p2p_interface_addr() + + src_ll_opt0 = "\x01\x01" + binascii.unhexlify(addr0.replace(':','')) + src_ll_opt1 = "\x01\x01" + binascii.unhexlify(addr1.replace(':','')) + + # DAD NS + pkt = build_ns(src_ll=addr0, ip_src="::", ip_dst="ff02::1:ff00:2", + target="aaaa:bbbb:cccc::2", opt=src_ll_opt0) + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + + pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2", + opt=src_ll_opt0) + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + # test frame without source link-layer address option + pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2") + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + # test frame with bogus option + pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2", + opt="\x70\x01\x01\x02\x03\x04\x05\x05") + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + # test frame with truncated source link-layer address option + pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2", + opt="\x01\x01\x01\x02\x03\x04") + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + # test frame with foreign source link-layer address option + pkt = build_ns(src_ll=addr0, ip_src="aaaa:bbbb:cccc::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:cccc::2", + opt="\x01\x01\x01\x02\x03\x04\x05\x06") + if "OK" not in dev[0].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + + pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:dddd::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:dddd::2", + opt=src_ll_opt1) + if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + + pkt = build_ns(src_ll=addr1, ip_src="aaaa:bbbb:eeee::2", + ip_dst="ff02::1:ff00:2", target="aaaa:bbbb:eeee::2", + opt=src_ll_opt1) + if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + # another copy for additional code coverage + if "OK" not in dev[1].request("DATA_TEST_FRAME " + binascii.hexlify(pkt)): + raise Exception("DATA_TEST_FRAME failed") + matches = get_permanent_neighbors("ap-br0") logger.info("After connect: " + str(matches)) if len(matches) != 3: raise Exception("Unexpected number of neighbor entries after connect") + if 'aaaa:bbbb:cccc::2 dev ap-br0 lladdr 02:00:00:00:00:00 PERMANENT' not in matches: + raise Exception("dev0 addr missing") + if 'aaaa:bbbb:dddd::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: + raise Exception("dev1 addr(1) missing") + if 'aaaa:bbbb:eeee::2 dev ap-br0 lladdr 02:00:00:00:01:00 PERMANENT' not in matches: + raise Exception("dev1 addr(2) missing") + dev[0].request("DISCONNECT") dev[1].request("DISCONNECT") time.sleep(0.5) @@ -2257,11 +2367,5 @@ def test_proxyarp_open(dev, apdev): stderr=open('/dev/null', 'w')) subprocess.call(['brctl', 'delbr', 'ap-br0'], stderr=open('/dev/null', 'w')) - subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:cccc::1/64', - 'dev', dev[0].ifname]) - subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:dddd::1/64', - 'dev', dev[1].ifname]) - subprocess.call(['ip', 'addr', 'del', 'aaaa:bbbb:eeee::1/64', - 'dev', dev[1].ifname]) return res