tests: GAS comeback protocol testing

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2014-03-01 17:05:52 +02:00
parent c61e5a822c
commit 75f6134dd4

View File

@ -221,12 +221,14 @@ def test_gas_comeback_delay(dev, apdev):
if ev is None:
raise Exception("Operation timed out")
def expect_gas_result(dev, result):
def expect_gas_result(dev, result, status=None):
ev = dev.wait_event(["GAS-QUERY-DONE"], timeout=10)
if ev is None:
raise Exception("GAS query timed out")
if "result=" + result not in ev:
raise Exception("Unexpected GAS query result")
if status and "status_code=" + str(status) + ' ' not in ev:
raise Exception("Unexpected GAS status code")
def anqp_get(dev, bssid, id):
dev.request("ANQP_GET " + bssid + " " + str(id))
@ -263,13 +265,19 @@ GAS_ACTIONS = [ GAS_INITIAL_REQUEST, GAS_INITIAL_RESPONSE,
def anqp_adv_proto():
return struct.pack('BBBB', 108, 2, 127, 0)
def anqp_initial_resp(dialog_token, status_code):
def anqp_initial_resp(dialog_token, status_code, comeback_delay=0):
return struct.pack('<BBBHH', ACTION_CATEG_PUBLIC, GAS_INITIAL_RESPONSE,
dialog_token, status_code, 0) + anqp_adv_proto()
dialog_token, status_code, comeback_delay) + anqp_adv_proto()
def anqp_comeback_resp(dialog_token):
def anqp_comeback_resp(dialog_token, status_code=0, id=0, more=False, comeback_delay=0, bogus_adv_proto=False):
if more:
id |= 0x80
if bogus_adv_proto:
adv = struct.pack('BBBB', 108, 2, 127, 1)
else:
adv = anqp_adv_proto()
return struct.pack('<BBBHBH', ACTION_CATEG_PUBLIC, GAS_COMEBACK_RESPONSE,
dialog_token, 0, 0, 0) + anqp_adv_proto()
dialog_token, status_code, id, comeback_delay) + adv
def gas_rx(hapd):
count = 0
@ -300,7 +308,7 @@ def parse_gas(payload):
gas['action'] = action
pos = pos[3:]
if len(pos) < 1:
if len(pos) < 1 and action != GAS_COMEBACK_REQUEST:
return None
gas['dialog_token'] = dialog_token
@ -314,6 +322,14 @@ def action_response(req):
resp['bssid'] = req['bssid']
return resp
def send_gas_resp(hapd, resp):
hapd.mgmt_tx(resp)
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("Missing TX status for GAS response")
if "ok=1" not in ev:
raise Exception("GAS response not acknowledged")
def test_gas_invalid_response_type(dev, apdev):
"""GAS invalid response type"""
hapd = start_ap(apdev[0])
@ -330,12 +346,7 @@ def test_gas_invalid_response_type(dev, apdev):
resp = action_response(query)
# GAS Comeback Response instead of GAS Initial Response
resp['payload'] = anqp_comeback_resp(gas['dialog_token']) + struct.pack('<H', 0)
hapd.mgmt_tx(resp)
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("Missing TX status for GAS response")
if "ok=1" not in ev:
raise Exception("GAS response not acknowledged")
send_gas_resp(hapd, resp)
# station drops the invalid frame, so this needs to result in GAS timeout
expect_gas_result(dev[0], "TIMEOUT")
@ -355,12 +366,7 @@ def test_gas_failure_status_code(dev, apdev):
resp = action_response(query)
resp['payload'] = anqp_initial_resp(gas['dialog_token'], 61) + struct.pack('<H', 0)
hapd.mgmt_tx(resp)
ev = hapd.wait_event(["MGMT-TX-STATUS"], timeout=5)
if ev is None:
raise Exception("Missing TX status for GAS response")
if "ok=1" not in ev:
raise Exception("GAS response not acknowledged")
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "FAILURE")
@ -423,3 +429,149 @@ def test_gas_malformed(dev, apdev):
# part of the response is not valid. This is reported as successfulyl
# completed GAS exchange.
expect_gas_result(dev[0], "SUCCESS")
def init_gas(hapd, bssid, dev):
anqp_get(dev, bssid, 263)
query = gas_rx(hapd)
gas = parse_gas(query['payload'])
dialog_token = gas['dialog_token']
resp = action_response(query)
resp['payload'] = anqp_initial_resp(dialog_token, 0, comeback_delay=1) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
query = gas_rx(hapd)
gas = parse_gas(query['payload'])
if gas['action'] != GAS_COMEBACK_REQUEST:
raise Exception("Unexpected request action")
if gas['dialog_token'] != dialog_token:
raise Exception("Unexpected dialog token change")
return query, dialog_token
def test_gas_malformed_comeback_resp(dev, apdev):
"""GAS malformed comeback response frames"""
hapd = start_ap(apdev[0])
bssid = apdev[0]['bssid']
dev[0].scan(freq="2412")
hapd.set("ext_mgmt_frame_handling", "1")
logger.debug("Non-zero status code in comeback response")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, status_code=2) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "FAILURE", status=2)
logger.debug("Different advertisement protocol in comeback response")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, bogus_adv_proto=True) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "PEER_ERROR")
logger.debug("Non-zero frag id and comeback delay in comeback response")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, id=1, comeback_delay=1) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "PEER_ERROR")
logger.debug("Unexpected frag id in comeback response")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, id=1) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "PEER_ERROR")
logger.debug("Empty fragment and replay in comeback response")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, more=True) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
query = gas_rx(hapd)
gas = parse_gas(query['payload'])
if gas['action'] != GAS_COMEBACK_REQUEST:
raise Exception("Unexpected request action")
if gas['dialog_token'] != dialog_token:
raise Exception("Unexpected dialog token change")
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
resp['payload'] = anqp_comeback_resp(dialog_token, id=1) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "SUCCESS")
logger.debug("Unexpected initial response when waiting for comeback response")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = anqp_initial_resp(dialog_token, 0) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
ev = hapd.wait_event(["MGMT-RX"], timeout=1)
if ev is not None:
raise Exception("Unexpected management frame")
expect_gas_result(dev[0], "TIMEOUT")
logger.debug("Too short comeback response")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = struct.pack('<BBBH', ACTION_CATEG_PUBLIC,
GAS_COMEBACK_RESPONSE, dialog_token, 0)
send_gas_resp(hapd, resp)
ev = hapd.wait_event(["MGMT-RX"], timeout=1)
if ev is not None:
raise Exception("Unexpected management frame")
expect_gas_result(dev[0], "TIMEOUT")
logger.debug("Too short comeback response(2)")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = struct.pack('<BBBHBB', ACTION_CATEG_PUBLIC,
GAS_COMEBACK_RESPONSE, dialog_token, 0, 0x80,
0)
send_gas_resp(hapd, resp)
ev = hapd.wait_event(["MGMT-RX"], timeout=1)
if ev is not None:
raise Exception("Unexpected management frame")
expect_gas_result(dev[0], "TIMEOUT")
logger.debug("Maximum comeback response fragment claiming more fragments")
query, dialog_token = init_gas(hapd, bssid, dev[0])
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, more=True) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
for i in range(1, 129):
query = gas_rx(hapd)
gas = parse_gas(query['payload'])
if gas['action'] != GAS_COMEBACK_REQUEST:
raise Exception("Unexpected request action")
if gas['dialog_token'] != dialog_token:
raise Exception("Unexpected dialog token change")
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, id=i, more=True) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "PEER_ERROR")
def test_gas_comeback_resp_additional_delay(dev, apdev):
"""GAS comeback response requesting additional delay"""
hapd = start_ap(apdev[0])
bssid = apdev[0]['bssid']
dev[0].scan(freq="2412")
hapd.set("ext_mgmt_frame_handling", "1")
query, dialog_token = init_gas(hapd, bssid, dev[0])
for i in range(0, 2):
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, status_code=95, comeback_delay=50) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
query = gas_rx(hapd)
gas = parse_gas(query['payload'])
if gas['action'] != GAS_COMEBACK_REQUEST:
raise Exception("Unexpected request action")
if gas['dialog_token'] != dialog_token:
raise Exception("Unexpected dialog token change")
resp = action_response(query)
resp['payload'] = anqp_comeback_resp(dialog_token, status_code=0) + struct.pack('<H', 0)
send_gas_resp(hapd, resp)
expect_gas_result(dev[0], "SUCCESS")