import argparse
+import logging
+import socket
+import xml.parsers.expat
import dbus
+import dbus.service
+from gi.repository import GObject
+
+logger = logging.getLogger("onoff.dbusutils")
object_prefix = "/de/subdivi/onoff0"
+default_busname = "de.subdivi.onoff0"
dbus_options = argparse.ArgumentParser(add_help=False)
dbus_options.add_argument("--bus", default="session",
choices=("system", "session"),
help="which bus to use (default: %(default)s)")
-dbus_options.add_argument("--busname", type=str, required=True,
- help="which busname (i.e. client) to use")
-dbus_options.add_argument("--device", type=str, required=True,
+dbus_options.add_argument("--busname", type=str, default=default_busname,
+ help="which busname (i.e. client) to use " +
+ "(default: %(default)s)")
+dbus_options.add_argument("--device", type=str,
help="which device to control")
-def get_dbus_proxy(namespace):
+def get_dbus(namespace):
"""
@param namespace: a namespace returned from a dbus_options argument parser
- @returns: a dbus object proxy
+ @rtype: dbus.Bus
+ @returns: the requested bus
"""
if namespace.bus == "session":
- bus = dbus.SessionBus()
+ return dbus.SessionBus()
elif namespace.bus == "system":
- bus = dbus.SystemBus()
+ return dbus.SystemBus()
else:
raise AssertionError("namespace.bus %r is neither session nor system",
namespace.bus)
+
+def get_dbus_proxy(namespace):
+ """
+ @param namespace: a namespace returned from a dbus_options argument parser
+ @returns: a dbus object proxy
+ """
+ bus = get_dbus(namespace)
+ if not namespace.device:
+ raise ValueError("no --device given")
objname = "%s/%s" % (object_prefix, namespace.device)
return bus.get_object(namespace.busname, objname)
+
+def socketpair():
+ """Create a socket pair where the latter end is already wrapped for
+ transmission over dbus.
+
+ @rtype: (socket, dbus.types.UnixFd)
+ """
+ s1, s2 = socket.socketpair()
+ s3 = dbus.types.UnixFd(s2)
+ s2.close()
+ return s1, s3
+
+def list_objects(bus, busname, path=None):
+ """List objects on the given bus and busname starting with path. Only the
+ trailing components after the slash are returned.
+
+ @type bus: dbus.Bus
+ @type busname: str
+ @type path: None or str
+ @param path: prefix for searching objects. Defaults to
+ dbusutils.object_prefix.
+ @rtype: [str]
+ @returns: the trailing components of the objects found
+ """
+ if path is None:
+ path = object_prefix
+ xmlstring = bus.get_object(busname, path).Introspect()
+ parser = xml.parsers.expat.ParserCreate()
+ nodes = []
+ def start_element(name, attrs):
+ if name != "node":
+ return
+ try:
+ value = attrs["name"]
+ except KeyError:
+ return
+ nodes.append(value)
+ parser.StartElementHandler = start_element
+ parser.Parse(xmlstring)
+ return nodes
+
+class OnoffControl(dbus.service.Object):
+ domain = default_busname
+ path = object_prefix
+
+ def __init__(self, bus, name, device):
+ """
+ @type bus: dbus.Bus
+ @type name: str
+ @type device: OnoffDevice
+ """
+ busname = dbus.service.BusName(self.domain, bus=bus)
+ dbus.service.Object.__init__(self, busname, "%s/%s" % (self.path, name))
+ self.device = device
+ self.usecount = 0
+ device.notify.add(self.changestate)
+
+ @dbus.service.signal(domain, signature="q")
+ def changestate(self, state):
+ logger.debug("emitting state %d", state)
+
+ @dbus.service.method(domain, out_signature="q")
+ def state(self):
+ return self.device.state
+
+ @dbus.service.method(domain, in_signature="q", out_signature="q")
+ def activatetime(self, duration):
+ """Activate the device for a given number of seconds."""
+ logger.info("activatetime %d", duration)
+ GObject.timeout_add(duration * 1000, self.unuse)
+ return self.use()
+
+ @dbus.service.method(domain, in_signature="", out_signature="qh")
+ def activatefd(self):
+ """Activate a device until the returned file descriptor is closed."""
+ logger.info("activatefd")
+ notifyfd, retfd = socketpair()
+ def callback(fd, _):
+ logger.info("fd %d completed", fd.fileno())
+ fd.close()
+ self.unuse()
+ return False
+ GObject.io_add_watch(notifyfd, GObject.IO_HUP|GObject.IO_ERR, callback)
+ return (self.use(), retfd)
+
+ def use(self):
+ self.usecount += 1
+ if self.usecount <= 1:
+ self.device.activate()
+ return self.device.state
+
+ def unuse(self):
+ self.usecount -= 1
+ if not self.usecount:
+ self.device.deactivate()
+ else:
+ logger.debug("%d users left", self.usecount)
+ return False