#!/usr/bin/python # -*- coding: utf-8 -*- import dbus import dbus.service import gobject from dbus.mainloop.glib import DBusGMainLoop import sys DBusGMainLoop(set_as_default=True) loop = gobject.MainLoop() bus = dbus.SystemBus() def sig_received(*args, **kwargs): if "member" not in kwargs: return if "path" not in kwargs: return; sig_name = kwargs["member"] path = kwargs["path"] print sig_name print path if sig_name == "PropertyChanged": k, v = args print k print v else: ob = args[0] print ob def enter_mainloop(): bus.add_signal_receiver(sig_received, bus_name="org.bluez", dbus_interface = "org.bluez.HealthDevice", path_keyword="path", member_keyword="member", interface_keyword="interface") try: print "Entering main lopp, push Ctrl+C for finish" mainloop = gobject.MainLoop() mainloop.run() except KeyboardInterrupt: pass finally: print "Exiting, bye" hdp_manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.HealthManager") role = None while role == None: print "Select 1. source or 2. sink: ", try: sel = int(sys.stdin.readline()) if sel == 1: role = "Source" elif sel == 2: role = "Sink" else: raise ValueError except (TypeError, ValueError): print "Wrong selection, try again: ", except KeyboardInterrupt: sys.exit() dtype = None while dtype == None: print "Select a data type: ", try: sel = int(sys.stdin.readline()) if (sel < 0) or (sel > 65535): raise ValueError dtype = sel; except (TypeError, ValueError): print "Wrong selection, try again: ", except KeyboardInterrupt: sys.exit() pref = None if role == "Source": while pref == None: try: print "Select a prefered data channel type 1.", print "reliable 2. streaming: ", sel = int(sys.stdin.readline()) if sel == 1: pref = "Reliable" elif sel == 2: pref = "Streaming" else: raise ValueError except (TypeError, ValueError): print "Wrong selection, try again" except KeyboardInterrupt: sys.exit() app_path = hdp_manager.CreateApplication({ "DataType": dbus.types.UInt16(dtype), "Role": role, "Description": "Test Source", "ChannelType": pref}) else: app_path = hdp_manager.CreateApplication({ "DataType": dbus.types.UInt16(dtype), "Description": "Test sink", "Role": role}) print "New application created:", app_path con = None while con == None: try: print "Connect to a remote device (y/n)? ", sel = sys.stdin.readline() if sel in ("y\n", "yes\n", "Y\n", "YES\n"): con = True elif sel in ("n\n", "no\n", "N\n", "NO\n"): con = False else: print "Wrong selection, try again." except KeyboardInterrupt: sys.exit() if not con: enter_mainloop() sys.exit() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.bluez.Manager") adapters = manager.ListAdapters() i = 1 for ad in adapters: print "%d. %s" % (i, ad) i = i + 1 print "Select an adapter: ", select = None while select == None: try: pos = int(sys.stdin.readline()) - 1 if pos < 0: raise TypeError select = adapters[pos] except (TypeError, IndexError, ValueError): print "Wrong selection, try again: ", except KeyboardInterrupt: sys.exit() adapter = dbus.Interface(bus.get_object("org.bluez", select), "org.bluez.Adapter") devices = adapter.ListDevices() if len(devices) == 0: print "No devices available" sys.exit() i = 1 for dev in devices: print "%d. %s" % (i, dev) i = i + 1 print "Select a device: ", select = None while select == None: try: pos = int(sys.stdin.readline()) - 1 if pos < 0: raise TypeError select = devices[pos] except (TypeError, IndexError, ValueError): print "Wrong selection, try again: ", except KeyboardInterrupt: sys.exit() device = dbus.Interface(bus.get_object("org.bluez", select), "org.bluez.HealthDevice") echo = None while echo == None: try: print "Perform an echo (y/n)? ", sel = sys.stdin.readline() if sel in ("y\n", "yes\n", "Y\n", "YES\n"): echo = True elif sel in ("n\n", "no\n", "N\n", "NO\n"): echo = False else: print "Wrong selection, try again." except KeyboardInterrupt: sys.exit() if echo: if device.Echo(): print "Echo was ok" else: print "Echo war wrong, exiting" sys.exit() print "Connecting to device %s" % (select) if role == "Source": chan = device.CreateChannel(app_path, "Reliable") else: chan = device.CreateChannel(app_path, "Any") print chan enter_mainloop() hdp_manager.DestroyApplication(app_path)