| #!/usr/bin/env python |
| |
| import dbus |
| import dbus.decorators |
| import dbus.glib |
| import gobject |
| import sys |
| import getopt |
| from signal import * |
| |
| mgr_cmds = [ "DeviceList", "DefaultDevice" ] |
| dev_cmds = [ "Up", "Down", "SetProperty", "GetProperty", "Inquiry", |
| "CancelInquiry", "PeriodicInquiry","CancelPeriodic", "RemoteName", |
| "Connections", "Authenticate", "RoleSwitch" ] |
| dev_setprop_bool = [ "auth", "encrypt", "discoverable", "connectable" ] |
| dev_setprop_byte = [ "incmode" ] |
| |
| class Tester: |
| exit_events = [] |
| dev_path = None |
| need_dev = False |
| listen = False |
| at_interrupt = None |
| |
| def __init__(self, argv): |
| self.name = argv[0] |
| |
| self.parse_args(argv[1:]) |
| |
| try: |
| self.dbus_setup() |
| except dbus.DBusException, e: |
| print 'Failed to do D-BUS setup: %s' % e |
| sys.exit(1) |
| |
| self.dev_setup() |
| |
| def parse_args(self, argv): |
| try: |
| opts, args = getopt.getopt(argv, "hli:") |
| except getopt.GetoptError: |
| self.usage() |
| sys.exit(1) |
| |
| for o, a in opts: |
| if o == "-h": |
| self.usage() |
| sys.exit() |
| elif o == "-l": |
| self.need_dev = True |
| self.listen = True |
| elif o == "-i": |
| if a[0] == '/': |
| self.dev_path = a |
| else: |
| self.dev_path = '/org/bluez/Device/%s' % a |
| |
| if not (args or self.listen): |
| self.usage() |
| sys.exit(1) |
| |
| if args: |
| self.cmd = args[0] |
| self.cmd_args = args[1:] |
| |
| if not self.cmd in mgr_cmds: |
| self.need_dev = True |
| |
| def dev_setup(self): |
| if self.need_dev and not self.dev_path: |
| try: |
| self.dev_path = self.manager.DefaultDevice() |
| except dbus.DBusException, e: |
| print 'Failed to get default device: %s' % e |
| sys.exit(1) |
| |
| if self.dev_path: |
| try: |
| obj = self.bus.get_object('org.bluez', self.dev_path) |
| self.dev = dbus.Interface(obj, 'org.bluez.Device') |
| |
| self.dev.connect_to_signal('Up', self.dev_up) |
| self.dev.connect_to_signal('Down', self.dev_down) |
| self.bus.add_signal_receiver(self.dev_name_changed, 'DeviceNameChanged', |
| 'org.bluez.Device', 'org.bluez', |
| '/org/bluez/Device/hci0') |
| |
| obj = self.bus.get_object('org.bluez', '%s/Controller' % self.dev_path) |
| self.ctl = dbus.Interface(obj, 'org.bluez.Device.Controller') |
| |
| self.ctl.connect_to_signal('InquiryStart', self.inquiry_start) |
| self.ctl.connect_to_signal('InquiryResult', self.inquiry_result) |
| self.ctl.connect_to_signal('InquiryComplete', self.inquiry_complete) |
| self.ctl.connect_to_signal('RemoteName', self.remote_name) |
| self.ctl.connect_to_signal('RemoteNameFailed', self.remote_name_failed) |
| self.ctl.connect_to_signal('AuthenticationComplete', self.authentication_complete) |
| |
| except dbus.DBusException, e: |
| print 'Failed to setup device path: %s' % e |
| sys.exit(1) |
| |
| def dbus_setup(self): |
| self.bus = dbus.SystemBus() |
| manager_obj = self.bus.get_object('org.bluez', '/org/bluez/Manager') |
| self.manager = dbus.Interface(manager_obj, 'org.bluez.Manager') |
| self.manager.connect_to_signal('DeviceAdded', self.device_added) |
| self.manager.connect_to_signal('DeviceRemoved', self.device_removed) |
| |
| def usage(self): |
| print 'Usage: %s [-i <dev>] [-l] [-h] <cmd> [arg1..]' % self.name |
| print ' -i <dev> Specify device (e.g. "hci0" or "/org/bluez/Device/hci0")' |
| print ' -l Listen for events (no command required)' |
| print ' -h Show this help' |
| print 'Manager commands:' |
| for cmd in mgr_cmds: |
| print '\t%s' % cmd |
| print 'Device commands:' |
| for cmd in dev_cmds: |
| print '\t%s' % cmd |
| |
| def device_added(self, path): |
| print 'DeviceAdded: %s' % path |
| |
| def device_removed(self, path): |
| print 'DeviceRemoved: %s' % path |
| |
| def remote_name(self, bda, name): |
| print 'RemoteName: %s, %s' % (bda, name) |
| if 'RemoteName' in self.exit_events: |
| self.main_loop.quit() |
| |
| def remote_name_failed(self, bda, status): |
| print 'RemoteNameFailed: %s, 0x%02X' % (bda, status) |
| if 'RemoteNameFailed' in self.exit_events: |
| self.main_loop.quit() |
| |
| def inquiry_start(self): |
| print 'InquiryStart' |
| |
| def inquiry_complete(self): |
| print 'InquiryComplete' |
| if 'InquiryComplete' in self.exit_events: |
| self.main_loop.quit() |
| |
| def inquiry_result(self, bda, cls, rssi): |
| print 'InquiryResult: %s, %06X, %02X' % (bda, cls, rssi) |
| |
| def authentication_complete(self, bda, status): |
| print 'AuthenticationComplete: %s, 0x%02X' % (bda, status) |
| if 'AuthenticationComplete' in self.exit_events: |
| self.main_loop.quit() |
| |
| def dev_up(self): |
| print 'Up' |
| |
| def dev_down(self): |
| print 'Down' |
| |
| @dbus.decorators.explicitly_pass_message |
| def dev_name_changed(*args, **keywords): |
| name = args[1] |
| dbus_message = keywords["dbus_message"] |
| print 'Device %s name changed: %s' % (dbus_message.get_path(), name) |
| |
| def signal_cb(self, sig, frame): |
| print 'Caught signal, exiting' |
| if self.at_interrupt: |
| self.at_interrupt() |
| self.main_loop.quit() |
| |
| def run(self): |
| # Manager methods |
| if self.listen: |
| print 'Listening for events...' |
| elif self.cmd == 'DeviceList': |
| for dev in self.manager.DeviceList(): |
| print dev |
| elif self.cmd == 'DefaultDevice': |
| print self.manager.DefaultDevice() |
| |
| # Device methods |
| elif self.cmd == 'Up': |
| try: |
| self.dev.Up() |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| elif self.cmd == 'Down': |
| try: |
| self.dev.Down() |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| elif self.cmd == 'SetProperty': |
| if len(self.cmd_args) < 2: |
| print 'Usage: %s -i <dev> SetProperty strPropName arg' % self.name |
| sys.exit(1) |
| try: |
| if self.cmd_args[0].lower() in dev_setprop_bool: |
| self.dev.SetProperty(self.cmd_args[0], dbus.Boolean(self.cmd_args[1])) |
| elif self.cmd_args[0].lower() in dev_setprop_byte: |
| self.dev.SetProperty(self.cmd_args[0], dbus.Byte(self.cmd_args[1])) |
| else: |
| self.dev.SetProperty(self.cmd_args[0], self.cmd_args[1]) |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| elif self.cmd == 'GetProperty': |
| if len(self.cmd_args) < 1: |
| print 'Usage: %s -i <dev> GetProperty strPropName' % self.name |
| sys.exit(1) |
| try: |
| print self.dev.GetProperty(self.cmd_args[0]) |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| # Device.Controller methods |
| elif self.cmd == 'Inquiry': |
| try: |
| if len(self.cmd_args) != 2: |
| self.ctl.Inquiry() |
| else: |
| length, lap = self.cmd_args |
| self.ctl.Inquiry(dbus.Byte(length), dbus.UInt32(long(lap, 0))) |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| self.listen = True |
| self.exit_events.append('InquiryComplete') |
| self.at_interrupt = self.ctl.CancelInquiry |
| |
| elif self.cmd == 'CancelInquiry': |
| try: |
| self.ctl.CancelInquiry() |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| |
| elif self.cmd == 'RemoteName': |
| if len(self.cmd_args) < 1: |
| print 'Bluetooth address needed' |
| sys.exit(1) |
| try: |
| self.ctl.RemoteName(self.cmd_args[0]) |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| self.listen = True |
| self.exit_events.append('RemoteNameFailed') |
| self.exit_events.append('RemoteName') |
| |
| elif self.cmd == 'PeriodicInquiry': |
| try: |
| if len(self.cmd_args) < 3: |
| length, min, max = (6, 20, 60) |
| self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max)) |
| elif len(self.cmd_args) == 3: |
| length, min, max = self.cmd_args |
| self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max)) |
| else: |
| length, min, max, lap = self.cmd_args |
| self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max), |
| dbus.UInt32(long(lap, 0))) |
| self.listen = True |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| |
| elif self.cmd == 'CancelPeriodic': |
| try: |
| self.ctl.CancelPeriodic() |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| |
| elif self.cmd == 'Authenticate': |
| if len(self.cmd_args) < 1: |
| print 'Bluetooth address needed' |
| sys.exit(1) |
| try: |
| self.ctl.Authenticate(self.cmd_args[0]) |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| self.listen = True |
| self.exit_events.append('AuthenticationComplete') |
| |
| elif self.cmd == 'RoleSwitch': |
| if len(self.cmd_args) < 2: |
| print 'Bluetooth address and role needed' |
| exit.exit(1) |
| bda, role = self.cmd_args |
| if not (role == '0' or role == '1'): |
| print 'Role should be 0 (master) or 1 (slave)' |
| sys.exit(1) |
| try: |
| self.ctl.RoleSwitch(bda, dbus.Byte(role)) |
| except dbus.DBusException, e: |
| print 'Sending %s failed: %s' % (self.cmd, e) |
| sys.exit(1) |
| |
| elif self.cmd == 'Connections': |
| connections = self.ctl.Connections() |
| for conn in connections: |
| print conn |
| |
| else: |
| print 'Unknown command: %s' % self.cmd |
| sys.exit(1) |
| |
| if self.listen: |
| signal(SIGINT, self.signal_cb) |
| signal(SIGTERM, self.signal_cb) |
| self.main_loop = gobject.MainLoop() |
| self.main_loop.run() |
| |
| if __name__ == '__main__': |
| gobject.threads_init() |
| dbus.glib.init_threads() |
| |
| tester = Tester(sys.argv) |
| tester.run() |
| |