普通文本  |  111行  |  4.06 KB

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import re

from collections import namedtuple

from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error

ShmRecord = namedtuple('ShmRecord', ['owner', 'perms', 'attached'])
SemaphoreRecord = namedtuple('SemaphoreRecord', ['owner', 'perms'])

class security_SysVIPC(test.test):
    """Detect emergence of new attack surfaces in SysV IPC."""
    version = 1
    expected_shm = set([ShmRecord(owner='cras', perms='640',
                                  attached=('/usr/bin/cras',))])
    expected_sem = set([SemaphoreRecord(owner='root', perms='600')])

    def dump_ipcs_to_results(self):
        """Writes a copy of the 'ipcs' output to the autotest results dir."""
        utils.system_output('ipcs > "%s/ipcs-output.txt"' % self.resultsdir)


    def find_attached(self, shmid):
        """Find programs attached to a given shared memory segment.

        Returns full paths to each program identified.

        Args:
          @param shmid: the id as shown in ipcs and related utilities.
        """
        # This finds /proc/*/exe entries where maps shows they have
        # attached to the specified shm segment.
        cmd = 'grep "%s */SYSV" /proc/*/maps | sed "s/maps.*/exe/g"' % shmid
        # Then we just need to readlink each of the links. Even though
        # we ultimately convert to a sorted tuple, we use a set to avoid
        # accumulating duplicates as we go along.
        exes = set()
        for link in utils.system_output(cmd).splitlines():
            exes.add(os.readlink(link))
        return tuple(sorted(exes))


    def observe_shm(self):
        """Return a set of ShmRecords representing current system shm usage."""
        seen = set()
        cmd = 'ipcs -m | grep ^0'
        for line in utils.system_output(cmd, ignore_status=True).splitlines():
            fields = re.split('\s+', line)
            shmid = fields[1]
            owner = fields[2]
            perms = fields[3]
            attached = self.find_attached(shmid)
            seen.add(ShmRecord(owner=owner, perms=perms, attached=attached))
        return seen


    def observe_sems(self):
        """Return a set of SemaphoreRecords representing current usage."""
        seen = set()
        cmd = 'ipcs -s | grep ^0'
        for line in utils.system_output(cmd, ignore_status=True).splitlines():
            fields = re.split('\s+', line)
            seen.add(SemaphoreRecord(owner=fields[2], perms=fields[3]))
        return seen


    def run_once(self):
        """Main entry point to run the security_SysVIPC autotest."""
        test_fail = False
        self.dump_ipcs_to_results()
        # Check Shared Memory.
        observed_shm = self.observe_shm()
        missing = self.expected_shm.difference(observed_shm)
        extra = observed_shm.difference(self.expected_shm)
        if missing:
            logging.error('Expected shm(s) not found:')
            logging.error(missing)
        if extra:
            test_fail = True
            logging.error('Unexpected shm(s) found:')
            logging.error(extra)

        # Check Semaphores.
        observed_sem = self.observe_sems()
        missing = self.expected_sem.difference(observed_sem)
        extra = observed_sem.difference(self.expected_sem)
        if missing:
            logging.error('Expected semaphore(s) not found:')
            logging.error(missing)
        if extra:
            test_fail = True
            logging.error('Unexpected semaphore(s) found:')
            logging.error(extra)

        # Also check Message Queues. Since we currently expect
        # none, we can avoid over-engineering this check.
        queues = utils.system_output('ipcs -q | grep ^0', ignore_status=True)
        if queues:
            test_fail = True
            logging.error('Unexpected message queues found:')
            logging.error(queues)

        if test_fail:
            raise error.TestFail('SysV IPCs did not match expectations')