fragattacks: various improvements to ping test for experiments

This commit is contained in:
Mathy 2020-04-23 10:19:14 -04:00 committed by Mathy Vanhoef
parent e26278f4b5
commit 1027a7f902
2 changed files with 97 additions and 68 deletions

View File

@ -64,6 +64,11 @@ class TestOptions():
self.ip = None
self.peerip = None
def log_level2switch():
if global_log_level == 1: return ["-d", "-K"]
elif global_log_level <= 0: return ["-dd", "-K"]
return ["-K"]
#TODO: Move to libwifi?
def add_msdu_frag(src, dst, payload):
length = len(payload)
@ -96,7 +101,6 @@ def generate_request(sta, ptype, prior=2):
elif ptype == REQ_ICMP:
label = b"test_ping_icmp"
check = lambda p: ICMP in p and label in raw(p)
print(f"Ping from {sta.ip} to {sta.peerip}")
request = LLC()/SNAP()/IP(src=sta.ip, dst=sta.peerip)/ICMP()/Raw(label)
elif ptype == REQ_DHCP:
@ -150,6 +154,14 @@ class Action():
def get_action(self):
return self.action
def __str__(self):
trigger = ["StartAuth", "BeforeAuth", "AfterAuth", "Connected"][self.trigger]
action = ["GetIp", "Rekey", "Reconnect", "Roam", "Inject", "Func"][self.action]
return f"Action({trigger}, {action})"
def __repr__(self):
return str(self)
class Test(metaclass=abc.ABCMeta):
"""
Base class to define tests. The default defined methods can be used,
@ -159,6 +171,8 @@ class Test(metaclass=abc.ABCMeta):
def __init__(self, actions=None):
self.actions = actions if actions != None else []
self.generated = False
self.delay = None
self.inc_pn = None
def next_trigger_is(self, trigger):
if len(self.actions) == 0:
@ -181,13 +195,38 @@ class Test(metaclass=abc.ABCMeta):
return [act for act in self.actions if act.action == action]
@abc.abstractmethod
def generate(self, station):
def prepare(self, station):
pass
def generate(self, station):
self.prepare(station)
self.enforce_delay()
self.enforce_inc_pn()
@abc.abstractmethod
def check(self, p):
return False
def set_options(self, delay=None, inc_pn=None):
self.delay = delay
self.inc_pn = inc_pn
def enforce_delay(self):
if self.delay == None or self.delay <= 0:
return
# Add a delay between injected fragments if requested
for frag in self.get_actions(Action.Inject)[1:]:
frag.delay = self.delay
def enforce_inc_pn(self):
if self.inc_pn == None:
return
# Add a delay between injected fragments if requested
for frag in self.get_actions(Action.Inject)[1:]:
frag.inc_pn = self.inc_pn
class PingTest(Test):
def __init__(self, ptype, fragments, bcast=False, separate_with=None, as_msdu=False):
super().__init__(fragments)
@ -202,7 +241,7 @@ class PingTest(Test):
return False
return self.check_fn(p)
def generate(self, station):
def prepare(self, station):
log(STATUS, "Generating ping test", color="green")
# Generate the header and payload
@ -254,7 +293,7 @@ class LinuxTest(Test):
return False
return self.check_fn(p)
def generate(self, station):
def prepare(self, station):
header, request, self.check_fn = generate_request(station, self.ptype)
frag1, frag2 = create_fragments(header, request, 2)
@ -288,7 +327,7 @@ class MacOsTest(Test):
return False
return self.check_fn(p)
def generate(self, station):
def prepare(self, station):
# First fragment is the start of an EAPOL frame
header = station.get_header(prior=2)
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
@ -318,7 +357,7 @@ class EapolTest(Test):
Action(Action.BeforeAuth, enc=False)
])
def generate(self, station):
def prepare(self, station):
header = station.get_header(prior=2)
request = LLC()/SNAP()/EAPOL()/EAP()/Raw(b"A"*32)
frag1, frag2 = create_fragments(header, data=request, num_frags=2)
@ -345,7 +384,7 @@ class EapolMsduTest(Test):
return False
return self.check_fn(p)
def generate(self, station):
def prepare(self, station):
log(STATUS, "Generating ping test", color="green")
# Generate the single frame
@ -422,7 +461,7 @@ class Station():
if self.test != None and self.test.check != None and self.test.check(p):
log(STATUS, "SUCCESSFULL INJECTION", color="green")
print(repr(p))
log(STATUS, "Received packet: " + repr(p))
self.test = None
def send_mon(self, data, prior=1):
@ -451,7 +490,7 @@ class Station():
p = p/LLC()/SNAP()/payload
if self.tk: p = self.encrypt(p)
print("[Injecting]", repr(p))
log(STATUS, "[Injecting] " + repr(p))
daemon.inject_mon(p)
def set_header(self, p, forward=False, prior=None):
@ -595,7 +634,6 @@ class Station():
# Force rekey as AP, wait on rekey as client
self.daemon.rekey(self)
elif act.action == Action.Roam:
# Roam as client, TODO XXX what was AP?
self.daemon.roam(self)
@ -605,7 +643,7 @@ class Station():
self.daemon.reconnect(self)
elif act.action == Action.Inject:
if act.delay != None:
if act.delay != None and act.delay > 0:
log(STATUS, f"Sleeping {act.delay} seconds")
time.sleep(act.delay)
@ -635,7 +673,7 @@ class Station():
#
if self.options.inject_workaround and frame != None and frame.FCfield & 0x4 != 0:
self.daemon.inject_mon(Dot11(addr1="ff:ff:ff:ff:ff:ff"))
print("[Injected packet] Prevent ath9k_htc bug after fragment injection")
log(STATUS, "[Injected packet] Prevented ath9k_htc bug after fragment injection")
return result
@ -827,6 +865,7 @@ class Authenticator(Daemon):
log(STATUS, f"Waiting on client {station.get_peermac()} to get IP")
def rekey(self, station):
log(STATUS, f"Starting PTK rekey with client {station.get_peermac()}", color="green")
wpaspy_command(self.wpaspy_ctrl, "REKEY_PTK " + station.get_peermac())
def reconnect(self, station):
@ -884,7 +923,7 @@ class Authenticator(Daemon):
return
self.stations[clientmac].handle_eapol_tx(bytes.fromhex(payload))
# XXX update so this also works with rekeys
# XXX WPA1: Take into account group key handshake on initial 4-way HS
elif "AP-STA-CONNECTED" in msg:
cmd, clientmac = msg.split()
if not clientmac in self.stations:
@ -898,8 +937,7 @@ class Authenticator(Daemon):
self.process = subprocess.Popen([
"../hostapd/hostapd",
"-i", self.nic_iface,
"hostapd.conf", "-dd"
])
"hostapd.conf"] + log_level2switch())
time.sleep(1)
except:
if not os.path.exists("../hostapd/hostapd"):
@ -1086,8 +1124,7 @@ class Supplicant(Daemon):
"../wpa_supplicant/wpa_supplicant",
"-Dnl80211",
"-i", self.nic_iface,
"-cclient.conf",
"-dd"])
"-cclient.conf"] + log_level2switch())
time.sleep(1)
except:
if not os.path.exists("../wpa_supplicant/wpa_supplicant"):
@ -1102,19 +1139,36 @@ class Supplicant(Daemon):
def cleanup():
daemon.stop()
def char2action(c):
def char2trigger(c):
if c == 'S': return Action.StartAuth
elif c == 'B': return Action.BeforeAuth
elif c == 'A': return Action.AfterAuth
elif c == 'C': return Action.Connected
else: raise Exception("Unknown trigger character " + c)
def stract2action(stract):
if len(stract) == 1:
trigger = Action.Connected
c = stract[0]
else:
trigger = char2trigger(stract[0])
c = stract[1]
if c == 'I':
return Action(Action.Connected, action=Action.GetIp)
return Action(trigger, action=Action.GetIp)
elif c == 'R':
return Action(trigger, action=Action.Rekey)
elif c == 'P':
return Action(Action.Connected, enc=False)
return Action(trigger, enc=False)
elif c == 'E':
return Action(Action.Connected, enc=True)
return Action(trigger, enc=True)
raise Exception("Unrecognized action")
def prepare_tests(test_name, actions):
def prepare_tests(test_name, actions, delay=0, inc_pn=0, as_msdu=False):
# Handle action string
if actions != None:
actions = [char2action(c) for c in actions]
actions = [stract2action(stract) for stract in actions.split(",")]
if test_name == "qca_test":
test = QcaDriverTest()
@ -1126,34 +1180,22 @@ def prepare_tests(test_name, actions):
test = QcaDriverRekey()
elif test_name == "ping":
# Simple ping as sanity check
if actions == None:
actions = [Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True)]
test = PingTest(REQ_ICMP, actions)
test = PingTest(REQ_ICMP, actions, as_msdu=as_msdu)
elif test_name == "ping_msdu":
# Simple ping as sanity check
elif test_name == "ping_frag_sep":
# Check if we can send frames in between fragments. Use priority of 1 since that
# is also what we use in send_mon currently.
separator = Dot11(type="Data", subtype=8, SC=(33 << 4) | 0)/Dot11QoS(TID=1)/LLC()/SNAP()
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True)],
as_msdu=True)
elif test_name == "ping_frag":
# Simple ping as sanity check
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True),
])
elif test_name == "ping_frag_skip":
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True, inc_pn=2),
])
separate_with=separator, as_msdu=as_msdu,
)
elif test_name == "wep_mixed_key":
log(WARNING, "Cannot predict WEP key reotation. Fragment may time out, use very short key rotation!", color="orange")
@ -1164,16 +1206,6 @@ def prepare_tests(test_name, actions):
Action(Action.AfterAuth, enc=True),
])
elif test_name == "ping_frag_sep":
# Check if we can send frames in between fragments
separator = Dot11(type="Data", subtype=8, SC=(33 << 4) | 0)/Dot11QoS()/LLC()/SNAP()
test = PingTest(REQ_ICMP,
[Action(Action.Connected, action=Action.GetIp),
Action(Action.Connected, enc=True),
Action(Action.Connected, enc=True)],
separate_with=separator
)
elif test_name == "cache_poison":
# Cache poison attack. Worked against Linux Hostapd and RT-AC51U.
test = PingTest(REQ_ICMP,
@ -1223,20 +1255,14 @@ def prepare_tests(test_name, actions):
# - Test case with a very lage aggregated frame (which is normally not
# allowed but some may accept it). And a variation to check how APs
# will forward such overly large frame (e.g. force fragmentation).
# - 1.1 Encrypted (= sanity ping test)
# 1.2 Plaintext (= text plaintext injection)
# 1.3 Encrpted, Encrypted
# 1.4 [TKIP] Encrpted, Encrypted, no global MIC
# 1.5 Plaintext, plaintext
# 1.6 Encrypted, plaintext
# 1.7 Plaintext, encrypted
# 1.8 Encrypted, plaintext, encrypted
# 1.9 Plaintext, encrypted, plaintext
# 2. Test 2 but first plaintext sent before installing key
# ==> Plaintext can already be sent during 4-way HS?
# - [TKIP] Encrpted, Encrypted, no global MIC
# - Plain/Enc tests but first plaintext sent before installing key
# - Test fragmentation of management frames
# - Test fragmentation of group frames (STA mode of RT-AC51u?)
# Handle delay and inc_pn parameters automatically in all tests somehow
test.set_options(delay, inc_pn)
return test
if __name__ == "__main__":
@ -1250,12 +1276,15 @@ if __name__ == "__main__":
parser.add_argument('--peerip', help="IP of the device we will test.")
parser.add_argument('--ap', default=False, action='store_true', help="Act as an AP to test clients.")
parser.add_argument('--debug', type=int, default=0, help="Debug output level.")
parser.add_argument('--delay', type=int, default=0, help="Delay between fragments in certain tests.")
parser.add_argument('--inc_pn', type=int, default=1, help="To test non-sequential packet number in fragments.")
parser.add_argument('--msdu', default=False, action='store_true', help="Encapsulate pings in an A-MSDU frame.")
args = parser.parse_args()
# Convert parsed options to TestOptions object
options = TestOptions()
options.interface = args.iface
options.test = prepare_tests(args.testname, args.actions)
options.test = prepare_tests(args.testname, args.actions, args.delay, args.inc_pn, args.msdu)
options.ip = args.ip
options.peerip = args.peerip

View File

@ -34,7 +34,7 @@ class QcaDriverTest(Test):
return False
return self.check_fn(p)
def generate(self, station):
def prepare(self, station):
log(STATUS, "Generating QCA driver test", color="green")
# Generate the header and payload
@ -100,7 +100,7 @@ class QcaTestSplit(Test):
return False
return self.check_fn(p)
def generate(self, station):
def prepare(self, station):
log(STATUS, "Generating QCA driver test", color="green")
# Generate the header and payload
@ -177,7 +177,7 @@ class QcaDriverRekey(Test):
return False
return self.check_fn(p)
def generate(self, station):
def prepare(self, station):
log(STATUS, "Generating QCA driver test", color="green")
# Generate the header and payload