tests: Use python selector in the parallel-vm.py main loop

This gets rid of the loop that was polling for things to do every 0.25
seconds and instead, reacts to any data from VMs as soon as it becomes
available. This avoids unnecessary operations when no new data is
available and avoids unnecessary waits when new data becomes available
more quickly.

Signed-off-by: Jouni Malinen <j@w1.fi>
This commit is contained in:
Jouni Malinen 2019-12-27 17:12:34 +02:00
parent 0075df74df
commit c64b6f62cd

View File

@ -11,6 +11,7 @@ import curses
import fcntl import fcntl
import logging import logging
import os import os
import selectors
import subprocess import subprocess
import sys import sys
import time import time
@ -87,7 +88,7 @@ def get_failed(vm):
failed += vm[i]['failed'] failed += vm[i]['failed']
return failed return failed
def vm_read_stdout(vm, i, test_queue): def vm_read_stdout(vm, test_queue):
global total_started, total_passed, total_failed, total_skipped global total_started, total_passed, total_failed, total_skipped
global rerun_failures global rerun_failures
global first_run_failures global first_run_failures
@ -102,7 +103,7 @@ def vm_read_stdout(vm, i, test_queue):
if e.errno == errno.EAGAIN: if e.errno == errno.EAGAIN:
return False return False
raise raise
logger.debug("VM[%d] stdout.read[%s]" % (i, out)) logger.debug("VM[%d] stdout.read[%s]" % (vm['idx'], out))
pending = vm['pending'] + out pending = vm['pending'] + out
lines = [] lines = []
while True: while True:
@ -111,7 +112,7 @@ def vm_read_stdout(vm, i, test_queue):
break break
line = pending[0:pos].rstrip() line = pending[0:pos].rstrip()
pending = pending[(pos + 1):] pending = pending[(pos + 1):]
logger.debug("VM[%d] stdout full line[%s]" % (i, line)) logger.debug("VM[%d] stdout full line[%s]" % (vm['idx'], line))
if line.startswith("READY"): if line.startswith("READY"):
vm['starting'] = False vm['starting'] = False
vm['started'] = True vm['started'] = True
@ -124,14 +125,15 @@ def vm_read_stdout(vm, i, test_queue):
total_failed += 1 total_failed += 1
vals = line.split(' ') vals = line.split(' ')
if len(vals) < 2: if len(vals) < 2:
logger.info("VM[%d] incomplete FAIL line: %s" % (i, line)) logger.info("VM[%d] incomplete FAIL line: %s" % (vm['idx'],
line))
name = line name = line
else: else:
name = vals[1] name = vals[1]
logger.debug("VM[%d] test case failed: %s" % (i, name)) logger.debug("VM[%d] test case failed: %s" % (vm['idx'], name))
vm['failed'].append(name) vm['failed'].append(name)
if name != vm['current_name']: if name != vm['current_name']:
logger.info("VM[%d] test result mismatch: %s (expected %s)" % (i, name, vm['current_name'])) logger.info("VM[%d] test result mismatch: %s (expected %s)" % (vm['idx'], name, vm['current_name']))
else: else:
count = vm['current_count'] count = vm['current_count']
if count == 0: if count == 0:
@ -142,7 +144,7 @@ def vm_read_stdout(vm, i, test_queue):
elif line.startswith("NOT-FOUND"): elif line.startswith("NOT-FOUND"):
ready = True ready = True
total_failed += 1 total_failed += 1
logger.info("VM[%d] test case not found" % i) logger.info("VM[%d] test case not found" % vm['idx'])
elif line.startswith("SKIP"): elif line.startswith("SKIP"):
ready = True ready = True
total_skipped += 1 total_skipped += 1
@ -159,7 +161,8 @@ def vm_read_stdout(vm, i, test_queue):
vm['pending'] = pending vm['pending'] = pending
return ready return ready
def start_vm(vm): def start_vm(vm, sel):
logger.info("VM[%d] starting up" % (vm['idx'] + 1))
vm['starting'] = True vm['starting'] = True
vm['proc'] = subprocess.Popen(vm['cmd'], vm['proc'] = subprocess.Popen(vm['cmd'],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
@ -170,6 +173,7 @@ def start_vm(vm):
fd = stream.fileno() fd = stream.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL) fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
sel.register(stream, selectors.EVENT_READ, vm)
def num_vm_starting(): def num_vm_starting():
count = 0 count = 0
@ -178,113 +182,83 @@ def num_vm_starting():
count += 1 count += 1
return count return count
def show_progress(scr): def vm_read_stderr(vm):
global num_servers try:
global vm err = vm['proc'].stderr.read()
global dir if err != None:
global timestamp err = err.decode()
global tests if len(err) > 0:
global first_run_failures vm['err'] += err
global total_started, total_passed, total_failed, total_skipped logger.info("VM[%d] stderr.read[%s]" % (vm['idx'], err))
global rerun_failures except IOError as e:
if e.errno != errno.EAGAIN:
raise
total_tests = len(tests) def vm_next_step(_vm, scr, test_queue):
logger.info("Total tests: %d" % total_tests) scr.move(_vm['idx'] + 1, 10)
test_queue = [(t, 0) for t in tests] scr.clrtoeol()
start_vm(vm[0]) if not test_queue:
_vm['proc'].stdin.write(b'\n')
_vm['proc'].stdin.flush()
scr.addstr("shutting down")
logger.info("VM[%d] shutting down" % _vm['idx'])
return
(name, count) = test_queue.pop(0)
_vm['current_name'] = name
_vm['current_count'] = count
_vm['proc'].stdin.write(name.encode() + b'\n')
_vm['proc'].stdin.flush()
scr.addstr(name)
logger.debug("VM[%d] start test %s" % (_vm['idx'], name))
scr.leaveok(1) def check_vm_start(scr, sel, test_queue):
scr.addstr(0, 0, "Parallel test execution status", curses.A_BOLD)
for i in range(0, num_servers):
scr.addstr(i + 1, 0, "VM %d:" % (i + 1), curses.A_BOLD)
status = "starting VM" if vm[i]['proc'] else "not yet started"
scr.addstr(i + 1, 10, status)
scr.addstr(num_servers + 1, 0, "Total:", curses.A_BOLD)
scr.addstr(num_servers + 1, 20, "TOTAL={} STARTED=0 PASS=0 FAIL=0 SKIP=0".format(total_tests))
scr.refresh()
while True:
running = False running = False
updated = False updated = False
for i in range(num_servers): for i in range(num_servers):
if not vm[i]['proc']: if not vm[i]['proc']:
# Either not yet started or already stopped VM
if test_queue and vm[i]['cmd'] and num_vm_starting() < 2: if test_queue and vm[i]['cmd'] and num_vm_starting() < 2:
scr.move(i + 1, 10) scr.move(i + 1, 10)
scr.clrtoeol() scr.clrtoeol()
scr.addstr(i + 1, 10, "starting VM") scr.addstr(i + 1, 10, "starting VM")
updated = True updated = True
start_vm(vm[i]) start_vm(vm[i], sel)
else:
continue continue
if vm[i]['proc'].poll() is not None:
vm[i]['proc'] = None running = True
scr.move(i + 1, 10) return running, updated
def vm_terminated(_vm, scr, sel, test_queue):
updated = False
for stream in [_vm['proc'].stdout, _vm['proc'].stderr]:
sel.unregister(stream)
_vm['proc'] = None
scr.move(_vm['idx'] + 1, 10)
scr.clrtoeol() scr.clrtoeol()
log = '{}/{}.srv.{}/console'.format(dir, timestamp, i + 1) log = '{}/{}.srv.{}/console'.format(dir, timestamp, _vm['idx'] + 1)
with open(log, 'r') as f: with open(log, 'r') as f:
if "Kernel panic" in f.read(): if "Kernel panic" in f.read():
scr.addstr("kernel panic") scr.addstr("kernel panic")
logger.info("VM[%d] kernel panic" % i) logger.info("VM[%d] kernel panic" % _vm['idx'])
updated = True updated = True
if test_queue: if test_queue:
num_vm = 0 num_vm = 0
for i in range(num_servers): for i in range(num_servers):
if vm[i]['proc']: if _vm['proc']:
num_vm += 1 num_vm += 1
if len(test_queue) > num_vm: if len(test_queue) > num_vm:
scr.addstr("unexpected exit") scr.addstr("unexpected exit")
logger.info("VM[%d] unexpected exit" % i) logger.info("VM[%d] unexpected exit" % i)
updated = True updated = True
continue return updated
running = True def update_screen(scr, total_tests):
try:
err = vm[i]['proc'].stderr.read()
if err != None:
err = err.decode()
vm[i]['err'] += err
logger.info("VM[%d] stderr.read[%s]" % (i, err))
except IOError as e:
if e.errno != errno.EAGAIN:
raise
if vm_read_stdout(vm[i], i, test_queue):
scr.move(i + 1, 10)
scr.clrtoeol()
updated = True
if not test_queue:
vm[i]['proc'].stdin.write(b'\n')
vm[i]['proc'].stdin.flush()
scr.addstr("shutting down")
logger.info("VM[%d] shutting down" % i)
continue
else:
(name, count) = test_queue.pop(0)
vm[i]['current_name'] = name
vm[i]['current_count'] = count
vm[i]['proc'].stdin.write(name.encode() + b'\n')
vm[i]['proc'].stdin.flush()
scr.addstr(name)
logger.debug("VM[%d] start test %s" % (i, name))
try:
err = vm[i]['proc'].stderr.read()
if err != None:
err = err.decode()
vm[i]['err'] += err
logger.debug("VM[%d] stderr.read[%s]" % (i, err))
except IOError as e:
if e.errno != errno.EAGAIN:
raise
if not running:
break
if updated:
scr.move(num_servers + 1, 10) scr.move(num_servers + 1, 10)
scr.clrtoeol() scr.clrtoeol()
scr.addstr("{} %".format(int(100.0 * (total_passed + total_failed + total_skipped) / total_tests))) scr.addstr("{} %".format(int(100.0 * (total_passed + total_failed + total_skipped) / total_tests)))
scr.addstr(num_servers + 1, 20, "TOTAL={} STARTED={} PASS={} FAIL={} SKIP={}".format(total_tests, total_started, total_passed, total_failed, total_skipped)) scr.addstr(num_servers + 1, 20,
"TOTAL={} STARTED={} PASS={} FAIL={} SKIP={}".format(total_tests, total_started, total_passed, total_failed, total_skipped))
failed = get_failed(vm) failed = get_failed(vm)
if len(failed) > 0: if len(failed) > 0:
scr.move(num_servers + 2, 0) scr.move(num_servers + 2, 0)
@ -299,10 +273,56 @@ def show_progress(scr):
break break
scr.addstr(f) scr.addstr(f)
scr.addstr(' ') scr.addstr(' ')
scr.refresh() scr.refresh()
time.sleep(0.25) def show_progress(scr):
global num_servers
global vm
global dir
global timestamp
global tests
global first_run_failures
global total_started, total_passed, total_failed, total_skipped
global rerun_failures
sel = selectors.DefaultSelector()
total_tests = len(tests)
logger.info("Total tests: %d" % total_tests)
test_queue = [(t, 0) for t in tests]
start_vm(vm[0], sel)
scr.leaveok(1)
scr.addstr(0, 0, "Parallel test execution status", curses.A_BOLD)
for i in range(0, num_servers):
scr.addstr(i + 1, 0, "VM %d:" % (i + 1), curses.A_BOLD)
status = "starting VM" if vm[i]['proc'] else "not yet started"
scr.addstr(i + 1, 10, status)
scr.addstr(num_servers + 1, 0, "Total:", curses.A_BOLD)
scr.addstr(num_servers + 1, 20, "TOTAL={} STARTED=0 PASS=0 FAIL=0 SKIP=0".format(total_tests))
scr.refresh()
while True:
updated = False
events = sel.select(timeout=1)
for key, mask in events:
_vm = key.data
if not _vm['proc']:
continue
vm_read_stderr(_vm)
if vm_read_stdout(_vm, test_queue):
vm_next_step(_vm, scr, test_queue)
updated = True
vm_read_stderr(_vm)
if _vm['proc'].poll() is not None:
if vm_terminated(_vm, scr, sel, test_queue):
updated = True
running, run_update = check_vm_start(scr, sel, test_queue)
if updated or run_update:
update_screen(scr, total_tests)
if not running:
break
sel.close()
for i in range(num_servers): for i in range(num_servers):
if not vm[i]['proc']: if not vm[i]['proc']:
@ -436,9 +456,6 @@ def main():
vm = {} vm = {}
for i in range(0, num_servers): for i in range(0, num_servers):
print("\rStarting virtual machine {}/{}".format(i + 1, num_servers),
end='')
logger.info("Starting virtual machine {}/{}".format(i + 1, num_servers))
cmd = [os.path.join(scriptsdir, 'vm-run.sh'), cmd = [os.path.join(scriptsdir, 'vm-run.sh'),
'--timestamp', str(timestamp), '--timestamp', str(timestamp),
'--ext', 'srv.%d' % (i + 1), '--ext', 'srv.%d' % (i + 1),
@ -446,6 +463,7 @@ def main():
if args.telnet: if args.telnet:
cmd += ['--telnet', str(args.telnet + i)] cmd += ['--telnet', str(args.telnet + i)]
vm[i] = {} vm[i] = {}
vm[i]['idx'] = i
vm[i]['starting'] = False vm[i]['starting'] = False
vm[i]['started'] = False vm[i]['started'] = False
vm[i]['cmd'] = cmd vm[i]['cmd'] = cmd