From f7b1a750360105c6fcf083f0cf2a57b2d94da356 Mon Sep 17 00:00:00 2001 From: Jouni Malinen Date: Sat, 9 Mar 2013 15:52:57 +0200 Subject: [PATCH] tests: Validate GO Negotiation status code Signed-hostap: Jouni Malinen --- tests/hwsim/test_p2p_grpform.py | 9 +++++++-- tests/hwsim/wpasupplicant.py | 36 ++++++++++++++++++++++----------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/tests/hwsim/test_p2p_grpform.py b/tests/hwsim/test_p2p_grpform.py index 2f830aa78..7e56ed4ed 100644 --- a/tests/hwsim/test_p2p_grpform.py +++ b/tests/hwsim/test_p2p_grpform.py @@ -11,7 +11,7 @@ logger = logging.getLogger(__name__) import hwsim_utils -def go_neg_pin_authorized(i_dev, r_dev, i_intent=None, r_intent=None, expect_failure=False): +def go_neg_pin_authorized(i_dev, r_dev, i_intent=None, r_intent=None, expect_failure=False, i_go_neg_status=None): r_dev.p2p_listen() i_dev.p2p_listen() pin = r_dev.wps_read_pin() @@ -23,6 +23,11 @@ def go_neg_pin_authorized(i_dev, r_dev, i_intent=None, r_intent=None, expect_fai logger.debug("r_res: " + str(r_res)) r_dev.dump_monitor() i_dev.dump_monitor() + if i_go_neg_status: + if i_res['result'] != 'go-neg-failed': + raise Exception("Expected GO Negotiation failure not reported") + if i_res['status'] != i_go_neg_status: + raise Exception("Expected GO Negotiation status not seen") if expect_failure: return logger.info("Group formed") @@ -45,7 +50,7 @@ def test_grpform2(dev): pass def test_both_go_intent_15(dev): - go_neg_pin_authorized(i_dev=dev[0], i_intent=15, r_dev=dev[1], r_intent=15, expect_failure=True) + go_neg_pin_authorized(i_dev=dev[0], i_intent=15, r_dev=dev[1], r_intent=15, expect_failure=True, i_go_neg_status=9) def add_tests(tests): tests.append(test_grpform) diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py index 4d9e1ad8d..a1e2e6bbc 100644 --- a/tests/hwsim/wpasupplicant.py +++ b/tests/hwsim/wpasupplicant.py @@ -82,7 +82,22 @@ class WpaSupplicant: return True return False - def group_form_result(self, ev): + def group_form_result(self, ev, expect_failure=False): + if expect_failure: + if "P2P-GROUP-STARTED" in ev: + raise Exception("Group formation succeeded when expecting failure") + exp = r'<.>(P2P-GO-NEG-FAILURE) status=([0-9]*)' + s = re.split(exp, ev) + if len(s) < 3: + return None + res = {} + res['result'] = 'go-neg-failed' + res['status'] = int(s[2]) + return res + + if "P2P-GROUP-STARTED" not in ev: + raise Exception("No P2P-GROUP-STARTED event seen") + exp = r'<.>(P2P-GROUP-STARTED) ([^ ]*) ([^ ]*) ssid="(.*)" freq=([0-9]*) ((?:psk=.*)|(?:passphrase=".*")) go_dev_addr=([0-9a-f:]*)' s = re.split(exp, ev) if len(s) < 8: @@ -114,15 +129,13 @@ class WpaSupplicant: raise Exception("P2P_CONNECT (auth) failed") def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False): - ev = self.wait_event("P2P-GROUP-STARTED", timeout); + ev = self.wait_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout); if ev is None: if expect_failure: return None raise Exception("Group formation timed out") self.dump_monitor() - if expect_failure: - raise Exception("Group formation succeeded when expecting failure") - return self.group_form_result(ev) + return self.group_form_result(ev, expect_failure) def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False): if not self.discover_peer(peer): @@ -135,18 +148,16 @@ class WpaSupplicant: if timeout == 0: self.dump_monitor() return None - ev = self.wait_event("P2P-GROUP-STARTED", timeout) + ev = self.wait_event(["P2P-GROUP-STARTED","P2P-GO-NEG-FAILURE"], timeout) if ev is None: if expect_failure: return None raise Exception("Group formation timed out") self.dump_monitor() - if expect_failure: - raise Exception("Group formation succeeded when expecting failure") - return self.group_form_result(ev) + return self.group_form_result(ev, expect_failure) raise Exception("P2P_CONNECT failed") - def wait_event(self, event, timeout): + def wait_event(self, events, timeout): count = 0 while count < timeout * 2: count = count + 1 @@ -154,8 +165,9 @@ class WpaSupplicant: while self.mon.pending(): ev = self.mon.recv() logger.debug(self.ifname + ": " + ev) - if event in ev: - return ev + for event in events: + if event in ev: + return ev return None def dump_monitor(self):