tests: Refactor tshark running

Refactor the code to run tshark into its own submodule. This allows
even remembering whether -Y or -R needs to be used for filtering.

Signed-off-by: Johannes Berg <johannes.berg@intel.com>
This commit is contained in:
Johannes Berg 2015-01-09 19:55:44 +01:00 committed by Jouni Malinen
parent 0e126c6dca
commit 2e1d7386e2
4 changed files with 64 additions and 69 deletions

View File

@ -13,6 +13,7 @@ import time
import hostapd import hostapd
import hwsim_utils import hwsim_utils
from tshark import run_tshark
from nl80211 import * from nl80211 import *
def nl80211_command(dev, cmd, attr): def nl80211_command(dev, cmd, attr):
@ -82,28 +83,9 @@ def test_cfg80211_tx_frame(dev, apdev, params):
# note: also the Deauthenticate frame sent by the GO going down ends up # note: also the Deauthenticate frame sent by the GO going down ends up
# being transmitted incorrectly on 2422 MHz. # being transmitted incorrectly on 2422 MHz.
try: out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
arg = [ "tshark", "wlan.fc.type_subtype == 13", ["radiotap.channel.freq"])
"-r", os.path.join(params['logdir'], "hwsim0.pcapng"), if out is not None:
"-Y", "wlan.fc.type_subtype == 13",
"-Tfields", "-e", "radiotap.channel.freq" ]
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
except Exception, e:
logger.info("Could run run tshark check: " + str(e))
cmd = None
pass
if cmd:
(out,err) = cmd.communicate()
res = cmd.wait()
if res == 1:
arg[3] = '-R'
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
(out,err) = cmd.communicate()
res = cmd.wait()
freq = out.splitlines() freq = out.splitlines()
if len(freq) != 2: if len(freq) != 2:
raise Exception("Unexpected number of Action frames (%d)" % len(freq)) raise Exception("Unexpected number of Action frames (%d)" % len(freq))

View File

@ -12,6 +12,7 @@ import time
import hostapd import hostapd
import hwsim_utils import hwsim_utils
from tshark import run_tshark
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
from hwsim import HWSimRadio from hwsim import HWSimRadio
from test_p2p_grpform import go_neg_pin_authorized from test_p2p_grpform import go_neg_pin_authorized
@ -131,28 +132,9 @@ def test_p2p_channel_random_social_with_op_class_change(dev, apdev, params):
raise Exception("Unexpected channel %d MHz - did not pick random social channel" % freq) raise Exception("Unexpected channel %d MHz - did not pick random social channel" % freq)
remove_group(dev[0], dev[1]) remove_group(dev[0], dev[1])
try: out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
arg = [ "tshark", "wifi_p2p.public_action.subtype == 0")
"-r", os.path.join(params['logdir'], "hwsim0.pcapng"), if out is not None:
"-Y", "wifi_p2p.public_action.subtype == 0",
"-V" ]
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
except Exception, e:
logger.info("Could run run tshark check: " + str(e))
cmd = None
pass
if cmd:
(out,err) = cmd.communicate()
res = cmd.wait()
if res == 1:
arg[3] = '-R'
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
(out,err) = cmd.communicate()
res = cmd.wait()
last = None last = None
for l in out.splitlines(): for l in out.splitlines():
if "Operating Channel:" not in l: if "Operating Channel:" not in l:

View File

@ -13,6 +13,7 @@ import subprocess
import hostapd import hostapd
from wpasupplicant import WpaSupplicant from wpasupplicant import WpaSupplicant
from utils import HwsimSkip from utils import HwsimSkip
from tshark import run_tshark
def check_scan(dev, params, other_started=False, test_busy=False): def check_scan(dev, params, other_started=False, test_busy=False):
if not other_started: if not other_started:
@ -707,32 +708,10 @@ def _test_scan_random_mac(dev, apdev, params):
for args in tests: for args in tests:
dev[0].request("MAC_RAND_SCAN " + args) dev[0].request("MAC_RAND_SCAN " + args)
dev[0].scan_for_bss(bssid, freq=2412, force_scan=True) dev[0].scan_for_bss(bssid, freq=2412, force_scan=True)
# wait a bit to make it more likely for wlantest sniffer to have captured
# and written the results into a file that we can process here
time.sleep(1)
try:
arg = [ "tshark",
"-r", os.path.join(params['logdir'], "hwsim0.pcapng"),
"-Y", "wlan.fc.type_subtype == 4",
"-Tfields", "-e", "wlan.ta" ]
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
except Exception, e:
logger.info("Could run run tshark check: " + str(e))
cmd = None
pass
if cmd:
(out,err) = cmd.communicate()
res = cmd.wait()
if res == 1:
arg[3] = '-R'
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
(out,err) = cmd.communicate()
res = cmd.wait()
out = run_tshark(os.path.join(params['logdir'], "hwsim0.pcapng"),
"wlan.fc.type_subtype == 4", ["wlan.ta" ])
if out is not None:
addr = out.splitlines() addr = out.splitlines()
logger.info("Probe Request frames seen from: " + str(addr)) logger.info("Probe Request frames seen from: " + str(addr))
if dev[0].own_addr() in addr: if dev[0].own_addr() in addr:

52
tests/hwsim/tshark.py Normal file
View File

@ -0,0 +1,52 @@
#
# tshark module - refactored from test_scan.py
#
# Copyright (c) 2014, Qualcomm Atheros, Inc.
# Copyright (c) 2015, Intel Mobile Communications GmbH
#
# This software may be distributed under the terms of the BSD license.
# See README for more details.
import time
import subprocess
import logging
logger = logging.getLogger()
_tshark_filter_arg = '-Y'
def run_tshark(filename, filter, display=None):
# wait a bit to make it more likely for wlantest sniffer to have captured
# and written the results into a file that we can process here
time.sleep(1)
try:
arg = [ "tshark", "-r", filename,
_tshark_filter_arg, "wlan.fc.type_subtype == 4",
"-Tfields", ]
if display:
arg.append('-Tfields')
for d in display:
arg.append('-e')
arg.append(d)
else:
arg.append('-V')
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
except Exception, e:
logger.info("Could run run tshark check: " + str(e))
cmd = None
return None
out = cmd.communicate()[0]
res = cmd.wait()
if res == 1:
# remember this for efficiency
_tshark_filter_arg = '-R'
arg[3] = '-R'
cmd = subprocess.Popen(arg, stdout=subprocess.PIPE,
stderr=open('/dev/null', 'w'))
out = cmd.communicate()[0]
cmd.wait()
return out