普通文本  |  368行  |  10.77 KB

#
# Copyright (c) 2011 Thomas Graf <tgraf@suug.ch>
#

"""Module providing access to network addresses
"""

from __future__ import absolute_import


__version__ = '1.0'
__all__ = [
    'AddressCache',
    'Address']

import datetime
from .. import core as netlink
from .  import capi as capi
from .  import link as Link
from .. import util as util

class AddressCache(netlink.Cache):
    """Cache containing network addresses"""

    def __init__(self, cache=None):
        if not cache:
            cache = self._alloc_cache_name('route/addr')

        self._protocol = netlink.NETLINK_ROUTE
        self._nl_cache = cache

    def __getitem__(self, key):
        # Using ifindex=0 here implies that the local address itself
        # is unique, otherwise the first occurence is returned.
        return self.lookup(0, key)

    def lookup(self, ifindex, local):
        if type(local) is str:
            local = netlink.AbstractAddress(local)

        addr = capi.rtnl_addr_get(self._nl_cache, ifindex,
                      local._nl_addr)
        if addr is None:
            raise KeyError()

        return Address._from_capi(addr)

    @staticmethod
    def _new_object(obj):
        return Address(obj)

    @staticmethod
    def _new_cache(cache):
        return AddressCache(cache=cache)

class Address(netlink.Object):
    """Network address"""

    def __init__(self, obj=None):
        netlink.Object.__init__(self, 'route/addr', 'address', obj)
        self._rtnl_addr = self._obj2type(self._nl_object)

    @classmethod
    def _from_capi(cls, obj):
        return cls(capi.addr2obj(obj))

    @staticmethod
    def _obj2type(obj):
        return capi.obj2addr(obj)

    def __cmp__(self, other):
        # sort by:
        #    1. network link
        #    2. address family
        #    3. local address (including prefixlen)
        diff = self.ifindex - other.ifindex

        if diff == 0:
            diff = self.family - other.family
            if diff == 0:
                diff = capi.nl_addr_cmp(self.local, other.local)

        return diff

    @staticmethod
    def _new_instance(obj):
        return Address(obj)

    @property
    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
    def ifindex(self):
        """interface index"""
        return capi.rtnl_addr_get_ifindex(self._rtnl_addr)

    @ifindex.setter
    def ifindex(self, value):
        link = Link.resolve(value)
        if not link:
            raise ValueError()

        self.link = link

    @property
    @netlink.nlattr(type=str, fmt=util.string)
    def link(self):
        link = capi.rtnl_addr_get_link(self._rtnl_addr)
        if not link:
            return None

        return Link.Link.from_capi(link)

    @link.setter
    def link(self, value):
        if type(value) is str:
            try:
                value = Link.resolve(value)
            except KeyError:
                raise ValueError()

        capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)

        # ifindex is immutable but we assume that if _orig does not
        # have an ifindex specified, it was meant to be given here
        if capi.rtnl_addr_get_ifindex(self._orig) == 0:
            capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)

    @property
    @netlink.nlattr(type=str, fmt=util.string)
    def label(self):
        """address label"""
        return capi.rtnl_addr_get_label(self._rtnl_addr)

    @label.setter
    def label(self, value):
        capi.rtnl_addr_set_label(self._rtnl_addr, value)

    @property
    @netlink.nlattr(type=str, fmt=util.string)
    def flags(self):
        """Flags

        Setting this property will *Not* reset flags to value you supply in

        Examples:
        addr.flags = '+xxx' # add xxx flag
        addr.flags = 'xxx'  # exactly the same
        addr.flags = '-xxx' # remove xxx flag
        addr.flags = [ '+xxx', '-yyy' ] # list operation
        """
        flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
        return capi.rtnl_addr_flags2str(flags, 256)[0].split(',')

    def _set_flag(self, flag):
        if flag.startswith('-'):
            i = capi.rtnl_addr_str2flags(flag[1:])
            capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
        elif flag.startswith('+'):
            i = capi.rtnl_addr_str2flags(flag[1:])
            capi.rtnl_addr_set_flags(self._rtnl_addr, i)
        else:
            i = capi.rtnl_addr_str2flags(flag)
            capi.rtnl_addr_set_flags(self._rtnl_addr, i)

    @flags.setter
    def flags(self, value):
        if type(value) is list:
            for flag in value:
                self._set_flag(flag)
        else:
            self._set_flag(value)

    @property
    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
    def family(self):
        """Address family"""
        fam = capi.rtnl_addr_get_family(self._rtnl_addr)
        return netlink.AddressFamily(fam)

    @family.setter
    def family(self, value):
        if not isinstance(value, netlink.AddressFamily):
            value = netlink.AddressFamily(value)

        capi.rtnl_addr_set_family(self._rtnl_addr, int(value))

    @property
    @netlink.nlattr(type=int, fmt=util.num)
    def scope(self):
        """Address scope"""
        scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
        return capi.rtnl_scope2str(scope, 32)[0]

    @scope.setter
    def scope(self, value):
        if type(value) is str:
            value = capi.rtnl_str2scope(value)
        capi.rtnl_addr_set_scope(self._rtnl_addr, value)

    @property
    @netlink.nlattr(type=str, immutable=True, fmt=util.addr)
    def local(self):
        """Local address"""
        a = capi.rtnl_addr_get_local(self._rtnl_addr)
        return netlink.AbstractAddress(a)

    @local.setter
    def local(self, value):
        a = netlink.AbstractAddress(value)
        capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)

        # local is immutable but we assume that if _orig does not
        # have a local address specified, it was meant to be given here
        if capi.rtnl_addr_get_local(self._orig) is None:
            capi.rtnl_addr_set_local(self._orig, a._nl_addr)

    @property
    @netlink.nlattr(type=str, fmt=util.addr)
    def peer(self):
        """Peer address"""
        a = capi.rtnl_addr_get_peer(self._rtnl_addr)
        return netlink.AbstractAddress(a)

    @peer.setter
    def peer(self, value):
        a = netlink.AbstractAddress(value)
        capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)

    @property
    @netlink.nlattr(type=str, fmt=util.addr)
    def broadcast(self):
        """Broadcast address"""
        a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
        return netlink.AbstractAddress(a)

    @broadcast.setter
    def broadcast(self, value):
        a = netlink.AbstractAddress(value)
        capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)

    @property
    @netlink.nlattr(type=str, fmt=util.addr)
    def multicast(self):
        """multicast address"""
        a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
        return netlink.AbstractAddress(a)

    @multicast.setter
    def multicast(self, value):
        try:
            a = netlink.AbstractAddress(value)
        except ValueError as err:
            raise AttributeError('multicast', err)

        capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)

    @property
    @netlink.nlattr(type=str, fmt=util.addr)
    def anycast(self):
        """anycast address"""
        a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
        return netlink.AbstractAddress(a)

    @anycast.setter
    def anycast(self, value):
        a = netlink.AbstractAddress(value)
        capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)

    @property
    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
    def valid_lifetime(self):
        """Valid lifetime"""
        msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
        if msecs == 0xFFFFFFFF:
            return None
        else:
            return datetime.timedelta(seconds=msecs)

    @valid_lifetime.setter
    def valid_lifetime(self, value):
        capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))

    @property
    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
    def preferred_lifetime(self):
        """Preferred lifetime"""
        msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
        if msecs == 0xFFFFFFFF:
            return None
        else:
            return datetime.timedelta(seconds=msecs)

    @preferred_lifetime.setter
    def preferred_lifetime(self, value):
        capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))

    @property
    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
    def create_time(self):
        """Creation time"""
        hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
        return datetime.timedelta(milliseconds=10*hsec)

    @property
    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
    def last_update(self):
        """Last update"""
        hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
        return datetime.timedelta(milliseconds=10*hsec)

    def add(self, socket=None, flags=None):
        if not socket:
            socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)

        if not flags:
            flags = netlink.NLM_F_CREATE

        ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
        if ret < 0:
            raise netlink.KernelError(ret)

    def delete(self, socket, flags=0):
        """Attempt to delete this address in the kernel"""
        ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
        if ret < 0:
            raise netlink.KernelError(ret)

    ###################################################################
    # private properties
    #
    # Used for formatting output. USE AT OWN RISK
    @property
    def _flags(self):
        return ','.join(self.flags)

    def format(self, details=False, stats=False, nodev=False, indent=''):
        """Return address as formatted text"""
        fmt = util.MyFormatter(self, indent)

        buf = fmt.format('{a|local!b}')

        if not nodev:
            buf += fmt.format(' {a|ifindex}')

        buf += fmt.format(' {a|scope}')

        if self.label:
            buf += fmt.format(' "{a|label}"')

        buf += fmt.format(' <{a|_flags}>')

        if details:
            buf += fmt.nl('\t{t|broadcast} {t|multicast}') \
                 + fmt.nl('\t{t|peer} {t|anycast}')

            if self.valid_lifetime:
                buf += fmt.nl('\t{s|valid-lifetime!k} '\
                       '{a|valid_lifetime}')

            if self.preferred_lifetime:
                buf += fmt.nl('\t{s|preferred-lifetime!k} '\
                       '{a|preferred_lifetime}')

        if stats and (self.create_time or self.last_update):
            buf += self.nl('\t{s|created!k} {a|create_time}'\
                   ' {s|last-updated!k} {a|last_update}')

        return buf