mirror of
https://github.com/vanhoefm/fragattacks.git
synced 2025-02-21 03:23:04 -05:00
tests: Extend RADIUS protocol testing coverage
Test RADIUS client behavior with various invalid Access-Accept messages. Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
parent
737754dc2b
commit
a47f815ff3
@ -4,9 +4,11 @@
|
|||||||
# This software may be distributed under the terms of the BSD license.
|
# This software may be distributed under the terms of the BSD license.
|
||||||
# See README for more details.
|
# See README for more details.
|
||||||
|
|
||||||
|
import hmac
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
import select
|
import select
|
||||||
|
import struct
|
||||||
import subprocess
|
import subprocess
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@ -593,8 +595,8 @@ def test_radius_failover(dev, apdev):
|
|||||||
if req_e <= req_s:
|
if req_e <= req_s:
|
||||||
raise Exception("Unexpected RADIUS server acct MIB value")
|
raise Exception("Unexpected RADIUS server acct MIB value")
|
||||||
|
|
||||||
def run_pyrad_server(srv, stop_event):
|
def run_pyrad_server(srv, t_events):
|
||||||
srv.RunWithStop(stop_event)
|
srv.RunWithStop(t_events)
|
||||||
|
|
||||||
def test_radius_protocol(dev, apdev):
|
def test_radius_protocol(dev, apdev):
|
||||||
"""RADIUS Authentication protocol tests with a fake server"""
|
"""RADIUS Authentication protocol tests with a fake server"""
|
||||||
@ -611,16 +613,41 @@ def test_radius_protocol(dev, apdev):
|
|||||||
logger.info("Received authentication request")
|
logger.info("Received authentication request")
|
||||||
reply = self.CreateReplyPacket(pkt)
|
reply = self.CreateReplyPacket(pkt)
|
||||||
reply.code = pyrad.packet.AccessAccept
|
reply.code = pyrad.packet.AccessAccept
|
||||||
# TODO: Add attributes. For now, this works as a test case for
|
if self.t_events['msg_auth'].is_set():
|
||||||
# missing Message-Authenticator.
|
logger.info("Add Message-Authenticator")
|
||||||
|
if self.t_events['wrong_secret'].is_set():
|
||||||
|
logger.info("Use incorrect RADIUS shared secret")
|
||||||
|
pw = "incorrect"
|
||||||
|
else:
|
||||||
|
pw = reply.secret
|
||||||
|
hmac_obj = hmac.new(pw)
|
||||||
|
hmac_obj.update(struct.pack("B", reply.code))
|
||||||
|
hmac_obj.update(struct.pack("B", reply.id))
|
||||||
|
|
||||||
|
# reply attributes
|
||||||
|
reply.AddAttribute("Message-Authenticator",
|
||||||
|
"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
|
||||||
|
attrs = reply._PktEncodeAttributes()
|
||||||
|
|
||||||
|
# Length
|
||||||
|
flen = 4 + 16 + len(attrs)
|
||||||
|
hmac_obj.update(struct.pack(">H", flen))
|
||||||
|
hmac_obj.update(pkt.authenticator)
|
||||||
|
hmac_obj.update(attrs)
|
||||||
|
if self.t_events['double_msg_auth'].is_set():
|
||||||
|
logger.info("Include two Message-Authenticator attributes")
|
||||||
|
else:
|
||||||
|
del reply[80]
|
||||||
|
reply.AddAttribute("Message-Authenticator", hmac_obj.digest())
|
||||||
self.SendReplyPacket(pkt.fd, reply)
|
self.SendReplyPacket(pkt.fd, reply)
|
||||||
|
|
||||||
def RunWithStop(self, stop_event):
|
def RunWithStop(self, t_events):
|
||||||
self._poll = select.poll()
|
self._poll = select.poll()
|
||||||
self._fdmap = {}
|
self._fdmap = {}
|
||||||
self._PrepareSockets()
|
self._PrepareSockets()
|
||||||
|
self.t_events = t_events
|
||||||
|
|
||||||
while not stop_event.is_set():
|
while not t_events['stop'].is_set():
|
||||||
for (fd, event) in self._poll.poll(1000):
|
for (fd, event) in self._poll.poll(1000):
|
||||||
if event == select.POLLIN:
|
if event == select.POLLIN:
|
||||||
try:
|
try:
|
||||||
@ -639,8 +666,12 @@ def test_radius_protocol(dev, apdev):
|
|||||||
"radius",
|
"radius",
|
||||||
"localhost")
|
"localhost")
|
||||||
srv.BindToAddress("")
|
srv.BindToAddress("")
|
||||||
t_stop = threading.Event()
|
t_events = {}
|
||||||
t = threading.Thread(target=run_pyrad_server, args=(srv, t_stop))
|
t_events['stop'] = threading.Event()
|
||||||
|
t_events['msg_auth'] = threading.Event()
|
||||||
|
t_events['wrong_secret'] = threading.Event()
|
||||||
|
t_events['double_msg_auth'] = threading.Event()
|
||||||
|
t = threading.Thread(target=run_pyrad_server, args=(srv, t_events))
|
||||||
t.start()
|
t.start()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -652,6 +683,25 @@ def test_radius_protocol(dev, apdev):
|
|||||||
if ev is None:
|
if ev is None:
|
||||||
raise Exception("Timeout on EAP start")
|
raise Exception("Timeout on EAP start")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
dev[0].request("REMOVE_NETWORK all")
|
||||||
|
time.sleep(0.1)
|
||||||
|
dev[0].dump_monitor()
|
||||||
|
t_events['msg_auth'].set()
|
||||||
|
t_events['wrong_secret'].set()
|
||||||
|
connect(dev[0], "radius-test", wait_connect=False)
|
||||||
|
time.sleep(1)
|
||||||
|
dev[0].request("REMOVE_NETWORK all")
|
||||||
|
time.sleep(0.1)
|
||||||
|
dev[0].dump_monitor()
|
||||||
|
t_events['wrong_secret'].clear()
|
||||||
|
connect(dev[0], "radius-test", wait_connect=False)
|
||||||
|
time.sleep(1)
|
||||||
|
dev[0].request("REMOVE_NETWORK all")
|
||||||
|
time.sleep(0.1)
|
||||||
|
dev[0].dump_monitor()
|
||||||
|
t_events['double_msg_auth'].set()
|
||||||
|
connect(dev[0], "radius-test", wait_connect=False)
|
||||||
|
time.sleep(1)
|
||||||
finally:
|
finally:
|
||||||
t_stop.set()
|
t_events['stop'].set()
|
||||||
t.join()
|
t.join()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user