diff --git a/tests/hwsim/test_p2p_service.py b/tests/hwsim/test_p2p_service.py index 1eec0e04c..9d165043c 100644 --- a/tests/hwsim/test_p2p_service.py +++ b/tests/hwsim/test_p2p_service.py @@ -6,6 +6,7 @@ import logging logger = logging.getLogger() +import time import uuid import hwsim_utils @@ -441,3 +442,71 @@ def test_p2p_service_discovery_cancel_during_query(dev): dev[2].p2p_stop_find() dev[1].p2p_stop_find() dev[0].p2p_stop_find() + +def get_p2p_state(dev): + res = dev.global_request("STATUS") + p2p_state = None + for line in res.splitlines(): + if line.startswith("p2p_state="): + p2p_state = line.split('=')[1] + break + if p2p_state is None: + raise Exception("Could not get p2p_state") + return p2p_state + +def test_p2p_service_discovery_peer_not_listening(dev): + """P2P service discovery and peer not listening""" + addr0 = dev[0].p2p_dev_addr() + addr1 = dev[1].p2p_dev_addr() + add_bonjour_services(dev[0]) + add_upnp_services(dev[0]) + dev[0].p2p_listen() + dev[1].global_request("P2P_FIND 1 type=social") + ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=4) + if ev is None: + raise Exception("Peer not found") + dev[0].p2p_stop_find() + ev = dev[1].wait_event(["CTRL-EVENT-SCAN-STARTED"], timeout=1) + ev = dev[1].wait_event(["CTRL-EVENT-SCAN-STARTED"], timeout=1) + time.sleep(0.03) + dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001") + ev = dev[0].wait_global_event(["P2P-SERV-DISC-REQ"], timeout=1) + if ev is not None: + raise Exception("Service discovery request unexpectedly received") + ev = dev[1].wait_global_event(["P2P-FIND-STOPPED", "P2P-SERV-DISC-RESP"], + timeout=10) + if ev is None: + raise Exception("P2P-FIND-STOPPED event timed out") + if "P2P-SERV-DISC-RESP" in ev: + raise Exception("Unexpected SD response") + p2p_state = get_p2p_state(dev[1]) + if p2p_state != "IDLE": + raise Exception("Unexpected p2p_state after P2P_FIND timeout: " + p2p_state) + +def test_p2p_service_discovery_peer_not_listening2(dev): + """P2P service discovery and peer not listening""" + addr0 = dev[0].p2p_dev_addr() + addr1 = dev[1].p2p_dev_addr() + add_bonjour_services(dev[0]) + add_upnp_services(dev[0]) + dev[0].p2p_listen() + dev[1].global_request("P2P_FIND type=social") + ev = dev[1].wait_global_event(["P2P-DEVICE-FOUND"], timeout=10) + if ev is None: + raise Exception("Peer not found") + dev[0].p2p_stop_find() + time.sleep(0.53) + dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001") + ev = dev[0].wait_global_event(["P2P-SERV-DISC-REQ"], timeout=0.5) + if ev is not None: + raise Exception("Service discovery request unexpectedly received") + dev[1].p2p_stop_find() + ev = dev[1].wait_global_event(["P2P-FIND-STOPPED", "P2P-SERV-DISC-RESP"], + timeout=10) + if ev is None: + raise Exception("P2P-FIND-STOPPED event timed out") + if "P2P-SERV-DISC-RESP" in ev: + raise Exception("Unexpected SD response") + p2p_state = get_p2p_state(dev[1]) + if p2p_state != "IDLE": + raise Exception("Unexpected p2p_state after P2P_FIND timeout: " + p2p_state)