fragattacks: initial support for hwsim simulation

This commit is contained in:
Mathy Vanhoef 2020-05-12 18:33:44 +04:00 committed by Mathy Vanhoef
parent 70f2cc33b7
commit 304d7871ce
2 changed files with 65 additions and 27 deletions

View File

@ -6,6 +6,7 @@ import argparse
import os.path
from wpaspy import Ctrl
from scapy.contrib.wpa_eapol import WPA_key
from scapy.arch.common import get_if_raw_hwaddr
from tests_qca import *
@ -137,14 +138,7 @@ def set_monitor_mode(iface):
time.sleep(0.5)
subprocess.check_output(["iw", iface, "set", "type", "monitor"])
subprocess.check_output(["ifconfig", iface, "up"])
def get_channel(iface):
output = str(subprocess.check_output(["iw", iface, "info"]))
p = re.compile("channel (\d+)")
return int(p.search(output).group(1))
def set_channel(iface, channel):
subprocess.check_output(["iw", iface, "set", "channel", str(channel)])
subprocess.check_output(["ifconfig", iface, "mtu", "2200"])
# ----------------------------------- Injection Tests -----------------------------------
@ -938,11 +932,13 @@ class Daemon(metaclass=abc.ABCMeta):
self.nic_iface = None
self.nic_mon = None
self.nic_hwsim = None
self.performed_injection_selftest = False
self.process = None
self.sock_eth = None
self.sock_mon = None
self.sock_hwsim = None
@abc.abstractmethod
def start_daemon(self):
@ -987,15 +983,22 @@ class Daemon(metaclass=abc.ABCMeta):
subprocess.check_output(["rfkill", "unblock", "wifi"])
self.nic_iface = options.iface
# 1. Set or create the monitor mode interface
if self.options.inject:
# 1. Assign/create interfaces according to provided options
if self.options.hwsim:
# TODO: Automatically create both interfaces?
self.nic_iface, self.nic_hwsim = self.options.hwsim.split(",")
self.nic_mon = options.iface
set_macaddress(self.nic_iface, get_macaddress(self.nic_mon))
log(WARNING, f"Note: you must manually set {self.nic_mon} on the channel of the AP")
elif self.options.inject:
# Use the provided interface to monitor/inject frames
self.nic_mon = self.options.inject
# Avoid the monitor interface from retransmitting frames? This was not needed for the
# ath9k_htc and intel/mvm device that I tested. Perhaps for others it might be needed.
#subprocess.call(["ifconfig", self.nic_mon, "down"])
#subprocess.call(["macchanger", "-m", scapy.arch.get_if_hwaddr(self.nic_iface), self.nic_mon])
#subprocess.call(["macchanger", "-m", get_macaddress(self.nic_iface), self.nic_mon])
else:
# Create second virtual interface in monitor mode. Note: some kernels
@ -1009,11 +1012,7 @@ class Daemon(metaclass=abc.ABCMeta):
subprocess.call(["iw", self.nic_mon, "del"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
subprocess.check_output(["iw", self.nic_iface, "interface", "add", self.nic_mon, "type", "monitor"])
# 2. Enable monitor mode
set_monitor_mode(self.nic_mon)
log(STATUS, f"Using interface {self.nic_mon} to inject frames.")
# 3. Remember whether to need to perform a workaround.
# Remember whether to need to perform a workaround.
driver = get_device_driver(self.nic_iface)
if driver == None:
log(WARNING, "Unable to detect driver of interface!")
@ -1022,6 +1021,14 @@ class Daemon(metaclass=abc.ABCMeta):
options.inject_workaround = True
log(STATUS, "Detected ath9k_htc, using injection bug workarounds")
log(WARNING, "Remember to use a modified backports and ath9k_htc firmware!")
# 2. Enable monitor mode
set_monitor_mode(self.nic_mon)
log(STATUS, f"Using interface {self.nic_mon} to inject frames.")
if self.nic_hwsim:
set_monitor_mode(self.nic_hwsim)
def inject_mon(self, p):
self.sock_mon.send(p)
@ -1062,6 +1069,17 @@ class Daemon(metaclass=abc.ABCMeta):
log(DEBUG, f"Passed injection self-test on interface {self.nic_mon}.")
self.performed_injection_selftest = True
def forward_hwsim(self, p, s):
if p == None: return
if not Dot11 in p: return
if p.type != 0 and p.type != 2: return
if len(p) >= 2200:
log(DEBUG, f"Cannot forward frame longer than MTU (length {len(p)}).")
return
s.send(p)
def run(self):
self.configure_interfaces()
@ -1070,18 +1088,28 @@ class Daemon(metaclass=abc.ABCMeta):
self.start_daemon()
self.sock_mon = MonitorSocket(type=ETH_P_ALL, iface=self.nic_mon)
self.sock_eth = L2Socket(type=ETH_P_ALL, iface=self.nic_iface)
self.sock_mon = MonitorSocket(type=ETH_P_ALL, iface=self.nic_mon)
if self.nic_hwsim:
self.sock_hwsim = MonitorSocket(type=ETH_P_ALL, iface=self.nic_hwsim)
# Post-startup configuration of the supplicant or AP
self.configure_daemon()
# Monitor the virtual monitor interface of the client and perform the needed actions
sockets = [self.sock_mon, self.sock_eth, self.wpaspy_ctrl.s]
if self.sock_hwsim: sockets.append(self.sock_hwsim)
while True:
sel = select.select([self.sock_mon, self.sock_eth, self.wpaspy_ctrl.s], [], [], 0.5)
sel = select.select(sockets, [], [], 0.5)
if self.sock_hwsim in sel[0]:
p = self.sock_hwsim.recv()
if p != None: self.forward_hwsim(p, self.sock_mon)
if self.sock_mon in sel[0]:
p = self.sock_mon.recv()
if p != None: self.handle_mon(p)
if self.sock_hwsim:
self.forward_hwsim(p, self.sock_hwsim)
if self.sock_eth in sel[0]:
p = self.sock_eth.recv()
@ -1222,7 +1250,7 @@ class Authenticator(Daemon):
raise
self.connect_wpaspy()
self.apmac = scapy.arch.get_if_hwaddr(self.nic_iface)
self.apmac = get_macaddress(self.nic_iface)
def configure_daemon(self):
# Intercept EAPOL packets that the AP wants to send
@ -1249,6 +1277,7 @@ class Authenticator(Daemon):
if self.options.inject:
channel = get_channel(self.nic_iface)
set_channel(self.nic_mon, channel)
self.injection_selftest()
@ -1370,7 +1399,7 @@ class Supplicant(Daemon):
self.station.handle_eth(p)
def handle_wpaspy(self, msg):
log(STATUS, "daemon: " + msg)
log(DEBUG, "daemon: " + msg)
if "WPA: Key negotiation completed with" in msg:
# This get's the current keys
@ -1381,6 +1410,16 @@ class Supplicant(Daemon):
if self.options.inject:
channel = get_channel(self.nic_iface)
set_channel(self.nic_mon, channel)
log(STATUS, "{self.nic_mon}: setting to channel {channel}")
elif self.options.hwsim:
# FIXME: There is some delay, causing the first authentication to fail
channel = get_channel(self.nic_iface)
set_channel(self.nic_mon, channel)
set_channel(self.nic_hwsim, channel)
log(STATUS, f"{self.nic_mon}: setting to channel {channel}")
log(STATUS, f"{self.nic_hwsim}: setting to channel {channel}")
self.injection_selftest()
p = re.compile("Trying to authenticate with (.*) \(SSID")
@ -1612,14 +1651,13 @@ def args2msdu(args):
return None
if __name__ == "__main__":
log(WARNING, "Remember to use a modified backports and ath9k_htc firmware!")
parser = argparse.ArgumentParser(description="Test for fragmentation vulnerabilities.")
parser.add_argument('iface', help="Interface to use for the tests.")
parser.add_argument('testname', help="Name or identifier of the test to run.")
parser.add_argument('actions', nargs='?', help="Optional textual descriptions of actions")
parser.add_argument('--inject', default=None, help="Interface to use to inject frames.")
parser.add_argument('--inject-test', default=None, help="Use main interface to test injection through this interface.")
parser.add_argument('--hwsim', default=None, help="Use provided interface in monitor mode, and simulate AP/client through hwsim.")
parser.add_argument('--ip', help="IP we as a sender should use.")
parser.add_argument('--peerip', help="IP of the device we will test.")
parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.")

@ -1 +1 @@
Subproject commit 95dc6b7e32b63c1e2efa50f2ba60fca1b025dca7
Subproject commit 0665c34816c64f50e8a7c3dd7b9b4134bd0d1992