# # Copyright (c) 2011 Thomas Graf <tgraf@suug.ch> # """Module providing access to network links This module provides an interface to view configured network links, modify them and to add and delete virtual network links. The following is a basic example: import netlink.core as netlink import netlink.route.link as link sock = netlink.Socket() sock.connect(netlink.NETLINK_ROUTE) cache = link.LinkCache() # create new empty link cache cache.refill(sock) # fill cache with all configured links eth0 = cache['eth0'] # lookup link "eth0" print eth0 # print basic configuration The module contains the following public classes: - Link -- Represents a network link. Instances can be created directly via the constructor (empty link objects) or via the refill() method of a LinkCache. - LinkCache -- Derived from netlink.Cache, holds any number of network links (Link instances). Main purpose is to keep a local list of all network links configured in the kernel. The following public functions exist: - get_from_kernel(socket, name) """ from __future__ import absolute_import __version__ = '0.1' __all__ = [ 'LinkCache', 'Link', 'get_from_kernel', ] import socket from .. import core as netlink from .. import capi as core_capi from . import capi as capi from .links import inet as inet from .. import util as util # Link statistics definitions RX_PACKETS = 0 TX_PACKETS = 1 RX_BYTES = 2 TX_BYTES = 3 RX_ERRORS = 4 TX_ERRORS = 5 RX_DROPPED = 6 TX_DROPPED = 7 RX_COMPRESSED = 8 TX_COMPRESSED = 9 RX_FIFO_ERR = 10 TX_FIFO_ERR = 11 RX_LEN_ERR = 12 RX_OVER_ERR = 13 RX_CRC_ERR = 14 RX_FRAME_ERR = 15 RX_MISSED_ERR = 16 TX_ABORT_ERR = 17 TX_CARRIER_ERR = 18 TX_HBEAT_ERR = 19 TX_WIN_ERR = 20 COLLISIONS = 21 MULTICAST = 22 IP6_INPKTS = 23 IP6_INHDRERRORS = 24 IP6_INTOOBIGERRORS = 25 IP6_INNOROUTES = 26 IP6_INADDRERRORS = 27 IP6_INUNKNOWNPROTOS = 28 IP6_INTRUNCATEDPKTS = 29 IP6_INDISCARDS = 30 IP6_INDELIVERS = 31 IP6_OUTFORWDATAGRAMS = 32 IP6_OUTPKTS = 33 IP6_OUTDISCARDS = 34 IP6_OUTNOROUTES = 35 IP6_REASMTIMEOUT = 36 IP6_REASMREQDS = 37 IP6_REASMOKS = 38 IP6_REASMFAILS = 39 IP6_FRAGOKS = 40 IP6_FRAGFAILS = 41 IP6_FRAGCREATES = 42 IP6_INMCASTPKTS = 43 IP6_OUTMCASTPKTS = 44 IP6_INBCASTPKTS = 45 IP6_OUTBCASTPKTS = 46 IP6_INOCTETS = 47 IP6_OUTOCTETS = 48 IP6_INMCASTOCTETS = 49 IP6_OUTMCASTOCTETS = 50 IP6_INBCASTOCTETS = 51 IP6_OUTBCASTOCTETS = 52 ICMP6_INMSGS = 53 ICMP6_INERRORS = 54 ICMP6_OUTMSGS = 55 ICMP6_OUTERRORS = 56 class LinkCache(netlink.Cache): """Cache of network links""" def __init__(self, family=socket.AF_UNSPEC, cache=None): if not cache: cache = self._alloc_cache_name('route/link') self._info_module = None self._protocol = netlink.NETLINK_ROUTE self._nl_cache = cache self._set_arg1(family) def __getitem__(self, key): if type(key) is int: link = capi.rtnl_link_get(self._nl_cache, key) else: link = capi.rtnl_link_get_by_name(self._nl_cache, key) if link is None: raise KeyError() else: return Link.from_capi(link) @staticmethod def _new_object(obj): return Link(obj) def _new_cache(self, cache): return LinkCache(family=self.arg1, cache=cache) class Link(netlink.Object): """Network link""" def __init__(self, obj=None): netlink.Object.__init__(self, 'route/link', 'link', obj) self._rtnl_link = self._obj2type(self._nl_object) if self.type: self._module_lookup('netlink.route.links.' + self.type) self.inet = inet.InetLink(self) self.af = {'inet' : self.inet } def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): if exc_type is None: self.change() else: return false @classmethod def from_capi(cls, obj): return cls(capi.link2obj(obj)) @staticmethod def _obj2type(obj): return capi.obj2link(obj) def __cmp__(self, other): return self.ifindex - other.ifindex @staticmethod def _new_instance(obj): if not obj: raise ValueError() return Link(obj) @property @netlink.nlattr(type=int, immutable=True, fmt=util.num) def ifindex(self): """interface index""" return capi.rtnl_link_get_ifindex(self._rtnl_link) @ifindex.setter def ifindex(self, value): capi.rtnl_link_set_ifindex(self._rtnl_link, int(value)) # ifindex is immutable but we assume that if _orig does not # have an ifindex specified, it was meant to be given here if capi.rtnl_link_get_ifindex(self._orig) == 0: capi.rtnl_link_set_ifindex(self._orig, int(value)) @property @netlink.nlattr(type=str, fmt=util.bold) def name(self): """Name of link""" return capi.rtnl_link_get_name(self._rtnl_link) @name.setter def name(self, value): capi.rtnl_link_set_name(self._rtnl_link, value) # name is the secondary identifier, if _orig does not have # the name specified yet, assume it was meant to be specified # here. ifindex will always take priority, therefore if ifindex # is specified as well, this will be ignored automatically. if capi.rtnl_link_get_name(self._orig) is None: capi.rtnl_link_set_name(self._orig, value) @property @netlink.nlattr(type=str, fmt=util.string) def flags(self): """Flags Setting this property will *Not* reset flags to value you supply in Examples: link.flags = '+xxx' # add xxx flag link.flags = 'xxx' # exactly the same link.flags = '-xxx' # remove xxx flag link.flags = [ '+xxx', '-yyy' ] # list operation """ flags = capi.rtnl_link_get_flags(self._rtnl_link) return capi.rtnl_link_flags2str(flags, 256)[0].split(',') def _set_flag(self, flag): if flag.startswith('-'): i = capi.rtnl_link_str2flags(flag[1:]) capi.rtnl_link_unset_flags(self._rtnl_link, i) elif flag.startswith('+'): i = capi.rtnl_link_str2flags(flag[1:]) capi.rtnl_link_set_flags(self._rtnl_link, i) else: i = capi.rtnl_link_str2flags(flag) capi.rtnl_link_set_flags(self._rtnl_link, i) @flags.setter def flags(self, value): if not (type(value) is str): for flag in value: self._set_flag(flag) else: self._set_flag(value) @property @netlink.nlattr(type=int, fmt=util.num) def mtu(self): """Maximum Transmission Unit""" return capi.rtnl_link_get_mtu(self._rtnl_link) @mtu.setter def mtu(self, value): capi.rtnl_link_set_mtu(self._rtnl_link, int(value)) @property @netlink.nlattr(type=int, immutable=True, fmt=util.num) def family(self): """Address family""" return capi.rtnl_link_get_family(self._rtnl_link) @family.setter def family(self, value): capi.rtnl_link_set_family(self._rtnl_link, value) @property @netlink.nlattr(type=str, fmt=util.addr) def address(self): """Hardware address (MAC address)""" a = capi.rtnl_link_get_addr(self._rtnl_link) return netlink.AbstractAddress(a) @address.setter def address(self, value): capi.rtnl_link_set_addr(self._rtnl_link, value._addr) @property @netlink.nlattr(type=str, fmt=util.addr) def broadcast(self): """Hardware broadcast address""" a = capi.rtnl_link_get_broadcast(self._rtnl_link) return netlink.AbstractAddress(a) @broadcast.setter def broadcast(self, value): capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string) def qdisc(self): """Name of qdisc (cannot be changed)""" return capi.rtnl_link_get_qdisc(self._rtnl_link) @qdisc.setter def qdisc(self, value): capi.rtnl_link_set_qdisc(self._rtnl_link, value) @property @netlink.nlattr(type=int, fmt=util.num) def txqlen(self): """Length of transmit queue""" return capi.rtnl_link_get_txqlen(self._rtnl_link) @txqlen.setter def txqlen(self, value): capi.rtnl_link_set_txqlen(self._rtnl_link, int(value)) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string) def arptype(self): """Type of link (cannot be changed)""" type_ = capi.rtnl_link_get_arptype(self._rtnl_link) return core_capi.nl_llproto2str(type_, 64)[0] @arptype.setter def arptype(self, value): i = core_capi.nl_str2llproto(value) capi.rtnl_link_set_arptype(self._rtnl_link, i) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string, title='state') def operstate(self): """Operational status""" operstate = capi.rtnl_link_get_operstate(self._rtnl_link) return capi.rtnl_link_operstate2str(operstate, 32)[0] @operstate.setter def operstate(self, value): i = capi.rtnl_link_str2operstate(value) capi.rtnl_link_set_operstate(self._rtnl_link, i) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string) def mode(self): """Link mode""" mode = capi.rtnl_link_get_linkmode(self._rtnl_link) return capi.rtnl_link_mode2str(mode, 32)[0] @mode.setter def mode(self, value): i = capi.rtnl_link_str2mode(value) capi.rtnl_link_set_linkmode(self._rtnl_link, i) @property @netlink.nlattr(type=str, fmt=util.string) def alias(self): """Interface alias (SNMP)""" return capi.rtnl_link_get_ifalias(self._rtnl_link) @alias.setter def alias(self, value): capi.rtnl_link_set_ifalias(self._rtnl_link, value) @property @netlink.nlattr(type=str, fmt=util.string) def type(self): """Link type""" return capi.rtnl_link_get_type(self._rtnl_link) @type.setter def type(self, value): if capi.rtnl_link_set_type(self._rtnl_link, value) < 0: raise NameError('unknown info type') self._module_lookup('netlink.route.links.' + value) def get_stat(self, stat): """Retrieve statistical information""" if type(stat) is str: stat = capi.rtnl_link_str2stat(stat) if stat < 0: raise NameError('unknown name of statistic') return capi.rtnl_link_get_stat(self._rtnl_link, stat) def enslave(self, slave, sock=None): if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) return capi.rtnl_link_enslave(sock._sock, self._rtnl_link, slave._rtnl_link) def release(self, slave, sock=None): if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) return capi.rtnl_link_release(sock._sock, self._rtnl_link, slave._rtnl_link) def add(self, sock=None, flags=None): if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) if not flags: flags = netlink.NLM_F_CREATE ret = capi.rtnl_link_add(sock._sock, self._rtnl_link, flags) if ret < 0: raise netlink.KernelError(ret) def change(self, sock=None, flags=0): """Commit changes made to the link object""" if sock is None: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) if not self._orig: raise netlink.NetlinkError('Original link not available') ret = capi.rtnl_link_change(sock._sock, self._orig, self._rtnl_link, flags) if ret < 0: raise netlink.KernelError(ret) def delete(self, sock=None): """Attempt to delete this link in the kernel""" if sock is None: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) ret = capi.rtnl_link_delete(sock._sock, self._rtnl_link) if ret < 0: raise netlink.KernelError(ret) ################################################################### # private properties # # Used for formatting output. USE AT OWN RISK @property def _state(self): if 'up' in self.flags: buf = util.good('up') if 'lowerup' not in self.flags: buf += ' ' + util.bad('no-carrier') else: buf = util.bad('down') return buf @property def _brief(self): return self._module_brief() + self._foreach_af('brief') @property def _flags(self): ignore = [ 'up', 'running', 'lowerup', ] return ','.join([flag for flag in self.flags if flag not in ignore]) def _foreach_af(self, name, args=None): buf = '' for af in self.af: try: func = getattr(self.af[af], name) s = str(func(args)) if len(s) > 0: buf += ' ' + s except AttributeError: pass return buf def format(self, details=False, stats=False, indent=''): """Return link as formatted text""" fmt = util.MyFormatter(self, indent) buf = fmt.format('{a|ifindex} {a|name} {a|arptype} {a|address} '\ '{a|_state} <{a|_flags}> {a|_brief}') if details: buf += fmt.nl('\t{t|mtu} {t|txqlen} {t|weight} '\ '{t|qdisc} {t|operstate}') buf += fmt.nl('\t{t|broadcast} {t|alias}') buf += self._foreach_af('details', fmt) if stats: l = [['Packets', RX_PACKETS, TX_PACKETS], ['Bytes', RX_BYTES, TX_BYTES], ['Errors', RX_ERRORS, TX_ERRORS], ['Dropped', RX_DROPPED, TX_DROPPED], ['Compressed', RX_COMPRESSED, TX_COMPRESSED], ['FIFO Errors', RX_FIFO_ERR, TX_FIFO_ERR], ['Length Errors', RX_LEN_ERR, None], ['Over Errors', RX_OVER_ERR, None], ['CRC Errors', RX_CRC_ERR, None], ['Frame Errors', RX_FRAME_ERR, None], ['Missed Errors', RX_MISSED_ERR, None], ['Abort Errors', None, TX_ABORT_ERR], ['Carrier Errors', None, TX_CARRIER_ERR], ['Heartbeat Errors', None, TX_HBEAT_ERR], ['Window Errors', None, TX_WIN_ERR], ['Collisions', None, COLLISIONS], ['Multicast', None, MULTICAST], ['', None, None], ['Ipv6:', None, None], ['Packets', IP6_INPKTS, IP6_OUTPKTS], ['Bytes', IP6_INOCTETS, IP6_OUTOCTETS], ['Discards', IP6_INDISCARDS, IP6_OUTDISCARDS], ['Multicast Packets', IP6_INMCASTPKTS, IP6_OUTMCASTPKTS], ['Multicast Bytes', IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS], ['Broadcast Packets', IP6_INBCASTPKTS, IP6_OUTBCASTPKTS], ['Broadcast Bytes', IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS], ['Delivers', IP6_INDELIVERS, None], ['Forwarded', None, IP6_OUTFORWDATAGRAMS], ['No Routes', IP6_INNOROUTES, IP6_OUTNOROUTES], ['Header Errors', IP6_INHDRERRORS, None], ['Too Big Errors', IP6_INTOOBIGERRORS, None], ['Address Errors', IP6_INADDRERRORS, None], ['Unknown Protocol', IP6_INUNKNOWNPROTOS, None], ['Truncated Packets', IP6_INTRUNCATEDPKTS, None], ['Reasm Timeouts', IP6_REASMTIMEOUT, None], ['Reasm Requests', IP6_REASMREQDS, None], ['Reasm Failures', IP6_REASMFAILS, None], ['Reasm OK', IP6_REASMOKS, None], ['Frag Created', None, IP6_FRAGCREATES], ['Frag Failures', None, IP6_FRAGFAILS], ['Frag OK', None, IP6_FRAGOKS], ['', None, None], ['ICMPv6:', None, None], ['Messages', ICMP6_INMSGS, ICMP6_OUTMSGS], ['Errors', ICMP6_INERRORS, ICMP6_OUTERRORS]] buf += '\n\t%s%s%s%s\n' % (33 * ' ', util.title('RX'), 15 * ' ', util.title('TX')) for row in l: row[0] = util.kw(row[0]) row[1] = self.get_stat(row[1]) if row[1] else '' row[2] = self.get_stat(row[2]) if row[2] else '' buf += '\t{0[0]:27} {0[1]:>16} {0[2]:>16}\n'.format(row) buf += self._foreach_af('stats') return buf def get(name, sock=None): """Lookup Link object directly from kernel""" if not name: raise ValueError() if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) link = capi.get_from_kernel(sock._sock, 0, name) if not link: return None return Link.from_capi(link) _link_cache = LinkCache() def resolve(name): _link_cache.refill() return _link_cache[name]