fragattacks: directly track libwifi and not as submodule

This will make it easier for users to clone the repository and will
assure that they always use the correct version of libwifi.
This commit is contained in:
Mathy Vanhoef 2021-05-08 19:35:46 +04:00
parent 95affbcaa9
commit 7f93c1cec7
13 changed files with 1488 additions and 4 deletions

3
.gitmodules vendored
View File

@ -1,3 +0,0 @@
[submodule "research/libwifi"]
path = research/libwifi
url = https://github.com/vanhoefm/libwifi.git

@ -1 +0,0 @@
Subproject commit 35b3f4fafcbc0227c4d800e941bccee4789f9f33

View File

@ -0,0 +1 @@
This is an experimental library that I internally use in some projects. One day this might be useful for others too.

View File

@ -0,0 +1,4 @@
from .wifi import *
from .dragonfly import *
from .crypto import *
from .injectiontest import *

169
research/libwifi/crypto.py Normal file
View File

@ -0,0 +1,169 @@
#!/usr/bin/env python3
import struct, binascii
from .wifi import *
#from binascii import a2b_hex
#from struct import unpack,pack
from Crypto.Cipher import AES, ARC4
from scapy.layers.dot11 import Dot11, Dot11CCMP, Dot11QoS
import zlib
def pn2bytes(pn):
pn_bytes = [0] * 6
for i in range(6):
pn_bytes[i] = pn & 0xFF
pn >>= 8
return pn_bytes
def pn2bin(pn):
return struct.pack(">Q", pn)[2:]
def dot11ccmp_get_pn(p):
pn = p.PN5
pn = (pn << 8) | p.PN4
pn = (pn << 8) | p.PN3
pn = (pn << 8) | p.PN2
pn = (pn << 8) | p.PN1
pn = (pn << 8) | p.PN0
return pn
def ccmp_get_nonce(priority, addr, pn):
return struct.pack("B", priority) + addr2bin(addr) + pn2bin(pn)
def ccmp_get_aad(p, amsdu_spp=False):
# FC field with masked values
fc = raw(p)[:2]
fc = struct.pack("<BB", fc[0] & 0x8f, fc[1] & 0xc7)
# Sequence number is masked, but fragment number is included
sc = struct.pack("<H", p.SC & 0xf)
addr1 = addr2bin(p.addr1)
addr2 = addr2bin(p.addr2)
addr3 = addr2bin(p.addr3)
aad = fc + addr1 + addr2 + addr3 + sc
if Dot11QoS in p:
if not amsdu_spp:
# Everything except the TID is masked
aad += struct.pack("<H", p[Dot11QoS].TID)
else:
# TODO: Mask unrelated fields
aad += raw(p[Dot11QoS])[:2]
return aad
def Raw(x):
return x
def encrypt_ccmp(p, tk, pn, keyid=0, amsdu_spp=False):
"""Takes a plaintext Dot11 frame, encrypts it, and adds all the necessairy headers"""
# Update the FC field
p = p.copy()
p.FCfield |= Dot11(FCfield="protected").FCfield
if Dot11QoS in p:
payload = raw(p[Dot11QoS].payload)
p[Dot11QoS].remove_payload()
# Explicitly set TID so we can assume it's an integer
if p[Dot11QoS].TID == None:
p[Dot11QoS].TID = 0
priority = p[Dot11QoS].TID
else:
payload = raw(p.payload)
p.remove_payload()
priority = 0
# Add the CCMP header. res0 and res1 are by default set to zero.
newp = p/Dot11CCMP()
pn_bytes = pn2bytes(pn)
newp.PN0, newp.PN1, newp.PN2, newp.PN3, newp.PN4, newp.PN5 = pn_bytes
newp.key_id = keyid
newp.ext_iv = 1
# Generate the CCMP Header and AAD for encryption.
ccm_nonce = ccmp_get_nonce(priority, newp.addr2, pn)
ccm_aad = ccmp_get_aad(newp, amsdu_spp)
#print("CCM Nonce:", ccm_nonce.hex())
#print("CCM aad :", ccm_aad.hex())
# Encrypt the plaintext using AES in CCM Mode.
#print("Payload:", payload.hex())
cipher = AES.new(tk, AES.MODE_CCM, ccm_nonce, mac_len=8)
cipher.update(ccm_aad)
ciphertext = cipher.encrypt(payload)
digest = cipher.digest()
newp = newp/Raw(ciphertext)
newp = newp/Raw(digest)
#print("Ciphertext:", ciphertext.hex())
#print(repr(newp))
#print(raw(newp).hex())
return newp
def decrypt_ccmp(p, tk, verify=True):
"""Takes a Dot11CCMP frame and decrypts it"""
p = p.copy()
# Get used CCMP parameters
keyid = p.key_id
priority = dot11_get_priority(p)
pn = dot11ccmp_get_pn(p)
# TODO: Mask flags in p.FCfield that are not part of the AAD
fc = p.FCfield
payload = get_ccmp_payload(p)
if Dot11QoS in p:
p[Dot11QoS].remove_payload()
else:
p.remove_payload()
# Prepare for CCMP decryption
ccm_nonce = ccmp_get_nonce(priority, p.addr2, pn)
ccm_aad = ccmp_get_aad(p)
# Decrypt using AES in CCM Mode.
cipher = AES.new(tk, AES.MODE_CCM, ccm_nonce, mac_len=8)
cipher.update(ccm_aad)
plaintext = cipher.decrypt(payload[:-8])
try:
if verify:
cipher.verify(payload[-8:])
except ValueError:
return None
return p/LLC(plaintext)
def encrypt_wep(p, key, pn, keyid=0):
"""Takes a plaintext Dot11 frame, encrypts it, and adds all the necessairy headers"""
# Update the FC field --- XXX share this with encrypt_ccmp
p = p.copy()
p.FCfield |= Dot11(FCfield="protected").FCfield
if Dot11QoS in p:
payload = raw(p[Dot11QoS].payload)
p[Dot11QoS].remove_payload()
# Explicitly set TID so we can assume it's an integer
if p[Dot11QoS].TID == None:
p[Dot11QoS].TID = 0
priority = p[Dot11QoS].TID
else:
payload = raw(p.payload)
p.remove_payload()
priority = 0
# Add the WEP ICV which will be encrypted
payload += struct.pack("<I", zlib.crc32(payload) & 0xffffffff)
iv = struct.pack(">I", pn)[1:]
cipher = ARC4.new(iv + key)
ciphertext = cipher.encrypt(payload)
# Construct packet ourselves to avoid scapy bugs
newp = p/iv/struct.pack("<B", keyid)/ciphertext
return newp

View File

@ -0,0 +1,334 @@
# TODO: For now only include the code we actually used for EAP-pwd
# TODO: Program unit tests so we can easily keep our EAP-pwd code correct
#!/usr/bin/env python3
from scapy.all import *
from .wifi import *
import sys, struct, math, random, select, time, binascii
from Crypto.Hash import HMAC, SHA256
from Crypto.PublicKey import ECC
from Crypto.Math.Numbers import Integer
# Alternative is https://eli.thegreenplace.net/2009/03/07/computing-modular-square-roots-in-python
from sympy.ntheory.residue_ntheory import sqrt_mod_iter
# ----------------------- Utility ---------------------------------
def int_to_data(num):
return binascii.unhexlify("%064x" % num)
def zeropoint_to_data():
return int_to_data(0) + int_to_data(0)
#TODO: Not sure if this actually works under python2...
def str2bytes(password):
if not isinstance(password, str): return password
if sys.version_info < (3, 0):
return bytes(password)
else:
return bytes(password, 'utf8')
def getord(value):
if isinstance(value, int):
return value
else:
return ord(value)
def HMAC256(pw, data):
h = HMAC.new(pw, digestmod=SHA256)
h.update(data)
return h.digest()
# ----------------------- Elliptic Curve Operations ---------------------------------
# This is group 19. Support of it is required by WPA3.
secp256r1_p = 0xffffffff00000001000000000000000000000000ffffffffffffffffffffffff
secp256r1_r = 0xffffffff00000000ffffffffffffffffbce6faada7179e84f3b9cac2fc632551
def legendre_symbol(a, p):
"""Compute the Legendre symbol."""
if a % p == 0: return 0
ls = pow(a, (p - 1)//2, p)
return -1 if ls == p - 1 else ls
def point_on_curve(x, y, curve="p256"):
try:
point = ECC.EccPoint(x, y)
except ValueError:
return False
return True
def point_to_data(p):
if p is None:
return zeropoint_to_data()
return int_to_data(p.x) + int_to_data(p.y)
# ----------------------- WPA3 ---------------------------------
def is_sae(p):
if not Dot11Auth in p:
return False
return p[Dot11Auth].algo == 3
def is_sae_commit(p):
return is_sae(p) and p[Dot11Auth].seqnum == 1
def is_sae_confirm(p):
return is_sae(p) and p[Dot11Auth].seqnum == 2
def KDF_Length(data, label, context, length):
iterations = int(math.ceil(length / 256.0))
result = b""
for i in range(1, iterations + 1):
hash_data = struct.pack("<H", i) + str2bytes(label) + context + struct.pack("<H", length)
result += HMAC256(data, hash_data)
return result
# TODO: Also modify to support curve 521
def derive_pwe_ecc(password, addr1, addr2, curve_name="p256"):
curve = ECC._curves[curve_name]
bits = curve.modulus_bits
assert bits % 8 == 0
addr1 = binascii.unhexlify(addr1.replace(':', ''))
addr2 = binascii.unhexlify(addr2.replace(':', ''))
hash_pw = addr1 + addr2 if addr1 > addr2 else addr2 + addr1
for counter in range(1, 100):
hash_data = str2bytes(password) + struct.pack("<B", counter)
pwd_seed = HMAC256(hash_pw, hash_data)
log(DEBUG, "PWD-seed: %s" % pwd_seed)
pwd_value = KDF_Length(pwd_seed, "SAE Hunting and Pecking", curve.p.to_bytes(bits // 8), bits)
log(DEBUG, "PWD-value: %s" % pwd_value)
pwd_value = int(binascii.hexlify(pwd_value), 16)
if pwd_value >= curve.p:
continue
x = Integer(pwd_value)
y_sqr = (x**3 - x * 3 + curve.b) % curve.p
if legendre_symbol(y_sqr, curve.p) != 1:
continue
y = y_sqr.sqrt(curve.p)
y_bit = getord(pwd_seed[-1]) & 1
if y & 1 == y_bit:
return ECC.EccPoint(x, y, curve_name)
else:
return ECC.EccPoint(x, curve.p - y, curve_name)
# TODO: Use this somewhere???
def calc_k_kck_pmk(pwe, peer_element, peer_scalar, my_rand, my_scalar):
k = ((pwe * peer_scalar + peer_element) * my_rand).x
keyseed = HMAC256(b"\x00" * 32, int_to_data(k))
kck_and_pmk = KDF_Length(keyseed, "SAE KCK and PMK",
int_to_data((my_scalar + peer_scalar) % secp256r1_r), 512)
kck = kck_and_pmk[0:32]
pmk = kck_and_pmk[32:]
return k, kck, pmk
def calculate_confirm_hash(kck, send_confirm, scalar, element, peer_scalar, peer_element):
return HMAC256(kck, struct.pack("<H", send_confirm) + int_to_data(scalar) + point_to_data(element)
+ int_to_data(peer_scalar) + point_to_data(peer_element))
def build_sae_commit(srcaddr, dstaddr, scalar, element, token=""):
p = Dot11(addr1=dstaddr, addr2=srcaddr, addr3=dstaddr)
p = p/Dot11Auth(algo=3, seqnum=1, status=0)
group_id = 19
scalar_blob = ("%064x" % scalar).decode("hex")
element_blob = ("%064x" % element.x).decode("hex") + ("%064x" % element.y).decode("hex")
return p/Raw(struct.pack("<H", group_id) + token + scalar_blob + element_blob)
def build_sae_confirm(srcaddr, dstaddr, send_confirm, confirm):
p = Dot11(addr1=dstaddr, addr2=srcaddr, addr3=dstaddr)
p = p/Dot11Auth(algo=3, seqnum=2, status=0)
return p/Raw(struct.pack("<H", send_confirm) + confirm)
class SAEHandshake():
def __init__(self, password, srcaddr, dstaddr):
self.password = password
self.srcaddr = srcaddr
self.dstaddr = dstaddr
self.pwe = None
self.rand = None
self.scalar = None
self.element = None
self.kck = None
self.pmk = None
def send_commit(self, password):
self.pwe = derive_pwe_ecc(password, self.dstaddr, self.srcaddr)
# After generation of the PWE, each STA shall generate a secret value, rand, and a temporary secret value,
# mask, each of which shall be chosen randomly such that 1 < rand < r and 1 < mask < r and (rand + mask)
# mod r is greater than 1, where r is the (prime) order of the group.
self.rand = random.randint(0, secp256r1_r - 1)
mask = random.randint(0, secp256r1_r - 1)
# commit-scalar = (rand + mask) mod r
self.scalar = (self.rand + mask) % secp256r1_r
assert self.scalar > 1
# COMMIT-ELEMENT = inverse(mask * PWE)
temp = self.pwe * mask
self.element = ECC.EccPoint(temp.x, Integer(secp256r1_p) - temp.y)
auth = build_sae_commit(self.srcaddr, self.dstaddr, self.scalar, self.element)
sendp(RadioTap()/auth)
def process_commit(self, p):
payload = str(p[Dot11Auth].payload)
group_id = struct.unpack("<H", payload[:2])[0]
pos = 2
self.peer_scalar = int(payload[pos:pos+32].encode("hex"), 16)
pos += 32
peer_element_x = int(payload[pos:pos+32].encode("hex"), 16)
peer_element_y = int(payload[pos+32:pos+64].encode("hex"), 16)
self.peer_element = ECC.EccPoint(peer_element_x, peer_element_y)
pos += 64
k = ((self.pwe * self.peer_scalar + self.peer_element) * self.rand).x
keyseed = HMAC256("\x00"*32, int_to_data(k))
kck_and_pmk = KDF_Length(keyseed, "SAE KCK and PMK",
int_to_data((self.scalar + self.peer_scalar) % secp256r1_r), 512)
self.kck = kck_and_pmk[0:32]
self.pmk = kck_and_pmk[32:]
self.send_confirm()
def send_confirm(self):
send_confirm = 0
confirm = calculate_confirm_hash(self.kck, send_confirm, self.scalar, self.element, self.peer_scalar, self.peer_element)
auth = build_sae_confirm(self.srcaddr, self.dstaddr, send_confirm, confirm)
sendp(RadioTap()/auth)
def process_confirm(self, p):
payload = str(p[Dot11Auth].payload)
send_confirm = struct.unpack("<H", payload[:2])[0]
pos = 2
received_confirm = payload[pos:pos+32]
pos += 32
expected_confirm = calculate_confirm_hash(self.kck, send_confirm, self.peer_scalar, self.peer_element, self.scalar, self.element)
# ----------------------- EAP-pwd (TODO Test with Python3) ---------------------------------
def KDF_Length_eappwd(data, label, length):
num_bytes = (length + 7) // 8
iterations = (num_bytes + 31) // 32
# TODO: EAP-pwd uses a different byte ordering for the counter and length?!? WTF!
result = b""
for i in range(1, iterations + 1):
hash_data = digest if i > 1 else b""
hash_data += struct.pack(">H", i) + str2bytes(label) + struct.pack(">H", length)
digest = HMAC256(data, hash_data)
result += digest
result = result[:num_bytes]
if length % 8 != 0:
num_clear = 8 - (length % 8)
trailbyte = result[-1] >> num_clear << num_clear
result = result[:-1] + struct.pack(">B", trailbyte)
return result
def derive_pwe_ecc_eappwd(password, peer_id, server_id, token, curve_name="p256", info=None):
curve = ECC._curves[curve_name]
bits = curve.modulus_bits
hash_pw = struct.pack(">I", token) + str2bytes(peer_id + server_id + password)
for counter in range(1, 100):
hash_data = hash_pw + struct.pack("<B", counter)
pwd_seed = HMAC256(b"\x00", hash_data)
log(DEBUG, "PWD-Seed: %s" % pwd_seed)
pwd_value = KDF_Length_eappwd(pwd_seed, "EAP-pwd Hunting And Pecking", bits)
log(DEBUG, "PWD-Value: %s" % pwd_value)
pwd_value = int(binascii.hexlify(pwd_value), 16)
if bits % 8 != 0:
pwd_value = pwd_value >> (8 - (521 % 8))
if pwd_value >= curve.p:
continue
x = Integer(pwd_value)
log(DEBUG, "X-candidate: %x" % x)
y_sqr = (x**3 - x * 3 + curve.b) % curve.p
if legendre_symbol(y_sqr, curve.p) != 1:
continue
y = y_sqr.sqrt(curve.p)
y_bit = getord(pwd_seed[-1]) & 1
if y & 1 == y_bit:
if not info is None: info["counter"] = counter
return ECC.EccPoint(x, y, curve_name)
else:
if not info is None: info["counter"] = counter
return ECC.EccPoint(x, curve.p - y, curve_name)
def calculate_confirm_eappwd(k, element1, scalar1, element2, scalar2, group_num=19, rand_func=1, prf=1):
hash_data = int_to_data(k)
hash_data += point_to_data(element1)
hash_data += int_to_data(scalar1)
hash_data += point_to_data(element2)
hash_data += int_to_data(scalar2)
hash_data += struct.pack(">HBB", group_num, rand_func, prf)
confirm = HMAC256(b"\x00" * 32, hash_data)
return confirm
# ----------------------- Fuzzing/Testing ---------------------------------
def inject_sae_auth(srcaddr, bssid):
p = Dot11(addr1=bssid, addr2=srcaddr, addr3=bssid)
p = p/Dot11Auth(algo=3, seqnum=1, status=0)
group_id = 19
scalar = 0
element_x = 0
element_y = 0
p = p/Raw(struct.pack("<H", group_id))
if False:
# Convert to octets
commit_scalar = ("%064x" % scalar).decode("hex")
commit_element = ("%064x" % element_x).decode("hex") + ("%064x" % element_y).decode("hex")
p = p / Raw(commit_scalar + commit_element)
else:
p = p / Raw(open("/dev/urandom").read(32*3))
sendp(RadioTap()/p)
def forge_sae_confirm(bssid, stamac):
kck = "\x00" * 32
send_confirm = "\x00\x00"
confirm = HMAC256(kck, send_confirm + int_to_data(0) + zeropoint_to_data()
+ int_to_data(0) + zeropoint_to_data())
auth = Dot11(addr1=bssid, addr2=stamac, addr3=bssid)
auth = auth/Dot11Auth(algo=3, seqnum=2, status=0)
auth = auth/Raw(struct.pack("<H", 0) + confirm)
sendp(RadioTap()/auth)

View File

@ -0,0 +1,268 @@
# Copyright (c) 2020, Mathy Vanhoef <mathy.vanhoef@nyu.edu>
#
# This code may be distributed under the terms of the BSD license.
# See README for more details.
from scapy.all import *
from .wifi import *
FLAG_FAIL, FLAG_NOCAPTURE = [2**i for i in range(2)]
#### Utility ####
def get_nearby_ap_addr(sin):
# If this interface itself is also hosting an AP, the beacons transmitted by it might be
# returned as well. We filter these out by the condition `p.dBm_AntSignal != None`.
beacons = list(sniff(opened_socket=sin, timeout=0.5, lfilter=lambda p: (Dot11 in p or Dot11FCS in p) \
and p.type == 0 and p.subtype == 8 \
and p.dBm_AntSignal != None))
if len(beacons) == 0:
return None, None
beacons.sort(key=lambda p: p.dBm_AntSignal, reverse=True)
return beacons[0].addr2, get_ssid(beacons[0])
def inject_and_capture(sout, sin, p, count=0, retries=1):
# Append unique label to recognize injected frame
label = b"AAAA" + struct.pack(">II", random.randint(0, 2**32), random.randint(0, 2**32))
toinject = p/Raw(label)
attempt = 0
while True:
log(DEBUG, "Injecting test frame: " + repr(toinject))
sout.send(RadioTap(present="TXFlags", TXFlags="NOSEQ+ORDER")/toinject)
# TODO:Move this to a shared socket interface?
# Note: this workaround for Intel is only needed if the fragmented frame is injected using
# valid MAC addresses. But for simplicity just execute it after any fragmented frame.
if sout.mf_workaround and toinject.FCfield & Dot11(FCfield="MF").FCfield != 0:
sout.send(RadioTap(present="TXFlags", TXFlags="NOSEQ+ORDER")/Dot11())
log(DEBUG, "Sending dummy frame after injecting frame with MF flag set")
# 1. When using a 2nd interface: capture the actual packet that was injected in the air.
# 2. Not using 2nd interface: capture the "reflected" frame sent back by the kernel. This allows
# us to at least detect if the kernel (and perhaps driver) is overwriting fields. It generally
# doesn't allow us to detect if the device/firmware itself is overwriting fields.
packets = sniff(opened_socket=sin, timeout=1, count=count, lfilter=lambda p: p != None and label in raw(p))
if len(packets) > 0 or attempt >= retries:
break
log(STATUS, " Unable to capture injected frame, retrying.")
attempt += 1
return packets
#### Injection tests ####
def test_injection_fragment(sout, sin, ref):
log(STATUS, "--- Testing injection of fragmented frame using (partly) valid MAC addresses")
p = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, type=2, subtype=8, SC=33<<4)
p = p/Dot11QoS(TID=2)/LLC()/SNAP()/EAPOL()/EAP()
p.FCfield |= Dot11(FCfield="MF").FCfield
captured = inject_and_capture(sout, sin, p, count=1)
if len(captured) == 0:
log(ERROR, "[-] Unable to inject frame with More Fragment flag using (partly) valid MAC addresses.")
else:
log(STATUS, "[+] Frame with More Fragment flag using (partly) valid MAC addresses can be injected.", color="green")
return FLAG_FAIL if len(captured) == 0 else 0
def test_packet_injection(sout, sin, p, test_func, frametype, msgfail):
"""Check if given property holds of all injected frames"""
packets = inject_and_capture(sout, sin, p, count=1)
if len(packets) < 1:
log(ERROR, f"[-] Unable to capture injected {frametype}.")
return FLAG_NOCAPTURE
if not all([test_func(cap) for cap in packets]):
log(ERROR, f"[-] " + msgfail.format(frametype=frametype))
return FLAG_FAIL
log(STATUS, f" Properly captured injected {frametype}.")
return 0
def test_injection_fields(sout, sin, ref, strtype):
log(STATUS, f"--- Testing injection of fields using {strtype}")
status = 0
p = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, addr3=ref.addr3, type=2, SC=30<<4)/LLC()/SNAP()/EAPOL()/EAP()
status |= test_packet_injection(sout, sin, p, lambda cap: EAPOL in cap, f"EAPOL frame with {strtype}",
"Scapy thinks injected {frametype} is a different frame?")
p = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, addr3=ref.addr3, type=2, SC=31<<4)
status |= test_packet_injection(sout, sin, p, lambda cap: cap.SC == p.SC, f"empty data frame with {strtype}",
"Sequence number of injected {frametype} is being overwritten!")
p = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, addr3=ref.addr3, type=2, SC=(32<<4)|1)
status |= test_packet_injection(sout, sin, p, lambda cap: (cap.SC & 0xf) == 1, f"fragmented empty data frame with {strtype}",
"Fragment number of injected {frametype} is being overwritten!")
p = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, addr3=ref.addr3, type=2, subtype=8, SC=33<<4)/Dot11QoS(TID=2)
status |= test_packet_injection(sout, sin, p, lambda cap: cap.TID == p.TID, f"empty QoS data frame with {strtype}",
"QoS TID of injected {frametype} is being overwritten!")
p = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, addr3=ref.addr3, type=2, subtype=8, SC=33<<4)/Dot11QoS(TID=2)/Raw("BBBB")
set_amsdu(p[Dot11QoS])
status |= test_packet_injection(sout, sin, p, \
lambda cap: cap.TID == p.TID and is_amsdu(cap) and b"BBBB" in raw(cap), \
f"A-MSDU frame with {strtype}", "A-MSDU frame is not properly injected!")
if status == 0: log(STATUS, f"[+] All tested fields are properly injected when using {strtype}.", color="green")
return status
def test_injection_order(sout, sin, ref, strtype, retries=1):
log(STATUS, f"--- Testing order of injected QoS frames using {strtype}")
label = b"AAAA" + struct.pack(">II", random.randint(0, 2**32), random.randint(0, 2**32))
p2 = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, type=2, subtype=8, SC=33<<4)/Dot11QoS(TID=2)
p6 = Dot11(FCfield=ref.FCfield, addr1=ref.addr1, addr2=ref.addr2, type=2, subtype=8, SC=33<<4)/Dot11QoS(TID=6)
for i in range(retries + 1):
# First frame causes Tx queue to be busy. Next two frames tests if frames are reordered.
for p in [p2] * 4 + [p6]:
sout.send(RadioTap(present="TXFlags", TXFlags="NOSEQ+ORDER")/p/Raw(label))
packets = sniff(opened_socket=sin, timeout=1.5, lfilter=lambda p: Dot11QoS in p and label in raw(p))
tids = [p[Dot11QoS].TID for p in packets]
log(STATUS, "Captured TIDs: " + str(tids))
# Sanity check the captured TIDs, and then analyze the results
if not (2 in tids and 6 in tids):
log(STATUS, f"We didn't capture all injected QoS TID frames, retrying.")
else:
break
if not (2 in tids and 6 in tids):
log(ERROR, f"[-] We didn't capture all injected QoS TID frames with {strtype}. Test failed.")
return FLAG_NOCAPTURE
elif tids != sorted(tids):
log(ERROR, f"[-] Frames with different QoS TIDs are reordered during injection with {strtype}.")
return FLAG_FAIL
else:
log(STATUS, f"[+] Frames with different QoS TIDs are not reordered during injection with {strtype}.", color="green")
return 0
def test_injection_ack(sout, sin, addr1, addr2):
suspicious = False
test_fail = False
# Test number of retransmissions
p = Dot11(FCfield="to-DS", addr1="00:11:00:00:02:01", addr2="00:11:00:00:02:01", type=2, SC=33<<4)
num = len(inject_and_capture(sout, sin, p, retries=1))
log(STATUS, f"Injected frames seem to be (re)transitted {num} times")
if num == 0:
log(ERROR, "Couldn't capture injected frame. Please restart the test.")
test_fail = True
elif num == 1:
log(WARNING, "Injected frames don't seem to be retransmitted!")
suspicious = True
# Test ACK towards an unassigned MAC address
p = Dot11(FCfield="to-DS", addr1=addr1, addr2="00:22:00:00:00:01", type=2, SC=33<<4)
num = len(inject_and_capture(sout, sin, p, retries=1))
log(STATUS, f"Captured {num} (re)transmitted frames to the AP when using a spoofed sender address")
if num == 0:
log(ERROR, "Couldn't capture injected frame. Please restart the test.")
test_fail = True
if num > 2:
log(STATUS, " => Acknowledged frames with a spoofed sender address are still retransmitted. This has low impact.")
# Test ACK towards an assigned MAC address
p = Dot11(FCfield="to-DS", addr1=addr1, addr2=addr2, type=2, SC=33<<4)
num = len(inject_and_capture(sout, sin, p, retries=1))
log(STATUS, f"Captured {num} (re)transmitted frames to the AP when using the real sender address")
if num == 0:
log(ERROR, "Couldn't capture injected frame. Please restart the test.")
test_fail = True
elif num > 2:
log(STATUS, " => Acknowledged frames with real sender address are still retransmitted. This might impact time-sensitive tests.")
suspicious = True
if suspicious:
log(WARNING, "[-] Retransmission behaviour isn't ideal. This test can be unreliable (e.g. due to background noise).")
elif not test_fail:
log(STATUS, "[+] Retransmission behaviour is good. This test can be unreliable (e.g. due to background noise).", color="green")
#### Main test function ####
def test_injection(iface_out, iface_in=None, peermac=None, ownmac=None, testack=True):
status = 0
# We start monitoring iface_in already so injected frame won't be missed
sout = L2Socket(type=ETH_P_ALL, iface=iface_out)
driver_out = get_device_driver(iface_out)
# Workaround to properly inject fragmented frames (and prevent it from blocking Tx queue).
sout.mf_workaround = driver_out in ["iwlwifi", "ath9k_htc"]
if sout.mf_workaround:
log(WARNING, f"Detected {driver_out}, using workaround to reliably inject fragmented frames.")
# Print out what we are tested. Abort if the driver is known not to support a self-test.
log(STATUS, f"Injection test: using {iface_out} ({driver_out}) to inject frames")
if iface_in == None:
log(WARNING, f"Injection selftest: also using {iface_out} to capture frames. This means the tests can detect if the kernel")
log(WARNING, f" interferes with injection, but it cannot check the behaviour of the device itself.")
if driver_out in ["mt76x2u"]:
log(WARNING, f" WARNING: self-test with the {driver_out} driver can be unreliable.")
elif not driver_out in ["iwlwifi", "ath9k_htc"]:
log(WARNING, f" WARNING: it is unknown whether a self-test works with the {driver_out} driver.")
sin = sout
else:
driver_in = get_device_driver(iface_in)
log(STATUS, f"Injection test: using {iface_in} ({driver_in}) to capture frames")
sin = L2Socket(type=ETH_P_ALL, iface=iface_in)
# Injection using the "own" MAC address is mainly a problem when using a second virtual
# interface for injection when the first interface is used as client or AP. We want to
# test injection when using the MAC address of the client or AP. The caller should supply
# this address because the MAC address of the second virtual interface may be different
# from the MAC address used by the client or AP. Only use the MAC address of sout.iface
# if no "own" address is supplied by the caller.
if ownmac == None:
ownmac = get_macaddress(sout.iface)
# Some devices only properly inject frames when either the to-DS or from-DS flag is set,
# so set one of them as well.
spoofed = Dot11(FCfield="from-DS", addr1="00:11:00:00:02:01", addr2="00:22:00:00:02:01")
valid = Dot11(FCfield="from-DS", addr1=peermac, addr2=ownmac)
# This tests basic injection capabilities
status |= test_injection_fragment(sout, sin, valid)
# Perform some actual injection tests
status |= test_injection_fields(sout, sin, spoofed, "spoofed MAC addresses")
status |= test_injection_fields(sout, sin, valid, "(partly) valid MAC addresses")
status |= test_injection_order(sout, sin, spoofed, "spoofed MAC addresses")
status |= test_injection_order(sout, sin, valid, "(partly) valid MAC addresses")
# Acknowledgement behaviour tests
if iface_in != None and testack:
# We search for an AP on the interface that injects frames because:
# 1. In mixed managed/monitor mode, we will otherwise detect our own AP on the sout interface
# 2. If sout interface "sees" the AP this assure it will also receive its ACK frames
# 3. The given peermac might be a client that goes into sleep mode
channel = get_channel(sin.iface)
log(STATUS, f"--- Searching for AP on channel {channel} to test ACK behaviour.")
apmac, ssid = get_nearby_ap_addr(sout)
if apmac == None and peermac == None:
raise IOError("Unable to find nearby AP to test injection")
elif apmac == None:
log(WARNING, f"Unable to find AP. Try a different channel? Testing ACK behaviour with peer {peermac}.")
destmac = peermac
else:
log(STATUS, f"Testing ACK behaviour by injecting frames to AP {ssid} ({apmac}).")
destmac = apmac
test_injection_ack(sout, sin, addr1=destmac, addr2=ownmac)
# Show a summary of results/advice
log(STATUS, "")
if status == 0:
log(STATUS, "==> The most important tests have been passed successfully!", color="green")
if status & FLAG_NOCAPTURE != 0:
log(WARNING, f"==> Failed to capture some frames. Try another channel or use another monitoring device.")
if status & FLAG_FAIL !=0 :
log(ERROR, f"==> Some tests failed. Are you using patched drivers/firmware?")
sout.close()
sin.close()

View File

@ -0,0 +1,77 @@
#!/usr/bin/env python3
import binascii, struct
from Crypto.Hash import MD4, SHA
from Crypto.Cipher import DES
def des_encrypt(clear, key, offset):
cNext = 0
cWorking = 0
hexKey = {}
for x in range(0,8):
cWorking = 0xFF & key[x + offset]
hexKey[x] = ((cWorking >> x) | cNext | 1) & 0xFF
cWorking = 0xFF & key[x + offset]
cNext = ((cWorking << (7 - x)))
newKey = b""
for x in range(0, len(hexKey)):
newKey += struct.pack(">B", hexKey[x])
des = DES.new(newKey, DES.MODE_ECB)
return des.encrypt(clear)
def challenge_hash(peer_challenge, authenticator_challenge, username):
challenge = SHA.new(peer_challenge + authenticator_challenge + username).digest()
return challenge[0:8]
def nt_password_hash(password):
unicode_pw = password.encode("utf-16-le")
return MD4.new(unicode_pw).digest()
def hash_nt_password_hash(password_hash):
md4 = MD4.new()
md4.update(password_hash)
return md4.digest()
def challenge_response(challenge, pwhash):
# for some reason in python we need to pad an extra byte so that
# the offset works out correctly when we call DesEncrypt
pwhash += b'\x00' * (22 - len(pwhash))
response = b""
for x in range(0, 3):
encrypted = des_encrypt(challenge, pwhash, x * 7)
response += encrypted
return response
def generate_nt_response_mschap2(authenticator_challenge, peer_challenge, username, password):
challenge = challenge_hash(peer_challenge, authenticator_challenge, username)
password_hash = nt_password_hash(password)
return challenge_response(challenge, password_hash)
def generate_authenticator_response(password, nt_response, peer_challenge, authenticator_challenge, username):
magic1 = b"\x4D\x61\x67\x69\x63\x20\x73\x65\x72\x76\x65\x72\x20\x74\x6F\x20\x63\x6C\x69\x65\x6E\x74\x20\x73\x69\x67\x6E\x69\x6E\x67\x20\x63\x6F\x6E\x73\x74\x61\x6E\x74"
magic2 = b"\x50\x61\x64\x20\x74\x6F\x20\x6D\x61\x6B\x65\x20\x69\x74\x20\x64\x6F\x20\x6D\x6F\x72\x65\x20\x74\x68\x61\x6E\x20\x6F\x6E\x65\x20\x69\x74\x65\x72\x61\x74\x69\x6F\x6E"
password_hash = nt_password_hash(password)
password_hash_hash = hash_nt_password_hash(password_hash)
sha_hash = SHA.new()
sha_hash.update(password_hash_hash)
sha_hash.update(nt_response)
sha_hash.update(magic1)
digest = sha_hash.digest()
challenge = challenge_hash(peer_challenge, authenticator_challenge, username)
sha_hash = SHA.new()
sha_hash.update(digest)
sha_hash.update(challenge)
sha_hash.update(magic2)
digest = sha_hash.digest()
return digest

3
research/libwifi/run-tests.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
cd ..
python -m pytest $@

View File

@ -0,0 +1,41 @@
from libwifi.crypto import *
def get_ciphertext_mic(encrypted):
dot11ccmp = encrypted[Dot11CCMP].payload
ciphertext = dot11ccmp.load
mic = dot11ccmp.payload.load
return ciphertext, mic
def test_ccmp():
payload = b"A" * 16
ptk = b'\x00' * 48
tk = ptk[32:48]
pn = 0
plaintext = Dot11(type="Data", subtype=0, FCfield="to-DS", addr1="11:11:11:11:11:11",\
addr2="22:22:22:22:22:22", addr3="33:33:33:33:33:33", SC=0)
plaintext = plaintext/Raw(payload)
encrypted = encrypt_ccmp(plaintext, tk, pn)
ciphertext, mic = get_ciphertext_mic(encrypted)
assert ciphertext == bytes.fromhex("bedf2769dcdde9e002ab5b9df9342bc6")
assert mic == bytes.fromhex("3a49543fa1ecb1e0")
plaintext.SC = 1
encrypted = encrypt_ccmp(plaintext, tk, pn)
ciphertext, mic = get_ciphertext_mic(encrypted)
assert ciphertext == bytes.fromhex("bedf2769dcdde9e002ab5b9df9342bc6")
assert mic == bytes.fromhex("1fdbedc0538f98f2")
plaintext.FCfield |= Dot11(FCfield="MF").FCfield
encrypted = encrypt_ccmp(plaintext, tk, pn)
ciphertext, mic = get_ciphertext_mic(encrypted)
assert ciphertext == bytes.fromhex("bedf2769dcdde9e002ab5b9df9342bc6")
assert mic == bytes.fromhex("8795d9c3fba25e76")
pn = 0x1122
plaintext.FCfield |= Dot11(FCfield="MF").FCfield
encrypted = encrypt_ccmp(plaintext, tk, pn)
ciphertext, mic = get_ciphertext_mic(encrypted)
assert ciphertext == bytes.fromhex("ff76206822afb77decc7ee87568a02c6")
assert mic == bytes.fromhex("8d6fd7578170ecb1")

View File

@ -0,0 +1,83 @@
from libwifi import *
def test_crypto():
x = 32774075109236952337158599048510140249162039589740847669274255820096074575478
y = 65816200486131266053931191249788950977703402544735864608496951271815280382692
assert point_on_curve(x, y)
assert not point_on_curve(x, y + 1)
def test_sae():
# KDF_Length with 256 bits
data = b'p(b\x08\x84%\xd4\xfc\x85\x02`>Z\x8e8\x02\x06\xdcak1_\x8a\xca\x90D2[\xda\x88\x87\xbe'
label = "SAE Hunting and Pecking"
context = b'\xff\xff\xff\xff\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
result = KDF_Length(data, label, context, 256)
assert result == b"\xce\xd7\x0c`n\xc3\xa0V\xbc\xe5<Y\xd3\x80f@'.\xa3\xd8\x18\xedr\xd4/\xc8\x8a)\x18?G\xc4"
# TODO: Test KDF_Length and derive_pwe_ecc with 521 bits
# Needs one iteration
pwe = derive_pwe_ecc("password", "01:02:03:04:05:06", "11:22:33:44:55:66")
assert pwe.x == 93556404347856098254252288489266236507096062950733110787978910790160278833092
assert pwe.y == 78525808305378972147720640984042921538517312040012375950098621853813571375494
# Needs 4 iterations
pwe = derive_pwe_ecc("OtherPassword4", "01:02:03:04:05:06", "11:22:33:44:55:66")
assert pwe.x == 64608214587651293351943984050978725016684752726028646409621871614902214025509
assert pwe.y == 6010654006319004793785415601018381818802850726803703928480923751376254632130
def test_eappwd():
data = b'yKY2\xdfVP_\x84R\x04d\t\xa9\xac\xc0\xd0\x81\xe7\x01\xaa5\xe0\xd5r\xb6K\xb1g\xe0\xc8\xca'
label = "EAP-pwd Hunting And Pecking"
result = KDF_Length_eappwd(data, label, 256)
assert result == b'\x82\xe7\x92\xd75\xfc\xef\x0e:\xb1\xea\x85E\xe6\rt\x849\xd6\xc3l\xcd\x00\x8e\xde\x94\xe4\xde\xa6q\x91\x1e'
data = b'\xa8\xd5\x0c\xf47\x9b\xd0\x1d\x89\x1d\x1f\xf4\xa1\xeb\xdd\x9e\x17\x9d\xecm\xd8\xe6A2\x9c\xde$p\x9a\xcb\xd5N'
result = KDF_Length_eappwd(data, label, 256)
assert result == b'\xc8r\xfdke\xce\xfa\xbb\x0e\xc3\xb8\x83\xc2\x04\x95\x9fT\xc3P\xa9\x1be\x84\x16\xfba&\xc0!\xfbMs'
data = binascii.unhexlify("8726279f4137e57cc040cc23bdf7053217dc613bae7defe2549c5bb75ad72a79")
expect = binascii.unhexlify("ab8c540cf06cb38138670bb4e64b93d1f1232ae94f27bc19106bc84be6d0297bc2bf4b30418e12eed79462304bf12563a6b211984acf4c95005875f5b94d19c06400")
result = KDF_Length_eappwd(data, label, 521)
assert result == expect
data = binascii.unhexlify("2683a19c41fd2ad1736f3efdccfee34afcd9866ee4e213b8e23d191f4ce5a4f7")
expect = binascii.unhexlify("60d5d03a90e5ee8d08e1390f50b070330b4680dc8cf974e3227a8c09eedc56d975b191c7ea3ef5d0adcc0fa777bedcea9910098d3b02d1741699bfe8fa39be69a880")
result = KDF_Length_eappwd(data, label, 521)
assert result == expect
data = b'\x82\x02\x07\xceX\xf5\xbb\x11s\xdc\xe4\xcb\xcc"\xa9\x1f\'\x19\xe0\xee\x84w\xb7\xd6G\xad\xa6w_\xf1\x8eg'
label = b'4%\xfe\x8e\xd4\x93YD\xb3\xe7\xea\x8coN\xac\xf1 r\x83\x86u\xe1I\xbeS\xa7\x12l\xea\xbdhw\xc7'
result = KDF_Length_eappwd(data, label, 1024)
expected = b's\xdd\xb9\xcf]\x80\x08\xf6\xa08\xf1J L\x8d\xd5\xa9j\x83J\x9d\x04T(\x1a\xbc\x11o\xab\x015\x82V\xd8\xff>J'
expected += b'\x80P\x1f[PB\xfaF\x13\xae\xef\xfd\xddBkX\xff\xe03\xd3q(\x80\x9f"S\xad\xfeK:\x98\x90\xde\xdfw'
expected += b'\xd5\xe4\xf0O\xcf\x9c\xa1\r^\xf3\xf6\x1fp_\xd9\x13<\xf7\x0e\xf11\x81\x88\xb9\xfe\x80mn\xed\xf2'
expected += b'\xdel\x8f8f\\\xd4\x91\x83}|\xd0\x90\xa6I\xac\xca\xdb\x1d4\x00\xf5\xf8\xf5/\xa1'
assert result == expected
k = 34571911558658786479991772754682275693090212979118519100150784653967632958583
e1x = 35056562182093533434520846036041768262744712948121085631797144112066231820275
e1y = 30178867311470377631935198005507521070028138958470370567962433403317268006022
e1 = ECC.EccPoint(e1x, e1y)
s1 = 25673957538626389018921350300691255233489834439459044820849488820063961042178
e2x = 55846548926525467025361797152934092596912359473099878093027981331310692689958
e2y = 25540727936496301520339336932631497861346599764823572263118430938562903665071
e2 = ECC.EccPoint(e2x, e2y)
s2 = 89671311642711662572527453485728796207545960881415665173397225314404138450610
confirm = calculate_confirm_eappwd(k, e1, s1, e2, s2)
assert confirm == b"n\xe1N\xc1\x86\x0f\x94\x85W*Y\xf8\xf2'\x19\xac\x9c\xf6\xe6\xe6\x14\x8c+\xf7\x0e\xd0\xfdF\x87\x03G\xcc"
pwe = derive_pwe_ecc_eappwd("password", "user", "server", 2903600207)
assert pwe.x == 65324672961960968584356420288746215288928369435013474055323481826901726558522
assert pwe.y == 81287605691219879983190651062276165371083848816381214499332721121120114417256
pwe = derive_pwe_ecc_eappwd("password", "user", "server", 2546484939)
assert pwe.x == 32774075109236952337158599048510140249162039589740847669274255820096074575478
assert pwe.y == 65816200486131266053931191249788950977703402544735864608496951271815280382692
pwe = derive_pwe_ecc_eappwd("hello", "bob", "theserver@example.com", 0xEE04524, curve_name="p521")
assert pwe.x == 3008622341264366589487649162226557348235630833654679745848438214237061388319208914517686003128943873854271397962689455307621303688693893126759626682265352869
assert pwe.y == 649775647643090676911381912723346979966421674682002310678312738784243727860456911539411456724737204490685667258758093054491548052506429972664016924839683943

View File

@ -0,0 +1,14 @@
from libwifi.mschap import *
def test_mschap():
username = b"peapuser"
password = "password"
auth_challenge = binascii.unhexlify("59 ff 64 4c 14 62 df 4d 59 a4 46 5d 6b c8 09 6c".replace(" ", ""))
peer_challenge = binascii.unhexlify("0d 60 5a 24 da 8d 6e f7 58 ee 23 69 8f 37 04 46".replace(" ", ""))
nt_response = generate_nt_response_mschap2(auth_challenge, peer_challenge, username, password)
assert nt_response == b"\xd8\xf7\xd6\x10\xa6\x1f\x0c\x0b\x49\x1d\x21\xac\xbb\xd3\x6d\x86\xb9\x91\x6f\x8e\x69\xa6\x5f\x97"
auth_resp = generate_authenticator_response(password, nt_response, peer_challenge, auth_challenge, username)
assert auth_resp == b"\x0f\x91\x69\x7e\x8e\x8f\xd6\xb7\x25\xf3\x3c\x30\xd8\x1d\x67\xa7\x47\xfc\xba\x01"

494
research/libwifi/wifi.py Normal file
View File

@ -0,0 +1,494 @@
# Copyright (c) 2019-2021, Mathy Vanhoef <mathy.vanhoef@nyu.edu>
#
# This code may be distributed under the terms of the BSD license.
# See README for more details.
from scapy.all import *
from Crypto.Cipher import AES
from datetime import datetime
import binascii
#### Constants ####
IEEE_TLV_TYPE_SSID = 0
IEEE_TLV_TYPE_CHANNEL = 3
IEEE_TLV_TYPE_RSN = 48
IEEE_TLV_TYPE_CSA = 37
IEEE_TLV_TYPE_FT = 55
IEEE_TLV_TYPE_VENDOR = 221
WLAN_REASON_DISASSOC_DUE_TO_INACTIVITY = 4
WLAN_REASON_CLASS2_FRAME_FROM_NONAUTH_STA = 6
WLAN_REASON_CLASS3_FRAME_FROM_NONASSOC_STA = 7
#TODO: Not sure if really needed...
IEEE80211_RADIOTAP_RATE = (1 << 2)
IEEE80211_RADIOTAP_CHANNEL = (1 << 3)
IEEE80211_RADIOTAP_TX_FLAGS = (1 << 15)
IEEE80211_RADIOTAP_DATA_RETRIES = (1 << 17)
#### Basic output and logging functionality ####
ALL, DEBUG, INFO, STATUS, WARNING, ERROR = range(6)
COLORCODES = { "gray" : "\033[0;37m",
"green" : "\033[0;32m",
"orange": "\033[0;33m",
"red" : "\033[0;31m" }
global_log_level = INFO
def log(level, msg, color=None, showtime=True):
if level < global_log_level: return
if level == DEBUG and color is None: color="gray"
if level == WARNING and color is None: color="orange"
if level == ERROR and color is None: color="red"
msg = (datetime.now().strftime('[%H:%M:%S] ') if showtime else " "*11) + COLORCODES.get(color, "") + msg + "\033[1;0m"
print(msg)
def change_log_level(delta):
global global_log_level
global_log_level += delta
def croprepr(p, length=175):
string = repr(p)
if len(string) > length:
return string[:length - 3] + "..."
return string
#### Back-wards compatibility with older scapy
if not "Dot11FCS" in locals():
class Dot11FCS():
pass
if not "Dot11Encrypted" in locals():
class Dot11Encrypted():
pass
class Dot11CCMP():
pass
class Dot11TKIP():
pass
#### Linux ####
def get_device_driver(iface):
path = "/sys/class/net/%s/device/driver" % iface
try:
output = subprocess.check_output(["readlink", "-f", path])
return output.decode('utf-8').strip().split("/")[-1]
except:
return None
#### Utility ####
def get_macaddress(iface):
try:
# This method has been widely tested.
s = get_if_raw_hwaddr(iface)[1]
return ("%02x:" * 6)[:-1] % tuple(orb(x) for x in s)
except:
# Keep the old method as a backup though.
return open("/sys/class/net/%s/address" % iface).read().strip()
def addr2bin(addr):
return binascii.a2b_hex(addr.replace(':', ''))
def get_channel(iface):
output = str(subprocess.check_output(["iw", iface, "info"]))
p = re.compile("channel (\d+)")
m = p.search(output)
if m == None: return None
return int(m.group(1))
def set_channel(iface, channel):
if isinstance(channel, int):
# Compatibility with old channels encoded as simple integers
subprocess.check_output(["iw", iface, "set", "channel", str(channel)])
else:
# Channels represented as strings with extra info (e.g "11 HT40-")
subprocess.check_output(["iw", iface, "set", "channel"] + channel.split())
def set_macaddress(iface, macaddr):
# macchanger throws an error if the interface already has the given MAC address
if get_macaddress(iface) != macaddr:
subprocess.check_output(["ifconfig", iface, "down"])
subprocess.check_output(["macchanger", "-m", macaddr, iface])
def get_iface_type(iface):
output = str(subprocess.check_output(["iw", iface, "info"]))
p = re.compile("type (\w+)")
return str(p.search(output).group(1))
def set_monitor_mode(iface, up=True, mtu=1500):
# Note: we let the user put the device in monitor mode, such that they can control optional
# parameters such as "iw wlan0 set monitor active" for devices that support it.
if get_iface_type(iface) != "monitor":
# Some kernels (Debian jessie - 3.16.0-4-amd64) don't properly add the monitor interface. The following ugly
# sequence of commands assures the virtual interface is properly registered as a 802.11 monitor interface.
subprocess.check_output(["ifconfig", iface, "down"])
subprocess.check_output(["iw", iface, "set", "type", "monitor"])
time.sleep(0.5)
subprocess.check_output(["iw", iface, "set", "type", "monitor"])
if up:
subprocess.check_output(["ifconfig", iface, "up"])
subprocess.check_output(["ifconfig", iface, "mtu", str(mtu)])
def rawmac(addr):
return bytes.fromhex(addr.replace(':', ''))
def set_amsdu(p):
if "A_MSDU_Present" in [field.name for field in Dot11QoS.fields_desc]:
p.A_MSDU_Present = 1
else:
p.Reserved = 1
def is_amsdu(p):
if "A_MSDU_Present" in [field.name for field in Dot11QoS.fields_desc]:
return p.A_MSDU_Present == 1
else:
return p.Reserved == 1
#### Packet Processing Functions ####
class DHCP_sock(DHCP_am):
def __init__(self, **kwargs):
self.sock = kwargs.pop("sock")
self.server_ip = kwargs["gw"]
super(DHCP_sock, self).__init__(**kwargs)
def prealloc_ip(self, clientmac, ip=None):
"""Allocate an IP for the client before it send DHCP requests"""
if clientmac not in self.leases:
if ip == None:
ip = self.pool.pop()
self.leases[clientmac] = ip
return self.leases[clientmac]
def make_reply(self, req):
rep = super(DHCP_sock, self).make_reply(req)
# Fix scapy bug: set broadcast IP if required
if rep is not None and BOOTP in req and IP in rep:
if req[BOOTP].flags & 0x8000 != 0 and req[BOOTP].giaddr == '0.0.0.0' and req[BOOTP].ciaddr == '0.0.0.0':
rep[IP].dst = "255.255.255.255"
# Explicitly set source IP if requested
if not self.server_ip is None:
rep[IP].src = self.server_ip
return rep
def send_reply(self, reply):
self.sock.send(reply, **self.optsend)
def print_reply(self, req, reply):
log(STATUS, "%s: DHCP reply %s to %s" % (reply.getlayer(Ether).dst, reply.getlayer(BOOTP).yiaddr, reply.dst), color="green")
def remove_client(self, clientmac):
clientip = self.leases[clientmac]
self.pool.append(clientip)
del self.leases[clientmac]
class ARP_sock(ARP_am):
def __init__(self, **kwargs):
self.sock = kwargs.pop("sock")
super(ARP_am, self).__init__(**kwargs)
def send_reply(self, reply):
self.sock.send(reply, **self.optsend)
def print_reply(self, req, reply):
log(STATUS, "%s: ARP: %s ==> %s on %s" % (reply.getlayer(Ether).dst, req.summary(), reply.summary(), self.iff))
#### Packet Processing Functions ####
# Compatibility with older Scapy versions
if not "ORDER" in scapy.layers.dot11._rt_txflags:
scapy.layers.dot11._rt_txflags.append("ORDER")
class MonitorSocket(L2Socket):
def __init__(self, iface, dumpfile=None, detect_injected=False, **kwargs):
super(MonitorSocket, self).__init__(iface, **kwargs)
self.pcap = None
if dumpfile:
self.pcap = PcapWriter("%s.%s.pcap" % (dumpfile, self.iface), append=False, sync=True)
self.detect_injected = detect_injected
self.default_rate = None
def set_channel(self, channel):
subprocess.check_output(["iw", self.iface, "set", "channel", str(channel)])
def attach_filter(self, bpf):
log(DEBUG, "Attaching filter to %s: <%s>" % (self.iface, bpf))
attach_filter(self.ins, bpf, self.iface)
def set_default_rate(self, rate):
self.default_rate = rate
def send(self, p, rate=None):
# Hack: set the More Data flag so we can detect injected frames (and so clients stay awake longer)
if self.detect_injected:
p.FCfield |= 0x20
# Control data rate injected frames
if rate is None and self.default_rate is None:
rtap = RadioTap(present="TXFlags", TXFlags="NOSEQ+ORDER")
else:
use_rate = rate if rate != None else self.default_rate
rtap = RadioTap(present="TXFlags+Rate", Rate=use_rate, TXFlags="NOSEQ+ORDER")
L2Socket.send(self, rtap/p)
if self.pcap: self.pcap.write(RadioTap()/p)
def _strip_fcs(self, p):
"""
Scapy may throw exceptions when handling malformed short frames,
so we need to catch these exceptions and just ignore these frames.
This in particular happened with short malformed beacons.
"""
try:
return Dot11(raw(p[Dot11FCS])[:-4])
except:
return None
def _detect_and_strip_fcs(self, p):
# Older scapy can't handle the optional Frame Check Sequence (FCS) field automatically
if p[RadioTap].present & 2 != 0 and not Dot11FCS in p:
rawframe = raw(p[RadioTap])
pos = 8
while orb(rawframe[pos - 1]) & 0x80 != 0: pos += 4
# If the TSFT field is present, it must be 8-bytes aligned
if p[RadioTap].present & 1 != 0:
pos += (8 - (pos % 8))
pos += 8
# Remove FCS if present
if orb(rawframe[pos]) & 0x10 != 0:
return self._strip_fcs(p)
return p[Dot11]
def recv(self, x=MTU, reflected=False):
p = L2Socket.recv(self, x)
if p == None or not (Dot11 in p or Dot11FCS in p):
return None
if self.pcap:
self.pcap.write(p)
# Hack: ignore frames that we just injected and are echoed back by the kernel
if self.detect_injected and p.FCfield & 0x20 != 0:
return None
# Ignore reflection of injected frames. These have a small RadioTap header.
if not reflected and p[RadioTap].len < 13:
return None
# Strip the FCS if present, and drop the RadioTap header
if Dot11FCS in p:
return self._strip_fcs(p)
else:
return self._detect_and_strip_fcs(p)
def close(self):
if self.pcap: self.pcap.close()
super(MonitorSocket, self).close()
# For backwards compatibility
class MitmSocket(MonitorSocket):
pass
def dot11_get_seqnum(p):
return p.SC >> 4
def dot11_is_encrypted_data(p):
# All these different cases are explicitly tested to handle older scapy versions
return (p.FCfield & 0x40) or Dot11CCMP in p or Dot11TKIP in p or Dot11WEP in p or Dot11Encrypted in p
def payload_to_iv(payload):
iv0 = payload[0]
iv1 = payload[1]
wepdata = payload[4:8]
# FIXME: Only CCMP is supported (TKIP uses a different IV structure)
return orb(iv0) + (orb(iv1) << 8) + (struct.unpack(">I", wepdata)[0] << 16)
def dot11_get_iv(p):
"""
This function assumes the frame is encrypted using either CCMP or WEP.
It does not work for other encrypion protocol (e.g. TKIP).
"""
# The simple and default case
if Dot11CCMP in p:
payload = raw(p[Dot11CCMP])
return payload_to_iv(payload)
# Scapy uses a heuristic to differentiate CCMP/TKIP and this may be wrong.
# So even when we get a Dot11TKIP frame, we'll still treat it like a Dot11CCMP frame.
elif Dot11TKIP in p:
payload = raw(p[Dot11TKIP])
return payload_to_iv(payload)
elif Dot11WEP in p:
wep = p[Dot11WEP]
# Older Scapy versions parse CCMP-encrypted frames as Dot11WEP. So we check if the
# extended IV flag is set, and if so, treat it like a CCMP frame.
if wep.keyid & 32:
# This only works for CCMP (TKIP uses a different IV structure).
return orb(wep.iv[0]) + (orb(wep.iv[1]) << 8) + (struct.unpack(">I", wep.wepdata[:4])[0] << 16)
# If the extended IV flag is not set meaning it's indeed WEP.
else:
return orb(wep.iv[0]) + (orb(wep.iv[1]) << 8) + (orb(wep.iv[2]) << 16)
# Scapy uses Dot11Encrypted if it couldn't determine how the frame was encrypted. Assume CCMP.
elif Dot11Encrypted in p:
payload = raw(p[Dot11Encrypted])
return payload_to_iv(payload)
# Manually detect encrypted frames in case (older versions of) Scapy failed to do this. Assume CCMP.
elif p.FCfield & 0x40:
return payload_to_iv(p[Raw].load)
# Couldn't determine the IV
return None
def dot11_get_priority(p):
if not Dot11QoS in p: return 0
return p[Dot11QoS].TID
#### Crypto functions and util ####
def get_ccmp_payload(p):
if Dot11WEP in p:
# Extract encrypted payload:
# - Skip extended IV (4 bytes in total)
# - Exclude first 4 bytes of the CCMP MIC (note that last 4 are saved in the WEP ICV field)
return str(p.wepdata[4:-4])
elif Dot11CCMP in p:
return p[Dot11CCMP].data
elif Dot11TKIP in p:
return p[Dot11TKIP].data
elif Dot11Encrypted in p:
return p[Dot11Encrypted].data
else:
return p[Raw].load
class IvInfo():
def __init__(self, p):
self.iv = dot11_get_iv(p)
self.seq = dot11_get_seqnum(p)
self.time = p.time
def is_reused(self, p):
"""Return true if frame p reuses an IV and if p is not a retransmitted frame"""
iv = dot11_get_iv(p)
seq = dot11_get_seqnum(p)
return self.iv == iv and self.seq != seq and p.time >= self.time + 1
class IvCollection():
def __init__(self):
self.ivs = dict() # maps IV values to IvInfo objects
def reset(self):
self.ivs = dict()
def track_used_iv(self, p):
iv = dot11_get_iv(p)
self.ivs[iv] = IvInfo(p)
def is_iv_reused(self, p):
"""Returns True if this is an *observed* IV reuse and not just a retransmission"""
iv = dot11_get_iv(p)
return iv in self.ivs and self.ivs[iv].is_reused(p)
def is_new_iv(self, p):
"""Returns True if the IV in this frame is higher than all previously observed ones"""
iv = dot11_get_iv(p)
if len(self.ivs) == 0: return True
return iv > max(self.ivs.keys())
def create_fragments(header, data, num_frags):
# This special case is useful so scapy keeps the full "interpretation" of the frame
# instead of afterwards treating/displaying the payload as just raw data.
if num_frags == 1: return [header/data]
data = raw(data)
fragments = []
fragsize = (len(data) + num_frags - 1) // num_frags
for i in range(num_frags):
frag = header.copy()
frag.SC |= i
if i < num_frags - 1:
frag.FCfield |= Dot11(FCfield="MF").FCfield
payload = data[fragsize * i : fragsize * (i + 1)]
frag = frag/Raw(payload)
fragments.append(frag)
return fragments
def get_element(el, id):
if not Dot11Elt in el: return None
el = el[Dot11Elt]
while not el is None:
if el.ID == id:
return el
el = el.payload
return None
def get_ssid(beacon):
if not (Dot11 in beacon or Dot11FCS in beacon): return
if Dot11Elt not in beacon: return
if beacon[Dot11].type != 0 and beacon[Dot11].subtype != 8: return
el = get_element(beacon, IEEE_TLV_TYPE_SSID)
return el.info.decode()
def is_from_sta(p, macaddr):
if not (Dot11 in p or Dot11FCS in p):
return False
if p.addr1 != macaddr and p.addr2 != macaddr:
return False
return True
def get_bss(iface, clientmac, timeout=20):
ps = sniff(count=1, timeout=timeout, lfilter=lambda p: is_from_sta(p, clientmac) and p.addr2 != None, iface=iface)
if len(ps) == 0:
return None
return ps[0].addr1 if ps[0].addr1 != clientmac else ps[0].addr2
def create_msdu_subframe(src, dst, payload, last=False):
length = len(payload)
p = Ether(dst=dst, src=src, type=length)
payload = raw(payload)
total_length = len(p) + len(payload)
padding = ""
if not last and total_length % 4 != 0:
padding = b"\x00" * (4 - (total_length % 4))
return p / payload / Raw(padding)
def find_network(iface, ssid):
ps = sniff(count=1, timeout=0.3, lfilter=lambda p: get_ssid(p) == ssid, iface=iface)
if ps is None or len(ps) < 1:
log(STATUS, "Searching for target network on other channels")
for chan in [1, 6, 11, 3, 8, 2, 7, 4, 10, 5, 9, 12, 13]:
set_channel(iface, chan)
log(DEBUG, "Listening on channel %d" % chan)
ps = sniff(count=1, timeout=0.3, lfilter=lambda p: get_ssid(p) == ssid, iface=iface)
if ps and len(ps) >= 1: break
if ps and len(ps) >= 1:
# Even though we capture the beacon we might still be on another channel,
# so it's important to explicitly switch to the correct channel.
actual_chan = orb(get_element(ps[0], IEEE_TLV_TYPE_CHANNEL).info)
set_channel(iface, actual_chan)
# Return the beacon that we captured
return ps[0]
return None