tests: More thorough P2P GO Negotiation Request protocol checks

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2014-03-05 00:26:16 +02:00
parent 11d78bb1ee
commit b4343b9c94

View File

@ -7,6 +7,8 @@
import binascii
import struct
import time
import logging
logger = logging.getLogger()
import hostapd
@ -219,7 +221,54 @@ def parse_p2p_public_action(payload):
p2p = {}
p2p['subtype'] = subtype
p2p['dialog_token'] = dialog_token
p2p['data'] = pos[2:]
pos = pos[2:]
p2p['elements'] = pos
while len(pos) > 2:
(id,elen) = struct.unpack('BB', pos[0:2])
pos = pos[2:]
if elen > len(pos):
raise Exception("Truncated IE in P2P Public Action frame (elen=%d left=%d)" % (elen, len(pos)))
if id == WLAN_EID_VENDOR_SPECIFIC:
if elen < 4:
raise Exception("Too short vendor specific IE in P2P Public Action frame (elen=%d)" % elen)
(oui1,oui2,oui3,subtype) = struct.unpack('BBBB', pos[0:4])
if oui1 == 0x50 and oui2 == 0x6f and oui3 == 0x9a and subtype == 9:
if 'p2p' in p2p:
p2p['p2p'] += pos[4:elen]
else:
p2p['p2p'] = pos[4:elen]
if oui1 == 0x00 and oui2 == 0x50 and oui3 == 0xf2 and subtype == 4:
p2p['wsc'] = pos[4:elen]
pos = pos[elen:]
if len(pos) > 0:
raise Exception("Invalid element in P2P Public Action frame")
if 'p2p' in p2p:
p2p['p2p_attrs'] = {}
pos = p2p['p2p']
while len(pos) >= 3:
(id,alen) = struct.unpack('<BH', pos[0:3])
pos = pos[3:]
if alen > len(pos):
logger.info("P2P payload: " + binascii.hexlify(p2p['p2p']))
raise Exception("Truncated P2P attribute in P2P Public Action frame (alen=%d left=%d p2p-payload=%d)" % (alen, len(pos), len(p2p['p2p'])))
p2p['p2p_attrs'][id] = pos[0:alen]
pos = pos[alen:]
if P2P_ATTR_STATUS in p2p['p2p_attrs']:
p2p['p2p_status'] = struct.unpack('B', p2p['p2p_attrs'][P2P_ATTR_STATUS])[0]
if 'wsc' in p2p:
p2p['wsc_attrs'] = {}
pos = p2p['wsc']
while len(pos) >= 4:
(id,alen) = struct.unpack('>HH', pos[0:4])
pos = pos[4:]
if alen > len(pos):
logger.info("WSC payload: " + binascii.hexlify(p2p['wsc']))
raise Exception("Truncated WSC attribute in P2P Public Action frame (alen=%d left=%d wsc-payload=%d)" % (alen, len(pos), len(p2p['wsc'])))
p2p['wsc_attrs'][id] = pos[0:alen]
pos = pos[alen:]
return p2p
def test_p2p_msg_empty(dev, apdev):
@ -822,6 +871,18 @@ def test_p2p_msg_pd(dev, apdev):
if ev is not None:
raise Exception("Unexpected PD result event")
def check_p2p_response(hapd, dialog_token, status):
resp = hapd.mgmt_rx(timeout=1)
if resp is None:
raise Exception("No GO Neg Response " + str(dialog_token))
p2p = parse_p2p_public_action(resp['payload'])
if p2p is None:
raise Exception("Not a P2P Public Action frame " + str(dialog_token))
if dialog_token != p2p['dialog_token']:
raise Exception("Unexpected dialog token in response")
if p2p['p2p_status'] != status:
raise Esception("Unexpected status code %s in response (expected %d)" % (p2p['p2p_status'], status))
def test_p2p_msg_go_neg_req(dev, apdev):
"""P2P protocol tests for invitation request from unknown peer"""
dst, src, hapd, channel = start_p2p(dev, apdev)
@ -975,8 +1036,8 @@ def test_p2p_msg_go_neg_req(dev, apdev):
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
hapd.mgmt_tx(msg)
if hapd.mgmt_rx(timeout=1) is None:
raise Exception("No GO Neg Response " + str(dialog_token))
check_p2p_response(hapd, dialog_token,
P2P_SC_FAIL_INFO_CURRENTLY_UNAVAILABLE)
ev = dev[0].wait_event(["P2P-GO-NEG-REQUEST"], timeout=1)
if ev is None:
raise Exception("Timeout on GO Neg event " + str(dialog_token))
@ -1016,8 +1077,26 @@ def test_p2p_msg_go_neg_req(dev, apdev):
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
hapd.mgmt_tx(msg)
if hapd.mgmt_rx(timeout=1) is None:
raise Exception("No GO Neg Response " + str(dialog_token))
check_p2p_response(hapd, dialog_token, P2P_SC_FAIL_INVALID_PARAMS)
# ready - invalid Channel List
time.sleep(0.1)
dialog_token += 1
msg = p2p_hdr(dst, src, type=P2P_GO_NEG_REQ, dialog_token=dialog_token)
attrs = p2p_attr_capability()
attrs += p2p_attr_go_intent()
attrs += p2p_attr_config_timeout()
attrs += p2p_attr_listen_channel()
attrs += p2p_attr_ext_listen_timing()
attrs += p2p_attr_intended_interface_addr("02:02:02:02:02:02")
attrs += struct.pack("<BH3BBB11B", P2P_ATTR_CHANNEL_LIST, 16,
0x58, 0x58, 0x04,
81, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
attrs += p2p_attr_device_info(src, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
hapd.mgmt_tx(msg)
check_p2p_response(hapd, dialog_token, P2P_SC_FAIL_NO_COMMON_CHANNELS)
# ready - invalid GO Neg Req (unsupported Device Password ID)
time.sleep(0.1)
@ -1029,10 +1108,14 @@ def test_p2p_msg_go_neg_req(dev, apdev):
attrs += p2p_attr_listen_channel()
attrs += p2p_attr_ext_listen_timing()
attrs += p2p_attr_intended_interface_addr("02:02:02:02:02:02")
attrs += p2p_attr_channel_list()
# very long channel list
attrs += struct.pack("<BH3BBB11B30B", P2P_ATTR_CHANNEL_LIST, 46,
0x58, 0x58, 0x04,
81, 11, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
1, 1, 1, 2, 1, 2, 3, 1, 3, 4, 1, 4, 5, 1, 5,
6, 1, 6, 7, 1, 7, 8, 1, 8, 9, 1, 9, 10, 1, 10)
attrs += p2p_attr_device_info(src, config_methods=0x0108)
attrs += p2p_attr_operating_channel()
msg['payload'] += ie_p2p(attrs)
hapd.mgmt_tx(msg)
if hapd.mgmt_rx(timeout=1) is None:
raise Exception("No GO Neg Response " + str(dialog_token))
check_p2p_response(hapd, dialog_token, P2P_SC_FAIL_INCOMPATIBLE_PROV_METHOD)