# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging, os
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
import tempfile
class security_ModuleLocking(test.test):
"""
Handle examining the system for specific module loading capabilities.
"""
version = 1
def _passed(self, msg):
logging.info('ok: %s', msg)
def _failed(self, msg):
logging.error('FAIL: %s', msg)
self._failures.append(msg)
def _fatal(self, msg):
logging.error('FATAL: %s', msg)
raise error.TestError(msg)
def check(self, boolean, msg, fatal=False):
"""
Check boolean state and report condition to log.
@param boolean: condition to examine
@param msg: what the condition is testing
@param fatal: should the test full abort on the condition failing
"""
if boolean == True:
self._passed(msg)
else:
msg = "could not satisfy '%s'" % (msg)
if fatal:
self._fatal(msg)
else:
self._failed(msg)
def module_loaded(self, module):
"""
Detect if the given module is already loaded in the kernel.
@param module: name of module to check
"""
module = module.replace('-', '_')
match = "%s " % (module)
for line in open("/proc/modules"):
if line.startswith(match):
return True
return False
def rmmod(self, module):
"""
Unload a module if it is already loaded in the kernel.
@param module: name of module to unload
"""
if self.module_loaded(module):
utils.system("rmmod %s" % (module))
def modprobe(self, module):
"""
If a module is not already loaded in the kernel, load it via modprobe.
@param module: name of module to load
"""
if not self.module_loaded(module):
utils.system("modprobe %s" % (module))
def _module_path(self, module):
"""
Locate a kernel module's full filesystem path.
@param module: name of module to locate
"""
ko = utils.system_output("find /lib/modules -name '%s.ko'" % (module))
return ko.splitlines()[0]
def module_loads_outside_rootfs(self, module):
"""
Copies the given module into /tmp and tries to load it from there
using insmod directly.
@param module: name of module to test
"""
# Start from a clean slate.
self.rmmod(module)
# Make sure we can load with standard mechanisms.
self.modprobe(module)
self.rmmod(module)
# Load module directly with insmod from root filesystem.
ko = self._module_path(module)
utils.system("insmod %s" % (ko))
self.rmmod(module)
# Load module directly with insmod from /tmp.
tmp = "/tmp/%s.ko" % (module)
utils.system("cp %s %s" % (ko, tmp))
rc = utils.system("insmod %s" % (tmp), ignore_status=True)
# Clean up.
self.rmmod(module)
utils.system("rm %s" % (tmp))
if rc == 0:
return True
return False
def module_loads_old_api(self, module):
"""
Loads a module using the old blob-style kernel syscall. With
kmod, this requires compressing the module first to trigger
in-memory decompression and loading.
@param module: name of module to test
"""
# Start from a clean slate.
self.rmmod(module)
# Compress module to trigger the old API.
tmp = "/tmp/%s.ko.gz" % (module)
ko = self._module_path(module)
utils.system("gzip -c %s > %s" % (ko, tmp))
rc = utils.system("insmod %s" % (tmp), ignore_status=True)
# Clean up.
self.rmmod(module)
utils.system("rm %s" % (tmp))
if rc == 0:
return True
return False
def module_loads_after_bind_umount(self, module):
"""
Makes sure modules can still load after a bind mount of the
filesystem is umounted.
@param module: name of module to test
"""
# Start from a clean slate.
self.rmmod(module)
# Make sure we can load with standard mechanisms.
self.modprobe(module)
self.rmmod(module)
# Create and umount a bind mount of the root filesystem.
bind = tempfile.mkdtemp(prefix=module)
rc = utils.system("mount -o bind / %s && umount %s" % (bind, bind))
utils.system("rmdir %s" % (bind))
# Attempt to load again.
self.modprobe(module)
self.rmmod(module)
if rc == 0:
return True
return False
def run_once(self):
"""
Check that the fd-based module loading syscall is enforcing the
module fd origin to the root filesystem, and that it can be
disabled and will allow the old syscall API as well.
TODO(keescook): add production test to make sure that on a verified
boot, "/proc/sys/kernel/chromiumos/module_locking" does not exist.
"""
# Empty failure list means test passes.
self._failures = []
# Check that the sysctl is either missing or set to 1.
sysctl = "/proc/sys/kernel/chromiumos/module_locking"
if os.path.exists(sysctl):
self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl))
# Check the enforced state is to deny non-rootfs module loads.
module = "test_module"
loaded = self.module_loads_outside_rootfs(module)
self.check(loaded == False, "cannot load %s from /tmp" % (module))
# Check old API fails when enforcement enabled.
loaded = self.module_loads_old_api(module)
self.check(loaded == False, "cannot load %s with old API" % (module))
# Make sure the bind umount bug is not present.
loaded = self.module_loads_after_bind_umount(module)
self.check(loaded == True, "can load %s after bind umount" % (module))
# If the sysctl exists, verify that it will disable the restriction.
if os.path.exists(sysctl):
# Disable restriction.
open(sysctl, "w").write("0\n")
self.check(open(sysctl).read() == '0\n', "%s disabled" % (sysctl))
# Check enforcement is disabled.
loaded = self.module_loads_outside_rootfs(module)
self.check(loaded == True, "can load %s from /tmp" % (module))
# Check old API works when enforcement disabled.
loaded = self.module_loads_old_api(module)
self.check(loaded == True, "can load %s with old API" % (module))
# Clean up.
open(sysctl, "w").write("1\n")
self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl))
# Raise a failure if anything unexpected was seen.
if len(self._failures):
raise error.TestFail((", ".join(self._failures)))