fragattacks/research/fragattack.py

247 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
from fraginternals import *
from tests_common import *
from tests_qca import *
from tests_attacks import *
# ----------------------------------- Main Function -----------------------------------
def cleanup():
daemon.stop()
def char2trigger(c):
if c == 'S': return Action.StartAuth
elif c == 'B': return Action.BeforeAuth
elif c == 'A': return Action.AfterAuth
elif c == 'C': return Action.Connected
else: raise Exception("Unknown trigger character " + c)
def stract2action(stract):
if len(stract) == 1:
trigger = Action.Connected
c = stract[0]
else:
trigger = char2trigger(stract[0])
c = stract[1]
if c == 'I':
return Action(trigger, action=Action.GetIp)
elif c == 'R':
return Action(trigger, action=Action.Rekey)
elif c == 'C':
return Action(trigger, action=Action.Reconnect)
elif c == 'P':
return Action(trigger, enc=False)
elif c == 'E':
return Action(trigger, enc=True)
elif c == 'D':
return Action(meta_action=Action.MetaDrop)
raise Exception("Unrecognized action")
def prepare_tests(opt):
stractions = opt.actions
if opt.testname == "ping":
if stractions != None:
actions = [stract2action(stract) for stract in stractions.split(",")]
else:
actions = [Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True)]
test = PingTest(REQ_ICMP, actions, opt=opt)
elif opt.testname == "ping-frag-sep":
# Check if we can send frames in between fragments. The seperator by default uses a different
# QoS TID. The second fragment must use an incremental PN compared to the first fragment.
# So this also tests if the receivers uses a per-QoS receive replay counter. By overriding
# the TID you can check whether fragments are cached for multiple sequence numbers in one TID.
tid = 1 if stractions == None else int(stractions)
separator = Dot11(type="Data", subtype=8, SC=(33 << 4) | 0)/Dot11QoS(TID=tid)/LLC()/SNAP()
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True, inc_pn=0)],
separate_with=separator, opt=opt)
elif opt.testname == "wep-mixed-key":
log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange")
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
# On a WEP key rotation we get a Connected event. So wait for that.
Action(Action.AfterAuth, enc=True),
])
elif opt.testname == "cache-poison":
# Cache poison attack. Worked against Linux Hostapd and RT-AC51U.
test = PingTest(REQ_ICMP,
[Action(Action.Connected, enc=True),
Action(Action.Connected, action=Action.Reconnect),
Action(Action.AfterAuth, enc=True)])
elif opt.testname == "forward":
test = ForwardTest()
elif opt.testname == "eapol-amsdu":
freebsd = False
if stractions != None:
# TODO: Clean up this parsing / specification
stractions = stractions
if stractions.startswith("M,"):
freebsd = True
stractions = stractions[2:]
prefix, specific = stractions[:-3], stractions[-2:]
actions = []
if len(prefix) > 0:
actions = [stract2action(stract) for stract in prefix.split(",")]
actions += [Action(char2trigger(t), enc=False) for t in specific]
else:
actions = [Action(Action.StartAuth, enc=False),
Action(Action.StartAuth, enc=False)]
test = EapolAmsduTest(REQ_ICMP, actions, freebsd, opt)
elif opt.testname == "linux-plain":
decoy_tid = None if stractions == None else int(stractions)
test = LinuxTest(REQ_ICMP, decoy_tid)
elif opt.testname == "macos":
if stractions != None:
actions = [Action(char2trigger(t), enc=False) for t in stractions]
else:
actions = [Action(Action.StartAuth, enc=False),
Action(Action.StartAuth, enc=False)]
test = MacOsTest(REQ_ICMP, actions, opt.bcast_dst)
elif opt.testname == "qca-test":
test = QcaDriverTest()
elif opt.testname == "qca-split":
test = QcaTestSplit()
elif opt.testname == "qca-rekey":
test = QcaDriverRekey()
elif opt.testname == "amsdu-inject":
test = AmsduInject(REQ_ICMP, stractions)
# No valid test ID/name was given
else: return None
# -----------------------------------------------------------------------------------------
# XXX TODO : Hardware decrypts it using old key, software using new key?
# So right after rekey we inject first with old key, second with new key?
# XXX TODO : What about extended functionality where we can have
# two simultaneously pairwise keys?!?!
# TODO:
# - Test case to check if the receiver supports interleaved priority
# reception. It seems Windows 10 / Intel might not support this.
# - Test case with a very lage aggregated frame (which is normally not
# allowed but some may accept it). And a variation to check how APs
# will forward such overly large frame (e.g. force fragmentation).
# - [TKIP] Encrpted, Encrypted, no global MIC
# - Plain/Enc tests but first plaintext sent before installing key
# - Test fragmentation of management frames
# - Test fragmentation of group frames (STA mode of RT-AC51u?)
# If requested, override delay and inc_pn parameters in the test.
test.set_general_options(opt.delay, opt.inc_pn)
# If requested, override the ptype
if opt.ptype != None:
if not hasattr(test, "ptype"):
log(WARNING, "Cannot override request type of the selected test.")
quit(1)
test.ptype = opt.ptype
return test
def args2ptype(args):
# Only one of these should be given
if args.arp + args.dhcp + args.icmp + args.ipv6 > 1:
log(STATUS, "You cannot combine --arp, --dhcp, --ipv6, or --icmp. Please only supply one of them.")
quit(1)
if args.arp: return REQ_ARP
if args.dhcp: return REQ_DHCP
if args.icmp: return REQ_ICMP
if args.ipv6: return REQ_ICMPv6_RA
return None
def args2msdu(args):
# Only one of these should be given
if args.amsdu + args.fake_amsdu > 1:
log(STATUS, "You cannot combine --amsdu and --fake-amsdu. Please only supply one of them.")
quit(1)
if args.amsdu: return 1
if args.fake_amsdu: return 2
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test for fragmentation vulnerabilities.")
parser.add_argument('iface', help="Interface to use for the tests.")
parser.add_argument('testname', help="Name or identifier of the test to run.")
parser.add_argument('actions', nargs='?', help="Optional textual descriptions of actions")
parser.add_argument('--inject', default=None, help="Interface to use to inject frames.")
parser.add_argument('--inject-test', default=None, help="Use given interface to test injection through monitor interface.")
parser.add_argument('--inject-selftest', default=False, action='store_true', help="Partial injection test (checks kernel only).")
parser.add_argument('--hwsim', default=None, help="Use provided interface in monitor mode, and simulate AP/client through hwsim.")
parser.add_argument('--ip', help="IP we as a sender should use.")
parser.add_argument('--peerip', help="IP of the device we will test.")
parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.")
parser.add_argument('--debug', type=int, default=0, help="Debug output level.")
parser.add_argument('--delay', type=float, default=0, help="Delay between fragments in certain tests.")
parser.add_argument('--inc-pn', type=int, help="To test non-sequential packet number in fragments.")
parser.add_argument('--amsdu', default=False, action='store_true', help="Encapsulate pings in an A-MSDU frame.")
parser.add_argument('--fake-amsdu', default=False, action='store_true', help="Set A-MSDU flag but include normal payload.")
parser.add_argument('--arp', default=False, action='store_true', help="Override default request with ARP request.")
parser.add_argument('--dhcp', default=False, action='store_true', help="Override default request with DHCP discover.")
parser.add_argument('--icmp', default=False, action='store_true', help="Override default request with ICMP ping request.")
parser.add_argument('--ipv6', default=False, action='store_true', help="Override default request with ICMPv6 router advertisement.")
parser.add_argument('--no-dhcp', default=False, action='store_true', help="Do not reply to DHCP requests as an AP.")
parser.add_argument('--icmp-size', type=int, default=None, help="Second to wait after AfterAuth before triggering Connected event")
parser.add_argument('--padding', type=int, default=None, help="Add padding data to ARP/DHCP/ICMP requests.")
parser.add_argument('--rekey-request', default=False, action='store_true', help="Actively request PTK rekey as client.")
parser.add_argument('--rekey-plaintext', default=False, action='store_true', help="Do PTK rekey with plaintext EAPOL frames.")
parser.add_argument('--rekey-early-install', default=False, action='store_true', help="Install PTK after sending Msg3 during rekey.")
parser.add_argument('--full-reconnect', default=False, action='store_true', help="Reconnect by deauthenticating first.")
parser.add_argument('--bcast-ra', default=False, action='store_true', help="Send pings using broadcast *receiver* address (= addr1).")
parser.add_argument('--bcast-dst', default=False, action='store_true', help="Send pings using broadcast *destination* when to AP ().")
parser.add_argument('--pn-per-qos', default=False, action='store_true', help="Use separate Tx packet counter for each QoS TID.")
parser.add_argument('--freebsd-cache', default=False, action='store_true', help="Sent EAP(OL) frames as (malformed) broadcast EAPOL/A-MSDUs.")
parser.add_argument('--connected-delay', type=int, default=1, help="Second to wait after AfterAuth before triggering Connected event")
parser.add_argument('--to-self', default=False, action='store_true', help="Send ARP/DHCP/ICMP with same src and dst MAC address.")
options = parser.parse_args()
# Default value for options that should not be command line parameters
options.inject_mf_workaround = False
# Sanity check and convert some arguments to more usable form
options.ptype = args2ptype(options)
options.as_msdu = args2msdu(options)
# Construct the test
options.test = prepare_tests(options)
if options.test == None:
log(STATUS, f"Test name/id '{options.testname}' not recognized. Specify a valid test case.")
quit(1)
# Parse remaining options
change_log_level(-options.debug)
# Now start the tests --- TODO: Inject Deauths before connecting with client...
if options.ap:
daemon = Authenticator(options)
else:
daemon = Supplicant(options)
atexit.register(cleanup)
daemon.run()