# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import re
from collections import namedtuple
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
ShmRecord = namedtuple('ShmRecord', ['owner', 'perms', 'attached'])
SemaphoreRecord = namedtuple('SemaphoreRecord', ['owner', 'perms'])
class security_SysVIPC(test.test):
"""Detect emergence of new attack surfaces in SysV IPC."""
version = 1
expected_shm = set([ShmRecord(owner='cras', perms='640',
attached=('/usr/bin/cras',))])
expected_sem = set([SemaphoreRecord(owner='root', perms='600')])
def dump_ipcs_to_results(self):
"""Writes a copy of the 'ipcs' output to the autotest results dir."""
utils.system_output('ipcs > "%s/ipcs-output.txt"' % self.resultsdir)
def find_attached(self, shmid):
"""Find programs attached to a given shared memory segment.
Returns full paths to each program identified.
Args:
@param shmid: the id as shown in ipcs and related utilities.
"""
# This finds /proc/*/exe entries where maps shows they have
# attached to the specified shm segment.
cmd = 'grep "%s */SYSV" /proc/*/maps | sed "s/maps.*/exe/g"' % shmid
# Then we just need to readlink each of the links. Even though
# we ultimately convert to a sorted tuple, we use a set to avoid
# accumulating duplicates as we go along.
exes = set()
for link in utils.system_output(cmd).splitlines():
exes.add(os.readlink(link))
return tuple(sorted(exes))
def observe_shm(self):
"""Return a set of ShmRecords representing current system shm usage."""
seen = set()
cmd = 'ipcs -m | grep ^0'
for line in utils.system_output(cmd, ignore_status=True).splitlines():
fields = re.split('\s+', line)
shmid = fields[1]
owner = fields[2]
perms = fields[3]
attached = self.find_attached(shmid)
seen.add(ShmRecord(owner=owner, perms=perms, attached=attached))
return seen
def observe_sems(self):
"""Return a set of SemaphoreRecords representing current usage."""
seen = set()
cmd = 'ipcs -s | grep ^0'
for line in utils.system_output(cmd, ignore_status=True).splitlines():
fields = re.split('\s+', line)
seen.add(SemaphoreRecord(owner=fields[2], perms=fields[3]))
return seen
def run_once(self):
"""Main entry point to run the security_SysVIPC autotest."""
test_fail = False
self.dump_ipcs_to_results()
# Check Shared Memory.
observed_shm = self.observe_shm()
missing = self.expected_shm.difference(observed_shm)
extra = observed_shm.difference(self.expected_shm)
if missing:
logging.error('Expected shm(s) not found:')
logging.error(missing)
if extra:
test_fail = True
logging.error('Unexpected shm(s) found:')
logging.error(extra)
# Check Semaphores.
observed_sem = self.observe_sems()
missing = self.expected_sem.difference(observed_sem)
extra = observed_sem.difference(self.expected_sem)
if missing:
logging.error('Expected semaphore(s) not found:')
logging.error(missing)
if extra:
test_fail = True
logging.error('Unexpected semaphore(s) found:')
logging.error(extra)
# Also check Message Queues. Since we currently expect
# none, we can avoid over-engineering this check.
queues = utils.system_output('ipcs -q | grep ^0', ignore_status=True)
if queues:
test_fail = True
logging.error('Unexpected message queues found:')
logging.error(queues)
if test_fail:
raise error.TestFail('SysV IPCs did not match expectations')