JFIF$        dd7 

Viewing File: /usr/lib/python3.9/site-packages/tuned/exports/dbus_exporter_with_properties.py

from inspect import ismethod
from dbus.service import method, signal
from dbus import PROPERTIES_IFACE
from dbus.exceptions import DBusException
from tuned.exports.dbus_exporter import DBusExporter


class DBusExporterWithProperties(DBusExporter):
    def __init__(self, bus_name, interface_name, object_name, namespace):
        super(DBusExporterWithProperties, self).__init__(bus_name, interface_name, object_name, namespace)
        self._property_setters = {}
        self._property_getters = {}

        def Get(_, interface_name, property_name, caller):
            if interface_name != self._interface_name:
                raise DBusException("Unknown interface: %s" % interface_name)
            if property_name not in self._property_getters:
                raise DBusException("No such property: %s" % property_name)
            getter = self._property_getters[property_name]
            return getter(caller)

        def Set(_, interface_name, property_name, value, caller):
            if interface_name != self._interface_name:
                raise DBusException("Unknown interface: %s" % interface_name)
            if property_name not in self._property_setters:
                raise DBusException("No such property: %s" % property_name)
            setter = self._property_setters[property_name]
            setter(value, caller)

        def GetAll(_, interface_name, caller):
            if interface_name != self._interface_name:
                raise DBusException("Unknown interface: %s" % interface_name)
            return {name: getter(caller) for name, getter in self._property_getters.items()}

        def PropertiesChanged(_, interface_name, changed_properties, invalidated_properties):
            if interface_name != self._interface_name:
                raise DBusException("Unknown interface: %s" % interface_name)

        self._dbus_methods["Get"] = method(PROPERTIES_IFACE, in_signature="ss", out_signature="v", sender_keyword="caller")(Get)
        self._dbus_methods["Set"] = method(PROPERTIES_IFACE, in_signature="ssv", sender_keyword="caller")(Set)
        self._dbus_methods["GetAll"] = method(PROPERTIES_IFACE, in_signature="s", out_signature="a{sv}", sender_keyword="caller")(GetAll)
        self._dbus_methods["PropertiesChanged"] = signal(PROPERTIES_IFACE, signature="sa{sv}as")(PropertiesChanged)
        self._signals.add("PropertiesChanged")

    def _auth_wrapper(self, method, action_name):
        def wrapper(*args, **kwargs):
            new_args = self._polkit_auth(action_name, *args)
            if new_args[-1] == "":
                raise DBusException("Unauthorized")
            return method(*new_args, **kwargs)
        return wrapper

    def property_changed(self, property_name, value):
        self.send_signal("PropertiesChanged", self._interface_name, {property_name: value}, {})

    def property_getter(self, method, property_name, action_name=None):
        if not ismethod(method):
            raise Exception("Only bound methods can be exported.")
        if property_name in self._property_getters:
            raise Exception("A getter for this property is already registered.")
        if action_name is not None:
            method = self._auth_wrapper(method, action_name)
        self._property_getters[property_name] = method

    def property_setter(self, method, property_name, action_name=None):
        if not ismethod(method):
            raise Exception("Only bound methods can be exported.")
        if property_name in self._property_setters:
            raise Exception("A setter for this property is already registered.")
        if action_name is not None:
            method = self._auth_wrapper(method, action_name)
        self._property_setters[property_name] = method
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg