| #!/usr/bin/env python |
| |
| import dbus |
| import dbus.decorators |
| import dbus.glib |
| import gobject |
| import sys |
| import getopt |
| from signal import * |
| |
| mgr_cmds = [ "InterfaceVersion", "ListAdapters", "DefaultAdapter" ] |
| mgr_signals = [ "AdapterAdded", "AdapterRemoved" ] |
| |
| dev_cmds = [ "GetAddress", |
| "GetVersion", |
| "GetRevision", |
| "GetManufacturer", |
| "GetCompany", |
| "GetMode", |
| "SetMode", |
| "GetDiscoverableTimeout", |
| "SetDiscoverableTimeout", |
| "IsConnectable", |
| "IsDiscoverable", |
| "IsConnected", |
| "ListConnections", |
| "GetMajorClass", |
| "ListAvailableMinorClasses", |
| "GetMinorClass", |
| "SetMinorClass", |
| "GetServiceClasses", |
| "GetName", |
| "SetName", |
| "GetRemoteVersion", |
| "GetRemoteRevision", |
| "GetRemoteManufacturer", |
| "GetRemoteCompany", |
| "GetRemoteMajorClass", |
| "GetRemoteMinorClass", |
| "GetRemoteServiceClasses", |
| "GetRemoteClass", |
| "GetRemoteName", |
| "GetRemoteAlias", |
| "SetRemoteAlias", |
| "ClearRemoteAlias", |
| "LastSeen", |
| "LastUsed", |
| "DisconnectRemoteDevice", |
| "CreateBonding", |
| "CancelBondingProcess", |
| "RemoveBonding", |
| "HasBonding", |
| "ListBondings", |
| "GetPinCodeLength", |
| "GetEncryptionKeySize", |
| "DiscoverDevices", |
| "DiscoverDevicesWithoutNameResolving", |
| "CancelDiscovery", |
| "ListRemoteDevices", |
| "ListRecentRemoteDevices" ] |
| dev_signals = [ "ModeChanged", |
| "NameChanged", |
| "MinorClassChanged", |
| "DiscoveryStarted", |
| "DiscoveryCompleted", |
| "RemoteDeviceFound", |
| "RemoteNameUpdated", |
| "RemoteNameFailed", |
| "RemoteAliasChanged" |
| "RemoteAliasCleared", |
| "RemoteDeviceConnected", |
| "RemoteDeviceDisconnectRequested", |
| "RemoteDeviceDisconnected", |
| "BondingCreated", |
| "BondingRemoved" ] |
| |
| dev_signals_filter = [ "/org/bluez/hci0", "/org/bluez/hci1", |
| "/org/bluez/hci2", "/org/bluez/hci3", |
| "/org/bluez/hci4", "/org/bluez/hci5", |
| "/org/bluez/hci6", "/org/bluez/hci7" ] |
| |
| class Tester: |
| exit_events = [] |
| dev_path = None |
| need_dev = False |
| listen = False |
| at_interrupt = None |
| |
| def __init__(self, argv): |
| self.name = argv[0] |
| |
| self.parse_args(argv[1:]) |
| |
| try: |
| self.dbus_setup() |
| except dbus.DBusException, e: |
| print 'Failed to do D-Bus setup: %s' % e |
| sys.exit(1) |
| |
| def parse_args(self, argv): |
| try: |
| opts, args = getopt.getopt(argv, "hli:") |
| except getopt.GetoptError: |
| self.usage() |
| sys.exit(1) |
| |
| for o, a in opts: |
| if o == "-h": |
| self.usage() |
| sys.exit() |
| elif o == "-l": |
| self.listen = True |
| elif o == "-i": |
| if a[0] == '/': |
| self.dev_path = a |
| else: |
| self.dev_path = '/org/bluez/%s' % a |
| |
| if not (args or self.listen): |
| self.usage() |
| sys.exit(1) |
| |
| if args: |
| self.cmd = args[0] |
| self.cmd_args = args[1:] |
| |
| def dbus_dev_setup(self): |
| if not self.dev_path: |
| try: |
| self.dbus_mgr_setup() |
| self.dev_path = self.manager.DefaultAdapter() |
| except dbus.DBusException, e: |
| print 'Failed to get default device: %s' % e |
| sys.exit(1) |
| try: |
| obj = self.bus.get_object('org.bluez', self.dev_path) |
| self.device = dbus.Interface(obj, 'org.bluez.Adapter') |
| except dbus.DBusException, e: |
| print 'Failed to setup device path: %s' % e |
| sys.exit(1) |
| |
| def dbus_dev_sig_setup(self): |
| try: |
| for signal in dev_signals: |
| for path in dev_signals_filter: |
| self.bus.add_signal_receiver(self.dev_signal_handler, |
| signal, 'org.bluez.Adapter', |
| 'org.bluez', path, |
| message_keyword='dbus_message') |
| except dbus.DBusException, e: |
| print 'Failed to setup signal handler for device path: %s' % e |
| sys.exit(1) |
| |
| def dbus_mgr_sig_setup(self): |
| try: |
| for signal in mgr_signals: |
| self.bus.add_signal_receiver(self.mgr_signal_handler, |
| signal,'org.bluez.Manager', |
| 'org.bluez', '/org/bluez') |
| except dbus.DBusException, e: |
| print 'Failed to setup signal handler for manager path: %s' % e |
| sys.exit(1) |
| |
| def dbus_mgr_setup(self): |
| self.manager_obj = self.bus.get_object('org.bluez', '/org/bluez') |
| self.manager = dbus.Interface(self.manager_obj, 'org.bluez.Manager') |
| |
| def dbus_setup(self): |
| self.bus = dbus.SystemBus() |
| |
| def usage(self): |
| print 'Usage: %s [-i <dev>] [-l] [-h] <cmd> [arg1..]' % self.name |
| print ' -i <dev> Specify device (e.g. "hci0" or "/org/bluez/hci0")' |
| print ' -l Listen for events (no command required)' |
| print ' -h Show this help' |
| print 'Manager commands:' |
| for cmd in mgr_cmds: |
| print '\t%s' % cmd |
| print 'Adapter commands:' |
| for cmd in dev_cmds: |
| print '\t%s' % cmd |
| |
| #@dbus.decorators.explicitly_pass_message |
| def dev_signal_handler(*args, **keywords): |
| dbus_message = keywords["dbus_message"] |
| print '%s - %s: ' % (dbus_message.get_member(), dbus_message.get_path()), |
| for arg in args[1:]: |
| print '%s ' % arg, |
| print |
| |
| #@dbus.decorators.explicitly_pass_message |
| def mgr_signal_handler(*args, **keywords): |
| dbus_message = keywords["dbus_message"] |
| print '%s: ' % dbus_message.get_member() |
| for arg in args[1:]: |
| print '%s ' % arg, |
| print |
| |
| def signal_cb(self, sig, frame): |
| print 'Caught signal, exiting' |
| if self.at_interrupt: |
| self.at_interrupt() |
| self.main_loop.quit() |
| |
| def call_mgr_dbus_func(self): |
| if self.cmd == 'InterfaceVersion': |
| try: |
| print self.manager.InterfaceVersion() |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| if self.cmd == 'ListAdapters': |
| try: |
| devices = self.manager.ListAdapters() |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| for device in devices: |
| print device |
| elif self.cmd == 'DefaultAdapter': |
| try: |
| print self.manager.DefaultAdapter() |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| |
| def call_dev_dbus_func(self): |
| try: |
| if self.cmd == 'GetAddress': |
| print self.device.GetAddress() |
| elif self.cmd == 'GetManufacturer': |
| print self.device.GetManufacturer() |
| elif self.cmd == 'GetVersion': |
| print self.device.GetVersion() |
| elif self.cmd == 'GetRevision': |
| print self.device.GetRevision() |
| elif self.cmd == 'GetCompany': |
| print self.device.GetCompany() |
| elif self.cmd == 'GetMode': |
| print self.device.GetMode() |
| elif self.cmd == 'SetMode': |
| if len(self.cmd_args) == 1: |
| self.device.SetMode(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> SetMode scan_mode' % self.name |
| elif self.cmd == 'GetDiscoverableTimeout': |
| print '%u' % (self.device.GetDiscoverableTimeout()) |
| elif self.cmd == 'SetDiscoverableTimeout': |
| if len(self.cmd_args) == 1: |
| self.device.SetDiscoverableTimeout(dbus.UInt32(self.cmd_args[0])) |
| else: |
| print 'Usage: %s -i <dev> SetDiscoverableTimeout timeout' % self.name |
| elif self.cmd == 'IsConnectable': |
| print self.device.IsConnectable() |
| elif self.cmd == 'IsDiscoverable': |
| print self.device.IsDiscoverable() |
| elif self.cmd == 'IsConnected': |
| if len(self.cmd_args) == 1: |
| print self.device.IsConnected(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> IsConnected address' % self.name |
| elif self.cmd == 'ListConnections': |
| print self.device.ListConnections() |
| elif self.cmd == 'GetMajorClass': |
| print self.device.GetMajorClass() |
| elif self.cmd == 'ListAvailableMinorClasses': |
| print self.device.ListAvailableMinorClasses() |
| elif self.cmd == 'GetMinorClass': |
| print self.device.GetMinorClass() |
| elif self.cmd == 'SetMinorClass': |
| if len(self.cmd_args) == 1: |
| self.device.SetMinorClass(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> SetMinorClass minor' % self.name |
| elif self.cmd == 'GetServiceClasses': |
| classes = self.device.GetServiceClasses() |
| for clas in classes: |
| print clas, |
| elif self.cmd == 'GetName': |
| print self.device.GetName() |
| elif self.cmd == 'SetName': |
| if len(self.cmd_args) == 1: |
| self.device.SetName(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> SetName newname' % self.name |
| elif self.cmd == 'GetRemoteName': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteName(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteName address' % self.name |
| elif self.cmd == 'GetRemoteVersion': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteVersion(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteVersion address' % self.name |
| elif self.cmd == 'GetRemoteRevision': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteRevision(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteRevision address' % self.name |
| elif self.cmd == 'GetRemoteManufacturer': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteManufacturer(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteManufacturer address' % self.name |
| elif self.cmd == 'GetRemoteCompany': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteCompany(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteCompany address' % self.name |
| elif self.cmd == 'GetRemoteAlias': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteAlias(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteAlias address' % self.name |
| elif self.cmd == 'GetRemoteMajorClass': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteMajorClass(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteMajorClass address' % self.name |
| elif self.cmd == 'GetRemoteMinorClass': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteMinorClass(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteMinorClass address' % self.name |
| elif self.cmd == 'GetRemoteServiceClasses': |
| if len(self.cmd_args) == 1: |
| print self.device.GetRemoteServiceClasses(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetRemoteServiceClasses address' % self.name |
| elif self.cmd == 'SetRemoteAlias': |
| if len(self.cmd_args) == 2: |
| self.device.SetRemoteAlias(self.cmd_args[0], self.cmd_args[1]) |
| else: |
| print 'Usage: %s -i <dev> SetRemoteAlias address alias' % self.name |
| elif self.cmd == 'ClearRemoteAlias': |
| if len(self.cmd_args) == 1: |
| print self.device.ClearRemoteAlias(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> ClearRemoteAlias address' % self.name |
| elif self.cmd == 'LastSeen': |
| if len(self.cmd_args) == 1: |
| print self.device.LastSeen(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> LastSeen address' % self.name |
| elif self.cmd == 'LastUsed': |
| if len(self.cmd_args) == 1: |
| print self.device.LastUsed(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> LastUsed address' % self.name |
| elif self.cmd == 'DisconnectRemoteDevice': |
| if len(self.cmd_args) == 1: |
| print self.device.LastUsed(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> DisconnectRemoteDevice address' % self.name |
| elif self.cmd == 'CreateBonding': |
| if len(self.cmd_args) == 1: |
| print self.device.CreateBonding(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> CreateBonding address' % self.name |
| elif self.cmd == 'RemoveBonding': |
| if len(self.cmd_args) == 1: |
| print self.device.RemoveBonding(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> RemoveBonding address' % self.name |
| elif self.cmd == 'CancelBondingProcess': |
| if len(self.cmd_args) == 1: |
| print self.device.CancelBondingProcess(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> CancelBondingProcess address' % self.name |
| elif self.cmd == 'HasBonding': |
| if len(self.cmd_args) == 1: |
| print self.device.HasBonding(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> HasBonding address' % self.name |
| elif self.cmd == 'ListBondings': |
| bondings = self.device.ListBondings() |
| for bond in bondings: |
| print bond, |
| elif self.cmd == 'GetPinCodeLength': |
| if len(self.cmd_args) == 1: |
| print self.device.GetPinCodeLength(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetPinCodeLength address' % self.name |
| elif self.cmd == 'GetEncryptionKeySize': |
| if len(self.cmd_args) == 1: |
| print self.device.GetEncryptionKeySize(self.cmd_args[0]) |
| else: |
| print 'Usage: %s -i <dev> GetEncryptionKeySize address' % self.name |
| elif self.cmd == 'DiscoverDevices': |
| print self.device.DiscoverDevices() |
| elif self.cmd == 'DiscoverDevicesWithoutNameResolving': |
| print self.device.DiscoverDevicesWithoutNameResolving() |
| elif self.cmd == 'ListRemoteDevices': |
| devices = self.device.ListRemoteDevices() |
| for device in devices: |
| print device, |
| elif self.cmd == 'ListRecentRemoteDevices': |
| if len(self.cmd_args) == 1: |
| devices = self.device.ListRecentRemoteDevices(self.cmd_args[0]) |
| for device in devices: |
| print device, |
| else: |
| print 'Usage: %s -i <dev> ListRecentRemoteDevices date' % self.name |
| else: |
| # FIXME: remove at future version |
| print 'Script Error: Method %s not found. Maybe a mispelled word.' % (self.cmd_args) |
| except dbus.DBusException, e: |
| print '%s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| |
| def run(self): |
| # Manager methods |
| if self.listen: |
| self.dbus_mgr_sig_setup() |
| self.dbus_dev_sig_setup() |
| print 'Listening for events...' |
| |
| if self.cmd in mgr_cmds: |
| try: |
| self.dbus_mgr_setup() |
| except dbus.DBusException, e: |
| print 'Failed to setup manager interface: %s' % e |
| sys.exit(1) |
| self.call_mgr_dbus_func() |
| elif self.cmd in dev_cmds: |
| try: |
| self.dbus_dev_setup() |
| except dbus.DBusException, e: |
| print 'Failed to setup device interface: %s' % e |
| sys.exit(1) |
| self.call_dev_dbus_func() |
| elif not self.listen: |
| print 'Unknown command: %s' % self.cmd |
| self.usage() |
| sys.exit(1) |
| |
| if self.listen: |
| signal(SIGINT, self.signal_cb) |
| signal(SIGTERM, self.signal_cb) |
| self.main_loop = gobject.MainLoop() |
| self.main_loop.run() |
| |
| if __name__ == '__main__': |
| gobject.threads_init() |
| dbus.glib.init_threads() |
| |
| tester = Tester(sys.argv) |
| tester.run() |