8 from dbus.mainloop.glib import DBusGMainLoop
9 from gi.repository import GObject
14 logger = logging.getLogger("dbus_service")
16 def dbus_socket_pair():
17 """Create a socket pair where the latter end is suitable for dbus.
18 @rtype: (socket, dbus.types.UnixFd)
20 s1, s2 = socket.socketpair()
21 s3 = dbus.types.UnixFd(s2)
25 class OnoffControl(dbus.service.Object):
26 domain = "de.subdivi.onoff0"
27 path = "/de/subdivi/onoff0"
29 def __init__(self, bus, name, device):
30 busname = dbus.service.BusName(self.domain, bus=bus)
31 dbus.service.Object.__init__(self, busname, "%s/%s" % (self.path, name))
33 device.notify.add(self.changestate)
36 @dbus.service.signal(domain, signature="q")
37 def changestate(self, st):
38 logger.debug("emitting state %d", st)
40 @dbus.service.method(domain, out_signature="q")
42 return self.device.state
44 @dbus.service.method(domain, in_signature="q", out_signature="q")
45 def activatetime(self, duration):
46 logger.info("activatetime %d", duration)
47 GObject.timeout_add(duration * 1000, self.unuse)
50 @dbus.service.method(domain, in_signature="q", out_signature="qh")
51 def activatefd(self, duration):
52 logger.info("activatefd duration %d", duration)
53 notifyfd, retfd = dbus_socket_pair()
55 logger.info("fd %d completed", fd.fileno())
57 GObject.timeout_add(duration * 1000, self.unuse)
59 GObject.io_add_watch(notifyfd, GObject.IO_HUP|GObject.IO_ERR, callback)
60 return (self.use(), retfd)
64 if self.usecount <= 1:
65 self.device.activate()
66 return self.device.state
71 self.device.deactivate()
73 logger.debug("%d users left", self.usecount)
79 logging.getLogger().setLevel(logging.DEBUG)
80 DBusGMainLoop(set_as_default=True)
81 bus = dbus.SessionBus()
82 dev = onoff.process.OnoffProcess(["redshift"], 3)
83 dev = onoff.common.InvertedDevice(dev)
84 OnoffControl(bus, "redshift", dev)
85 GObject.MainLoop().run()
87 if __name__ == "__main__":