JFIF$        dd7 

Viewing File: /usr/lib/python3.9/site-packages/tuned/admin/dbus_controller.py

import dbus
import dbus.exceptions
import time
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib, GObject
from .exceptions import TunedAdminDBusException

__all__ = ["DBusController"]

class DBusController(object):
	def __init__(self, bus_name, interface_name, object_name, debug = False):
		self._bus_name = bus_name
		self._interface_name = interface_name
		self._object_name = object_name
		self._proxy = None
		self._interface = None
		self._debug = debug
		self._main_loop = None
		self._action = None
		self._on_exit_action = None
		self._ret = True
		self._exit = False
		self._exception = None

	def _init_proxy(self):
		try:
			if self._proxy is None:
				DBusGMainLoop(set_as_default=True)
				self._main_loop = GLib.MainLoop()
				bus = dbus.SystemBus()
				self._proxy = bus.get_object(self._bus_name, self._object_name)
				self._interface = dbus.Interface(self._proxy, dbus_interface = self._interface_name)
		except dbus.exceptions.DBusException:
			raise TunedAdminDBusException("Cannot talk to TuneD daemon via DBus. Is TuneD daemon running?")

	def _idle(self):
		if self._action is not None:
			# This may (and very probably will) run in child thread, so catch and pass exceptions to the main thread
			try:
				self._action_exit_code = self._action(*self._action_args, **self._action_kwargs)
			except TunedAdminDBusException as e:
				self._exception = e
				self._exit = True

		if self._exit:
			if self._on_exit_action is not None:
				self._on_exit_action(*self._on_exit_action_args,
						**self._on_exit_action_kwargs)
			self._main_loop.quit()
			return False
		else:
			time.sleep(1)
		return True

	def set_on_exit_action(self, action, *args, **kwargs):
		self._on_exit_action = action
		self._on_exit_action_args = args
		self._on_exit_action_kwargs = kwargs

	def set_action(self, action, *args, **kwargs):
		self._action = action
		self._action_args = args
		self._action_kwargs = kwargs

	def run(self):
		self._exception = None
		GLib.idle_add(self._idle)
		self._main_loop.run()
		# Pass exception happened in child thread to the caller
		if self._exception is not None:
			raise self._exception
		return self._ret

	def _call(self, method_name, *args, **kwargs):
		self._init_proxy()

		try:
			method = self._interface.get_dbus_method(method_name)
			return method(*args, timeout=40)
		except dbus.exceptions.DBusException as dbus_exception:
			err_str = "DBus call to TuneD daemon failed"
			if self._debug:
				err_str += " (%s)" % str(dbus_exception)
			raise TunedAdminDBusException(err_str)

	def set_signal_handler(self, signal, cb):
		self._init_proxy()
		self._proxy.connect_to_signal(signal, cb)

	def is_running(self):
		return self._call("is_running")

	def start(self):
		return self._call("start")

	def stop(self):
		return self._call("stop")

	def profiles(self):
		return self._call("profiles")

	def profiles2(self):
		return self._call("profiles2")

	def profile_info(self, profile_name):
		return self._call("profile_info", profile_name)

	def log_capture_start(self, log_level, timeout):
		return self._call("log_capture_start", log_level, timeout)

	def log_capture_finish(self, token):
		return self._call("log_capture_finish", token)

	def active_profile(self):
		return self._call("active_profile")

	def profile_mode(self):
		return self._call("profile_mode")

	def post_loaded_profile(self):
		return self._call("post_loaded_profile")

	def switch_profile(self, new_profile):
		if new_profile == "":
			return (False, "No profile specified")
		return self._call("switch_profile", new_profile)

	def auto_profile(self):
		return self._call("auto_profile")

	def recommend_profile(self):
		return self._call("recommend_profile")

	def verify_profile(self):
		return self._call("verify_profile")

	def verify_profile_ignore_missing(self):
		return self._call("verify_profile_ignore_missing")

	def off(self):
		return self._call("disable")

	def get_plugins(self):
		"""Return dict with plugin names and their hints

		Return:
		dictionary -- {plugin_name: {parameter_name: default_value}}
		"""
		return self._call("get_all_plugins")

	def get_plugin_documentation(self, plugin_name):
		"""Return docstring of plugin's class"""
		return self._call("get_plugin_documentation", plugin_name)

	def get_plugin_hints(self, plugin_name):
		"""Return dictionary with parameters of plugin and their hints

		Parameters:
		plugin_name -- name of plugin

		Return:
		dictionary -- {parameter_name: hint}
		"""
		return self._call("get_plugin_hints", plugin_name)

	def instance_acquire_devices(self, devices, instance):
		return self._call("instance_acquire_devices", devices, instance)

	def get_instances(self, plugin_name):
		return self._call("get_instances", plugin_name)

	def instance_get_devices(self, instance):
		return self._call("instance_get_devices", instance)

	def exit(self, ret):
		self.set_action(None)
		self._ret = ret
		self._exit = True
		return ret
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg