mirror of
https://github.com/vanhoefm/fragattacks.git
synced 2025-01-17 18:34:03 -05:00
tests: WPS AP and UPnP event subscription and many events
Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
parent
61c3d464e6
commit
d91a64c426
@ -2782,6 +2782,73 @@ def test_ap_wps_upnp_subscribe(dev, apdev):
|
||||
if "FAIL" not in hapd.request("ENABLE"):
|
||||
raise Exception("ENABLE succeeded during OOM")
|
||||
|
||||
def test_ap_wps_upnp_subscribe_events(dev, apdev):
|
||||
"""WPS AP and UPnP event subscription and many events"""
|
||||
ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
|
||||
hapd = add_ssdp_ap(apdev[0]['ifname'], ap_uuid)
|
||||
|
||||
location = ssdp_get_location(ap_uuid)
|
||||
urls = upnp_get_urls(location)
|
||||
eventurl = urlparse.urlparse(urls['event_sub_url'])
|
||||
|
||||
class WPSERHTTPServer(SocketServer.StreamRequestHandler):
|
||||
def handle(self):
|
||||
data = self.rfile.readline().strip()
|
||||
logger.debug(data)
|
||||
self.wfile.write(gen_wps_event())
|
||||
|
||||
server = MyTCPServer(("127.0.0.1", 12345), WPSERHTTPServer)
|
||||
server.timeout = 1
|
||||
|
||||
url = urlparse.urlparse(location)
|
||||
conn = httplib.HTTPConnection(url.netloc)
|
||||
|
||||
headers = { "callback": '<http://127.0.0.1:12345/event>',
|
||||
"NT": "upnp:event",
|
||||
"timeout": "Second-1234" }
|
||||
conn.request("SUBSCRIBE", eventurl.path, "\r\n\r\n", headers)
|
||||
resp = conn.getresponse()
|
||||
if resp.status != 200:
|
||||
raise Exception("Unexpected HTTP response: %d" % resp.status)
|
||||
sid = resp.getheader("sid")
|
||||
logger.debug("Subscription SID " + sid)
|
||||
|
||||
# Fetch the first event message
|
||||
server.handle_request()
|
||||
|
||||
# Force subscription event queue to reach the maximum length by generating
|
||||
# new proxied events without the ER fetching any of the pending events.
|
||||
dev[1].scan_for_bss(apdev[0]['bssid'], freq=2412)
|
||||
dev[2].scan_for_bss(apdev[0]['bssid'], freq=2412)
|
||||
for i in range(16):
|
||||
dev[1].dump_monitor()
|
||||
dev[2].dump_monitor()
|
||||
dev[1].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670")
|
||||
dev[2].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670")
|
||||
dev[1].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
|
||||
dev[1].request("WPS_CANCEL")
|
||||
dev[2].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 5)
|
||||
dev[2].request("WPS_CANCEL")
|
||||
if i % 4 == 1:
|
||||
time.sleep(1)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
|
||||
hapd.request("WPS_PIN any 12345670")
|
||||
dev[1].dump_monitor()
|
||||
dev[1].request("WPS_PIN " + apdev[0]['bssid'] + " 12345670")
|
||||
ev = dev[1].wait_event(["WPS-SUCCESS"], timeout=10)
|
||||
if ev is None:
|
||||
raise Exception("WPS success not reported")
|
||||
|
||||
# Close the WPS ER HTTP server without fetching all the pending events.
|
||||
# This tests hostapd code path that clears subscription and the remaining
|
||||
# event queue when the interface is deinitialized.
|
||||
server.handle_request()
|
||||
server.server_close()
|
||||
|
||||
dev[1].wait_connected()
|
||||
|
||||
def test_ap_wps_upnp_http_proto(dev, apdev):
|
||||
"""WPS AP and UPnP/HTTP protocol testing"""
|
||||
ap_uuid = "27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"
|
||||
|
Loading…
Reference in New Issue
Block a user