diff --git a/tests/hwsim/test_p2p_grpform.py b/tests/hwsim/test_p2p_grpform.py index 65859a5cc..2f830aa78 100644 --- a/tests/hwsim/test_p2p_grpform.py +++ b/tests/hwsim/test_p2p_grpform.py @@ -11,18 +11,20 @@ logger = logging.getLogger(__name__) import hwsim_utils -def go_neg_pin_authorized(i_dev, r_dev, i_intent=7, r_intent=7): +def go_neg_pin_authorized(i_dev, r_dev, i_intent=None, r_intent=None, expect_failure=False): r_dev.p2p_listen() i_dev.p2p_listen() pin = r_dev.wps_read_pin() logger.info("Start GO negotiation " + i_dev.ifname + " -> " + r_dev.ifname) - r_dev.p2p_go_neg_auth(i_dev.p2p_dev_addr(), pin, "display") - i_res = i_dev.p2p_go_neg_init(r_dev.p2p_dev_addr(), pin, "enter", timeout=15) - r_res = r_dev.p2p_go_neg_auth_result() + r_dev.p2p_go_neg_auth(i_dev.p2p_dev_addr(), pin, "display", go_intent=r_intent) + i_res = i_dev.p2p_go_neg_init(r_dev.p2p_dev_addr(), pin, "enter", timeout=15, go_intent=i_intent, expect_failure=expect_failure) + r_res = r_dev.p2p_go_neg_auth_result(expect_failure=expect_failure) logger.debug("i_res: " + str(i_res)) logger.debug("r_res: " + str(r_res)) r_dev.dump_monitor() i_dev.dump_monitor() + if expect_failure: + return logger.info("Group formed") hwsim_utils.test_connectivity_p2p(r_dev, i_dev) @@ -42,6 +44,10 @@ def test_grpform2(dev): except: pass +def test_both_go_intent_15(dev): + go_neg_pin_authorized(i_dev=dev[0], i_intent=15, r_dev=dev[1], r_intent=15, expect_failure=True) + def add_tests(tests): tests.append(test_grpform) tests.append(test_grpform2) + tests.append(test_both_go_intent_15) diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py index 51f9907b6..4d9e1ad8d 100644 --- a/tests/hwsim/wpasupplicant.py +++ b/tests/hwsim/wpasupplicant.py @@ -102,35 +102,47 @@ class WpaSupplicant: res['go_dev_addr'] = s[7] return res - def p2p_go_neg_auth(self, peer, pin, method): + def p2p_go_neg_auth(self, peer, pin, method, go_intent=None): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + " auth" + if go_intent: + cmd = cmd + ' go_intent=' + str(go_intent) if "OK" in self.request(cmd): return None raise Exception("P2P_CONNECT (auth) failed") - def p2p_go_neg_auth_result(self, timeout=1): + def p2p_go_neg_auth_result(self, timeout=1, expect_failure=False): ev = self.wait_event("P2P-GROUP-STARTED", timeout); if ev is None: + if expect_failure: + return None raise Exception("Group formation timed out") self.dump_monitor() + if expect_failure: + raise Exception("Group formation succeeded when expecting failure") return self.group_form_result(ev) - def p2p_go_neg_init(self, peer, pin, method, timeout=0): + def p2p_go_neg_init(self, peer, pin, method, timeout=0, go_intent=None, expect_failure=False): if not self.discover_peer(peer): raise Exception("Peer " + peer + " not found") self.dump_monitor() cmd = "P2P_CONNECT " + peer + " " + pin + " " + method + if go_intent: + cmd = cmd + ' go_intent=' + str(go_intent) if "OK" in self.request(cmd): if timeout == 0: self.dump_monitor() return None ev = self.wait_event("P2P-GROUP-STARTED", timeout) if ev is None: + if expect_failure: + return None raise Exception("Group formation timed out") self.dump_monitor() + if expect_failure: + raise Exception("Group formation succeeded when expecting failure") return self.group_form_result(ev) raise Exception("P2P_CONNECT failed") @@ -141,6 +153,7 @@ class WpaSupplicant: time.sleep(0.1) while self.mon.pending(): ev = self.mon.recv() + logger.debug(self.ifname + ": " + ev) if event in ev: return ev return None