fragattack: several changes and injection self-test functionality

This commit is contained in:
Mathy Vanhoef 2020-05-11 22:54:13 +04:00 committed by Mathy Vanhoef
parent 0b83439fdb
commit 70f2cc33b7

View File

@ -28,6 +28,16 @@ from tests_qca import *
# driver to send all frames using the transmission queue of priority zero,
# independent of the actual QoS priority value used in the frame.
# Intel 8265 / 8275 (rev 78)
# - Had to patch driver to prevent sequence number and QoS TID to be overwritten.
# - Unable to transmit any frames from a different transmitter address. This is
# because in ieee80211_monitor_start_xmit it cannot find a channel to transmit
# on (finding a valid chandef fails).
# - Cannot inject frames using a TID that is used for the first time. There's no
# queue in the driver allocated for it yet it seems, and this causes issues.
# To prevent this, and prevent frame reordering, we inject all frames on the
# same queue in the driver.
# ----------------------------------- Utility Commands -----------------------------------
def wpaspy_clear_messages(ctrl):
@ -85,7 +95,7 @@ def freebsd_create_eapolmsdu(src, dst, payload):
"""
FreeBSD doesn't properly parse EAPOL/MSDU frames for some reason.
It's unclear why. But this code puts the length and addresses at
the rigth positions so FreeBSD will parse the A-MSDU frame
the right positions so FreeBSD will parse the A-MSDU frame
successfully, so that we can even attack bad implementations.
"""
@ -136,7 +146,50 @@ def get_channel(iface):
def set_channel(iface, channel):
subprocess.check_output(["iw", iface, "set", "channel", str(channel)])
# ----------------------------------- Tests -----------------------------------
# ----------------------------------- Injection Tests -----------------------------------
def test_packet_injection(sout, sin, p, test_func):
# Append unique label to recognize frame & inject it
label = b"AAAA" + struct.pack(">II", random.randint(0, 2**32), random.randint(0, 2**32))
sout.send(RadioTap()/p/Raw(label))
# 1. When using a 2nd interface: capture the actual packet that was injected in the air.
# 2. Not using 2nd interface: capture the "reflected" frame sent back by the kernel. This allows
# us to at least detect if the kernel (and perhaps driver) is overwriting fields. It generally
# doesn't allow us to detect if the device/firmware itself is overwriting fields.
packets = sniff(opened_socket=sin, timeout=2, count=1, lfilter=lambda p: p != None and label in raw(p))
if len(packets) < 1:
raise IOError("Unable to inject test frame. Does your driver/device support monitor mode?")
# Property must hold for all frames
return all([test_func(cap) for cap in packets])
def test_injection(iface_out, iface_in=None):
# We start monitoring iface_in already so injected frame won't be missed
sout = L2Socket(type=ETH_P_ALL, iface=iface_out)
if iface_in == None:
sin = sout
else:
sin = L2Socket(type=ETH_P_ALL, iface=iface_in)
p = Dot11(addr1="00:11:11:11:11:11", addr2="00:22:22:22:22:22", type=2, SC=33<<4)
if not test_packet_injection(sout, sin, p, lambda cap: cap.SC == p.SC):
raise IOError("Sequence number of injected frames is being overwritten!")
p = Dot11(addr1="00:11:11:11:11:11", addr2="00:22:22:22:22:22", type=2, SC=(33<<4)|1)
if not test_packet_injection(sout, sin, p, lambda cap: (cap.SC & 0xf) == 1):
raise IOError("Fragment number of injected frames is being overwritten!")
p = Dot11(addr1="00:11:11:11:11:11", addr2="00:22:22:22:22:22", type=2, subtype=8, SC=33)/Dot11QoS(TID=2)
if not test_packet_injection(sout, sin, p, lambda cap: cap.TID == p.TID):
raise IOError("QoS TID of injected frames is being overwritten!")
sout.close()
sin.close()
# ----------------------------------- Vulnerability Tests -----------------------------------
# XXX --- We should always first see how the DUT reactions to a normal packet.
# For example, Aruba only responded to DHCP after reconnecting, and
@ -328,7 +381,7 @@ class Test(metaclass=abc.ABCMeta):
if self.inc_pn == None:
return
# Add a delay between injected fragments if requested
# Use specific PN increments between frames if requested
for frag in self.get_actions(Action.Inject)[1:]:
frag.inc_pn = self.inc_pn
@ -584,7 +637,7 @@ class Station():
# Contains either the "to-DS" or "from-DS" flag.
self.FCfield = Dot11(FCfield=ds_status).FCfield
self.seqnum = 1
self.seqnum = 16
# MAC address and IP of the station that our script controls.
# Can be either an AP or client.
@ -885,6 +938,7 @@ class Daemon(metaclass=abc.ABCMeta):
self.nic_iface = None
self.nic_mon = None
self.performed_injection_selftest = False
self.process = None
self.sock_eth = None
@ -938,10 +992,8 @@ class Daemon(metaclass=abc.ABCMeta):
# Use the provided interface to monitor/inject frames
self.nic_mon = self.options.inject
# This should avoid the monitor interface from retransmitting frames? But it
# may also cause the monitor interface to ACK frames, while they may not have
# been received by the normal interface...
# TODO: Test this with the Intel/mvm and others. For the ath9k_htc this is not needed.
# Avoid the monitor interface from retransmitting frames? This was not needed for the
# ath9k_htc and intel/mvm device that I tested. Perhaps for others it might be needed.
#subprocess.call(["ifconfig", self.nic_mon, "down"])
#subprocess.call(["macchanger", "-m", scapy.arch.get_if_hwaddr(self.nic_iface), self.nic_mon])
@ -968,7 +1020,7 @@ class Daemon(metaclass=abc.ABCMeta):
log(WARNING, "Injecting fragments may be unreliable.")
elif driver == "ath9k_htc":
options.inject_workaround = True
log(STATUS, "Detect ath9k_htc, using injection bug workarounds")
log(STATUS, "Detected ath9k_htc, using injection bug workarounds")
def inject_mon(self, p):
self.sock_mon.send(p)
@ -976,14 +1028,8 @@ class Daemon(metaclass=abc.ABCMeta):
def inject_eth(self, p):
self.sock_eth.send(p)
def run(self):
self.configure_interfaces()
# Remove old occurrences of the control interface that didn't get cleaned properly
subprocess.call(["rm", "-f", "wpaspy_ctrl/" + self.nic_iface])
def connect_wpaspy(self):
# Wait until daemon started
self.start_daemon()
while not os.path.exists("wpaspy_ctrl/" + self.nic_iface):
time.sleep(0.1)
@ -997,6 +1043,33 @@ class Daemon(metaclass=abc.ABCMeta):
log(ERROR, "Did you disable Wi-Fi in the network manager? Otherwise it won't start properly.")
raise
def injection_selftest(self):
if self.performed_injection_selftest:
return
# - When using a 2nd interface to inject frames, this self-test should trivially succeed.
# - This is mainly useful when using one interface as both client and monitor interface,
# in which case a modified kernel/driver must be used.
# - With the Intel/mvm injection in this case only works if hostapd/wpa_supp started.
# - When in client mode, it seems the scanning operation interferes with this test. So it
# must be executed once we are trying to connect so the channel is "stable".
try: test_injection(self.nic_mon)
except IOError as ex:
log(WARNING, "IOError: " + ex.args[0])
log(ERROR, "Injection self-test failed. Are you using the correct kernel/driver/device for injection?")
quit(1)
log(DEBUG, f"Passed injection self-test on interface {self.nic_mon}.")
self.performed_injection_selftest = True
def run(self):
self.configure_interfaces()
# Remove old occurrences of the control interface that didn't get cleaned properly
subprocess.call(["rm", "-rf", "wpaspy_ctrl/"])
self.start_daemon()
self.sock_mon = MonitorSocket(type=ETH_P_ALL, iface=self.nic_mon)
self.sock_eth = L2Socket(type=ETH_P_ALL, iface=self.nic_iface)
@ -1021,7 +1094,7 @@ class Daemon(metaclass=abc.ABCMeta):
self.time_tick()
def stop(self):
log(STATUS, "Closing Hostap daemon and cleaning up ...")
log(STATUS, "Closing daemon and cleaning up ...")
if self.process:
self.process.terminate()
self.process.wait()
@ -1148,6 +1221,7 @@ class Authenticator(Daemon):
log(ERROR, "hostapd executable not found. Did you compile hostapd?")
raise
self.connect_wpaspy()
self.apmac = scapy.arch.get_if_hwaddr(self.nic_iface)
def configure_daemon(self):
@ -1175,6 +1249,7 @@ class Authenticator(Daemon):
if self.options.inject:
channel = get_channel(self.nic_iface)
set_channel(self.nic_mon, channel)
self.injection_selftest()
class Supplicant(Daemon):
@ -1306,6 +1381,7 @@ class Supplicant(Daemon):
if self.options.inject:
channel = get_channel(self.nic_iface)
set_channel(self.nic_mon, channel)
self.injection_selftest()
p = re.compile("Trying to authenticate with (.*) \(SSID")
bss = p.search(msg).group(1)
@ -1331,9 +1407,6 @@ class Supplicant(Daemon):
wpaspy_command(self.wpaspy_ctrl, "REASSOCIATE")
def configure_daemon(self):
# TODO: Only enable networks once our script is ready, to prevent
# wpa_supplicant from connecting before our start started.
# Optimize reassoc-to-same-BSS. This makes the "REASSOCIATE" command skip the
# authentication phase (reducing the chance that packet queues are reset).
wpaspy_command(self.wpaspy_ctrl, "SET ext_eapol_frame_io 1")
@ -1342,9 +1415,11 @@ class Supplicant(Daemon):
if self.options.ip and self.options.peerip:
self.initialize_ips(self.options.ip, self.options.peerip)
wpaspy_command(self.wpaspy_ctrl, "ENABLE_NETWORK all")
def start_daemon(self):
cmd = ["../wpa_supplicant/wpa_supplicant", "-Dnl80211", "-i", self.nic_iface,
"-cclient.conf"] + log_level2switch()
"-cclient.conf", "-W"] + log_level2switch()
log(STATUS, "Starting wpa_supplicant using: " + " ".join(cmd))
try:
self.process = subprocess.Popen(cmd)
@ -1353,6 +1428,9 @@ class Supplicant(Daemon):
log(ERROR, "wpa_supplicant executable not found. Did you compile wpa_supplicant?")
raise
self.connect_wpaspy()
wpaspy_command(self.wpaspy_ctrl, "DISABLE_NETWORK all")
clientmac = scapy.arch.get_if_hwaddr(self.nic_iface)
self.station = Station(self, clientmac, "to-DS")
@ -1533,7 +1611,6 @@ def args2msdu(args):
return None
if __name__ == "__main__":
log(WARNING, "Remember to use a modified backports and ath9k_htc firmware!")
@ -1542,6 +1619,7 @@ if __name__ == "__main__":
parser.add_argument('testname', help="Name or identifier of the test to run.")
parser.add_argument('actions', nargs='?', help="Optional textual descriptions of actions")
parser.add_argument('--inject', default=None, help="Interface to use to inject frames.")
parser.add_argument('--inject-test', default=None, help="Use main interface to test injection through this interface.")
parser.add_argument('--ip', help="IP we as a sender should use.")
parser.add_argument('--peerip', help="IP of the device we will test.")
parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.")
@ -1568,6 +1646,14 @@ if __name__ == "__main__":
parser.add_argument('--to-self', default=False, action='store_true', help="Send ARP/DHCP/ICMP with same src and dst MAC address.")
options = parser.parse_args()
# Default value for options that should not be command line parameters
options.inject_workaround = False
# Perform test using a second interface if requested
if options.inject_test:
log(WARNING, "TODO: Perform proper injection tests (including with active AP/STA")
quit(1)
# Sanity check and convert some arguments to more usable form
options.ptype = args2ptype(options)
options.as_msdu = args2msdu(options)