mirror of
https://github.com/vanhoefm/fragattacks.git
synced 2025-01-29 16:24:03 -05:00
tests: Trigger failure on exceptions in the dbus_p2p_autogo thread
This makes the test case more robust in reporting failures. Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
parent
9c3380d776
commit
035efb2c43
@ -3051,6 +3051,7 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
TestDbus.__init__(self, bus)
|
TestDbus.__init__(self, bus)
|
||||||
self.first = True
|
self.first = True
|
||||||
self.waiting_end = False
|
self.waiting_end = False
|
||||||
|
self.exceptions = False
|
||||||
self.deauthorized = False
|
self.deauthorized = False
|
||||||
self.done = False
|
self.done = False
|
||||||
|
|
||||||
@ -3087,14 +3088,17 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
|
role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if role != "GO":
|
if role != "GO":
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected role reported: " + role)
|
raise Exception("Unexpected role reported: " + role)
|
||||||
group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
|
group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if group != properties['group_object']:
|
if group != properties['group_object']:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected Group reported: " + str(group))
|
raise Exception("Unexpected Group reported: " + str(group))
|
||||||
go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
|
go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
if go != '/':
|
if go != '/':
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected PeerGO value: " + str(go))
|
raise Exception("Unexpected PeerGO value: " + str(go))
|
||||||
if self.first:
|
if self.first:
|
||||||
self.first = False
|
self.first = False
|
||||||
@ -3152,9 +3156,11 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
|
wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
|
||||||
try:
|
try:
|
||||||
wps.Start(params)
|
wps.Start(params)
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Invalid WPS.Start() accepted")
|
raise Exception("Invalid WPS.Start() accepted")
|
||||||
except dbus.exceptions.DBusException, e:
|
except dbus.exceptions.DBusException, e:
|
||||||
if "InvalidArgs" not in str(e):
|
if "InvalidArgs" not in str(e):
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected error message: " + str(e))
|
raise Exception("Unexpected error message: " + str(e))
|
||||||
params = { 'Role': 'registrar',
|
params = { 'Role': 'registrar',
|
||||||
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
'P2PDeviceAddress': self.peer['DeviceAddress'],
|
||||||
@ -3169,9 +3175,12 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
|
res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE,
|
dbus_interface=dbus.PROPERTIES_IFACE,
|
||||||
byte_arrays=True)
|
byte_arrays=True)
|
||||||
|
logger.debug("Peer properties: " + str(res))
|
||||||
if 'Groups' not in res or len(res['Groups']) != 1:
|
if 'Groups' not in res or len(res['Groups']) != 1:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected number of peer Groups entries")
|
raise Exception("Unexpected number of peer Groups entries")
|
||||||
if res['Groups'][0] != self.group:
|
if res['Groups'][0] != self.group:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected peer Groups[0] value")
|
raise Exception("Unexpected peer Groups[0] value")
|
||||||
|
|
||||||
g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
|
g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
|
||||||
@ -3180,6 +3189,7 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
byte_arrays=True)
|
byte_arrays=True)
|
||||||
logger.debug("Group properties: " + str(res))
|
logger.debug("Group properties: " + str(res))
|
||||||
if 'Members' not in res or len(res['Members']) != 1:
|
if 'Members' not in res or len(res['Members']) != 1:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected number of group members")
|
raise Exception("Unexpected number of group members")
|
||||||
|
|
||||||
ext = dbus.ByteArray("\x11\x22\x33\x44")
|
ext = dbus.ByteArray("\x11\x22\x33\x44")
|
||||||
@ -3197,8 +3207,10 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
dbus_interface=dbus.PROPERTIES_IFACE,
|
dbus_interface=dbus.PROPERTIES_IFACE,
|
||||||
byte_arrays=True)
|
byte_arrays=True)
|
||||||
if len(res) != 1:
|
if len(res) != 1:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected number of vendor extensions")
|
raise Exception("Unexpected number of vendor extensions")
|
||||||
if res[0] != ext:
|
if res[0] != ext:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Vendor extension value changed")
|
raise Exception("Vendor extension value changed")
|
||||||
|
|
||||||
# And now verify that the more appropriate encoding is accepted as
|
# And now verify that the more appropriate encoding is accepted as
|
||||||
@ -3210,8 +3222,10 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
dbus_interface=dbus.PROPERTIES_IFACE,
|
dbus_interface=dbus.PROPERTIES_IFACE,
|
||||||
byte_arrays=True)
|
byte_arrays=True)
|
||||||
if len(res) != 2:
|
if len(res) != 2:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected number of vendor extensions")
|
raise Exception("Unexpected number of vendor extensions")
|
||||||
if res[0] != res2[0] or res[1] != res2[1]:
|
if res[0] != res2[0] or res[1] != res2[1]:
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Vendor extension value changed")
|
raise Exception("Vendor extension value changed")
|
||||||
|
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
@ -3219,36 +3233,44 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
try:
|
try:
|
||||||
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
|
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
||||||
except dbus.exceptions.DBusException, e:
|
except dbus.exceptions.DBusException, e:
|
||||||
if "Error.Failed" not in str(e):
|
if "Error.Failed" not in str(e):
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
||||||
|
|
||||||
vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
|
vals = dbus.Dictionary({ 'Foo': [ ext ]}, signature='sv')
|
||||||
try:
|
try:
|
||||||
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
|
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
||||||
except dbus.exceptions.DBusException, e:
|
except dbus.exceptions.DBusException, e:
|
||||||
if "InvalidArgs" not in str(e):
|
if "InvalidArgs" not in str(e):
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
||||||
|
|
||||||
vals = [ "foo" ]
|
vals = [ "foo" ]
|
||||||
try:
|
try:
|
||||||
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
|
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
||||||
except dbus.exceptions.DBusException, e:
|
except dbus.exceptions.DBusException, e:
|
||||||
if "Error.Failed" not in str(e):
|
if "Error.Failed" not in str(e):
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
||||||
|
|
||||||
vals = [ [ "foo" ] ]
|
vals = [ [ "foo" ] ]
|
||||||
try:
|
try:
|
||||||
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
|
g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
|
||||||
dbus_interface=dbus.PROPERTIES_IFACE)
|
dbus_interface=dbus.PROPERTIES_IFACE)
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
raise Exception("Invalid Set(WPSVendorExtensions) accepted")
|
||||||
except dbus.exceptions.DBusException, e:
|
except dbus.exceptions.DBusException, e:
|
||||||
if "Error.Failed" not in str(e):
|
if "Error.Failed" not in str(e):
|
||||||
|
self.exceptions = True
|
||||||
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
|
||||||
|
|
||||||
p2p.RemoveClient({ 'peer': self.peer_path })
|
p2p.RemoveClient({ 'peer': self.peer_path })
|
||||||
@ -3271,7 +3293,7 @@ def test_dbus_p2p_autogo(dev, apdev):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def success(self):
|
def success(self):
|
||||||
return self.done and self.deauthorized
|
return self.done and self.deauthorized and not self.exceptions
|
||||||
|
|
||||||
with TestDbusP2p(bus) as t:
|
with TestDbusP2p(bus) as t:
|
||||||
if not t.success():
|
if not t.success():
|
||||||
|
Loading…
Reference in New Issue
Block a user