JFIF$        dd7 

Viewing File: /usr/lib/python3.9/site-packages/tuned/utils/nettool.py

__all__ = ["ethcard"]

import tuned.logs
from subprocess import *
import re

log = tuned.logs.get()

class Nettool:

	_advertise_values = { # [ half, full ]
		10 : [ 0x001, 0x002 ],
		100 : [ 0x004, 0x008 ],
		1000 : [ 0x010, 0x020 ],
		2500 : [ 0, 0x8000 ], 
		10000 : [ 0, 0x1000 ],
		"auto" : 0x03F
	}

	_disabled = False

	def __init__(self, interface):
		self._interface = interface;
		self.update()

		log.debug("%s: speed %s, full duplex %s, autoneg %s, link %s" % (interface, self.speed, self.full_duplex, self.autoneg, self.link)) 
		log.debug("%s: supports: autoneg %s, modes %s" % (interface, self.supported_autoneg, self.supported_modes))
		log.debug("%s: advertises: autoneg %s, modes %s" % (interface, self.advertised_autoneg, self.advertised_modes))

#	def __del__(self):
#		if self.supported_autoneg:
#			self._set_advertise(self._advertise_values["auto"])

	def _clean_status(self):
		self.speed = 0
		self.full_duplex = False
		self.autoneg = False
		self.link = False

		self.supported_modes = []
		self.supported_autoneg = False

		self.advertised_modes = []
		self.advertised_autoneg = False

	def _calculate_mode(self, modes):
		mode = 0;
		for m in modes:
			mode += self._advertise_values[m[0]][ 1 if m[1] else 0 ]

		return mode

	def _set_autonegotiation(self, enable):
		if self.autoneg == enable:
			return True

		if not self.supported_autoneg:
			return False

		return 0 == call(["ethtool", "-s", self._interface, "autoneg", "on" if enable else "off"], close_fds=True)

	def _set_advertise(self, value):
		if not self._set_autonegotiation(True):
			return False

		return 0 == call(["ethtool", "-s", self._interface, "advertise", "0x%03x" % value], close_fds=True)

	def get_max_speed(self):
		max = 0
		for mode in self.supported_modes:
			if mode[0] > max: max = mode[0]

		if max > 0:
			return max
		else:
			return 1000

	def set_max_speed(self):
		if self._disabled or not self.supported_autoneg:
			return False

		#if self._set_advertise(self._calculateMode(self.supported_modes)):
		if self._set_advertise(self._advertise_values["auto"]):
			self.update()
			return True
		else:
			return False

	def set_speed(self, speed):
		if self._disabled or not self.supported_autoneg:
			return False

		mode = 0
		for am in self._advertise_values:
			if am == "auto": continue
			if am <= speed:
				mode += self._advertise_values[am][0];
				mode += self._advertise_values[am][1];

		effective_mode = mode & self._calculate_mode(self.supported_modes)

		log.debug("%s: set_speed(%d) - effective_mode 0x%03x" % (self._interface, speed, effective_mode))

		if self._set_advertise(effective_mode):
			self.update()
			return True
		else:
			return False

	def update(self):
		if self._disabled:
			return

		# run ethtool and preprocess output

		p_ethtool = Popen(["ethtool", self._interface], \
				stdout=PIPE, stderr=PIPE, close_fds=True, \
				universal_newlines = True)
		p_filter = Popen(["sed", r"s/^\s*//;s/:\s*/:\n/g"], \
				stdin=p_ethtool.stdout, stdout=PIPE, \
				universal_newlines = True, \
				close_fds=True)

		output = p_filter.communicate()[0]
		errors = p_ethtool.communicate()[1]

		if errors != "":
			log.warning("%s: some errors were reported by 'ethtool'" % self._interface)
			log.debug("%s: %s" % (self._interface, errors.replace("\n", r"\n")))
			self._clean_status()
			self._disabled = True
			return

		# parses output - kind of FSM

		self._clean_status()

		re_speed = re.compile(r"(\d+)")
		re_mode = re.compile(r"(\d+)baseT/(Half|Full)")

		state = "wait"

		for line in output.split("\n"):

			if line.endswith(":"):
				section = line[:-1]
				if section == "Speed": state = "speed"
				elif section == "Duplex": state = "duplex"
				elif section == "Auto-negotiation": state = "autoneg"
				elif section == "Link detected": state = "link"
				elif section == "Supported link modes": state = "supported_modes"
				elif section == "Supports auto-negotiation": state = "supported_autoneg"
				elif section == "Advertised link modes": state = "advertised_modes"
				elif section == "Advertised auto-negotiation": state = "advertised_autoneg"
				else: state = "wait"
				del section

			elif state == "speed":
				# Try to determine speed. If it fails, assume 1gbit ethernet
				try:
					self.speed = re_speed.match(line).group(1)
				except:
					self.speed = 1000
				state = "wait"

			elif state == "duplex":
				self.full_duplex = line == "Full"
				state = "wait"

			elif state == "autoneg":
				self.autoneg = (line == "yes" or line == "on")
				state = "wait"

			elif state == "link":
				self.link = line == "yes"
				state = "wait"

			elif state == "supported_modes":
				# Try to determine supported modes. If it fails, assume 1gibt ethernet fullduplex works
				try:
					for m in line.split():
						(s, d) = re_mode.match(m).group(1,2)
						self.supported_modes.append( (int(s), d == "Full") )
					del m,s,d
				except:
					self.supported_modes.append((1000, True))
	

			elif state == "supported_autoneg":
				self.supported_autoneg = line == "Yes"
				state = "wait"

			elif state == "advertised_modes":
				# Try to determine advertised modes. If it fails, assume 1gibt ethernet fullduplex works
				try:
					if line != "Not reported":
						for m in line.split():
							(s, d) = re_mode.match(m).group(1,2)
							self.advertised_modes.append( (int(s), d == "Full") )
						del m,s,d
				except:
					self.advertised_modes.append((1000, True))

			elif state == "advertised_autoneg":
				self.advertised_autoneg = line == "Yes"
				state = "wait"

def ethcard(interface):
	if not interface in ethcard.list:
		ethcard.list[interface] = Nettool(interface)

	return ethcard.list[interface]

ethcard.list = {}

Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg