tests: Add WpaSupplicant.wait_group_event()

This can be used to wait for events from a P2P group interface.

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2014-10-19 20:56:36 +03:00
parent f6420942cb
commit 4823566ca7

View File

@ -19,6 +19,7 @@ wpas_ctrl = '/var/run/wpa_supplicant'
class WpaSupplicant:
def __init__(self, ifname=None, global_iface=None):
self.group_ifname = None
self.gctrl_mon = None
if ifname:
self.set_ifname(ifname)
else:
@ -100,6 +101,12 @@ class WpaSupplicant:
self.request("SET p2p_pref_chan ")
self.request("SET p2p_no_group_iface 1")
self.request("SET p2p_go_intent 7")
if self.gctrl_mon:
try:
self.gctrl_mon.detach()
except:
pass
self.gctrl_mon = None
self.group_ifname = None
self.dump_monitor()
@ -426,6 +433,12 @@ class WpaSupplicant:
res['result'] = 'success'
res['ifname'] = s[2]
self.group_ifname = s[2]
try:
self.gctrl_mon = wpaspy.Ctrl(os.path.join(wpas_ctrl, self.group_ifname))
self.gctrl_mon.attach()
except:
logger.debug("Could not open monitor socket for group interface")
self.gctrl_mon = None
res['role'] = s[3]
res['ssid'] = s[4]
res['freq'] = s[5]
@ -570,7 +583,35 @@ class WpaSupplicant:
break
return None
def wait_group_event(self, events, timeout=10):
if self.group_ifname and self.group_ifname != self.ifname:
if self.gctrl_mon is None:
return None
start = os.times()[4]
while True:
while self.gctrl_mon.pending():
ev = self.gctrl_mon.recv()
logger.debug(self.group_ifname + ": " + ev)
for event in events:
if event in ev:
return ev
now = os.times()[4]
remaining = start + timeout - now
if remaining <= 0:
break
if not self.gctrl_mon.pending(timeout=remaining):
break
return None
return self.wait_event(events, timeout)
def wait_go_ending_session(self):
if self.gctrl_mon:
try:
self.gctrl_mon.detach()
except:
pass
self.gctrl_mon = None
ev = self.wait_event(["P2P-GROUP-REMOVED"], timeout=3)
if ev is None:
raise Exception("Group removal event timed out")
@ -586,6 +627,12 @@ class WpaSupplicant:
logger.debug(self.ifname + "(global): " + ev)
def remove_group(self, ifname=None):
if self.gctrl_mon:
try:
self.gctrl_mon.detach()
except:
pass
self.gctrl_mon = None
if ifname is None:
ifname = self.group_ifname if self.group_ifname else self.ifname
if "OK" not in self.global_request("P2P_GROUP_REMOVE " + ifname):