3 A simpe client for an onoff device. If no command is given, the device is
4 temporarily activated. When a command is given, the device is activated, then
5 the command is run. Some time after the command finishes, the device is
11 import xml.parsers.expat
13 from dbus.mainloop.glib import DBusGMainLoop
14 from gi.repository import GObject
16 from onoff.common import ST_ACTIVE
17 import onoff.dbusutils
19 def wait_for_signal(proxy, signal, desired_state):
20 loop = GObject.MainLoop()
22 if st == desired_state:
24 proxy.connect_to_signal(signal, callback)
27 def parse_introspection(xmlstring):
28 parser = xml.parsers.expat.ParserCreate()
30 def start_element(name, attrs):
38 parser.StartElementHandler = start_element
39 parser.Parse(xmlstring)
43 parser = argparse.ArgumentParser(parents=[onoff.dbusutils.dbus_options])
44 parser.add_argument("--duration", type=int, default=10,
45 help="how long to activate the device in seconds " +
46 "(default: %(default)d")
47 parser.add_argument("command", nargs=argparse.REMAINDER,
48 help="a command to be executed with the device being" +
49 "activated for the duration of the execution")
50 parser.add_argument("--list", action="store_true",
51 help="list available devices and exit")
52 args = parser.parse_args()
53 DBusGMainLoop(set_as_default=True)
55 bus = onoff.dbusutils.get_dbus(args)
56 proxy = bus.get_object(args.busname, onoff.dbusutils.object_prefix)
57 for elem in parse_introspection(proxy.Introspect()):
60 proxy = onoff.dbusutils.get_dbus_proxy(args)
61 st, fd = proxy.activatefd()
66 print("state is %d waiting for signal" % st)
67 wait_for_signal(proxy, "changestate", ST_ACTIVE)
68 print("new state is actived")
69 os.execvp(args.command[0], args.command)
71 proxy = onoff.dbusutils.get_dbus_proxy(args)
72 st = proxy.activatetime(args.duration)
74 print("state is %d waiting for signal" % st)
75 wait_for_signal(proxy, "changestate", ST_ACTIVE)
76 print("new state is active")
78 if __name__ == "__main__":