#!/usr/bin/env python
"""
-A simpe client for an onoff device. If no parameters are given, the device
-is activated for 10 seconds. When a command is given, the device is activated,
-then the command is run. 10 seconds after the command finishes, the device is
+A simpe client for an onoff device. If no command is given, the device is
+temporarily activated. When a command is given, the device is activated, then
+the command is run. Some time after the command finishes, the device is
released.
"""
import argparse
import os
+import xml.parsers.expat
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GObject
from onoff.common import ST_ACTIVE
import onoff.dbusutils
-def wait_for_signal(proxy, signal):
+def wait_for_signal(proxy, signal, desired_state):
loop = GObject.MainLoop()
- state = []
def callback(st):
- state.append(st)
- loop.quit()
+ if st == desired_state:
+ loop.quit()
proxy.connect_to_signal(signal, callback)
loop.run()
- return state[0]
+
+def parse_introspection(xmlstring):
+ parser = xml.parsers.expat.ParserCreate()
+ nodes = []
+ def start_element(name, attrs):
+ if name != "node":
+ return
+ try:
+ value = attrs["name"]
+ except KeyError:
+ return
+ nodes.append(value)
+ parser.StartElementHandler = start_element
+ parser.Parse(xmlstring)
+ return nodes
def main():
parser = argparse.ArgumentParser(parents=[onoff.dbusutils.dbus_options])
parser.add_argument("--duration", type=int, default=10,
help="how long to activate the device in seconds " +
"(default: %(default)d")
- parser.add_argument("command", nargs=argparse.REMAINDER)
+ parser.add_argument("command", nargs=argparse.REMAINDER,
+ help="a command to be executed with the device being" +
+ "activated for the duration of the execution")
+ parser.add_argument("--list", action="store_true",
+ help="list available devices and exit")
args = parser.parse_args()
DBusGMainLoop(set_as_default=True)
- proxy = onoff.dbusutils.get_dbus_proxy(args)
- if args.command:
- st, fd = proxy.activatefd(args.duration)
+ if args.list:
+ bus = onoff.dbusutils.get_dbus(args)
+ proxy = bus.get_object(args.busname, onoff.dbusutils.object_prefix)
+ for elem in parse_introspection(proxy.Introspect()):
+ print(elem)
+ elif args.command:
+ proxy = onoff.dbusutils.get_dbus_proxy(args)
+ st, fd = proxy.activatefd()
fd = fd.take()
os.dup2(fd, 254)
os.close(fd)
if st != ST_ACTIVE:
print("state is %d waiting for signal" % st)
- st = wait_for_signal(proxy, "changestate")
- print("new state is %d" % st)
+ wait_for_signal(proxy, "changestate", ST_ACTIVE)
+ print("new state is actived")
os.execvp(args.command[0], args.command)
else:
+ proxy = onoff.dbusutils.get_dbus_proxy(args)
st = proxy.activatetime(args.duration)
if st != ST_ACTIVE:
print("state is %d waiting for signal" % st)
- st = wait_for_signal(proxy, "changestate")
- print("new state is %d" % st)
+ wait_for_signal(proxy, "changestate", ST_ACTIVE)
+ print("new state is active")
if __name__ == "__main__":
main()