tests: Use rfkill python module

Instead of calling the rfkill binary, use the built-in module.

Signed-off-by: Johannes Berg <johannes.berg@intel.com>
This commit is contained in:
Johannes Berg 2015-01-08 14:59:16 +01:00 committed by Jouni Malinen
parent bd31b92a68
commit 9c8779daf6
2 changed files with 45 additions and 49 deletions

View File

@ -6,39 +6,36 @@
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
import subprocess
import time import time
import hostapd import hostapd
from hostapd import HostapdGlobal from hostapd import HostapdGlobal
import hwsim_utils import hwsim_utils
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
from rfkill import RFKill
def get_rfkill_id(dev): def get_rfkill(dev):
phy = dev.get_driver_status_field("phyname")
try: try:
cmd = subprocess.Popen(["rfkill", "list"], stdout=subprocess.PIPE) for r, s, h in RFKill.list():
if r.name == phy:
return r
except Exception, e: except Exception, e:
logger.info("No rfkill available: " + str(e)) logger.info("No rfkill available: " + str(e))
return None
res = cmd.stdout.read() return None
cmd.stdout.close()
phy = dev.get_driver_status_field("phyname")
matches = [ line for line in res.splitlines() if phy + ':' in line ]
if len(matches) != 1:
return None
return matches[0].split(':')[0]
def test_rfkill_open(dev, apdev): def test_rfkill_open(dev, apdev):
"""rfkill block/unblock during open mode connection""" """rfkill block/unblock during open mode connection"""
id = get_rfkill_id(dev[0]) rfk = get_rfkill(dev[0])
if id is None: if rfk is None:
return "skip" return "skip"
hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
try: try:
logger.info("rfkill block") logger.info("rfkill block")
subprocess.call(['sudo', 'rfkill', 'block', id]) rfk.block()
dev[0].wait_disconnected(timeout=10, dev[0].wait_disconnected(timeout=10,
error="Missing disconnection event on rfkill block") error="Missing disconnection event on rfkill block")
@ -52,17 +49,17 @@ def test_rfkill_open(dev, apdev):
raise Exception("FETCH_OSU accepted while disabled") raise Exception("FETCH_OSU accepted while disabled")
logger.info("rfkill unblock") logger.info("rfkill unblock")
subprocess.call(['sudo', 'rfkill', 'unblock', id]) rfk.unblock()
dev[0].wait_connected(timeout=10, dev[0].wait_connected(timeout=10,
error="Missing connection event on rfkill unblock") error="Missing connection event on rfkill unblock")
hwsim_utils.test_connectivity(dev[0], hapd) hwsim_utils.test_connectivity(dev[0], hapd)
finally: finally:
subprocess.call(['sudo', 'rfkill', 'unblock', id]) rfk.unblock()
def test_rfkill_wpa2_psk(dev, apdev): def test_rfkill_wpa2_psk(dev, apdev):
"""rfkill block/unblock during WPA2-PSK connection""" """rfkill block/unblock during WPA2-PSK connection"""
id = get_rfkill_id(dev[0]) rfk = get_rfkill(dev[0])
if id is None: if rfk is None:
return "skip" return "skip"
ssid = "test-wpa2-psk" ssid = "test-wpa2-psk"
@ -72,25 +69,25 @@ def test_rfkill_wpa2_psk(dev, apdev):
dev[0].connect(ssid, psk=passphrase, scan_freq="2412") dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
try: try:
logger.info("rfkill block") logger.info("rfkill block")
subprocess.call(['sudo', 'rfkill', 'block', id]) rfk.block()
dev[0].wait_disconnected(timeout=10, dev[0].wait_disconnected(timeout=10,
error="Missing disconnection event on rfkill block") error="Missing disconnection event on rfkill block")
logger.info("rfkill unblock") logger.info("rfkill unblock")
subprocess.call(['sudo', 'rfkill', 'unblock', id]) rfk.unblock()
dev[0].wait_connected(timeout=10, dev[0].wait_connected(timeout=10,
error="Missing connection event on rfkill unblock") error="Missing connection event on rfkill unblock")
hwsim_utils.test_connectivity(dev[0], hapd) hwsim_utils.test_connectivity(dev[0], hapd)
finally: finally:
subprocess.call(['sudo', 'rfkill', 'unblock', id]) rfk.unblock()
def test_rfkill_autogo(dev, apdev): def test_rfkill_autogo(dev, apdev):
"""rfkill block/unblock for autonomous P2P GO""" """rfkill block/unblock for autonomous P2P GO"""
id0 = get_rfkill_id(dev[0]) rfk0 = get_rfkill(dev[0])
if id0 is None: if rfk0 is None:
return "skip" return "skip"
id1 = get_rfkill_id(dev[1]) rfk1 = get_rfkill(dev[1])
if id1 is None: if rfk1 is None:
return "skip" return "skip"
dev[0].p2p_start_go() dev[0].p2p_start_go()
@ -99,7 +96,7 @@ def test_rfkill_autogo(dev, apdev):
try: try:
logger.info("rfkill block 0") logger.info("rfkill block 0")
subprocess.call(['sudo', 'rfkill', 'block', id0]) rfk0.block()
ev = dev[0].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10) ev = dev[0].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
if ev is None: if ev is None:
raise Exception("Group removal not reported") raise Exception("Group removal not reported")
@ -111,7 +108,7 @@ def test_rfkill_autogo(dev, apdev):
raise Exception("P2P_LISTEN accepted unexpectedly") raise Exception("P2P_LISTEN accepted unexpectedly")
logger.info("rfkill block 1") logger.info("rfkill block 1")
subprocess.call(['sudo', 'rfkill', 'block', id1]) rfk1.block()
ev = dev[1].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10) ev = dev[1].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
if ev is None: if ev is None:
raise Exception("Group removal not reported") raise Exception("Group removal not reported")
@ -119,28 +116,28 @@ def test_rfkill_autogo(dev, apdev):
raise Exception("Unexpected group removal reason: " + ev) raise Exception("Unexpected group removal reason: " + ev)
logger.info("rfkill unblock 0") logger.info("rfkill unblock 0")
subprocess.call(['sudo', 'rfkill', 'unblock', id0]) rfk0.unblock()
logger.info("rfkill unblock 1") logger.info("rfkill unblock 1")
subprocess.call(['sudo', 'rfkill', 'unblock', id1]) rfk1.unblock()
time.sleep(1) time.sleep(1)
finally: finally:
subprocess.call(['sudo', 'rfkill', 'unblock', id0]) rfk0.unblock()
subprocess.call(['sudo', 'rfkill', 'unblock', id1]) rfk1.unblock()
def test_rfkill_hostapd(dev, apdev): def test_rfkill_hostapd(dev, apdev):
"""rfkill block/unblock during and prior to hostapd operations""" """rfkill block/unblock during and prior to hostapd operations"""
hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" }) hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
id = get_rfkill_id(hapd) rfk = get_rfkill(hapd)
if id is None: if rfk is None:
return "skip" return "skip"
try: try:
subprocess.call(['rfkill', 'block', id]) rfk.block()
ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5) ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5)
if ev is None: if ev is None:
raise Exception("INTERFACE-DISABLED event not seen") raise Exception("INTERFACE-DISABLED event not seen")
subprocess.call(['rfkill', 'unblock', id]) rfk.unblock()
ev = hapd.wait_event(["INTERFACE-ENABLED"], timeout=5) ev = hapd.wait_event(["INTERFACE-ENABLED"], timeout=5)
if ev is None: if ev is None:
raise Exception("INTERFACE-ENABLED event not seen") raise Exception("INTERFACE-ENABLED event not seen")
@ -148,7 +145,7 @@ def test_rfkill_hostapd(dev, apdev):
hapd.disable() hapd.disable()
hapd.enable() hapd.enable()
dev[0].connect("open", key_mgmt="NONE", scan_freq="2412") dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
subprocess.call(['rfkill', 'block', id]) rfk.block()
ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5) ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5)
if ev is None: if ev is None:
raise Exception("INTERFACE-DISABLED event not seen") raise Exception("INTERFACE-DISABLED event not seen")
@ -165,27 +162,27 @@ def test_rfkill_hostapd(dev, apdev):
if "FAIL" not in hapd.request("ENABLE"): if "FAIL" not in hapd.request("ENABLE"):
raise Exception("ENABLE succeeded unexpectedly (rfkill)") raise Exception("ENABLE succeeded unexpectedly (rfkill)")
finally: finally:
subprocess.call(['rfkill', 'unblock', id]) rfk.unblock()
def test_rfkill_wpas(dev, apdev): def test_rfkill_wpas(dev, apdev):
"""rfkill block prior to wpa_supplicant start""" """rfkill block prior to wpa_supplicant start"""
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5") wpas.interface_add("wlan5")
id = get_rfkill_id(wpas) rfk = get_rfkill(wpas)
if id is None: if rfk is None:
return "skip" return "skip"
wpas.interface_remove("wlan5") wpas.interface_remove("wlan5")
try: try:
subprocess.call(['rfkill', 'block', id]) rfk.block()
wpas.interface_add("wlan5") wpas.interface_add("wlan5")
time.sleep(0.5) time.sleep(0.5)
state = wpas.get_status_field("wpa_state") state = wpas.get_status_field("wpa_state")
if state != "INTERFACE_DISABLED": if state != "INTERFACE_DISABLED":
raise Exception("Unexpected state with rfkill blocked: " + state) raise Exception("Unexpected state with rfkill blocked: " + state)
subprocess.call(['rfkill', 'unblock', id]) rfk.unblock()
time.sleep(0.5) time.sleep(0.5)
state = wpas.get_status_field("wpa_state") state = wpas.get_status_field("wpa_state")
if state == "INTERFACE_DISABLED": if state == "INTERFACE_DISABLED":
raise Exception("Unexpected state with rfkill unblocked: " + state) raise Exception("Unexpected state with rfkill unblocked: " + state)
finally: finally:
subprocess.call(['rfkill', 'unblock', id]) rfk.unblock()

View File

@ -7,12 +7,11 @@
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
import os import os
import subprocess
import hostapd import hostapd
import hwsim_utils import hwsim_utils
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
from test_rfkill import get_rfkill_id from test_rfkill import get_rfkill
def get_wext_interface(): def get_wext_interface():
if not os.path.exists("/proc/net/wireless"): if not os.path.exists("/proc/net/wireless"):
@ -244,8 +243,8 @@ def test_wext_rfkill(dev, apdev):
"""WEXT and rfkill block/unblock""" """WEXT and rfkill block/unblock"""
wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
wpas.interface_add("wlan5") wpas.interface_add("wlan5")
id = get_rfkill_id(wpas) rfk = get_rfkill(wpas)
if id is None: if rfk is None:
return "skip" return "skip"
wpas.interface_remove("wlan5") wpas.interface_remove("wlan5")
@ -257,14 +256,14 @@ def test_wext_rfkill(dev, apdev):
wpas.connect("open", key_mgmt="NONE", scan_freq="2412") wpas.connect("open", key_mgmt="NONE", scan_freq="2412")
try: try:
logger.info("rfkill block") logger.info("rfkill block")
subprocess.call(['rfkill', 'block', id]) rfk.block()
wpas.wait_disconnected(timeout=10, wpas.wait_disconnected(timeout=10,
error="Missing disconnection event on rfkill block") error="Missing disconnection event on rfkill block")
logger.info("rfkill unblock") logger.info("rfkill unblock")
subprocess.call(['rfkill', 'unblock', id]) rfk.unblock()
wpas.wait_connected(timeout=20, wpas.wait_connected(timeout=20,
error="Missing connection event on rfkill unblock") error="Missing connection event on rfkill unblock")
hwsim_utils.test_connectivity(wpas, hapd) hwsim_utils.test_connectivity(wpas, hapd)
finally: finally:
subprocess.call(['rfkill', 'unblock', id]) rfk.unblock()