| #!/usr/bin/python |
| # -*- coding: utf-8 -*- |
| |
| import dbus |
| import dbus.service |
| import gobject |
| from dbus.mainloop.glib import DBusGMainLoop |
| import sys |
| |
| DBusGMainLoop(set_as_default=True) |
| loop = gobject.MainLoop() |
| |
| bus = dbus.SystemBus() |
| |
| def sig_received(*args, **kwargs): |
| if "member" not in kwargs: |
| return |
| if "path" not in kwargs: |
| return; |
| sig_name = kwargs["member"] |
| path = kwargs["path"] |
| print sig_name |
| print path |
| if sig_name == "PropertyChanged": |
| k, v = args |
| print k |
| print v |
| else: |
| ob = args[0] |
| print ob |
| |
| |
| def enter_mainloop(): |
| bus.add_signal_receiver(sig_received, bus_name="org.bluez", |
| dbus_interface = "org.bluez.HealthDevice", |
| path_keyword="path", |
| member_keyword="member", |
| interface_keyword="interface") |
| |
| try: |
| print "Entering main lopp, push Ctrl+C for finish" |
| |
| mainloop = gobject.MainLoop() |
| mainloop.run() |
| except KeyboardInterrupt: |
| pass |
| finally: |
| print "Exiting, bye" |
| |
| hdp_manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), |
| "org.bluez.HealthManager") |
| |
| role = None |
| while role == None: |
| print "Select 1. source or 2. sink: ", |
| try: |
| sel = int(sys.stdin.readline()) |
| if sel == 1: |
| role = "Source" |
| elif sel == 2: |
| role = "Sink" |
| else: |
| raise ValueError |
| except (TypeError, ValueError): |
| print "Wrong selection, try again: ", |
| except KeyboardInterrupt: |
| sys.exit() |
| |
| dtype = None |
| while dtype == None: |
| print "Select a data type: ", |
| try: |
| sel = int(sys.stdin.readline()) |
| if (sel < 0) or (sel > 65535): |
| raise ValueError |
| dtype = sel; |
| except (TypeError, ValueError): |
| print "Wrong selection, try again: ", |
| except KeyboardInterrupt: |
| sys.exit() |
| |
| pref = None |
| if role == "Source": |
| while pref == None: |
| try: |
| print "Select a preferred data channel type 1.", |
| print "reliable 2. streaming: ", |
| sel = int(sys.stdin.readline()) |
| if sel == 1: |
| pref = "Reliable" |
| elif sel == 2: |
| pref = "Streaming" |
| else: |
| raise ValueError |
| |
| except (TypeError, ValueError): |
| print "Wrong selection, try again" |
| except KeyboardInterrupt: |
| sys.exit() |
| |
| app_path = hdp_manager.CreateApplication({ |
| "DataType": dbus.types.UInt16(dtype), |
| "Role": role, |
| "Description": "Test Source", |
| "ChannelType": pref}) |
| else: |
| app_path = hdp_manager.CreateApplication({ |
| "DataType": dbus.types.UInt16(dtype), |
| "Description": "Test sink", |
| "Role": role}) |
| |
| print "New application created:", app_path |
| |
| con = None |
| while con == None: |
| try: |
| print "Connect to a remote device (y/n)? ", |
| sel = sys.stdin.readline() |
| if sel in ("y\n", "yes\n", "Y\n", "YES\n"): |
| con = True |
| elif sel in ("n\n", "no\n", "N\n", "NO\n"): |
| con = False |
| else: |
| print "Wrong selection, try again." |
| except KeyboardInterrupt: |
| sys.exit() |
| |
| if not con: |
| enter_mainloop() |
| sys.exit() |
| |
| manager = dbus.Interface(bus.get_object("org.bluez", "/"), |
| "org.bluez.Manager") |
| |
| adapters = manager.ListAdapters() |
| |
| i = 1 |
| for ad in adapters: |
| print "%d. %s" % (i, ad) |
| i = i + 1 |
| |
| print "Select an adapter: ", |
| select = None |
| while select == None: |
| try: |
| pos = int(sys.stdin.readline()) - 1 |
| if pos < 0: |
| raise TypeError |
| select = adapters[pos] |
| except (TypeError, IndexError, ValueError): |
| print "Wrong selection, try again: ", |
| except KeyboardInterrupt: |
| sys.exit() |
| |
| adapter = dbus.Interface(bus.get_object("org.bluez", select), |
| "org.bluez.Adapter") |
| |
| devices = adapter.ListDevices() |
| |
| if len(devices) == 0: |
| print "No devices available" |
| sys.exit() |
| |
| i = 1 |
| for dev in devices: |
| print "%d. %s" % (i, dev) |
| i = i + 1 |
| |
| print "Select a device: ", |
| select = None |
| while select == None: |
| try: |
| pos = int(sys.stdin.readline()) - 1 |
| if pos < 0: |
| raise TypeError |
| select = devices[pos] |
| except (TypeError, IndexError, ValueError): |
| print "Wrong selection, try again: ", |
| except KeyboardInterrupt: |
| sys.exit() |
| |
| device = dbus.Interface(bus.get_object("org.bluez", select), |
| "org.bluez.HealthDevice") |
| |
| echo = None |
| while echo == None: |
| try: |
| print "Perform an echo (y/n)? ", |
| sel = sys.stdin.readline() |
| if sel in ("y\n", "yes\n", "Y\n", "YES\n"): |
| echo = True |
| elif sel in ("n\n", "no\n", "N\n", "NO\n"): |
| echo = False |
| else: |
| print "Wrong selection, try again." |
| except KeyboardInterrupt: |
| sys.exit() |
| |
| if echo: |
| if device.Echo(): |
| print "Echo was ok" |
| else: |
| print "Echo war wrong, exiting" |
| sys.exit() |
| |
| print "Connecting to device %s" % (select) |
| |
| if role == "Source": |
| chan = device.CreateChannel(app_path, "Reliable") |
| else: |
| chan = device.CreateChannel(app_path, "Any") |
| |
| print chan |
| |
| enter_mainloop() |
| |
| hdp_manager.DestroyApplication(app_path) |