mirror of
https://github.com/vanhoefm/fragattacks.git
synced 2025-01-31 01:04:03 -05:00
tests: Extend D-Bus test cases to cover separate P2P Device operations
Number of the P2P test cases through D-Bus commands were not prepared for there being a separate group interface when the P2P Device concept is used. Signed-off-by: Jouni Malinen <jouni@qca.qualcomm.com>
This commit is contained in:
parent
e8181e26ef
commit
cb346b49d2
@ -2095,6 +2095,9 @@ def _test_dbus_country(dev, apdev):
|
|||||||
raise Exception("Unexpected Country value %s (expected FI)" % res)
|
raise Exception("Unexpected Country value %s (expected FI)" % res)
|
||||||
|
|
||||||
ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
|
ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
|
||||||
|
if ev is None:
|
||||||
|
# For now, work around separate P2P Device interface event delivery
|
||||||
|
ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("regdom change event not seen")
|
raise Exception("regdom change event not seen")
|
||||||
if "init=USER type=COUNTRY alpha2=FI" not in ev:
|
if "init=USER type=COUNTRY alpha2=FI" not in ev:
|
||||||
@ -2120,6 +2123,9 @@ def _test_dbus_country(dev, apdev):
|
|||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
|
|
||||||
ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
|
ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
|
||||||
|
if ev is None:
|
||||||
|
# For now, work around separate P2P Device interface event delivery
|
||||||
|
ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("regdom change event not seen")
|
raise Exception("regdom change event not seen")
|
||||||
if "init=CORE type=WORLD" not in ev:
|
if "init=CORE type=WORLD" not in ev:
|
||||||
@ -2164,9 +2170,7 @@ def _test_dbus_scan_interval(dev, apdev):
|
|||||||
def test_dbus_probe_req_reporting(dev, apdev):
|
def test_dbus_probe_req_reporting(dev, apdev):
|
||||||
"""D-Bus Probe Request reporting"""
|
"""D-Bus Probe Request reporting"""
|
||||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||||
iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
|
|
||||||
|
|
||||||
dev[0].p2p_start_go(freq=2412)
|
|
||||||
dev[1].p2p_find(social=True)
|
dev[1].p2p_find(social=True)
|
||||||
|
|
||||||
class TestDbusProbe(TestDbus):
|
class TestDbusProbe(TestDbus):
|
||||||
@ -2177,12 +2181,21 @@ def test_dbus_probe_req_reporting(dev, apdev):
|
|||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
gobject.timeout_add(1, self.run_test)
|
gobject.timeout_add(1, self.run_test)
|
||||||
gobject.timeout_add(15000, self.timeout)
|
gobject.timeout_add(15000, self.timeout)
|
||||||
|
self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
|
||||||
|
"GroupStarted")
|
||||||
self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
|
self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
|
||||||
byte_arrays=True)
|
byte_arrays=True)
|
||||||
iface.SubscribeProbeReq()
|
|
||||||
self.loop.run()
|
self.loop.run()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def groupStarted(self, properties):
|
||||||
|
logger.debug("groupStarted: " + str(properties))
|
||||||
|
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
|
self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
|
||||||
|
self.iface.SubscribeProbeReq()
|
||||||
|
self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
|
||||||
def probeRequest(self, args):
|
def probeRequest(self, args):
|
||||||
logger.debug("probeRequest: args=%s" % str(args))
|
logger.debug("probeRequest: args=%s" % str(args))
|
||||||
self.reported = True
|
self.reported = True
|
||||||
@ -2190,6 +2203,9 @@ def test_dbus_probe_req_reporting(dev, apdev):
|
|||||||
|
|
||||||
def run_test(self, *args):
|
def run_test(self, *args):
|
||||||
logger.debug("run_test")
|
logger.debug("run_test")
|
||||||
|
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
params = dbus.Dictionary({ 'frequency': 2412 })
|
||||||
|
p2p.GroupAdd(params)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def success(self):
|
def success(self):
|
||||||
@ -2198,14 +2214,14 @@ def test_dbus_probe_req_reporting(dev, apdev):
|
|||||||
with TestDbusProbe(bus) as t:
|
with TestDbusProbe(bus) as t:
|
||||||
if not t.success():
|
if not t.success():
|
||||||
raise Exception("Expected signals not seen")
|
raise Exception("Expected signals not seen")
|
||||||
iface.UnsubscribeProbeReq()
|
t.iface.UnsubscribeProbeReq()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
iface.UnsubscribeProbeReq()
|
t.iface.UnsubscribeProbeReq()
|
||||||
raise Exception("Invalid UnsubscribeProbeReq() accepted")
|
raise Exception("Invalid UnsubscribeProbeReq() accepted")
|
||||||
except dbus.exceptions.DBusException, e:
|
except dbus.exceptions.DBusException, e:
|
||||||
if "NoSubscription" not in str(e):
|
if "NoSubscription" not in str(e):
|
||||||
raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
|
raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
|
||||||
|
t.group_p2p.Disconnect()
|
||||||
|
|
||||||
with TestDbusProbe(bus) as t:
|
with TestDbusProbe(bus) as t:
|
||||||
if not t.success():
|
if not t.success():
|
||||||
@ -2214,7 +2230,6 @@ def test_dbus_probe_req_reporting(dev, apdev):
|
|||||||
# cleanup.
|
# cleanup.
|
||||||
|
|
||||||
dev[1].p2p_stop_find()
|
dev[1].p2p_stop_find()
|
||||||
dev[0].remove_group()
|
|
||||||
|
|
||||||
def test_dbus_probe_req_reporting_oom(dev, apdev):
|
def test_dbus_probe_req_reporting_oom(dev, apdev):
|
||||||
"""D-Bus Probe Request reporting (OOM)"""
|
"""D-Bus Probe Request reporting (OOM)"""
|
||||||
@ -2935,7 +2950,6 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
"""D-Bus P2P autonomous GO"""
|
"""D-Bus P2P autonomous GO"""
|
||||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||||
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
|
|
||||||
|
|
||||||
addr0 = dev[0].p2p_dev_addr()
|
addr0 = dev[0].p2p_dev_addr()
|
||||||
|
|
||||||
@ -2972,22 +2986,26 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
self.group = properties['group_object']
|
self.group = properties['group_object']
|
||||||
role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
|
self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
|
role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if role != "GO":
|
if role != "GO":
|
||||||
raise Exception("Unexpected role reported: " + role)
|
raise Exception("Unexpected role reported: " + role)
|
||||||
group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
|
group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if group != properties['group_object']:
|
if group != properties['group_object']:
|
||||||
raise Exception("Unexpected Group reported: " + str(group))
|
raise Exception("Unexpected Group reported: " + str(group))
|
||||||
go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
|
go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if go != '/':
|
if go != '/':
|
||||||
raise Exception("Unexpected PeerGO value: " + str(go))
|
raise Exception("Unexpected PeerGO value: " + str(go))
|
||||||
if self.first:
|
if self.first:
|
||||||
self.first = False
|
self.first = False
|
||||||
logger.info("Remove persistent group instance")
|
logger.info("Remove persistent group instance")
|
||||||
p2p.Disconnect()
|
group_p2p = dbus.Interface(self.g_if_obj,
|
||||||
|
WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
else:
|
else:
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
|
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
|
||||||
@ -3035,6 +3053,7 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
||||||
'Bssid': self.peer['DeviceAddress'],
|
'Bssid': self.peer['DeviceAddress'],
|
||||||
'Type': 'pin' }
|
'Type': 'pin' }
|
||||||
|
wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
|
||||||
try:
|
try:
|
||||||
wps.Start(params)
|
wps.Start(params)
|
||||||
raise Exception("Invalid WPS.Start() accepted")
|
raise Exception("Invalid WPS.Start() accepted")
|
||||||
@ -3043,7 +3062,6 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
raise Exception("Unexpected error message: " + str(e))
|
raise Exception("Unexpected error message: " + str(e))
|
||||||
params = { 'Role': 'registrar',
|
params = { 'Role': 'registrar',
|
||||||
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
||||||
'Bssid': self.peer['DeviceAddress'],
|
|
||||||
'Type': 'pin',
|
'Type': 'pin',
|
||||||
'Pin': '12345670' }
|
'Pin': '12345670' }
|
||||||
logger.info("Authorize peer to connect to the group")
|
logger.info("Authorize peer to connect to the group")
|
||||||
@ -3138,7 +3156,9 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
||||||
|
|
||||||
self.waiting_end = True
|
self.waiting_end = True
|
||||||
p2p.Disconnect()
|
group_p2p = dbus.Interface(self.g_if_obj,
|
||||||
|
WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
|
|
||||||
def run_test(self, *args):
|
def run_test(self, *args):
|
||||||
logger.debug("run_test")
|
logger.debug("run_test")
|
||||||
@ -3161,7 +3181,6 @@ def test_dbus_p2p_autogo_pbc(dev, apdev):
|
|||||||
"""D-Bus P2P autonomous GO and PBC"""
|
"""D-Bus P2P autonomous GO and PBC"""
|
||||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||||
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
|
|
||||||
|
|
||||||
addr0 = dev[0].p2p_dev_addr()
|
addr0 = dev[0].p2p_dev_addr()
|
||||||
|
|
||||||
@ -3192,6 +3211,8 @@ def test_dbus_p2p_autogo_pbc(dev, apdev):
|
|||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
self.group = properties['group_object']
|
self.group = properties['group_object']
|
||||||
|
self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
|
dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
|
||||||
|
|
||||||
@ -3219,14 +3240,16 @@ def test_dbus_p2p_autogo_pbc(dev, apdev):
|
|||||||
addr += '%02x' % ord(p)
|
addr += '%02x' % ord(p)
|
||||||
params = { 'Role': 'registrar',
|
params = { 'Role': 'registrar',
|
||||||
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
||||||
'Bssid': self.peer['DeviceAddress'],
|
|
||||||
'Type': 'pbc' }
|
'Type': 'pbc' }
|
||||||
logger.info("Authorize peer to connect to the group")
|
logger.info("Authorize peer to connect to the group")
|
||||||
|
wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
|
||||||
wps.Start(params)
|
wps.Start(params)
|
||||||
|
|
||||||
def staAuthorized(self, name):
|
def staAuthorized(self, name):
|
||||||
logger.debug("staAuthorized: " + name)
|
logger.debug("staAuthorized: " + name)
|
||||||
p2p.Disconnect()
|
group_p2p = dbus.Interface(self.g_if_obj,
|
||||||
|
WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
|
|
||||||
def run_test(self, *args):
|
def run_test(self, *args):
|
||||||
logger.debug("run_test")
|
logger.debug("run_test")
|
||||||
@ -3248,7 +3271,6 @@ def test_dbus_p2p_autogo_legacy(dev, apdev):
|
|||||||
"""D-Bus P2P autonomous GO and legacy STA"""
|
"""D-Bus P2P autonomous GO and legacy STA"""
|
||||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||||
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
|
|
||||||
|
|
||||||
addr0 = dev[0].p2p_dev_addr()
|
addr0 = dev[0].p2p_dev_addr()
|
||||||
|
|
||||||
@ -3271,14 +3293,25 @@ def test_dbus_p2p_autogo_legacy(dev, apdev):
|
|||||||
|
|
||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
|
g_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['group_object'])
|
||||||
|
res = g_obj.GetAll(WPAS_DBUS_GROUP,
|
||||||
|
dbus_interface=dbus.PROPERTIES_IFACE,
|
||||||
|
byte_arrays=True)
|
||||||
|
bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
|
||||||
|
|
||||||
pin = '12345670'
|
pin = '12345670'
|
||||||
params = { 'Role': 'enrollee',
|
params = { 'Role': 'enrollee',
|
||||||
'Type': 'pin',
|
'Type': 'pin',
|
||||||
'Pin': pin }
|
'Pin': pin }
|
||||||
|
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
|
wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
|
||||||
wps.Start(params)
|
wps.Start(params)
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.scan_for_bss(addr0, freq=2412)
|
dev1.scan_for_bss(bssid, freq=2412)
|
||||||
dev1.request("WPS_PIN " + addr0 + " " + pin)
|
dev1.request("WPS_PIN " + bssid + " " + pin)
|
||||||
|
self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
|
||||||
def groupFinished(self, properties):
|
def groupFinished(self, properties):
|
||||||
logger.debug("groupFinished: " + str(properties))
|
logger.debug("groupFinished: " + str(properties))
|
||||||
@ -3289,7 +3322,7 @@ def test_dbus_p2p_autogo_legacy(dev, apdev):
|
|||||||
logger.debug("staAuthorized: " + name)
|
logger.debug("staAuthorized: " + name)
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.request("DISCONNECT")
|
dev1.request("DISCONNECT")
|
||||||
p2p.Disconnect()
|
self.group_p2p.Disconnect()
|
||||||
|
|
||||||
def run_test(self, *args):
|
def run_test(self, *args):
|
||||||
logger.debug("run_test")
|
logger.debug("run_test")
|
||||||
@ -3312,6 +3345,7 @@ def test_dbus_p2p_join(dev, apdev):
|
|||||||
addr1 = dev[1].p2p_dev_addr()
|
addr1 = dev[1].p2p_dev_addr()
|
||||||
addr2 = dev[2].p2p_dev_addr()
|
addr2 = dev[2].p2p_dev_addr()
|
||||||
dev[1].p2p_start_go(freq=2412)
|
dev[1].p2p_start_go(freq=2412)
|
||||||
|
dev1_group_ifname = dev[1].group_ifname
|
||||||
dev[2].p2p_listen()
|
dev[2].p2p_listen()
|
||||||
|
|
||||||
class TestDbusP2p(TestDbus):
|
class TestDbusP2p(TestDbus):
|
||||||
@ -3356,19 +3390,22 @@ def test_dbus_p2p_join(dev, apdev):
|
|||||||
pin = p2p.Connect(args)
|
pin = p2p.Connect(args)
|
||||||
|
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.request("WPS_PIN any " + pin)
|
dev1.group_ifname = dev1_group_ifname
|
||||||
|
dev1.group_request("WPS_PIN any " + pin)
|
||||||
|
|
||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
role = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
|
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
|
role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if role != "client":
|
if role != "client":
|
||||||
raise Exception("Unexpected role reported: " + role)
|
raise Exception("Unexpected role reported: " + role)
|
||||||
group = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
|
group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if group != properties['group_object']:
|
if group != properties['group_object']:
|
||||||
raise Exception("Unexpected Group reported: " + str(group))
|
raise Exception("Unexpected Group reported: " + str(group))
|
||||||
go = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
|
go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if go != self.go:
|
if go != self.go:
|
||||||
raise Exception("Unexpected PeerGO value: " + str(go))
|
raise Exception("Unexpected PeerGO value: " + str(go))
|
||||||
@ -3390,12 +3427,13 @@ def test_dbus_p2p_join(dev, apdev):
|
|||||||
if "Error.Failed: Failed to set property" not in str(e):
|
if "Error.Failed: Failed to set property" not in str(e):
|
||||||
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
||||||
|
|
||||||
|
group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
args = { 'duration1': 30000, 'interval1': 102400,
|
args = { 'duration1': 30000, 'interval1': 102400,
|
||||||
'duration2': 20000, 'interval2': 102400 }
|
'duration2': 20000, 'interval2': 102400 }
|
||||||
p2p.PresenceRequest(args)
|
group_p2p.PresenceRequest(args)
|
||||||
|
|
||||||
args = { 'peer': self.peer }
|
args = { 'peer': self.peer }
|
||||||
p2p.Invite(args)
|
group_p2p.Invite(args)
|
||||||
|
|
||||||
def groupFinished(self, properties):
|
def groupFinished(self, properties):
|
||||||
logger.debug("groupFinished: " + str(properties))
|
logger.debug("groupFinished: " + str(properties))
|
||||||
@ -3407,6 +3445,7 @@ def test_dbus_p2p_join(dev, apdev):
|
|||||||
if result['status'] != 1:
|
if result['status'] != 1:
|
||||||
raise Exception("Unexpected invitation result: " + str(result))
|
raise Exception("Unexpected invitation result: " + str(result))
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
|
dev1.group_ifname = dev1_group_ifname
|
||||||
dev1.remove_group()
|
dev1.remove_group()
|
||||||
|
|
||||||
def run_test(self, *args):
|
def run_test(self, *args):
|
||||||
@ -3543,7 +3582,10 @@ def test_dbus_p2p_persistent(dev, apdev):
|
|||||||
|
|
||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
p2p.Disconnect()
|
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
|
group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
|
|
||||||
def groupFinished(self, properties):
|
def groupFinished(self, properties):
|
||||||
logger.debug("groupFinished: " + str(properties))
|
logger.debug("groupFinished: " + str(properties))
|
||||||
@ -3613,7 +3655,6 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
|
|||||||
"""D-Bus P2P reinvoke persistent group"""
|
"""D-Bus P2P reinvoke persistent group"""
|
||||||
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
(bus,wpas_obj,path,if_obj) = prepare_dbus(dev[0])
|
||||||
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
|
|
||||||
|
|
||||||
addr0 = dev[0].p2p_dev_addr()
|
addr0 = dev[0].p2p_dev_addr()
|
||||||
|
|
||||||
@ -3647,9 +3688,17 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
|
|||||||
|
|
||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
|
self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
if not self.invited:
|
if not self.invited:
|
||||||
|
g_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['group_object'])
|
||||||
|
res = g_obj.GetAll(WPAS_DBUS_GROUP,
|
||||||
|
dbus_interface=dbus.PROPERTIES_IFACE,
|
||||||
|
byte_arrays=True)
|
||||||
|
bssid = ':'.join([binascii.hexlify(l) for l in res['BSSID']])
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.scan_for_bss(addr0, freq=2412)
|
dev1.scan_for_bss(bssid, freq=2412)
|
||||||
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
|
dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
|
||||||
|
|
||||||
def groupFinished(self, properties):
|
def groupFinished(self, properties):
|
||||||
@ -3659,7 +3708,7 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
|
|||||||
self.loop.quit()
|
self.loop.quit()
|
||||||
else:
|
else:
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.request("SET persistent_reconnect 1")
|
dev1.global_request("SET persistent_reconnect 1")
|
||||||
dev1.p2p_listen()
|
dev1.p2p_listen()
|
||||||
|
|
||||||
args = { 'persistent_group_object': dbus.ObjectPath(path),
|
args = { 'persistent_group_object': dbus.ObjectPath(path),
|
||||||
@ -3676,6 +3725,11 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
|
|||||||
pin = p2p.Invite(args)
|
pin = p2p.Invite(args)
|
||||||
self.invited = True
|
self.invited = True
|
||||||
|
|
||||||
|
self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
|
||||||
|
timeout=15)
|
||||||
|
if self.sta_group_ev is None:
|
||||||
|
raise Exception("P2P-GROUP-STARTED event not seen")
|
||||||
|
|
||||||
def persistentGroupAdded(self, path, properties):
|
def persistentGroupAdded(self, path, properties):
|
||||||
logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
|
logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
|
||||||
self.persistent = path
|
self.persistent = path
|
||||||
@ -3702,16 +3756,24 @@ def test_dbus_p2p_reinvoke_persistent(dev, apdev):
|
|||||||
'Type': 'pin',
|
'Type': 'pin',
|
||||||
'Pin': '12345670' }
|
'Pin': '12345670' }
|
||||||
logger.info("Authorize peer to connect to the group")
|
logger.info("Authorize peer to connect to the group")
|
||||||
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
|
wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
|
||||||
wps.Start(params)
|
wps.Start(params)
|
||||||
|
self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
|
||||||
|
timeout=15)
|
||||||
|
if self.sta_group_ev is None:
|
||||||
|
raise Exception("P2P-GROUP-STARTED event not seen")
|
||||||
|
|
||||||
def staAuthorized(self, name):
|
def staAuthorized(self, name):
|
||||||
logger.debug("staAuthorized: " + name)
|
logger.debug("staAuthorized: " + name)
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
|
dev1.group_form_result(self.sta_group_ev)
|
||||||
dev1.remove_group()
|
dev1.remove_group()
|
||||||
ev = dev1.wait_event(["P2P-GROUP-REMOVED"], timeout=10)
|
ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("Group removal timed out")
|
raise Exception("Group removal timed out")
|
||||||
p2p.Disconnect()
|
group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
|
|
||||||
def run_test(self, *args):
|
def run_test(self, *args):
|
||||||
logger.debug("run_test")
|
logger.debug("run_test")
|
||||||
@ -3784,7 +3846,10 @@ def test_dbus_p2p_go_neg_rx(dev, apdev):
|
|||||||
|
|
||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
p2p.Disconnect()
|
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
|
group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
|
|
||||||
def groupFinished(self, properties):
|
def groupFinished(self, properties):
|
||||||
logger.debug("groupFinished: " + str(properties))
|
logger.debug("groupFinished: " + str(properties))
|
||||||
@ -3865,18 +3930,23 @@ def test_dbus_p2p_go_neg_auth(dev, apdev):
|
|||||||
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
|
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("Group formation timed out")
|
raise Exception("Group formation timed out")
|
||||||
|
self.sta_group_ev = ev
|
||||||
|
|
||||||
def goNegotiationSuccess(self, properties):
|
def goNegotiationSuccess(self, properties):
|
||||||
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
|
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
|
||||||
|
|
||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
|
self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
|
dev1.group_form_result(self.sta_group_ev)
|
||||||
dev1.remove_group()
|
dev1.remove_group()
|
||||||
|
|
||||||
def staDeauthorized(self, name):
|
def staDeauthorized(self, name):
|
||||||
logger.debug("staDeuthorized: " + name)
|
logger.debug("staDeuthorized: " + name)
|
||||||
p2p.Disconnect()
|
group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
|
|
||||||
def peerJoined(self, peer):
|
def peerJoined(self, peer):
|
||||||
logger.debug("peerJoined: " + peer)
|
logger.debug("peerJoined: " + peer)
|
||||||
@ -3945,14 +4015,19 @@ def test_dbus_p2p_go_neg_init(dev, apdev):
|
|||||||
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
|
ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
|
||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("Group formation timed out")
|
raise Exception("Group formation timed out")
|
||||||
|
self.sta_group_ev = ev
|
||||||
|
|
||||||
def goNegotiationSuccess(self, properties):
|
def goNegotiationSuccess(self, properties):
|
||||||
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
|
logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
|
||||||
|
|
||||||
def groupStarted(self, properties):
|
def groupStarted(self, properties):
|
||||||
logger.debug("groupStarted: " + str(properties))
|
logger.debug("groupStarted: " + str(properties))
|
||||||
p2p.Disconnect()
|
g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
|
||||||
|
properties['interface_object'])
|
||||||
|
group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
|
||||||
|
group_p2p.Disconnect()
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
|
dev1.group_form_result(self.sta_group_ev)
|
||||||
dev1.remove_group()
|
dev1.remove_group()
|
||||||
|
|
||||||
def groupFinished(self, properties):
|
def groupFinished(self, properties):
|
||||||
@ -4047,6 +4122,7 @@ def test_dbus_p2p_two_groups(dev, apdev):
|
|||||||
addr1 = dev[1].p2p_dev_addr()
|
addr1 = dev[1].p2p_dev_addr()
|
||||||
addr2 = dev[2].p2p_dev_addr()
|
addr2 = dev[2].p2p_dev_addr()
|
||||||
dev[1].p2p_start_go(freq=2412)
|
dev[1].p2p_start_go(freq=2412)
|
||||||
|
dev1_group_ifname = dev[1].group_ifname
|
||||||
|
|
||||||
class TestDbusP2p(TestDbus):
|
class TestDbusP2p(TestDbus):
|
||||||
def __init__(self, bus):
|
def __init__(self, bus):
|
||||||
@ -4088,7 +4164,8 @@ def test_dbus_p2p_two_groups(dev, apdev):
|
|||||||
p2p.StopFind()
|
p2p.StopFind()
|
||||||
pin = '12345670'
|
pin = '12345670'
|
||||||
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
|
||||||
dev1.request("WPS_PIN any " + pin)
|
dev1.group_ifname = dev1_group_ifname
|
||||||
|
dev1.group_request("WPS_PIN any " + pin)
|
||||||
args = { 'peer': self.go,
|
args = { 'peer': self.go,
|
||||||
'join': True,
|
'join': True,
|
||||||
'wps_method': 'pin',
|
'wps_method': 'pin',
|
||||||
@ -4140,6 +4217,10 @@ def test_dbus_p2p_two_groups(dev, apdev):
|
|||||||
dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
|
dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
|
||||||
dev2.scan_for_bss(bssid, freq=2412)
|
dev2.scan_for_bss(bssid, freq=2412)
|
||||||
dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
|
dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
|
||||||
|
ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15);
|
||||||
|
if ev is None:
|
||||||
|
raise Exception("Group join timed out")
|
||||||
|
self.dev2_group_ev = ev
|
||||||
|
|
||||||
def groupFinished(self, properties):
|
def groupFinished(self, properties):
|
||||||
logger.debug("groupFinished: " + str(properties))
|
logger.debug("groupFinished: " + str(properties))
|
||||||
@ -4158,6 +4239,7 @@ def test_dbus_p2p_two_groups(dev, apdev):
|
|||||||
self.check_results()
|
self.check_results()
|
||||||
|
|
||||||
dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
|
dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
|
||||||
|
dev2.group_form_result(self.dev2_group_ev)
|
||||||
dev2.remove_group()
|
dev2.remove_group()
|
||||||
|
|
||||||
logger.info("Disconnect group2")
|
logger.info("Disconnect group2")
|
||||||
|
Loading…
Reference in New Issue
Block a user