fragattacks/research/fragattack.py
Mathy Vanhoef b7a520637e fragattacks: do not use format strings
There are only supported on Python 3.6 and above. With openwifi we
likely need to support an older Python version.

This patch may be reverted in the future once support for older
Python versions is no longer needed.
2022-10-16 18:44:11 +02:00

275 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (c) 2020, Mathy Vanhoef <mathy.vanhoef@nyu.edu>
#
# This code may be distributed under the terms of the BSD license.
# See README for more details.
# Note that tests_*.py files are imported automatically
import glob, importlib, argparse
from fraginternals import *
# ----------------------------------- Main Function -----------------------------------
def cleanup():
daemon.stop()
def char2trigger(c):
if c == 'S': return Action.StartAuth
elif c == 'B': return Action.BeforeAuth
elif c == 'A': return Action.AfterAuth
elif c == 'C': return Action.Connected
else: raise Exception("Unknown trigger character " + c)
def stract2action(stract):
"""Parse a single trigger and action pair"""
if len(stract) == 1:
trigger = Action.Connected
c = stract[0]
else:
trigger = char2trigger(stract[0])
c = stract[1]
if c == 'I':
return Action(trigger, action=Action.GetIp)
elif c == 'F':
return Action(trigger, action=Action.Rekey)
elif c == 'R':
return Action(trigger, action=Action.Reconnect)
elif c == 'P':
return Action(trigger, enc=False)
elif c == 'E':
return Action(trigger, enc=True)
elif c == 'D':
# Note: the trigger condition of MetaDrop is ignored
return Action(meta_action=Action.MetaDrop)
raise Exception("Unrecognized action")
def str2actions(stractions, default):
"""Parse a list of trigger and action pairs"""
if stractions != None:
return [stract2action(stract) for stract in stractions.split(",")]
else:
return default
def prepare_tests(opt):
# --------------- Main Tests ---------------
stractions = opt.actions
if opt.testname == "ping":
actions = str2actions(stractions,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True)])
test = PingTest(REQ_ICMP, actions, opt=opt)
elif opt.testname == "ping-frag-sep":
# Check if we can send frames in between fragments. The seperator by default uses a different
# QoS TID. The second fragment of the ping request will NOT have an incremental PN compared
# to the first fragment. So this test fails if the receivers checks for consecutive PNs. By adding
# `--pn-per-qos` both ping fragments will use consecutive PNs, but this will only be accepted if
# the receiver checks PNs for each QoS TID separately.
# By overriding the TID to 2 you can check whether fragments are cached for multiple sequence numbers
# in one TID (since all fragments use the same TID but the ping fragments use a different sequence
# number compared to the seperator).
tid = 1 if stractions == None else int(stractions)
separator = Dot11(type="Data", subtype=8, SC=(33 << 4) | 0)/Dot11QoS(TID=tid)/LLC()/SNAP()
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True)],
separate_with=separator, opt=opt)
elif opt.testname in ["eapol-inject", "eapol-inject-large"]:
large = opt.testname.endswith("-large")
test = ForwardTest(eapol=True, dst=stractions, large=large)
elif opt.testname in ["eapol-amsdu", "eapol-amsdu-bad"]:
freebsd = opt.testname.endswith("-bad")
actions = str2actions(stractions,
[Action(Action.StartAuth, enc=False),
Action(Action.StartAuth, enc=False)])
test = EapolAmsduTest(REQ_ICMP, actions, freebsd, opt)
elif opt.testname == "linux-plain":
decoy_tid = None if stractions == None else int(stractions)
test = LinuxTest(REQ_ICMP, decoy_tid)
elif opt.testname in ["amsdu-inject", "amsdu-inject-bad"]:
malformed = opt.testname.endswith("-bad")
test = AmsduInject(REQ_ICMP, malformed)
elif opt.testname == "eapfrag":
actions = str2actions(stractions,
[Action(Action.StartAuth, enc=False),
Action(Action.StartAuth, enc=False)])
test = BcastEapFragTest(REQ_ICMP, actions, opt.bcast_dst)
elif opt.testname == "wep-mixed-key":
log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange")
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
# On a WEP key rotation we get a Connected event. So wait for that.
Action(Action.AfterAuth, enc=True),
])
# --------------- Research Tests ---------------
elif opt.testname == "forward":
test = ForwardTest(eapol=False, dst=stractions)
elif opt.testname == "qca-test":
test = QcaDriverTest()
elif opt.testname == "qca-split":
test = QcaTestSplit()
elif opt.testname == "qca-rekey":
test = QcaDriverRekey()
# No valid test ID/name was given
else: return None
# If requested, override delay and inc_pn parameters in the test.
test.set_general_options(opt.delay, opt.inc_pn, opt.pre_test_delay)
# If requested, override the ptype
if opt.ptype != None:
if not hasattr(test, "ptype"):
log(WARNING, "Cannot override request type of the selected test.")
quit(1)
test.ptype = opt.ptype
return test
def args2ptype(args):
# Only one of these should be given
if args.arp + args.dhcp + args.icmp + args.ipv6 > 1:
log(STATUS, "You cannot combine --arp, --dhcp, --ipv6, or --icmp. Please only supply one of them.")
quit(1)
if args.arp: return REQ_ARP
if args.dhcp: return REQ_DHCP
if args.icmp: return REQ_ICMP
if args.ipv6: return REQ_ICMPv6_RA
if args.udp: return REQ_UDP
return None
def args2msdu(args):
# Only one of these should be given
if args.amsdu + args.amsdu_fake > 1:
log(STATUS, "You cannot combine --amsdu and --amsdu-fake. Please only supply one of them.")
quit(1)
if args.amsdu: return 1
if args.amsdu_fake: return 2
return None
def get_expected_scapy_ver():
for line in open("requirements.txt"):
if line.startswith("scapy=="):
return line[7:].strip()
return None
if __name__ == "__main__":
log(STATUS, "This is FragAttack version {}.".format(FRAGVERSION))
parser = argparse.ArgumentParser(description="Test for fragmentation vulnerabilities (version {}).".format(FRAGVERSION))
parser.add_argument('iface', help="Interface to use for the tests.")
parser.add_argument('testname', help="Name or identifier of the test to run.")
parser.add_argument('actions', nargs='?', help="Optional textual descriptions of actions")
parser.add_argument('--inject', default=None, help="Interface to use to inject frames.")
parser.add_argument('--inject-test', default=None, help="Use given interface to test injection through monitor interface.")
parser.add_argument('--inject-test-postauth', default=None, help="Same as --inject-test but run the test after authenticating.")
parser.add_argument('--hwsim', default=None, help="Use provided interface in monitor mode, and simulate AP/client through hwsim.")
parser.add_argument('--ip', help="IP we as a sender should use.")
parser.add_argument('--peerip', help="IP of the device we will test.")
parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.")
parser.add_argument('--debug', type=int, default=0, help="Debug output level.")
parser.add_argument('--delay', type=float, default=0, help="Delay between fragments in certain tests.")
parser.add_argument('--inc-pn', type=int, help="To test non-sequential packet number in fragments.")
parser.add_argument('--amsdu', default=False, action='store_true', help="Encapsulate pings in an A-MSDU frame.")
parser.add_argument('--amsdu-fake', default=False, action='store_true', help="Set A-MSDU flag but include normal payload.")
parser.add_argument('--amsdu-spp', '--amsdu-ssp', default=False, action='store_true', help="Force authentication of QoS A-MSDU flag.")
parser.add_argument('--arp', default=False, action='store_true', help="Override default request with ARP request.")
parser.add_argument('--dhcp', default=False, action='store_true', help="Override default request with DHCP discover.")
parser.add_argument('--icmp', default=False, action='store_true', help="Override default request with ICMP ping request.")
parser.add_argument('--ipv6', default=False, action='store_true', help="Override default request with ICMPv6 router advertisement.")
# TODO: Test the --udp option more
parser.add_argument('--udp', type=int, default=None, help="Override default request with UDP packet to the given port.")
parser.add_argument('--no-dhcp', default=False, action='store_true', help="Do not reply to DHCP requests as an AP.")
parser.add_argument('--icmp-size', type=int, default=None, help="Size of the ICMP ping request to send.")
parser.add_argument('--padding', type=int, default=None, help="Add padding data to ARP/DHCP/ICMP requests.")
parser.add_argument('--rekey-request', default=False, action='store_true', help="Actively request PTK rekey as client.")
parser.add_argument('--rekey-plaintext', default=False, action='store_true', help="Do PTK rekey with plaintext EAPOL frames.")
parser.add_argument('--rekey-early-install', default=False, action='store_true', help="Install PTK after sending Msg3 during rekey.")
parser.add_argument('--full-reconnect', default=False, action='store_true', help="Reconnect by deauthenticating first.")
parser.add_argument('--bcast-ra', default=False, action='store_true', help="Send pings using broadcast *receiver* address (= addr1).")
parser.add_argument('--bcast-dst', default=False, action='store_true', help="Send pings using broadcast *destination* when to AP ().")
# TODO: Properly test the --bad-mic option
parser.add_argument('--bad-mic', default=False, action='store_true', help="Send pings using an invalid authentication tag.")
parser.add_argument('--pn-per-qos', default=False, action='store_true', help="Use separate Tx packet counter for each QoS TID.")
parser.add_argument('--no-qos', default=False, action='store_true', help="Don't send QoS data frames (experimental - may break some tests).")
parser.add_argument('--freebsd-cache', default=False, action='store_true', help="Sent EAP(OL) frames as (malformed) broadcast EAPOL/A-MSDUs.")
parser.add_argument('--connected-delay', type=float, default=1, help="Second to wait after AfterAuth before triggering Connected event")
parser.add_argument('--pre-test-delay', type=int, default=0, help="Delay before launching the test")
parser.add_argument('--to-self', default=False, action='store_true', help="Send ARP/DHCP/ICMP with same src and dst MAC address.")
parser.add_argument('--no-drivercheck', default=False, action='store_true', help="Don't check if patched drivers are being used.")
parser.add_argument('--stay-up', default=False, action='store_true', help="Don't quit when test has finished.")
options = parser.parse_args()
# Check if we're using the expected scapy version
expected_ver = get_expected_scapy_ver()
if expected_ver!= None and scapy.VERSION != expected_ver:
log(WARNING, "You are using scapy version {} instead of the expected {}".format(scapy.VERSION, expected_ver))
log(WARNING, "Are you executing the script from inside the correct python virtual environment?")
# Default value for options that should not be command line parameters
options.inject_mf_workaround = False
# Sanity check and convert some arguments to more usable form
options.ptype = args2ptype(options)
options.as_msdu = args2msdu(options)
if options.pn_per_qos and options.no_qos:
log(STATUS, "Cannot specify option --pn-per-qos and --no-qos simultaneously.")
quit(1)
# Make the --inject-test-postauth flags easier to check
if options.inject_test_postauth != None:
options.inject_test = options.inject_test_postauth
options.inject_test_postauth = True
else:
options.inject_test_postauth = False
# Dynamically import tests depending on their availability in the directory,
# but ignore private variables. See https://stackoverflow.com/a/44492879
for test in glob("tests_*.py"):
module = importlib.import_module(test[:-3])
globals().update(
{n: getattr(module, n) for n in module.__all__} if hasattr(module, '__all__')
else
{k: v for (k, v) in module.__dict__.items() if not k.startswith('_')
})
# Construct the test
options.test = prepare_tests(options)
if options.test == None:
log(STATUS, "Test name '{}' not recognized. Specify a valid test case.".format(options.testname))
quit(1)
# Parse remaining options
change_log_level(-options.debug)
# Now start the tests --- TODO: Inject Deauths before connecting with client...
if options.ap:
daemon = Authenticator(options)
else:
daemon = Supplicant(options)
atexit.register(cleanup)
daemon.run()