mirror of
https://github.com/vanhoefm/fragattacks.git
synced 2025-01-17 18:34:03 -05:00
tests: Re-factor PD and connection flows in P2PS tests
Reuse p2ps_provision() and p2ps_connect_pd() methods, and remove the previous PD helper functions which are no longer used. This fixes the previously "broken" p2ps_connect_keypad_method_nonautoaccept and p2ps_connect_display_method_nonautoaccept. Signed-off-by: Andrei Otcheretianski <andrei.otcheretianski@intel.com> Reviewed-by: Max Stepanov <Max.Stepanov@intel.com> Reviewed-by: Ilan Peer <ilan.peer@intel.com>
This commit is contained in:
parent
2f0f69a9ec
commit
3c72383e03
@ -227,154 +227,6 @@ def p2ps_provision(seeker, advertiser, adv_id, auto_accept=True, method="1000",
|
||||
|
||||
return ev1, ev2
|
||||
|
||||
def p2p_connect_p2ps_method(i_dev, r_dev, autoaccept):
|
||||
"""P2PS connect function with p2ps method"""
|
||||
if autoaccept == False:
|
||||
if "OK" not in i_dev.global_request("P2P_CONNECT " + r_dev.p2p_dev_addr() + " 12345670 p2ps persistent auth"):
|
||||
raise Exception("P2P_CONNECT fails on seeker side")
|
||||
ev0 = r_dev.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on Advertiser side")
|
||||
|
||||
if "OK" not in r_dev.global_request("P2P_CONNECT " + i_dev.p2p_dev_addr() + " 12345670 p2ps persistent"):
|
||||
raise Exception("P2P_CONNECT fails on Advertiser side")
|
||||
|
||||
else:
|
||||
if "OK" not in r_dev.global_request("P2P_CONNECT " + i_dev.p2p_dev_addr() + " 12345670 p2ps persistent auth"):
|
||||
raise Exception("P2P_CONNECT fails on Advertiser side")
|
||||
ev1 = i_dev.wait_global_event(["P2PS-PROV-DONE"], timeout=5)
|
||||
if ev1 is None:
|
||||
raise Exception("Failed to receive deferred acceptance at seeker")
|
||||
|
||||
if "OK" not in i_dev.global_request("P2P_CONNECT " + r_dev.p2p_dev_addr() + " 12345670 p2ps persistent"):
|
||||
raise Exception("P2P_CONNECT fails on seeker side")
|
||||
ev0 = r_dev.wait_global_event(["P2P-GO-NEG-SUCCESS"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("GO Neg did not succeed")
|
||||
ev0 = r_dev.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
|
||||
if ev0 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on advertiser side")
|
||||
r_dev.group_form_result(ev0)
|
||||
|
||||
ev1 = i_dev.wait_global_event(["P2P-GROUP-STARTED"], timeout=5)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on seeker side")
|
||||
i_dev.group_form_result(ev1)
|
||||
|
||||
def p2ps_provision_keypad_method(i_dev, r_dev, autoaccept,
|
||||
initiator_or_responder):
|
||||
"""P2PS keypad method provisioning function"""
|
||||
if autoaccept == False and initiator_or_responder == 'initiator':
|
||||
ev = i_dev.wait_global_event(["P2P-PROV-DISC-FAILURE"], timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("Provisioning deferred on seeker side")
|
||||
ev1 = i_dev.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on seeker side")
|
||||
if r_dev.p2p_dev_addr() not in ev1:
|
||||
raise Exception("Unknown peer ")
|
||||
ev = r_dev.wait_global_event(["P2PS-PROV-START"], timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2PS-PROV-START timeout on Advertiser side")
|
||||
|
||||
if autoaccept == False and initiator_or_responder == 'responder':
|
||||
ev0 = r_dev.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on seeker side")
|
||||
ev0 = r_dev.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=5)
|
||||
if ev0 is None:
|
||||
raise Exception("P2P-PROV-DISC-ENTER-PIN timeout on advertiser side")
|
||||
|
||||
if autoaccept == True and initiator_or_responder == 'initiator':
|
||||
ev1 = i_dev.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on seeker side")
|
||||
ev2 = i_dev.wait_global_event(["P2P-PROV-DISC-ENTER-PIN"], timeout=10)
|
||||
if ev2 is None:
|
||||
raise Exception("P2P-PROV-DISC-ENTER-PIN failed on seeker side")
|
||||
if r_dev.p2p_dev_addr() not in ev2:
|
||||
raise Exception("Unknown peer ")
|
||||
return ev1
|
||||
|
||||
def p2ps_provision_display_method(i_dev, r_dev, autoaccept,
|
||||
initiator_or_responder):
|
||||
"""P2PS display method provisioning function"""
|
||||
if initiator_or_responder == 'initiator':
|
||||
ev0 = r_dev.wait_global_event(["P2PS-PROV-START"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-START timeout on Advertiser side")
|
||||
if autoaccept == False:
|
||||
ev = i_dev.wait_global_event(["P2P-PROV-DISC-FAILURE"], timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("Provisioning deferred on seeker side")
|
||||
ev1 = i_dev.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-PROV-DISC-SHOW-PIN timeout on Seeker side")
|
||||
if r_dev.p2p_dev_addr() not in ev1:
|
||||
raise Exception("Unknown peer ")
|
||||
pin = ev1.split(" ")[2]
|
||||
else:
|
||||
ev0 = r_dev.wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on advertiser")
|
||||
ev0 = r_dev.wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=5)
|
||||
if ev0 is None:
|
||||
raise Exception("PIN Display on advertiser side")
|
||||
pin = ev0.split(" ")[2]
|
||||
return pin
|
||||
|
||||
def p2ps_connect_pin(pin, i_dev, r_dev, initiator_method):
|
||||
"""P2PS function to perform connection using PIN method"""
|
||||
if initiator_method=="display":
|
||||
if "OK" not in i_dev.global_request("P2P_CONNECT " + r_dev.p2p_dev_addr() + " " + pin + " display persistent "):
|
||||
raise Exception("P2P_CONNECT fails on seeker side")
|
||||
ev0 = r_dev.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("Failed to receive P2P_GO-NEG-REQUEST on responder side")
|
||||
if "OK" not in r_dev.global_request("P2P_CONNECT " + i_dev.p2p_dev_addr() + " " + pin + " keypad persistent "):
|
||||
raise Exception("P2P_CONNECT fails on Advertiser side")
|
||||
else:
|
||||
if "OK" not in i_dev.global_request("P2P_CONNECT " + r_dev.p2p_dev_addr() + " " + pin + " keypad persistent "):
|
||||
raise Exception("P2P_CONNECT fails on seeker side")
|
||||
ev0 = r_dev.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("Failed to receive P2P_GO-NEG-REQUEST on responder side")
|
||||
if "OK" not in r_dev.global_request("P2P_CONNECT " + i_dev.p2p_dev_addr() + " " + pin + " display persistent "):
|
||||
raise Exception("P2P_CONNECT fails on Advertiser side")
|
||||
|
||||
ev0 = r_dev.wait_global_event(["P2P-GO-NEG-SUCCESS"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("GO Neg did not succeed on advertiser side")
|
||||
peer_mac = ev0.split("peer_dev=")[1].split(" ")[0]
|
||||
|
||||
ev1 = i_dev.wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on seeker side")
|
||||
i_dev.group_form_result(ev1)
|
||||
|
||||
ev_grpfrm = r_dev.wait_global_event(["P2P-GROUP-FORMATION-SUCCESS"],
|
||||
timeout=10)
|
||||
if ev_grpfrm is None:
|
||||
raise Exception("Group Formation failed on advertiser side")
|
||||
|
||||
ev = r_dev.wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on advertiser side")
|
||||
res = r_dev.group_form_result(ev)
|
||||
|
||||
if res['role'] == "GO":
|
||||
ev_grpfrm = r_dev.wait_global_event(["AP-STA-CONNECTED"], timeout=10)
|
||||
if ev_grpfrm is None:
|
||||
raise Exception("AP-STA-CONNECTED timeout on advertiser side")
|
||||
if i_dev.p2p_dev_addr() not in ev_grpfrm:
|
||||
raise Exception("Group formed with unknown Peer")
|
||||
else:
|
||||
ev1 = i_dev.wait_global_event(["AP-STA-CONNECTED"], timeout=5)
|
||||
if ev1 is None:
|
||||
raise Exception("AP-STA-CONNECTED timeout on Seeker side")
|
||||
if r_dev.p2p_dev_addr() not in ev1:
|
||||
raise Exception("Group formed with unknown Peer")
|
||||
|
||||
def p2ps_connect_pd(dev0, dev1, ev0, ev1, pin=None):
|
||||
conf_methods_map = {"8": "p2ps", "1": "display", "5": "keypad"}
|
||||
peer0 = ev0.split()[1]
|
||||
@ -565,30 +417,15 @@ def test_p2ps_nonexact_search_srvinfo(dev):
|
||||
|
||||
def test_p2ps_connect_p2ps_method_nonautoaccept(dev):
|
||||
"""P2PS connect for non-auto-accept and P2PS config method"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='0', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
ev_list = p2ps_nonexact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send*',
|
||||
srv_info='2 GB')
|
||||
adv_id = ev_list[0].split(" ")[0]
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=1000"):
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
ev0 = dev[0].wait_global_event(["P2PS-PROV-START"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-START timeout on advertiser side")
|
||||
ev1 = dev[1].wait_global_event(["P2P-PROV-DISC-FAILURE"], timeout=15)
|
||||
if ev1 is None:
|
||||
raise Exception("Provisioning deferred timeout on seeker side")
|
||||
dev[1].p2p_listen()
|
||||
if "OK" not in dev[0].global_request("P2P_ASP_PROVISION_RESP " + addr1 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " status=12"):
|
||||
raise Exception("Failed to send deferred acceptance from advertizer")
|
||||
ev1 = dev[1].wait_global_event(["P2PS-PROV-DONE"], timeout=15)
|
||||
if ev1 is None:
|
||||
raise Exception("Failed to receive deferred acceptance at seeker")
|
||||
adv_id = ev_list[0].split()[0]
|
||||
ev1, ev0 = p2ps_provision(dev[1], dev[0], adv_id, auto_accept=False)
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1)
|
||||
|
||||
p2p_connect_p2ps_method(i_dev=dev[1], r_dev=dev[0], autoaccept=False)
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
@ -596,21 +433,14 @@ def test_p2ps_connect_p2ps_method_nonautoaccept(dev):
|
||||
|
||||
def test_p2ps_connect_p2ps_method_autoaccept(dev):
|
||||
"""P2PS connection with P2PS default config method and auto-accept"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='1', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=1000"):
|
||||
raise Exception("P2P_ASP_PROVISION failed on seeker side")
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on advertiser side")
|
||||
|
||||
p2p_connect_p2ps_method(i_dev=dev[1], r_dev=dev[0], autoaccept=True)
|
||||
ev1, ev0 = p2ps_provision(dev[1], dev[0], adv_id)
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1)
|
||||
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
@ -619,28 +449,16 @@ def test_p2ps_connect_p2ps_method_autoaccept(dev):
|
||||
|
||||
def test_p2ps_connect_keypad_method_nonautoaccept(dev):
|
||||
"""P2PS Connection with non-auto-accept and seeker having keypad method"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='0', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
ev_list = p2ps_nonexact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send*',
|
||||
srv_info='2 GB')
|
||||
adv_id = ev_list[0].split(" ")[0]
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=8"): # keypad method on seeker side
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
p2ps_provision_keypad_method(i_dev=dev[1], r_dev=dev[0], autoaccept=False,
|
||||
initiator_or_responder='initiator')
|
||||
dev[1].p2p_listen()
|
||||
adv_id = ev_list[0].split()[0]
|
||||
|
||||
if "OK" not in dev[0].global_request("P2P_ASP_PROVISION_RESP " + addr1 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " status=12"):
|
||||
raise Exception("Failed to send deferred acceptance from advertizer")
|
||||
ev1, ev0, pin = p2ps_provision(dev[1], dev[0], adv_id, auto_accept=False, method="8")
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1, pin)
|
||||
|
||||
pin = p2ps_provision_display_method(i_dev=dev[1], r_dev=dev[0],
|
||||
autoaccept=False,
|
||||
initiator_or_responder='responder')
|
||||
p2ps_connect_pin(pin, i_dev=dev[0], r_dev=dev[1],
|
||||
initiator_method="display")
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
@ -648,25 +466,14 @@ def test_p2ps_connect_keypad_method_nonautoaccept(dev):
|
||||
|
||||
def test_p2ps_connect_display_method_nonautoaccept(dev):
|
||||
"""P2PS connection with non-auto-accept and seeker having display method"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='0', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
ev_list = p2ps_nonexact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds*', srv_info='2 GB')
|
||||
adv_id = ev_list[0].split(" ")[0]
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=100"): # keypad method on seeker side
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
pin = p2ps_provision_display_method(i_dev=dev[1], r_dev=dev[0],
|
||||
autoaccept=False,
|
||||
initiator_or_responder='initiator')
|
||||
dev[1].p2p_listen()
|
||||
adv_id = ev_list[0].split()[0]
|
||||
|
||||
if "OK" not in dev[0].global_request("P2P_ASP_PROVISION_RESP " + addr1 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " status=12"):
|
||||
raise Exception("Failed to send deferred acceptance from advertiser")
|
||||
p2ps_provision_keypad_method(i_dev=dev[1], r_dev=dev[0], autoaccept=False,
|
||||
initiator_or_responder='responder')
|
||||
p2ps_connect_pin(pin, i_dev=dev[0], r_dev=dev[1], initiator_method="keypad")
|
||||
ev1, ev0, pin = p2ps_provision(dev[1], dev[0], adv_id, auto_accept=False, method="100")
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1, pin)
|
||||
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
@ -675,22 +482,15 @@ def test_p2ps_connect_display_method_nonautoaccept(dev):
|
||||
|
||||
def test_p2ps_connect_keypad_method_autoaccept(dev):
|
||||
"""P2PS connection with auto-accept and keypad method on seeker side"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='1', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=8"): # keypad method on seeker side
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
|
||||
p2ps_provision_keypad_method(i_dev=dev[1], r_dev=dev[0], autoaccept=True,
|
||||
initiator_or_responder='initiator')
|
||||
pin = p2ps_provision_display_method(i_dev=dev[1], r_dev=dev[0],
|
||||
autoaccept=True,
|
||||
initiator_or_responder='responder')
|
||||
p2ps_connect_pin(pin, i_dev=dev[1], r_dev=dev[0], initiator_method="Keypad")
|
||||
ev1, ev0, pin = p2ps_provision(dev[1], dev[0], adv_id, method="8")
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1, pin)
|
||||
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
@ -698,22 +498,15 @@ def test_p2ps_connect_keypad_method_autoaccept(dev):
|
||||
|
||||
def test_p2ps_connect_display_method_autoaccept(dev):
|
||||
"""P2PS connection with auto-accept and display method on seeker side"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='1', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=100"): # display method on seeker side
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
|
||||
pin = p2ps_provision_display_method(i_dev=dev[1], r_dev=dev[0],
|
||||
autoaccept=True,
|
||||
initiator_or_responder='initiator')
|
||||
dev[1].p2p_listen()
|
||||
ev1, ev0, pin = p2ps_provision(dev[1], dev[0], adv_id, method="100")
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1, pin)
|
||||
|
||||
p2ps_connect_pin(pin, i_dev=dev[1], r_dev=dev[0], initiator_method="display")
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
@ -721,47 +514,15 @@ def test_p2ps_connect_display_method_autoaccept(dev):
|
||||
|
||||
def test_p2ps_connect_adv_go_p2ps_method(dev):
|
||||
"""P2PS auto-accept connection with advertisement as GO and P2PS method"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='4', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=1000"):
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("Timed out while waiting for prov done on advertizer")
|
||||
if "go=" not in ev0:
|
||||
raise Exception("Advertiser failed to become GO")
|
||||
ev1, ev0 = p2ps_provision(dev[1], dev[0], adv_id)
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1)
|
||||
|
||||
adv_conncap = ev0.split("conncap=")[1].split(" ")[0]
|
||||
if adv_conncap == "4":
|
||||
logger.info("Advertiser is GO")
|
||||
ev0 = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on advertiser side")
|
||||
dev[0].group_form_result(ev0)
|
||||
|
||||
ev1 = dev[1].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on seeker side")
|
||||
|
||||
seeker_conncap = ev1.split("conncap=")[1].split(" ")[0]
|
||||
|
||||
if "join=" in ev1:
|
||||
if "OK" not in dev[1].global_request("P2P_CONNECT " + addr0 + " 12345670 p2ps persistent join"):
|
||||
raise Exception("P2P_CONNECT failed on seeker side")
|
||||
ev1 = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on seeker side")
|
||||
dev[1].group_form_result(ev1)
|
||||
|
||||
ev0 = dev[0].wait_global_event(["AP-STA-CONNECTED"], timeout=5)
|
||||
if ev0 is None:
|
||||
raise Exception("AP-STA-CONNECTED timeout on advertiser side")
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
@ -769,102 +530,28 @@ def test_p2ps_connect_adv_go_p2ps_method(dev):
|
||||
|
||||
def test_p2ps_connect_adv_client_p2ps_method(dev):
|
||||
"""P2PS auto-accept connection with advertisement as Client and P2PS method"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='2', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=1000"):
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on advertiser side")
|
||||
if "join=" not in ev0:
|
||||
raise Exception("Advertiser failed to become Client")
|
||||
ev1, ev0 = p2ps_provision(dev[1], dev[0], adv_id)
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1)
|
||||
|
||||
adv_conncap = ev0.split("conncap=")[1].split(" ")[0]
|
||||
if adv_conncap == "2":
|
||||
logger.info("Advertiser is Client")
|
||||
|
||||
ev1 = dev[1].wait_global_event(["P2PS-PROV-DONE"], timeout=5)
|
||||
if ev1 is None:
|
||||
raise Exception("Provisioning failed on seeker side")
|
||||
|
||||
seeker_conncap = ev1.split("conncap=")[1].split(" ")[0]
|
||||
ev1 = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on seeker side")
|
||||
dev[1].group_form_result(ev1)
|
||||
|
||||
if "join=" in ev0:
|
||||
if "OK" not in dev[0].global_request("P2P_CONNECT " + addr1 + " 12345670 p2ps persistent join"):
|
||||
raise Exception("P2P_CONNECT failed on seeker side")
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2P-GROUP-FORMATION-SUCCESS"], timeout=15)
|
||||
if ev0 is None:
|
||||
raise Exception("P2P Group Formation failed on advertiser side")
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on advertiser side")
|
||||
dev[0].group_form_result(ev0)
|
||||
|
||||
ev1 = dev[1].wait_global_event(["AP-STA-CONNECTED"], timeout=5)
|
||||
if ev1 is None:
|
||||
raise Exception("Group formation failed")
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
remove_group(dev[0], dev[1])
|
||||
|
||||
def p2ps_connect_adv_go_pin_method(dev, keep_group=False):
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='4', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=8"): # keypad method on seeker side
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
|
||||
seek_prov_ev = p2ps_provision_keypad_method(i_dev=dev[1], r_dev=dev[0],
|
||||
autoaccept=True,
|
||||
initiator_or_responder='initiator')
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on advertier side")
|
||||
adv_conncap = ev0.split("conncap=")[1].split(" ")[0]
|
||||
if adv_conncap == "4":
|
||||
logger.info("Advertiser is GO")
|
||||
ev0 = dev[0].wait_global_event(["P2P-PROV-DISC-SHOW-PIN"], timeout=5)
|
||||
if ev0 is None:
|
||||
raise Exception("PIN Display on advertiser side")
|
||||
pin = ev0.split(" ")[2]
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on advertiser side")
|
||||
dev[0].group_form_result(ev0)
|
||||
|
||||
ev0 = dev[0].group_request("WPS_PIN any " + pin)
|
||||
if ev0 is None:
|
||||
raise Exception("Failed to initiate Pin authorization on registrar side")
|
||||
if "join=" in seek_prov_ev:
|
||||
if "OK" not in dev[1].global_request("P2P_CONNECT " + addr0 + " " + pin + " keypad persistent join"):
|
||||
raise Exception("P2P_CONNECT failed on seeker side")
|
||||
ev1 = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on seeker side")
|
||||
dev[1].group_form_result(ev1)
|
||||
|
||||
ev0 = dev[0].wait_global_event(["AP-STA-CONNECTED"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("AP-STA-CONNECTED timeout on advertiser side")
|
||||
ev1, ev0, pin = p2ps_provision(dev[1], dev[0], adv_id, method="8")
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1, pin)
|
||||
|
||||
if not keep_group:
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
@ -878,53 +565,16 @@ def test_p2ps_connect_adv_go_pin_method(dev):
|
||||
|
||||
def test_p2ps_connect_adv_client_pin_method(dev):
|
||||
"""P2PS advertiser as client with keypad config method on seeker side and auto-accept"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
dev[0].flush_scan_cache()
|
||||
p2ps_advertise(r_dev=dev[0], r_role='2', svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='I can receive files upto size 2 GB')
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=8"): # keypad method on seeker side
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
|
||||
seek_prov_ev = p2ps_provision_keypad_method(i_dev=dev[1], r_dev=dev[0],
|
||||
autoaccept=True,
|
||||
initiator_or_responder='initiator')
|
||||
ev1, ev0, pin = p2ps_provision(dev[1], dev[0], adv_id, method="8")
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1, pin)
|
||||
|
||||
adv_prov = dev[0].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if adv_prov is None:
|
||||
raise Exception("Prov failed on advertiser")
|
||||
adv_conncap = adv_prov.split("conncap=")[1].split(" ")[0]
|
||||
if adv_conncap == "2":
|
||||
logger.info("Advertiser is Client")
|
||||
adv_pin_show_event = dev[0].wait_global_event(["P2P-PROV-DISC-SHOW-PIN"],
|
||||
timeout=5)
|
||||
if adv_pin_show_event is None:
|
||||
raise Exception("PIN Display on advertiser side")
|
||||
pin = adv_pin_show_event.split(" ")[2]
|
||||
|
||||
ev1 = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on seeker side")
|
||||
dev[1].group_form_result(ev1)
|
||||
|
||||
ev1 = dev[1].group_request("WPS_PIN any " + pin)
|
||||
if ev1 is None:
|
||||
raise Exception("Failed to initiate Pin authorization on registrar side")
|
||||
|
||||
if "join=" in adv_prov:
|
||||
if "OK" not in dev[0].global_request("P2P_CONNECT " + addr1 + " " + pin + " display persistent join"):
|
||||
raise Exception("P2P_CONNECT failed on advertiser side")
|
||||
ev0 = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("Group formation failed to start on seeker side")
|
||||
dev[0].group_form_result(ev0)
|
||||
|
||||
ev1 = dev[1].wait_global_event(["AP-STA-CONNECTED"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("AP-STA-CONNECTED timeout on advertiser side")
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
@ -1116,9 +766,6 @@ def test_p2ps_connect_p2ps_method_4(dev):
|
||||
|
||||
def test_p2ps_connect_adv_go_persistent(dev):
|
||||
"""P2PS auto-accept connection with advertisement as GO and having persistent group"""
|
||||
addr0 = dev[0].p2p_dev_addr()
|
||||
addr1 = dev[1].p2p_dev_addr()
|
||||
|
||||
go_neg_pin_authorized_persistent(i_dev=dev[0], i_intent=15,
|
||||
r_dev=dev[1], r_intent=0)
|
||||
dev[0].remove_group()
|
||||
@ -1129,44 +776,11 @@ def test_p2ps_connect_adv_go_persistent(dev):
|
||||
[adv_id, rcvd_svc_name] = p2ps_exact_seek(i_dev=dev[1], r_dev=dev[0],
|
||||
svc_name='org.wi-fi.wfds.send.rx',
|
||||
srv_info='2 GB')
|
||||
if "OK" not in dev[1].global_request("P2P_ASP_PROVISION " + addr0 + " adv_id=" + str(adv_id) + " adv_mac=" + addr0 + " session=1 session_mac=" + addr1 + " info='' method=1000"):
|
||||
raise Exception("Failed to request provisioning on seeker")
|
||||
ev1, ev0 = p2ps_provision(dev[1], dev[0], adv_id)
|
||||
if "persist=" not in ev0 or "persist=" not in ev1:
|
||||
raise Exception("Persistent group isn't used by peers")
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("Timed out while waiting for prov done on advertizer")
|
||||
if "persist=" not in ev0:
|
||||
raise Exception("Advertiser did not indicate persistent group")
|
||||
id0 = ev0.split("persist=")[1].split(" ")[0]
|
||||
if "OK" not in dev[0].global_request("P2P_GROUP_ADD persistent=" + id0 + " freq=2412"):
|
||||
raise Exception("Could not re-start persistent group")
|
||||
|
||||
ev0 = dev[0].wait_global_event(["P2P-GROUP-STARTED"], timeout=10)
|
||||
if ev0 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on advertiser side")
|
||||
dev[0].group_form_result(ev0)
|
||||
|
||||
ev1 = dev[1].wait_global_event(["P2PS-PROV-DONE"], timeout=10)
|
||||
if ev1 is None:
|
||||
raise Exception("P2PS-PROV-DONE timeout on seeker side")
|
||||
|
||||
if "persist=" not in ev1:
|
||||
raise Exception("Seeker did not indicate persistent group")
|
||||
id1 = ev1.split("persist=")[1].split(" ")[0]
|
||||
if "OK" not in dev[1].global_request("P2P_GROUP_ADD persistent=" + id1 + " freq=2412"):
|
||||
raise Exception("Could not re-start persistent group")
|
||||
|
||||
ev1 = dev[1].wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
|
||||
if ev1 is None:
|
||||
raise Exception("P2P-GROUP-STARTED timeout on seeker side")
|
||||
dev[1].group_form_result(ev1)
|
||||
|
||||
ev0 = dev[0].wait_global_event(["AP-STA-CONNECTED"], timeout=15)
|
||||
if ev0 is None:
|
||||
raise Exception("AP-STA-CONNECTED timeout on advertiser side")
|
||||
ev0 = dev[0].global_request("P2P_SERVICE_DEL asp " + str(adv_id))
|
||||
if ev0 is None:
|
||||
raise Exception("Unable to remove the advertisement instance")
|
||||
p2ps_connect_pd(dev[0], dev[1], ev0, ev1)
|
||||
remove_group(dev[0], dev[1])
|
||||
|
||||
def test_p2ps_client_probe(dev):
|
||||
|
Loading…
Reference in New Issue
Block a user