# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus
import json
import logging
import os.path
from xml.dom.minidom import parse, parseString
from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros import constants, login
class security_DbusMap(test.test):
version = 2
def policy_sort_priority(self, policy):
"""
Given a DOMElement representing one <policy> block from a dbus
configuraiton file, return a number suitable for determining
the order in which this policy would be applied by dbus-daemon.
For example, by returning:
0 for 'default' policies
1 for 'group' policies
2 for 'user' policies
... these numbers can be used as a sort-key for sorting
an array of policies into the order they would be evaluated by
dbus-daemon.
"""
# As derived from dbus-daemon(1) manpage
if policy.getAttribute('context') == 'default':
return 0
if policy.getAttribute('group') != '':
return 1
if policy.getAttribute('user') != '':
return 2
if policy.getAttribute('at_console') == 'true':
return 3
if policy.getAttribute('at_console') == 'false':
return 4
if policy.getAttribute('context') == 'mandatory':
return 5
def sort_policies(self, policies):
"""
Given an array of DOMElements representing <policy> blocks,
return a sorted copy of the array. Sorting is determined by
the order in which dbus-daemon(1) would consider the rules.
This is a stable sort, so in cases where dbus would employ
"last rule wins," position in the input list will be honored.
"""
# Use decorate-sort-undecorate to minimize calls to
# policy_sort_priority(). See http://wiki.python.org/moin/HowTo/Sorting
decorated = [(self.policy_sort_priority(policy), i, policy) for
i, policy in enumerate(policies)]
decorated.sort()
return [policy for _,_,policy in decorated]
def check_policies(self, config_doms, dest, iface, member,
user='chronos', at_console=None):
"""
Given 1 or more xml.dom's representing dbus configuration
data, determine if the <destination, interface, member>
triplet specified in the arguments would be permitted for
the specified user.
Returns True if permitted, False otherwise.
See also http://dbus.freedesktop.org/doc/busconfig.dtd
"""
# In the default case, if the caller doesn't specify
# "at_console," employ this cros-specific heuristic:
if user == 'chronos' and at_console == None:
at_console = True
# Apply the policies iteratively, in the same order
# that dbus-daemon(1) would consider them.
allow = False
for dom in config_doms:
for buscfg in dom.getElementsByTagName('busconfig'):
policies = self.sort_policies(
buscfg.getElementsByTagName('policy'))
for policy in policies:
ruling = self.check_one_policy(policy, dest, iface,
member, user, at_console)
if ruling is not None:
allow = ruling
return allow
def check_one_policy(self, policy, dest, iface, member,
user='chronos', at_console=True):
"""
Given a DOMElement representing one <policy> block from a dbus
configuration file, determine if the <destination, interface,
member> triplet specified in the arguments would be permitted
for the specified user.
Returns True if permitted, False if prohibited, or
None if the policy does not apply to the triplet.
"""
# While D-Bus overall is a default-deny, this individual
# rule may not match, and some previous rule may have already
# said "allow" for this interface/method. So, we work from
# here starting with "doesn't apply," not "deny" to avoid
# falsely masking any previous "allow" rules.
allow = None
# TODO(jimhebert) group='...' is not currently used by any
# Chrome OS dbus policies but could be in the future so
# we should add a check for it in this if-block.
if ((policy.getAttribute('context') != 'default') and
(policy.getAttribute('user') != user) and
(policy.getAttribute('at_console') != 'true')):
# In this case, the entire <policy> block does not apply
return None
# Alternatively, if this IS a at_console policy, but the
# situation being checked is not an "at_console" situation,
# then that's another way the policy would also not apply.
if (policy.getAttribute('at_console') == 'true' and not
at_console):
return None
# If the <policy> applies, try to find <allow> or <deny>
# child nodes that apply:
for node in policy.childNodes:
if (node.nodeType == node.ELEMENT_NODE and
node.localName in ['allow','deny']):
ruling = self.check_one_node(node, dest, iface, member)
if ruling is not None:
allow = ruling
return allow
def check_one_node(self, node, dest, iface, member):
"""
Given a DOMElement representing one <allow> or <deny> tag from a
dbus configuration file, determine if the <destination, interface,
member> triplet specified in the arguments would be permitted.
Returns True if permitted, False if prohibited, or
None if the policy does not apply to the triplet.
"""
# Require send_destination to match (if we accept missing
# send_destination we end up falsely processing tags like
# <allow own="...">). But, do not require send_interface
# or send_member to exist, because omitting them is used
# as a way of wildcarding in dbus configuration.
if ((node.getAttribute('send_destination') == dest) and
(not node.hasAttribute('send_interface') or
node.getAttribute('send_interface') == iface) and
(not node.hasAttribute('send_member') or
node.getAttribute('send_member') == member)):
# The rule applies! Return True if it's an allow rule, else false
logging.debug(('%s send_destination=%s send_interface=%s '
'send_member=%s applies to %s %s %s.') %
(node.localName,
node.getAttribute('send_destination'),
node.getAttribute('send_interface'),
node.getAttribute('send_member'),
dest, iface, member))
return (node.localName == 'allow')
else:
return None
def load_dbus_config_doms(self, dbusdir='/etc/dbus-1/system.d'):
"""
Given a path to a directory containing valid dbus configuration
files (http://dbus.freedesktop.org/doc/busconfig.dtd), return
a series of parsed DOMs representing the configuration.
This function implements the same requirements as dbus-daemon
itself -- notably, that valid config files must be named
with a ".conf" extension.
Returns: a list of DOMs
"""
config_doms = []
for dirent in os.listdir(dbusdir):
dirent = os.path.join(dbusdir, dirent)
if os.path.isfile(dirent) and dirent.endswith('.conf'):
config_doms.append(parse(dirent))
return config_doms
def mutual_compare(self, dbus_list, baseline, context='all'):
"""
This is a front-end for compare_dbus_trees which handles
comparison in both directions, discovering not only what is
missing from the baseline, but what is missing from the system.
The optional 'context' argument is (only) used to for
providing more detailed context in the debug-logging
that occurs.
Returns: True if the two exactly match. False otherwise.
"""
self.sort_dbus_tree(dbus_list)
self.sort_dbus_tree(baseline)
# Compare trees to find added API's.
newapis = self.compare_dbus_trees(dbus_list, baseline)
if (len(newapis) > 0):
logging.error("New (accessible to %s) API's to review:" % context)
logging.error(json.dumps(newapis, sort_keys=True, indent=2))
# Swap arguments to find missing API's.
missing_apis = self.compare_dbus_trees(baseline, dbus_list)
if (len(missing_apis) > 0):
logging.error("Missing API's (expected to be accessible to %s):" %
context)
logging.error(json.dumps(missing_apis, sort_keys=True, indent=2))
return (len(newapis) + len(missing_apis) == 0)
def add_member(self, dbus_list, dest, iface, member):
return self._add_surface(dbus_list, dest, iface, member, 'methods')
def add_signal(self, dbus_list, dest, iface, signal):
return self._add_surface(dbus_list, dest, iface, signal, 'signals')
def add_property(self, dbus_list, dest, iface, signal):
return self._add_surface(dbus_list, dest, iface, signal, 'properties')
def _add_surface(self, dbus_list, dest, iface, member, slot):
"""
This can add an entry for a member function to a given
dbus list. It behaves somewhat like "mkdir -p" in that
it creates any missing, necessary intermediate portions
of the data structure. For example, if this is the first
member being added for a given interface, the interface
will not already be mentioned in dbus_list, and this
function initializes the interface dictionary appropriately.
Returns: None
"""
# Ensure the Destination object exists in the data structure.
dest_idx = -1
for (i, objdict) in enumerate(dbus_list):
if objdict['Object_name'] == dest:
dest_idx = i
if dest_idx == -1:
dbus_list.append({'Object_name': dest, 'interfaces': []})
# Ensure the Interface entry exists for that Destination object.
iface_idx = -1
for (i, ifacedict) in enumerate(dbus_list[dest_idx]['interfaces']):
if ifacedict['interface'] == iface:
iface_idx = i
if iface_idx == -1:
dbus_list[dest_idx]['interfaces'].append({'interface': iface,
'signals': [],
'properties': [],
'methods': []})
# Ensure the slot exists.
if not slot in dbus_list[dest_idx]['interfaces'][iface_idx]:
dbus_list[dest_idx]['interfaces'][iface_idx][slot] = []
# Add member so long as it's not a duplicate.
if not member in (
dbus_list[dest_idx]['interfaces'][iface_idx][slot]):
dbus_list[dest_idx]['interfaces'][iface_idx][slot].append(
member)
def list_baselined_users(self):
"""
Return a list of usernames for which we keep user-specific
attack-surface baselines.
"""
bdir = os.path.dirname(os.path.abspath(__file__))
users = []
for item in os.listdir(bdir):
# Pick up baseline.username files but ignore emacs backups.
if item.startswith('baseline.') and not item.endswith('~'):
users.append(item.partition('.')[2])
return users
def load_baseline(self, user=''):
"""
Return a list of interface names we expect to be owned
by chronos.
"""
# The overall baseline is 'baseline'. User-specific baselines are
# stored in files named 'baseline.<username>'.
baseline_name = 'baseline'
if user:
baseline_name = '%s.%s' % (baseline_name, user)
# Figure out path to baseline file, by looking up our own path.
bpath = os.path.abspath(__file__)
bpath = os.path.join(os.path.dirname(bpath), baseline_name)
return self.load_dbus_data_from_disk(bpath)
def write_dbus_data_to_disk(self, dbus_list, file_path):
"""Writes the given dbus data to a given path to a json file.
Args:
dbus_list: list of dbus dictionaries to write to disk.
file_path: the path to the file to write the data to.
"""
file_handle = open(file_path, 'w')
my_json = json.dumps(dbus_list, sort_keys=True, indent=2)
# The json dumper has a trailing whitespace problem, and lacks
# a final newline. Fix both here.
file_handle.write(my_json.replace(', \n',',\n') + '\n')
file_handle.close()
def load_dbus_data_from_disk(self, file_path):
"""Loads dbus data from a given path to a json file.
Args:
file_path: path to the file as a string.
Returns:
A list of the dictionary representation of the dbus data loaded.
The dictionary format is the same as returned by walk_object().
"""
file_handle = open(file_path, 'r')
dbus_data = json.loads(file_handle.read())
file_handle.close()
return dbus_data
def sort_dbus_tree(self, tree):
"""Sorts a an aray of dbus dictionaries in alphabetical order.
All levels of the tree are sorted.
Args:
tree: the array to sort. Modified in-place.
"""
tree.sort(key=lambda x: x['Object_name'])
for dbus_object in tree:
dbus_object['interfaces'].sort(key=lambda x: x['interface'])
for interface in dbus_object['interfaces']:
interface['methods'].sort()
interface['signals'].sort()
interface['properties'].sort()
def compare_dbus_trees(self, current, baseline):
"""Compares two dbus dictionaries and return the delta.
The comparison only returns what is in the current (LHS) and not
in the baseline (RHS). If you want the reverse, call again
with the arguments reversed.
Args:
current: dbus tree you want to compare against the baseline.
baseline: dbus tree baseline.
Returns:
A list of dictionary representations of the additional dbus
objects, if there is a difference. Otherwise it returns an
empty list. The format of the dictionaries is the same as the
one returned in walk_object().
"""
# Build the key map of what is in the baseline.
bl_object_names = [bl_object['Object_name'] for bl_object in baseline]
new_items = []
for dbus_object in current:
if dbus_object['Object_name'] in bl_object_names:
index = bl_object_names.index(dbus_object['Object_name'])
bl_object_interfaces = baseline[index]['interfaces']
bl_interface_names = [name['interface'] for name in
bl_object_interfaces]
# If we have a new interface/method we need to build the shell.
new_object = {'Object_name':dbus_object['Object_name'],
'interfaces':[]}
for interface in dbus_object['interfaces']:
if interface['interface'] in bl_interface_names:
# The interface was baselined, check everything.
diffslots = {}
for slot in ['methods', 'signals', 'properties']:
index = bl_interface_names.index(
interface['interface'])
bl_methods = set(bl_object_interfaces[index][slot])
methods = set(interface[slot])
difference = methods.difference(bl_methods)
diffslots[slot] = list(difference)
if (diffslots['methods'] or diffslots['signals'] or
diffslots['properties']):
# This is a new thing we need to track.
new_methods = {'interface':interface['interface'],
'methods': diffslots['methods'],
'signals': diffslots['signals'],
'properties': diffslots['properties']
}
new_object['interfaces'].append(new_methods)
new_items.append(new_object)
else:
# This is a new interface we need to track.
new_object['interfaces'].append(interface)
new_items.append(new_object)
else:
# This is a new object we need to track.
new_items.append(dbus_object)
return new_items
def walk_object(self, bus, object_name, start_path, dbus_objects):
"""Walks the given bus and object returns a dictionary representation.
The formate of the dictionary is as follows:
{
Object_name: "string"
interfaces:
[
interface: "string"
methods:
[
"string1",
"string2"
]
]
}
Note that the decision to capitalize Object_name is just
a way to force it to appear above the interface-list it
corresponds to, when pretty-printed by the json dumper.
This makes it more logical for humans to read/edit.
Args:
bus: the bus to query, usually system.
object_name: the name of the dbus object to walk.
start_path: the path inside of the object in which to start walking
dbus_objects: current list of dbus objects in the given object
Returns:
A dictionary representation of a dbus object
"""
remote_object = bus.get_object(object_name,start_path)
unknown_iface = dbus.Interface(remote_object,
'org.freedesktop.DBus.Introspectable')
# Convert the string to an xml DOM object we can walk.
xml = parseString(unknown_iface.Introspect())
for child in xml.childNodes:
if ((child.nodeType == 1) and (child.localName == u'node')):
interfaces = child.getElementsByTagName('interface')
for interface in interfaces:
interface_name = interface.getAttribute('name')
# First get the methods.
methods = interface.getElementsByTagName('method')
method_list = []
for method in methods:
method_list.append(method.getAttribute('name'))
# Repeat the process for signals.
signals = interface.getElementsByTagName('signal')
signal_list = []
for signal in signals:
signal_list.append(signal.getAttribute('name'))
# Properties have to be discovered via API call.
prop_list = []
try:
prop_iface = dbus.Interface(remote_object,
'org.freedesktop.DBus.Properties')
prop_list = prop_iface.GetAll(interface_name).keys()
except dbus.exceptions.DBusException:
# Many daemons do not support this interface,
# which means they have no properties.
pass
# Create the dictionary with all the above.
dictionary = {'interface':interface_name,
'methods':method_list, 'signals':signal_list,
'properties':prop_list}
if dictionary not in dbus_objects:
dbus_objects.append(dictionary)
nodes = child.getElementsByTagName('node')
for node in nodes:
name = node.getAttribute('name')
if start_path[-1] != '/':
start_path = start_path + '/'
new_name = start_path + name
self.walk_object(bus, object_name, new_name, dbus_objects)
return {'Object_name':('%s' % object_name), 'interfaces':dbus_objects}
def mapper_main(self):
# Currently we only dump the SystemBus. Accessing the SessionBus says:
# "ExecFailed: /usr/bin/dbus-launch terminated abnormally with the
# following error: Autolaunch requested, but X11 support not compiled
# in."
# If this changes at a later date, add dbus.SessionBus() to the dict.
# We've left the code structured to support walking more than one bus
# for such an eventuality.
buses = {'System Bus': dbus.SystemBus()}
for busname in buses.keys():
bus = buses[busname]
remote_dbus_object = bus.get_object('org.freedesktop.DBus',
'/org/freedesktop/DBus')
iface = dbus.Interface(remote_dbus_object, 'org.freedesktop.DBus')
dbus_list = []
for i in iface.ListNames():
# There are some strange listings like ":1" which appear after
# certain names. Ignore these since we just need the names.
if i.startswith(':'):
continue
dbus_list.append(self.walk_object(bus, i, '/', []))
# Dump the complete observed dataset to disk. In the somewhat
# typical case, that we will want to rev the baseline to
# match current reality, these files are easily copied and
# checked in as a new baseline.
self.sort_dbus_tree(dbus_list)
observed_data_path = os.path.join(self.outputdir, 'observed')
self.write_dbus_data_to_disk(dbus_list, observed_data_path)
baseline = self.load_baseline()
test_pass = self.mutual_compare(dbus_list, baseline)
# Figure out which of the observed API's are callable by specific users
# whose attack surface we are particularly sensitive to:
dbus_cfg = self.load_dbus_config_doms()
for user in self.list_baselined_users():
user_baseline = self.load_baseline(user)
user_observed = []
# user_observed will be a subset of dbus_list. Iterate and check
# against the configured dbus policies as we go:
for objdict in dbus_list:
for ifacedict in objdict['interfaces']:
for meth in ifacedict['methods']:
if (self.check_policies(dbus_cfg,
objdict['Object_name'],
ifacedict['interface'], meth,
user=user)):
self.add_member(user_observed,
objdict['Object_name'],
ifacedict['interface'], meth)
# We don't do permission-checking on signals because
# signals are allow-all by default. Just copy them over.
for sig in ifacedict['signals']:
self.add_signal(user_observed,
objdict['Object_name'],
ifacedict['interface'], sig)
# A property might be readable, or even writable, to
# a given user if they can reach the Get/Set interface
access = []
if (self.check_policies(dbus_cfg, objdict['Object_name'],
'org.freedesktop.DBus.Properties',
'Set', user=user)):
access.append('Set')
if (self.check_policies(dbus_cfg, objdict['Object_name'],
'org.freedesktop.DBus.Properties',
'Get', user=user) or
self.check_policies(dbus_cfg, objdict['Object_name'],
'org.freedesktop.DBus.Properties',
'GetAll', user=user)):
access.append('Get')
if access:
access = ','.join(access)
for prop in ifacedict['properties']:
self.add_property(user_observed,
objdict['Object_name'],
ifacedict['interface'],
'%s (%s)' % (prop, access))
self.write_dbus_data_to_disk(user_observed,
'%s.%s' % (observed_data_path, user))
test_pass = test_pass and self.mutual_compare(user_observed,
user_baseline, user)
if not test_pass:
raise error.TestFail('Baseline mismatch(es)')
def run_once(self):
"""
Enumerates all discoverable interfaces, methods, and signals
in dbus-land. Verifies that it matches an expected set.
"""
login.wait_for_browser()
self.mapper_main()