#!/usr/bin/env python3 from fraginternals import * from tests_common import * from tests_qca import * from tests_attacks import * # ----------------------------------- Main Function ----------------------------------- def cleanup(): daemon.stop() def char2trigger(c): if c == 'S': return Action.StartAuth elif c == 'B': return Action.BeforeAuth elif c == 'A': return Action.AfterAuth elif c == 'C': return Action.Connected else: raise Exception("Unknown trigger character " + c) def stract2action(stract): if len(stract) == 1: trigger = Action.Connected c = stract[0] else: trigger = char2trigger(stract[0]) c = stract[1] if c == 'I': return Action(trigger, action=Action.GetIp) elif c == 'R': return Action(trigger, action=Action.Rekey) elif c == 'C': return Action(trigger, action=Action.Reconnect) elif c == 'P': return Action(trigger, enc=False) elif c == 'E': return Action(trigger, enc=True) elif c == 'D': return Action(meta_action=Action.MetaDrop) raise Exception("Unrecognized action") def prepare_tests(opt): stractions = opt.actions if opt.testname == "ping": if stractions != None: actions = [stract2action(stract) for stract in stractions.split(",")] else: actions = [Action(Action.Connected, action=Action.GetIp), Action(Action.Connected, enc=True)] test = PingTest(REQ_ICMP, actions, opt=opt) elif opt.testname == "ping-frag-sep": # Check if we can send frames in between fragments. The seperator by default uses a different # QoS TID. The second fragment must use an incremental PN compared to the first fragment. # So this also tests if the receivers uses a per-QoS receive replay counter. By overriding # the TID you can check whether fragments are cached for multiple sequence numbers in one TID. tid = 1 if stractions == None else int(stractions) separator = Dot11(type="Data", subtype=8, SC=(33 << 4) | 0)/Dot11QoS(TID=tid)/LLC()/SNAP() test = PingTest(REQ_ICMP, [Action(Action.Connected, action=Action.GetIp), Action(Action.Connected, enc=True), Action(Action.Connected, enc=True, inc_pn=0)], separate_with=separator, opt=opt) elif opt.testname == "wep-mixed-key": log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange") test = PingTest(REQ_ICMP, [Action(Action.Connected, action=Action.GetIp), Action(Action.Connected, enc=True), # On a WEP key rotation we get a Connected event. So wait for that. Action(Action.AfterAuth, enc=True), ]) elif opt.testname == "cache-poison": # Cache poison attack. Worked against Linux Hostapd and RT-AC51U. test = PingTest(REQ_ICMP, [Action(Action.Connected, enc=True), Action(Action.Connected, action=Action.Reconnect), Action(Action.AfterAuth, enc=True)]) elif opt.testname == "forward": test = ForwardTest() elif opt.testname == "eapol-amsdu": freebsd = False if stractions != None: # TODO: Clean up this parsing / specification stractions = stractions if stractions.startswith("M,"): freebsd = True stractions = stractions[2:] prefix, specific = stractions[:-3], stractions[-2:] actions = [] if len(prefix) > 0: actions = [stract2action(stract) for stract in prefix.split(",")] actions += [Action(char2trigger(t), enc=False) for t in specific] else: actions = [Action(Action.StartAuth, enc=False), Action(Action.StartAuth, enc=False)] test = EapolAmsduTest(REQ_ICMP, actions, freebsd, opt) elif opt.testname == "linux-plain": decoy_tid = None if stractions == None else int(stractions) test = LinuxTest(REQ_ICMP, decoy_tid) elif opt.testname == "macos": if stractions != None: actions = [Action(char2trigger(t), enc=False) for t in stractions] else: actions = [Action(Action.StartAuth, enc=False), Action(Action.StartAuth, enc=False)] test = MacOsTest(REQ_ICMP, actions, opt.bcast_dst) elif opt.testname == "qca-test": test = QcaDriverTest() elif opt.testname == "qca-split": test = QcaTestSplit() elif opt.testname == "qca-rekey": test = QcaDriverRekey() elif opt.testname == "amsdu-inject": test = AmsduInject(REQ_ICMP, stractions) # No valid test ID/name was given else: return None # ----------------------------------------------------------------------------------------- # XXX TODO : Hardware decrypts it using old key, software using new key? # So right after rekey we inject first with old key, second with new key? # XXX TODO : What about extended functionality where we can have # two simultaneously pairwise keys?!?! # TODO: # - Test case to check if the receiver supports interleaved priority # reception. It seems Windows 10 / Intel might not support this. # - Test case with a very lage aggregated frame (which is normally not # allowed but some may accept it). And a variation to check how APs # will forward such overly large frame (e.g. force fragmentation). # - [TKIP] Encrpted, Encrypted, no global MIC # - Plain/Enc tests but first plaintext sent before installing key # - Test fragmentation of management frames # - Test fragmentation of group frames (STA mode of RT-AC51u?) # If requested, override delay and inc_pn parameters in the test. test.set_general_options(opt.delay, opt.inc_pn) # If requested, override the ptype if opt.ptype != None: if not hasattr(test, "ptype"): log(WARNING, "Cannot override request type of the selected test.") quit(1) test.ptype = opt.ptype return test def args2ptype(args): # Only one of these should be given if args.arp + args.dhcp + args.icmp + args.ipv6 > 1: log(STATUS, "You cannot combine --arp, --dhcp, --ipv6, or --icmp. Please only supply one of them.") quit(1) if args.arp: return REQ_ARP if args.dhcp: return REQ_DHCP if args.icmp: return REQ_ICMP if args.ipv6: return REQ_ICMPv6_RA return None def args2msdu(args): # Only one of these should be given if args.amsdu + args.fake_amsdu > 1: log(STATUS, "You cannot combine --amsdu and --fake-amsdu. Please only supply one of them.") quit(1) if args.amsdu: return 1 if args.fake_amsdu: return 2 return None if __name__ == "__main__": parser = argparse.ArgumentParser(description="Test for fragmentation vulnerabilities.") parser.add_argument('iface', help="Interface to use for the tests.") parser.add_argument('testname', help="Name or identifier of the test to run.") parser.add_argument('actions', nargs='?', help="Optional textual descriptions of actions") parser.add_argument('--inject', default=None, help="Interface to use to inject frames.") parser.add_argument('--inject-test', default=None, help="Use given interface to test injection through monitor interface.") parser.add_argument('--inject-selftest', default=False, action='store_true', help="Partial injection test (checks kernel only).") parser.add_argument('--hwsim', default=None, help="Use provided interface in monitor mode, and simulate AP/client through hwsim.") parser.add_argument('--ip', help="IP we as a sender should use.") parser.add_argument('--peerip', help="IP of the device we will test.") parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.") parser.add_argument('--debug', type=int, default=0, help="Debug output level.") parser.add_argument('--delay', type=float, default=0, help="Delay between fragments in certain tests.") parser.add_argument('--inc-pn', type=int, help="To test non-sequential packet number in fragments.") parser.add_argument('--amsdu', default=False, action='store_true', help="Encapsulate pings in an A-MSDU frame.") parser.add_argument('--fake-amsdu', default=False, action='store_true', help="Set A-MSDU flag but include normal payload.") parser.add_argument('--arp', default=False, action='store_true', help="Override default request with ARP request.") parser.add_argument('--dhcp', default=False, action='store_true', help="Override default request with DHCP discover.") parser.add_argument('--icmp', default=False, action='store_true', help="Override default request with ICMP ping request.") parser.add_argument('--ipv6', default=False, action='store_true', help="Override default request with ICMPv6 router advertisement.") parser.add_argument('--no-dhcp', default=False, action='store_true', help="Do not reply to DHCP requests as an AP.") parser.add_argument('--icmp-size', type=int, default=None, help="Second to wait after AfterAuth before triggering Connected event") parser.add_argument('--padding', type=int, default=None, help="Add padding data to ARP/DHCP/ICMP requests.") parser.add_argument('--rekey-request', default=False, action='store_true', help="Actively request PTK rekey as client.") parser.add_argument('--rekey-plaintext', default=False, action='store_true', help="Do PTK rekey with plaintext EAPOL frames.") parser.add_argument('--rekey-early-install', default=False, action='store_true', help="Install PTK after sending Msg3 during rekey.") parser.add_argument('--full-reconnect', default=False, action='store_true', help="Reconnect by deauthenticating first.") parser.add_argument('--bcast-ra', default=False, action='store_true', help="Send pings using broadcast *receiver* address (= addr1).") parser.add_argument('--bcast-dst', default=False, action='store_true', help="Send pings using broadcast *destination* when to AP ().") parser.add_argument('--pn-per-qos', default=False, action='store_true', help="Use separate Tx packet counter for each QoS TID.") parser.add_argument('--freebsd-cache', default=False, action='store_true', help="Sent EAP(OL) frames as (malformed) broadcast EAPOL/A-MSDUs.") parser.add_argument('--connected-delay', type=float, default=1, help="Second to wait after AfterAuth before triggering Connected event") parser.add_argument('--to-self', default=False, action='store_true', help="Send ARP/DHCP/ICMP with same src and dst MAC address.") options = parser.parse_args() # Default value for options that should not be command line parameters options.inject_mf_workaround = False # Sanity check and convert some arguments to more usable form options.ptype = args2ptype(options) options.as_msdu = args2msdu(options) # Construct the test options.test = prepare_tests(options) if options.test == None: log(STATUS, f"Test name/id '{options.testname}' not recognized. Specify a valid test case.") quit(1) # Parse remaining options change_log_level(-options.debug) # Now start the tests --- TODO: Inject Deauths before connecting with client... if options.ap: daemon = Authenticator(options) else: daemon = Supplicant(options) atexit.register(cleanup) daemon.run()