kernel_samsung_a53x/tools/testing/kunit/kunit_kernel.py
2024-06-15 16:02:09 -03:00

218 lines
6.6 KiB
Python
Executable file

# SPDX-License-Identifier: GPL-2.0
import logging
import subprocess
import os
import kunit_config
import kunit_parser
KCONFIG_PATH = os.path.join(os.environ.get('KBUILD_OUTPUT', ''), '.config')
from collections import namedtuple
ConfigResult = namedtuple('ConfigResult', ['status','info'])
BuildResult = namedtuple('BuildResult', ['status','info'])
class ConfigStatus(object):
SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE'
class BuildStatus(object):
SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE'
class ConfigError(Exception):
"""Represents an error trying to configure the Linux kernel."""
class BuildError(Exception):
"""Represents an error trying to build the Linux kernel."""
class LinuxSourceTreeOperations(object):
"""An abstraction over command line operations performed on a source tree."""
def make_mrproper(self):
try:
subprocess.check_output(['make', 'mrproper'])
except OSError as e:
raise ConfigError('Could not call make command: ' + str(e))
except subprocess.CalledProcessError as e:
raise ConfigError(e.output)
def make_olddefconfig(self):
try:
subprocess.check_output(['make', 'ARCH=um', 'olddefconfig'])
except OSError as e:
raise ConfigError('Could not call make command: ' + str(e))
except subprocess.CalledProcessError as e:
raise ConfigError(e.output)
def make(self, jobs):
try:
subprocess.check_output(['make', 'ARCH=um', '--jobs=' + str(jobs)])
except OSError as e:
raise BuildError('Could not call execute make: ' + str(e))
except subprocess.CalledProcessError as e:
raise BuildError(e.output.decode())
def linux_bin(self, params, timeout):
"""Runs the Linux UML binary. Must be named 'linux'."""
process = subprocess.Popen(
[os.path.join(os.environ.get('KBUILD_OUTPUT', ''), './linux')] + params,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
timed_out = False
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
process.terminate()
timed_out = True
output, _ = process.communicate()
output = output.decode('ascii')
if timed_out:
output += kunit_parser.TIMED_OUT_LOG_ENTRY + '\n'
return output
def throw_error_if_not_subset(expected_superset: kunit_config.Kconfig,
expected_subset: kunit_config.Kconfig) -> None:
if not expected_subset.is_subset_of(expected_superset):
missing = expected_subset.entries() - expected_superset.entries()
message = 'Provided Kconfig contains fields not in validated .config: %s' % (
', '.join([str(e) for e in missing]),
)
raise ConfigError(message)
class ExtKunitconfigGenerator():
'''
Generate a new kunitconfig what you interested in.
kunitconfigs/kunitconfig : original kunitconfig (when there is no external-config opt)
kunitconfigs/.kunitconfig : re-generated kunitconfig
'''
def __init__(self, ex_config, path='.'):
self.rootdir = os.path.join(path, 'kunitconfigs')
self.final_config_file = os.path.join(self.rootdir, '.kunitconfig')
orig_config_file = os.path.join(self.rootdir, 'kunitconfig')
conf = []
merge_configs = self.gen_merge_config(ex_config)
conf.append(self.read_config(orig_config_file))
for c in merge_configs:
conf.append(self.read_config(c))
self.write_config(self.final_config_file, conf)
def read_config(self, fname):
with open(fname, 'r') as fp:
ret = fp.read()
return ret
def write_config(self, fname, li):
with open(fname, 'w') as fp:
for l in li:
fp.write(l)
def get_sub_config(self, parent):
childs = []
kuconf_list = os.listdir(self.rootdir)
for f in kuconf_list:
srch = '.' + parent
if srch in f.split('kunitconfig')[1]:
#FIXME: matching in the right side
childs.append(os.path.join(self.rootdir, f))
return childs
def gen_merge_config(self, ex_li):
ret = []
for module in ex_li:
sub_conf = self.get_sub_config(module)
for c in sub_conf:
ret.append(c)
return ret
class LinuxSourceTree(object):
"""Represents a Linux kernel source tree with KUnit tests."""
def __init__(self,
kconfig_provider=kunit_config.KunitConfigProvider(),
linux_build_operations=LinuxSourceTreeOperations()):
self._kconfig = kconfig_provider.get_kconfig()
self._ops = linux_build_operations
self.kconf_provider = kconfig_provider
def make_external_config(self, ex_config):
if not ex_config:
return
conf = ExtKunitconfigGenerator(ex_config).final_config_file
self._kconfig = self.kconf_provider.get_kconfig(conf)
def clean(self):
try:
self._ops.make_mrproper()
except ConfigError as e:
logging.error(e)
return False
return True
def build_config(self):
self._kconfig.write_to_file(KCONFIG_PATH)
try:
self._ops.make_olddefconfig()
except ConfigError as e:
logging.error(e)
return ConfigResult(ConfigStatus.FAILURE, str(e))
validated_kconfig = kunit_config.Kconfig()
validated_kconfig.read_from_file(KCONFIG_PATH)
try:
throw_error_if_not_subset(expected_subset=self._kconfig,
expected_superset=validated_kconfig)
except ConfigError as e:
logging.error(e)
return ConfigResult(ConfigStatus.FAILURE, str(e))
return ConfigResult(ConfigStatus.SUCCESS, 'Build config!')
def build_reconfig(self):
"""Creates a new .config if it is not a subset of the kunitconfig."""
if os.path.exists(KCONFIG_PATH):
existing_kconfig = kunit_config.Kconfig()
existing_kconfig.read_from_file(KCONFIG_PATH)
if not self._kconfig.is_subset_of(existing_kconfig):
print('Regenerating .config ...')
os.remove(KCONFIG_PATH)
return self.build_config()
else:
return ConfigResult(ConfigStatus.SUCCESS, 'Already built.')
else:
print('Generating .config ...')
return self.build_config()
def build_um_kernel(self, jobs):
try:
self._ops.make_olddefconfig()
self._ops.make(jobs)
except (ConfigError, BuildError) as e:
logging.error(e)
return BuildResult(BuildStatus.FAILURE, str(e))
used_kconfig = kunit_config.Kconfig()
used_kconfig.read_from_file(KCONFIG_PATH)
try:
throw_error_if_not_subset(expected_subset=self._kconfig,
expected_superset=used_kconfig)
except ConfigError as e:
logging.error(e)
return ConfigResult(ConfigStatus.FAILURE, str(e))
return BuildResult(BuildStatus.SUCCESS, 'Built kernel!')
def run_kernel(self, args=[], timeout=None):
args.extend(['mem=256M kunit_shutdown=1'])
raw_log = self._ops.linux_bin(args, timeout)
with open('test.log', 'w') as f:
for line in raw_log.split('\n'):
f.write(line.rstrip() + '\n')
yield line.rstrip()