| #!/usr/bin/python |
| |
| from __future__ import print_function |
| |
| import dbus |
| import dbus.exceptions |
| import dbus.mainloop.glib |
| import dbus.service |
| |
| import array |
| |
| try: |
| from gi.repository import GObject # python3 |
| except ImportError: |
| import gobject as GObject # python2 |
| |
| from random import randint |
| |
| mainloop = None |
| |
| BLUEZ_SERVICE_NAME = 'org.bluez' |
| LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' |
| DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' |
| DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' |
| |
| LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' |
| |
| |
| class InvalidArgsException(dbus.exceptions.DBusException): |
| _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' |
| |
| |
| class NotSupportedException(dbus.exceptions.DBusException): |
| _dbus_error_name = 'org.bluez.Error.NotSupported' |
| |
| |
| class NotPermittedException(dbus.exceptions.DBusException): |
| _dbus_error_name = 'org.bluez.Error.NotPermitted' |
| |
| |
| class InvalidValueLengthException(dbus.exceptions.DBusException): |
| _dbus_error_name = 'org.bluez.Error.InvalidValueLength' |
| |
| |
| class FailedException(dbus.exceptions.DBusException): |
| _dbus_error_name = 'org.bluez.Error.Failed' |
| |
| |
| class Advertisement(dbus.service.Object): |
| PATH_BASE = '/org/bluez/example/advertisement' |
| |
| def __init__(self, bus, index, advertising_type): |
| self.path = self.PATH_BASE + str(index) |
| self.bus = bus |
| self.ad_type = advertising_type |
| self.service_uuids = None |
| self.manufacturer_data = None |
| self.solicit_uuids = None |
| self.service_data = None |
| self.local_name = None |
| self.include_tx_power = None |
| self.data = None |
| dbus.service.Object.__init__(self, bus, self.path) |
| |
| def get_properties(self): |
| properties = dict() |
| properties['Type'] = self.ad_type |
| if self.service_uuids is not None: |
| properties['ServiceUUIDs'] = dbus.Array(self.service_uuids, |
| signature='s') |
| if self.solicit_uuids is not None: |
| properties['SolicitUUIDs'] = dbus.Array(self.solicit_uuids, |
| signature='s') |
| if self.manufacturer_data is not None: |
| properties['ManufacturerData'] = dbus.Dictionary( |
| self.manufacturer_data, signature='qv') |
| if self.service_data is not None: |
| properties['ServiceData'] = dbus.Dictionary(self.service_data, |
| signature='sv') |
| if self.local_name is not None: |
| properties['LocalName'] = dbus.String(self.local_name) |
| if self.include_tx_power is not None: |
| properties['IncludeTxPower'] = dbus.Boolean(self.include_tx_power) |
| |
| if self.data is not None: |
| properties['Data'] = dbus.Dictionary( |
| self.data, signature='yv') |
| return {LE_ADVERTISEMENT_IFACE: properties} |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| def add_service_uuid(self, uuid): |
| if not self.service_uuids: |
| self.service_uuids = [] |
| self.service_uuids.append(uuid) |
| |
| def add_solicit_uuid(self, uuid): |
| if not self.solicit_uuids: |
| self.solicit_uuids = [] |
| self.solicit_uuids.append(uuid) |
| |
| def add_manufacturer_data(self, manuf_code, data): |
| if not self.manufacturer_data: |
| self.manufacturer_data = dbus.Dictionary({}, signature='qv') |
| self.manufacturer_data[manuf_code] = dbus.Array(data, signature='y') |
| |
| def add_service_data(self, uuid, data): |
| if not self.service_data: |
| self.service_data = dbus.Dictionary({}, signature='sv') |
| self.service_data[uuid] = dbus.Array(data, signature='y') |
| |
| def add_local_name(self, name): |
| if not self.local_name: |
| self.local_name = "" |
| self.local_name = dbus.String(name) |
| |
| def add_data(self, ad_type, data): |
| if not self.data: |
| self.data = dbus.Dictionary({}, signature='yv') |
| self.data[ad_type] = dbus.Array(data, signature='y') |
| |
| @dbus.service.method(DBUS_PROP_IFACE, |
| in_signature='s', |
| out_signature='a{sv}') |
| def GetAll(self, interface): |
| print('GetAll') |
| if interface != LE_ADVERTISEMENT_IFACE: |
| raise InvalidArgsException() |
| print('returning props') |
| return self.get_properties()[LE_ADVERTISEMENT_IFACE] |
| |
| @dbus.service.method(LE_ADVERTISEMENT_IFACE, |
| in_signature='', |
| out_signature='') |
| def Release(self): |
| print('%s: Released!' % self.path) |
| |
| class TestAdvertisement(Advertisement): |
| |
| def __init__(self, bus, index): |
| Advertisement.__init__(self, bus, index, 'peripheral') |
| self.add_service_uuid('180D') |
| self.add_service_uuid('180F') |
| self.add_manufacturer_data(0xffff, [0x00, 0x01, 0x02, 0x03, 0x04]) |
| self.add_service_data('9999', [0x00, 0x01, 0x02, 0x03, 0x04]) |
| self.add_local_name('TestAdvertisement') |
| self.include_tx_power = True |
| self.add_data(0x26, [0x01, 0x01, 0x00]) |
| |
| |
| def register_ad_cb(): |
| print('Advertisement registered') |
| |
| |
| def register_ad_error_cb(error): |
| print('Failed to register advertisement: ' + str(error)) |
| mainloop.quit() |
| |
| |
| def find_adapter(bus): |
| remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), |
| DBUS_OM_IFACE) |
| objects = remote_om.GetManagedObjects() |
| |
| for o, props in objects.items(): |
| if LE_ADVERTISING_MANAGER_IFACE in props: |
| return o |
| |
| return None |
| |
| |
| def main(): |
| global mainloop |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| |
| adapter = find_adapter(bus) |
| if not adapter: |
| print('LEAdvertisingManager1 interface not found') |
| return |
| |
| adapter_props = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), |
| "org.freedesktop.DBus.Properties"); |
| |
| adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1)) |
| |
| ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), |
| LE_ADVERTISING_MANAGER_IFACE) |
| |
| test_advertisement = TestAdvertisement(bus, 0) |
| |
| mainloop = GObject.MainLoop() |
| |
| ad_manager.RegisterAdvertisement(test_advertisement.get_path(), {}, |
| reply_handler=register_ad_cb, |
| error_handler=register_ad_error_cb) |
| |
| mainloop.run() |
| |
| if __name__ == '__main__': |
| main() |