blob: 00cdfedab95c4ac5cb555119c3ea01ed230c6bc0 [file] [log] [blame]
#!/usr/bin/env python
import dbus
import dbus.decorators
import dbus.glib
import gobject
import sys
import getopt
from signal import *
mgr_cmds = [ "DeviceList", "DefaultDevice" ]
dev_cmds = [ "Up", "Down", "SetProperty", "GetProperty", "Inquiry",
"CancelInquiry", "PeriodicInquiry","CancelPeriodic", "RemoteName",
"Connections", "Authenticate", "RoleSwitch" ]
dev_setprop_bool = [ "auth", "encrypt", "discoverable", "connectable" ]
dev_setprop_byte = [ "incmode" ]
class Tester:
exit_events = []
dev_path = None
need_dev = False
listen = False
at_interrupt = None
def __init__(self, argv):
self.name = argv[0]
self.parse_args(argv[1:])
try:
self.dbus_setup()
except dbus.DBusException, e:
print 'Failed to do D-BUS setup: %s' % e
sys.exit(1)
self.dev_setup()
def parse_args(self, argv):
try:
opts, args = getopt.getopt(argv, "hli:")
except getopt.GetoptError:
self.usage()
sys.exit(1)
for o, a in opts:
if o == "-h":
self.usage()
sys.exit()
elif o == "-l":
self.need_dev = True
self.listen = True
elif o == "-i":
if a[0] == '/':
self.dev_path = a
else:
self.dev_path = '/org/bluez/Device/%s' % a
if not (args or self.listen):
self.usage()
sys.exit(1)
if args:
self.cmd = args[0]
self.cmd_args = args[1:]
if not self.cmd in mgr_cmds:
self.need_dev = True
def dev_setup(self):
if self.need_dev and not self.dev_path:
try:
self.dev_path = self.manager.DefaultDevice()
except dbus.DBusException, e:
print 'Failed to get default device: %s' % e
sys.exit(1)
if self.dev_path:
try:
obj = self.bus.get_object('org.bluez', self.dev_path)
self.dev = dbus.Interface(obj, 'org.bluez.Device')
self.dev.connect_to_signal('Up', self.dev_up)
self.dev.connect_to_signal('Down', self.dev_down)
self.bus.add_signal_receiver(self.dev_name_changed, 'DeviceNameChanged',
'org.bluez.Device', 'org.bluez',
'/org/bluez/Device/hci0')
obj = self.bus.get_object('org.bluez', '%s/Controller' % self.dev_path)
self.ctl = dbus.Interface(obj, 'org.bluez.Device.Controller')
self.ctl.connect_to_signal('InquiryStart', self.inquiry_start)
self.ctl.connect_to_signal('InquiryResult', self.inquiry_result)
self.ctl.connect_to_signal('InquiryComplete', self.inquiry_complete)
self.ctl.connect_to_signal('RemoteName', self.remote_name)
self.ctl.connect_to_signal('RemoteNameFailed', self.remote_name_failed)
self.ctl.connect_to_signal('AuthenticationComplete', self.authentication_complete)
except dbus.DBusException, e:
print 'Failed to setup device path: %s' % e
sys.exit(1)
def dbus_setup(self):
self.bus = dbus.SystemBus()
manager_obj = self.bus.get_object('org.bluez', '/org/bluez/Manager')
self.manager = dbus.Interface(manager_obj, 'org.bluez.Manager')
self.manager.connect_to_signal('DeviceAdded', self.device_added)
self.manager.connect_to_signal('DeviceRemoved', self.device_removed)
def usage(self):
print 'Usage: %s [-i <dev>] [-l] [-h] <cmd> [arg1..]' % self.name
print ' -i <dev> Specify device (e.g. "hci0" or "/org/bluez/Device/hci0")'
print ' -l Listen for events (no command required)'
print ' -h Show this help'
print 'Manager commands:'
for cmd in mgr_cmds:
print '\t%s' % cmd
print 'Device commands:'
for cmd in dev_cmds:
print '\t%s' % cmd
def device_added(self, path):
print 'DeviceAdded: %s' % path
def device_removed(self, path):
print 'DeviceRemoved: %s' % path
def remote_name(self, bda, name):
print 'RemoteName: %s, %s' % (bda, name)
if 'RemoteName' in self.exit_events:
self.main_loop.quit()
def remote_name_failed(self, bda, status):
print 'RemoteNameFailed: %s, 0x%02X' % (bda, status)
if 'RemoteNameFailed' in self.exit_events:
self.main_loop.quit()
def inquiry_start(self):
print 'InquiryStart'
def inquiry_complete(self):
print 'InquiryComplete'
if 'InquiryComplete' in self.exit_events:
self.main_loop.quit()
def inquiry_result(self, bda, cls, rssi):
print 'InquiryResult: %s, %06X, %02X' % (bda, cls, rssi)
def authentication_complete(self, bda, status):
print 'AuthenticationComplete: %s, 0x%02X' % (bda, status)
if 'AuthenticationComplete' in self.exit_events:
self.main_loop.quit()
def dev_up(self):
print 'Up'
def dev_down(self):
print 'Down'
@dbus.decorators.explicitly_pass_message
def dev_name_changed(*args, **keywords):
name = args[1]
dbus_message = keywords["dbus_message"]
print 'Device %s name changed: %s' % (dbus_message.get_path(), name)
def signal_cb(self, sig, frame):
print 'Caught signal, exiting'
if self.at_interrupt:
self.at_interrupt()
self.main_loop.quit()
def run(self):
# Manager methods
if self.listen:
print 'Listening for events...'
elif self.cmd == 'DeviceList':
for dev in self.manager.DeviceList():
print dev
elif self.cmd == 'DefaultDevice':
print self.manager.DefaultDevice()
# Device methods
elif self.cmd == 'Up':
try:
self.dev.Up()
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
elif self.cmd == 'Down':
try:
self.dev.Down()
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
elif self.cmd == 'SetProperty':
if len(self.cmd_args) < 2:
print 'Usage: %s -i <dev> SetProperty strPropName arg' % self.name
sys.exit(1)
try:
if self.cmd_args[0].lower() in dev_setprop_bool:
self.dev.SetProperty(self.cmd_args[0], dbus.Boolean(self.cmd_args[1]))
elif self.cmd_args[0].lower() in dev_setprop_byte:
self.dev.SetProperty(self.cmd_args[0], dbus.Byte(self.cmd_args[1]))
else:
self.dev.SetProperty(self.cmd_args[0], self.cmd_args[1])
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
elif self.cmd == 'GetProperty':
if len(self.cmd_args) < 1:
print 'Usage: %s -i <dev> GetProperty strPropName' % self.name
sys.exit(1)
try:
print self.dev.GetProperty(self.cmd_args[0])
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
# Device.Controller methods
elif self.cmd == 'Inquiry':
try:
if len(self.cmd_args) != 2:
self.ctl.Inquiry()
else:
length, lap = self.cmd_args
self.ctl.Inquiry(dbus.Byte(length), dbus.UInt32(long(lap, 0)))
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
self.listen = True
self.exit_events.append('InquiryComplete')
self.at_interrupt = self.ctl.CancelInquiry
elif self.cmd == 'CancelInquiry':
try:
self.ctl.CancelInquiry()
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
elif self.cmd == 'RemoteName':
if len(self.cmd_args) < 1:
print 'Bluetooth address needed'
sys.exit(1)
try:
self.ctl.RemoteName(self.cmd_args[0])
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
self.listen = True
self.exit_events.append('RemoteNameFailed')
self.exit_events.append('RemoteName')
elif self.cmd == 'PeriodicInquiry':
try:
if len(self.cmd_args) < 3:
length, min, max = (6, 20, 60)
self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max))
elif len(self.cmd_args) == 3:
length, min, max = self.cmd_args
self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max))
else:
length, min, max, lap = self.cmd_args
self.ctl.PeriodicInquiry(dbus.Byte(length), dbus.UInt16(min), dbus.UInt16(max),
dbus.UInt32(long(lap, 0)))
self.listen = True
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
elif self.cmd == 'CancelPeriodic':
try:
self.ctl.CancelPeriodic()
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
elif self.cmd == 'Authenticate':
if len(self.cmd_args) < 1:
print 'Bluetooth address needed'
sys.exit(1)
try:
self.ctl.Authenticate(self.cmd_args[0])
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
self.listen = True
self.exit_events.append('AuthenticationComplete')
elif self.cmd == 'RoleSwitch':
if len(self.cmd_args) < 2:
print 'Bluetooth address and role needed'
exit.exit(1)
bda, role = self.cmd_args
if not (role == '0' or role == '1'):
print 'Role should be 0 (master) or 1 (slave)'
sys.exit(1)
try:
self.ctl.RoleSwitch(bda, dbus.Byte(role))
except dbus.DBusException, e:
print 'Sending %s failed: %s' % (self.cmd, e)
sys.exit(1)
elif self.cmd == 'Connections':
connections = self.ctl.Connections()
for conn in connections:
print conn
else:
print 'Unknown command: %s' % self.cmd
sys.exit(1)
if self.listen:
signal(SIGINT, self.signal_cb)
signal(SIGTERM, self.signal_cb)
self.main_loop = gobject.MainLoop()
self.main_loop.run()
if __name__ == '__main__':
gobject.threads_init()
dbus.glib.init_threads()
tester = Tester(sys.argv)
tester.run()