#!/usr/bin/env python # Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Helper script to perform actions as a super-user on ChromeOS. Needs to be run with superuser privileges, typically using the suid_python binary. Usage: sudo python suid_actions.py --action=CleanFlimflamDirs """ import optparse import os import shutil import subprocess import sys import time sys.path.append('/usr/local') # to import autotest libs. from autotest.cros import constants from autotest.cros import cryptohome TEMP_BACKCHANNEL_FILE = '/tmp/pyauto_network_backchannel_file' class SuidAction(object): """Helper to perform some super-user actions on ChromeOS.""" def _ParseArgs(self): parser = optparse.OptionParser() parser.add_option( '-a', '--action', help='Action to perform.') self._options = parser.parse_args()[0] if not self._options.action: raise RuntimeError('No action specified.') def Run(self): self._ParseArgs() assert os.geteuid() == 0, 'Needs superuser privileges.' handler = getattr(self, self._options.action) assert handler and callable(handler), \ 'No handler for %s' % self._options.action handler() return 0 ## Actions ## def CleanFlimflamDirs(self): """Clean the contents of all connection manager (shill/flimflam) profiles. """ flimflam_dirs = ['/home/chronos/user/flimflam', '/home/chronos/user/shill', '/var/cache/flimflam', '/var/cache/shill'] # The stop/start flimflam command should stop/start shill respectivly if # enabled. os.system('stop flimflam') try: for flimflam_dir in flimflam_dirs: if not os.path.exists(flimflam_dir): continue for item in os.listdir(flimflam_dir): path = os.path.join(flimflam_dir, item) if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) finally: os.system('start flimflam') # TODO(stanleyw): crosbug.com/29421 This method should wait until # flimflam/shill is fully initialized and accessible via DBus again. # Otherwise, there is a race conditions and subsequent accesses to # flimflam/shill may fail. Until this is fixed, waiting for the # resolv.conf file to be created is better than nothing. begin = time.time() while not os.path.exists(constants.RESOLV_CONF_FILE): if time.time() - begin > 10: raise RuntimeError('Timeout while waiting for flimflam/shill start.') time.sleep(.25) def RemoveAllCryptohomeVaults(self): """Remove any existing cryptohome vaults.""" cryptohome.remove_all_vaults() def _GetEthInterfaces(self): """Returns a list of the eth* interfaces detected by the device.""" # Assumes ethernet interfaces all have "eth" in the name. import pyudev return sorted([iface.sys_name for iface in pyudev.Context().list_devices(subsystem='net') if 'eth' in iface.sys_name]) def _Renameif(self, old_iface, new_iface, mac_address): """Renames the interface with mac_address from old_iface to new_iface. Args: old_iface: The name of the interface you want to change. new_iface: The name of the interface you want to change to. mac_address: The mac address of the interface being changed. """ subprocess.call(['stop', 'flimflam']) subprocess.call(['ifconfig', old_iface, 'down']) subprocess.call(['nameif', new_iface, mac_address]) subprocess.call(['ifconfig', new_iface, 'up']) subprocess.call(['start', 'flimflam']) # Check and make sure interfaces have been renamed eth_ifaces = self._GetEthInterfaces() if new_iface not in eth_ifaces: raise RuntimeError('Interface %s was not renamed to %s' % (old_iface, new_iface)) elif old_iface in eth_ifaces: raise RuntimeError('Old iface %s is still present' % old_iface) def SetupBackchannel(self): """Renames the connected ethernet interface to eth_test for offline mode testing. Does nothing if no connected interface is found. """ # Return the interface with ethernet connected or returns if none found. for iface in self._GetEthInterfaces(): with open('/sys/class/net/%s/operstate' % iface, 'r') as fp: if 'up' in fp.read(): eth_iface = iface break else: return # Write backup file to be used by TeardownBackchannel to restore the # interface names. with open(TEMP_BACKCHANNEL_FILE, 'w') as fpw: with open('/sys/class/net/%s/address' % eth_iface) as fp: mac_address = fp.read().strip() fpw.write('%s, %s' % (eth_iface, mac_address)) self._Renameif(eth_iface, 'eth_test', mac_address) def TeardownBackchannel(self): """Restores the eth interface names if SetupBackchannel was called.""" if not os.path.isfile(TEMP_BACKCHANNEL_FILE): return with open(TEMP_BACKCHANNEL_FILE, 'r') as fp: eth_iface, mac_address = fp.read().split(',') self._Renameif('eth_test', eth_iface, mac_address) os.remove(TEMP_BACKCHANNEL_FILE) if __name__ == '__main__': sys.exit(SuidAction().Run())