218 lines
6.6 KiB
Python
Executable file
218 lines
6.6 KiB
Python
Executable file
# SPDX-License-Identifier: GPL-2.0
|
|
|
|
import logging
|
|
import subprocess
|
|
import os
|
|
|
|
import kunit_config
|
|
import kunit_parser
|
|
|
|
KCONFIG_PATH = os.path.join(os.environ.get('KBUILD_OUTPUT', ''), '.config')
|
|
|
|
from collections import namedtuple
|
|
|
|
ConfigResult = namedtuple('ConfigResult', ['status','info'])
|
|
|
|
BuildResult = namedtuple('BuildResult', ['status','info'])
|
|
|
|
class ConfigStatus(object):
|
|
SUCCESS = 'SUCCESS'
|
|
FAILURE = 'FAILURE'
|
|
|
|
class BuildStatus(object):
|
|
SUCCESS = 'SUCCESS'
|
|
FAILURE = 'FAILURE'
|
|
|
|
class ConfigError(Exception):
|
|
"""Represents an error trying to configure the Linux kernel."""
|
|
|
|
|
|
class BuildError(Exception):
|
|
"""Represents an error trying to build the Linux kernel."""
|
|
|
|
|
|
class LinuxSourceTreeOperations(object):
|
|
"""An abstraction over command line operations performed on a source tree."""
|
|
|
|
def make_mrproper(self):
|
|
try:
|
|
subprocess.check_output(['make', 'mrproper'])
|
|
except OSError as e:
|
|
raise ConfigError('Could not call make command: ' + str(e))
|
|
except subprocess.CalledProcessError as e:
|
|
raise ConfigError(e.output)
|
|
|
|
def make_olddefconfig(self):
|
|
try:
|
|
subprocess.check_output(['make', 'ARCH=um', 'olddefconfig'])
|
|
except OSError as e:
|
|
raise ConfigError('Could not call make command: ' + str(e))
|
|
except subprocess.CalledProcessError as e:
|
|
raise ConfigError(e.output)
|
|
|
|
def make(self, jobs):
|
|
try:
|
|
subprocess.check_output(['make', 'ARCH=um', '--jobs=' + str(jobs)])
|
|
except OSError as e:
|
|
raise BuildError('Could not call execute make: ' + str(e))
|
|
except subprocess.CalledProcessError as e:
|
|
raise BuildError(e.output.decode())
|
|
|
|
def linux_bin(self, params, timeout):
|
|
"""Runs the Linux UML binary. Must be named 'linux'."""
|
|
process = subprocess.Popen(
|
|
[os.path.join(os.environ.get('KBUILD_OUTPUT', ''), './linux')] + params,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
timed_out = False
|
|
try:
|
|
process.wait(timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
process.terminate()
|
|
timed_out = True
|
|
output, _ = process.communicate()
|
|
output = output.decode('ascii')
|
|
|
|
if timed_out:
|
|
output += kunit_parser.TIMED_OUT_LOG_ENTRY + '\n'
|
|
|
|
return output
|
|
|
|
def throw_error_if_not_subset(expected_superset: kunit_config.Kconfig,
|
|
expected_subset: kunit_config.Kconfig) -> None:
|
|
if not expected_subset.is_subset_of(expected_superset):
|
|
missing = expected_subset.entries() - expected_superset.entries()
|
|
message = 'Provided Kconfig contains fields not in validated .config: %s' % (
|
|
', '.join([str(e) for e in missing]),
|
|
)
|
|
raise ConfigError(message)
|
|
|
|
class ExtKunitconfigGenerator():
|
|
'''
|
|
Generate a new kunitconfig what you interested in.
|
|
kunitconfigs/kunitconfig : original kunitconfig (when there is no external-config opt)
|
|
kunitconfigs/.kunitconfig : re-generated kunitconfig
|
|
'''
|
|
|
|
def __init__(self, ex_config, path='.'):
|
|
self.rootdir = os.path.join(path, 'kunitconfigs')
|
|
self.final_config_file = os.path.join(self.rootdir, '.kunitconfig')
|
|
orig_config_file = os.path.join(self.rootdir, 'kunitconfig')
|
|
conf = []
|
|
merge_configs = self.gen_merge_config(ex_config)
|
|
|
|
conf.append(self.read_config(orig_config_file))
|
|
for c in merge_configs:
|
|
conf.append(self.read_config(c))
|
|
self.write_config(self.final_config_file, conf)
|
|
|
|
def read_config(self, fname):
|
|
with open(fname, 'r') as fp:
|
|
ret = fp.read()
|
|
return ret
|
|
|
|
def write_config(self, fname, li):
|
|
with open(fname, 'w') as fp:
|
|
for l in li:
|
|
fp.write(l)
|
|
|
|
def get_sub_config(self, parent):
|
|
childs = []
|
|
kuconf_list = os.listdir(self.rootdir)
|
|
for f in kuconf_list:
|
|
srch = '.' + parent
|
|
if srch in f.split('kunitconfig')[1]:
|
|
#FIXME: matching in the right side
|
|
childs.append(os.path.join(self.rootdir, f))
|
|
return childs
|
|
|
|
def gen_merge_config(self, ex_li):
|
|
ret = []
|
|
for module in ex_li:
|
|
sub_conf = self.get_sub_config(module)
|
|
for c in sub_conf:
|
|
ret.append(c)
|
|
return ret
|
|
|
|
|
|
class LinuxSourceTree(object):
|
|
"""Represents a Linux kernel source tree with KUnit tests."""
|
|
|
|
def __init__(self,
|
|
kconfig_provider=kunit_config.KunitConfigProvider(),
|
|
linux_build_operations=LinuxSourceTreeOperations()):
|
|
self._kconfig = kconfig_provider.get_kconfig()
|
|
self._ops = linux_build_operations
|
|
self.kconf_provider = kconfig_provider
|
|
|
|
def make_external_config(self, ex_config):
|
|
if not ex_config:
|
|
return
|
|
conf = ExtKunitconfigGenerator(ex_config).final_config_file
|
|
self._kconfig = self.kconf_provider.get_kconfig(conf)
|
|
|
|
def clean(self):
|
|
try:
|
|
self._ops.make_mrproper()
|
|
except ConfigError as e:
|
|
logging.error(e)
|
|
return False
|
|
return True
|
|
|
|
def build_config(self):
|
|
self._kconfig.write_to_file(KCONFIG_PATH)
|
|
try:
|
|
self._ops.make_olddefconfig()
|
|
except ConfigError as e:
|
|
logging.error(e)
|
|
return ConfigResult(ConfigStatus.FAILURE, str(e))
|
|
validated_kconfig = kunit_config.Kconfig()
|
|
validated_kconfig.read_from_file(KCONFIG_PATH)
|
|
try:
|
|
throw_error_if_not_subset(expected_subset=self._kconfig,
|
|
expected_superset=validated_kconfig)
|
|
except ConfigError as e:
|
|
logging.error(e)
|
|
return ConfigResult(ConfigStatus.FAILURE, str(e))
|
|
return ConfigResult(ConfigStatus.SUCCESS, 'Build config!')
|
|
|
|
def build_reconfig(self):
|
|
"""Creates a new .config if it is not a subset of the kunitconfig."""
|
|
if os.path.exists(KCONFIG_PATH):
|
|
existing_kconfig = kunit_config.Kconfig()
|
|
existing_kconfig.read_from_file(KCONFIG_PATH)
|
|
if not self._kconfig.is_subset_of(existing_kconfig):
|
|
print('Regenerating .config ...')
|
|
os.remove(KCONFIG_PATH)
|
|
return self.build_config()
|
|
else:
|
|
return ConfigResult(ConfigStatus.SUCCESS, 'Already built.')
|
|
else:
|
|
print('Generating .config ...')
|
|
return self.build_config()
|
|
|
|
def build_um_kernel(self, jobs):
|
|
try:
|
|
self._ops.make_olddefconfig()
|
|
self._ops.make(jobs)
|
|
except (ConfigError, BuildError) as e:
|
|
logging.error(e)
|
|
return BuildResult(BuildStatus.FAILURE, str(e))
|
|
used_kconfig = kunit_config.Kconfig()
|
|
used_kconfig.read_from_file(KCONFIG_PATH)
|
|
try:
|
|
throw_error_if_not_subset(expected_subset=self._kconfig,
|
|
expected_superset=used_kconfig)
|
|
except ConfigError as e:
|
|
logging.error(e)
|
|
return ConfigResult(ConfigStatus.FAILURE, str(e))
|
|
return BuildResult(BuildStatus.SUCCESS, 'Built kernel!')
|
|
|
|
def run_kernel(self, args=[], timeout=None):
|
|
args.extend(['mem=256M kunit_shutdown=1'])
|
|
raw_log = self._ops.linux_bin(args, timeout)
|
|
with open('test.log', 'w') as f:
|
|
for line in raw_log.split('\n'):
|
|
f.write(line.rstrip() + '\n')
|
|
yield line.rstrip()
|