#!/usr/bin/env python # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import cmd import dbus import dbus.exceptions import dbus.mainloop.glib import gobject import threading from functools import wraps DBUS_ERROR = 'org.freedesktop.DBus.Error' NEARD_PATH = '/org/neard/' PROMPT = 'NFC> ' class NfcClientException(Exception): """Exception class for exceptions thrown by NfcClient.""" def print_message(message, newlines=2): """ Prints the given message with extra wrapping newline characters. @param message: Message to print. @param newlines: Integer, specifying the number of '\n' characters that should be padded at the beginning and end of |message| before being passed to "print". """ padding = newlines * '\n' message = padding + message + padding print message def handle_errors(func): """ Decorator for handling exceptions that are commonly raised by many of the methods in NfcClient. @param func: The function this decorator is wrapping. """ @wraps(func) def _error_handler(*args): try: return func(*args) except dbus.exceptions.DBusException as e: if e.get_dbus_name() == DBUS_ERROR + '.ServiceUnknown': print_message('neard may have crashed or disappeared. ' 'Check if neard is running and run "initialize" ' 'from this shell.') return if e.get_dbus_name() == DBUS_ERROR + '.UnknownObject': print_message('Could not find object.') return print_message(str(e)) except Exception as e: print_message(str(e)) return _error_handler class NfcClient(object): """ neard D-Bus client """ NEARD_SERVICE_NAME = 'org.neard' IMANAGER = NEARD_SERVICE_NAME + '.Manager' IADAPTER = NEARD_SERVICE_NAME + '.Adapter' ITAG = NEARD_SERVICE_NAME + '.Tag' IRECORD = NEARD_SERVICE_NAME + '.Record' IDEVICE = NEARD_SERVICE_NAME + '.Device' def __init__(self): self._mainloop = None self._mainloop_thread = None self._adapters = {} self._adapter_property_handler_matches = {} def begin(self): """ Starts the D-Bus client. """ # Here we run a GLib MainLoop in its own thread, so that the client can # listen to D-Bus signals while keeping the console interactive. self._dbusmainloop = dbus.mainloop.glib.DBusGMainLoop( set_as_default=True) dbus.mainloop.glib.threads_init() gobject.threads_init() def _mainloop_thread_func(): self._mainloop = gobject.MainLoop() context = self._mainloop.get_context() self._run_loop = True while self._run_loop: context.iteration(True) self._mainloop_thread = threading.Thread(None, _mainloop_thread_func) self._mainloop_thread.start() self._bus = dbus.SystemBus() self.setup_manager() def end(self): """ Stops the D-Bus client. """ self._run_loop = False self._mainloop.quit() self._mainloop_thread.join() def restart(self): """Reinitializes the NFC client.""" self.setup_manager() @handle_errors def _get_manager_proxy(self): return dbus.Interface( self._bus.get_object(self.NEARD_SERVICE_NAME, '/'), self.IMANAGER) @handle_errors def _get_adapter_proxy(self, adapter): return dbus.Interface( self._bus.get_object(self.NEARD_SERVICE_NAME, adapter), self.IADAPTER) def _get_cached_adapter_proxy(self, adapter): adapter_proxy = self._adapters.get(adapter, None) if not adapter_proxy: raise NfcClientException('Adapter "' + adapter + '" not found.') return adapter_proxy @handle_errors def _get_tag_proxy(self, tag): return dbus.Interface( self._bus.get_object(self.NEARD_SERVICE_NAME, tag), self.ITAG) @handle_errors def _get_device_proxy(self, device): return dbus.Interface( self._bus.get_object(self.NEARD_SERVICE_NAME, device), self.IDEVICE) @handle_errors def _get_record_proxy(self, record): return dbus.Interface( self._bus.get_object(self.NEARD_SERVICE_NAME, record), self.IRECORD) @handle_errors def _get_adapter_properties(self, adapter): adapter_proxy = self._get_cached_adapter_proxy(adapter) return adapter_proxy.GetProperties() def _get_adapters(self): props = self._manager.GetProperties() return props.get('Adapters', None) def setup_manager(self): """ Creates a manager proxy and subscribes to adapter signals. This method will also initialize proxies for adapters if any are available. """ # Create the manager proxy. self._adapters.clear() self._manager = self._get_manager_proxy() if not self._manager: print_message('Failed to create a proxy to the Manager interface.') return # Listen to the adapter added and removed signals. self._manager.connect_to_signal( 'AdapterAdded', lambda adapter: self.register_adapter(str(adapter))) self._manager.connect_to_signal( 'AdapterRemoved', lambda adapter: self.unregister_adapter(str(adapter))) # See if there are any adapters and create proxies for each. adapters = self._get_adapters() if adapters: for adapter in adapters: self.register_adapter(adapter) def register_adapter(self, adapter): """ Registers an adapter proxy with the given object path and subscribes to adapter signals. @param adapter: string, containing the adapter's D-Bus object path. """ print_message('Added adapter: ' + adapter) adapter_proxy = self._get_adapter_proxy(adapter) self._adapters[adapter] = adapter_proxy # Tag found/lost currently don't get fired. Monitor property changes # instead. if self._adapter_property_handler_matches.get(adapter, None) is None: self._adapter_property_handler_matches[adapter] = ( adapter_proxy.connect_to_signal( 'PropertyChanged', (lambda name, value: self._adapter_property_changed_signal( adapter, name, value)))) def unregister_adapter(self, adapter): """ Removes the adapter proxy for the given object path from the internal cache of adapters. @param adapter: string, containing the adapter's D-Bus object path. """ print_message('Removed adapter: ' + adapter) match = self._adapter_property_handler_matches.get(adapter, None) if match is not None: match.remove() self._adapter_property_handler_matches.pop(adapter) self._adapters.pop(adapter) def _adapter_property_changed_signal(self, adapter, name, value): if name == 'Tags' or name == 'Devices': print_message('Found ' + name + ': ' + self._dbus_array_to_string(value)) @handle_errors def show_adapters(self): """ Prints the D-Bus object paths of all adapters that are available. """ adapters = self._get_adapters() if not adapters: print_message('No adapters found.') return for adapter in adapters: print_message(' ' + str(adapter), newlines=0) print def _dbus_array_to_string(self, array): string = '[ ' for value in array: string += ' ' + str(value) + ', ' string += ' ]' return string def print_adapter_status(self, adapter): """ Prints the properties of the given adapter. @param adapter: string, containing the adapter's D-Bus object path. """ props = self._get_adapter_properties(adapter) if not props: return print_message('Status ' + adapter + ': ', newlines=0) for key, value in props.iteritems(): if type(value) == dbus.Array: value = self._dbus_array_to_string(value) else: value = str(value) print_message(' ' + key + ' = ' + value, newlines=0) print @handle_errors def set_powered(self, adapter, powered): """ Enables or disables the adapter. @param adapter: string, containing the adapter's D-Bus object path. @param powered: boolean that dictates whether the adapter will be enabled or disabled. """ adapter_proxy = self._get_cached_adapter_proxy(adapter) if not adapter_proxy: return adapter_proxy.SetProperty('Powered', powered) @handle_errors def start_polling(self, adapter): """ Starts polling for nearby tags and devices in "Initiator" mode. @param adapter: string, containing the adapter's D-Bus object path. """ adapter_proxy = self._get_cached_adapter_proxy(adapter) adapter_proxy.StartPollLoop('Initiator') print_message('Started polling.') @handle_errors def stop_polling(self, adapter): """ Stops polling for nearby tags and devices. @param adapter: string, containing the adapter's D-Bus object path. """ adapter_proxy = self._get_cached_adapter_proxy(adapter) adapter_proxy.StopPollLoop() self._polling_stopped = True print_message('Stopped polling.') @handle_errors def show_tag_data(self, tag): """ Prints the properties of the given tag, as well as the contents of any records associated with it. @param tag: string, containing the tag's D-Bus object path. """ tag_proxy = self._get_tag_proxy(tag) if not tag_proxy: print_message('Tag "' + tag + '" not found.') return props = tag_proxy.GetProperties() print_message('Tag ' + tag + ': ', newlines=1) for key, value in props.iteritems(): if key != 'Records': print_message(' ' + key + ' = ' + str(value), newlines=0) records = props['Records'] if not records: return print_message('Records: ', newlines=1) for record in records: self.show_record_data(str(record)) print @handle_errors def show_device_data(self, device): """ Prints the properties of the given device, as well as the contents of any records associated with it. @param device: string, containing the device's D-Bus object path. """ device_proxy = self._get_device_proxy(device) if not device_proxy: print_message('Device "' + device + '" not found.') return records = device_proxy.GetProperties()['Records'] if not records: print_message('No records on device.') return print_message('Records: ', newlines=1) for record in records: self.show_record_data(str(record)) print @handle_errors def show_record_data(self, record): """ Prints the contents of the given record. @param record: string, containing the record's D-Bus object path. """ record_proxy = self._get_record_proxy(record) if not record_proxy: print_message('Record "' + record + '" not found.') return props = record_proxy.GetProperties() print_message('Record ' + record + ': ', newlines=1) for key, value in props.iteritems(): print ' ' + key + ' = ' + value print def _create_record_data(self, record_type, params): if record_type == 'Text': possible_keys = [ 'Encoding', 'Language', 'Representation' ] tag_data = { 'Type': 'Text' } elif record_type == 'URI': possible_keys = [ 'URI' ] tag_data = { 'Type': 'URI' } else: print_message('Writing record type "' + record_type + '" currently not supported.') return None for key, value in params.iteritems(): if key in possible_keys: tag_data[key] = value return tag_data @handle_errors def write_tag(self, tag, record_type, params): """ Writes an NDEF record to the given tag. @param tag: string, containing the tag's D-Bus object path. @param record_type: The type of the record, e.g. Text or URI. @param params: dictionary, containing the parameters of the NDEF. """ tag_data = self._create_record_data(record_type, params) if not tag_data: return tag_proxy = self._get_tag_proxy(tag) if not tag_proxy: print_message('Tag "' + tag + '" not found.') return tag_proxy.Write(tag_data) print_message('Tag written!') @handle_errors def push_to_device(self, device, record_type, params): """ Pushes an NDEF record to the given device. @param device: string, containing the device's D-Bus object path. @param record_type: The type of the record, e.g. Text or URI. @param params: dictionary, containing the parameters of the NDEF. """ record_data = self._create_record_data(record_type, params) if not record_data: return device_proxy = self._get_device_proxy(device) if not device_proxy: print_message('Device "' + device + '" not found.') return device_proxy.Push(record_data) print_message('NDEF pushed to device!') class NfcConsole(cmd.Cmd): """ Interactive console to interact with the NFC daemon. """ def __init__(self): cmd.Cmd.__init__(self) self.prompt = PROMPT def begin(self): """ Starts the interactive shell. """ print_message('NFC console! Run "help" for a list of commands.', newlines=1) self._nfc_client = NfcClient() self._nfc_client.begin() self.cmdloop() def can_exit(self): """Override""" return True def do_initialize(self, args): """Handles "initialize".""" if args: print_message('Command "initialize" expects no arguments.') return self._nfc_client.restart() def help_initialize(self): """Prints the help message for "initialize".""" print_message('Initializes the neard D-Bus client. This can be ' 'run many times to restart the client in case of ' 'neard failures or crashes.') def do_adapters(self, args): """Handles "adapters".""" if args: print_message('Command "adapters" expects no arguments.') return self._nfc_client.show_adapters() def help_adapters(self): """Prints the help message for "adapters".""" print_message('Displays the D-Bus object paths of the available ' 'adapter objects.') def do_adapter_status(self, args): """Handles "adapter_status".""" args = args.strip().split(' ') if len(args) != 1 or not args[0]: print_message('Usage: adapter_status <adapter>') return self._nfc_client.print_adapter_status(NEARD_PATH + args[0]) def help_adapter_status(self): """Prints the help message for "adapter_status".""" print_message('Returns the properties of the given NFC adapter.\n\n' ' Ex: "adapter_status nfc0"') def do_enable_adapter(self, args): """Handles "enable_adapter".""" args = args.strip().split(' ') if len(args) != 1 or not args[0]: print_message('Usage: enable_adapter <adapter>') return self._nfc_client.set_powered(NEARD_PATH + args[0], True) def help_enable_adapter(self): """Prints the help message for "enable_adapter".""" print_message('Powers up the adapter. Ex: "enable_adapter nfc0"') def do_disable_adapter(self, args): """Handles "disable_adapter".""" args = args.strip().split(' ') if len(args) != 1 or not args[0]: print_message('Usage: disable_adapter <adapter>') return self._nfc_client.set_powered(NEARD_PATH + args[0], False) def help_disable_adapter(self): """Prints the help message for "disable_adapter".""" print_message('Powers down the adapter. Ex: "disable_adapter nfc0"') def do_start_poll(self, args): """Handles "start_poll".""" args = args.strip().split(' ') if len(args) != 1 or not args[0]: print_message('Usage: start_poll <adapter>') return self._nfc_client.start_polling(NEARD_PATH + args[0]) def help_start_poll(self): """Prints the help message for "start_poll".""" print_message('Initiates a poll loop.\n\n Ex: "start_poll nfc0"') def do_stop_poll(self, args): """Handles "stop_poll".""" args = args.split(' ') if len(args) != 1 or not args[0]: print_message('Usage: stop_poll <adapter>') return self._nfc_client.stop_polling(NEARD_PATH + args[0]) def help_stop_poll(self): """Prints the help message for "stop_poll".""" print_message('Stops a poll loop.\n\n Ex: "stop_poll nfc0"') def do_read_tag(self, args): """Handles "read_tag".""" args = args.strip().split(' ') if len(args) != 1 or not args[0]: print_message('Usage read_tag <tag>') return self._nfc_client.show_tag_data(NEARD_PATH + args[0]) def help_read_tag(self): """Prints the help message for "read_tag".""" print_message('Reads the contents of a tag. Ex: read_tag nfc0/tag0') def _parse_record_args(self, record_type, args): if record_type == 'Text': if len(args) < 5: print_message('Usage: write_tag <tag> Text <encoding> ' '<language> <representation>') return None if args[2] not in [ 'UTF-8', 'UTF-16' ]: print_message('Encoding must be one of "UTF-8" or "UTF-16".') return None return { 'Encoding': args[2], 'Language': args[3], 'Representation': ' '.join(args[4:]) } if record_type == 'URI': if len(args) != 3: print_message('Usage: write_tag <tag> URI <uri>') return None return { 'URI': args[2] } print_message('Only types "Text" and "URI" are supported by this ' 'script.') return None def do_write_tag(self, args): """Handles "write_tag".""" args = args.strip().split(' ') if len(args) < 3: print_message('Usage: write_tag <tag> [params]') return record_type = args[1] params = self._parse_record_args(record_type, args) if not params: return self._nfc_client.write_tag(NEARD_PATH + args[0], record_type, params) def help_write_tag(self): """Prints the help message for "write_tag".""" print_message('Writes the given data to a tag. Usage:\n' ' write_tag <tag> Text <encoding> <language> ' '<representation>\n write_tag <tag> URI <uri>') def do_read_device(self, args): """Handles "read_device".""" args = args.strip().split(' ') if len(args) != 1 or not args[0]: print_message('Usage read_device <device>') return self._nfc_client.show_device_data(NEARD_PATH + args[0]) def help_read_device(self): """Prints the help message for "read_device".""" print_message('Reads the contents of a device. Ex: read_device ' 'nfc0/device0') def do_push_to_device(self, args): """Handles "push_to_device".""" args = args.strip().split(' ') if len(args) < 3: print_message('Usage: push_to_device <device> [params]') return record_type = args[1] params = self._parse_record_args(record_type, args) if not params: return self._nfc_client.push_to_device(NEARD_PATH + args[0], record_type, params) def help_push_to_device(self): """Prints the help message for "push_to_device".""" print_message('Pushes the given data to a device. Usage:\n' ' push_to_device <device> Text <encoding> <language> ' '<representation>\n push_to_device <device> URI <uri>') def do_exit(self, args): """ Handles the 'exit' command. @param args: Arguments to the command. Unused. """ if args: print_message('Command "exit" expects no arguments.') return resp = raw_input('Are you sure? (yes/no): ') if resp == 'yes': print_message('Goodbye!') self._nfc_client.end() return True if resp != 'no': print_message('Did not understand: ' + resp) return False def help_exit(self): """Handles the 'help exit' command.""" print_message('Exits the console.') do_EOF = do_exit help_EOF = help_exit def main(): """Main function.""" NfcConsole().begin() if __name__ == '__main__': main()