普通文本  |  275行  |  9.1 KB

# -*- coding: iso-8859-15 -*-
"""IP4 address range set implementation.

Implements an IPv4-range type.

Copyright (C) 2006, Heiko Wundram.
Released under the MIT-license.
"""

# Version information
# -------------------

__author__ = "Heiko Wundram <me@modelnine.org>"
__version__ = "0.2"
__revision__ = "3"
__date__ = "2006-01-20"


# Imports
# -------

from paste.util import intset
import socket
import six


# IP4Range class
# --------------

class IP4Range(intset.IntSet):
    """IP4 address range class with efficient storage of address ranges.
    Supports all set operations."""

    _MINIP4 = 0
    _MAXIP4 = (1<<32) - 1
    _UNITYTRANS = "".join([chr(n) for n in range(256)])
    _IPREMOVE = "0123456789."

    def __init__(self,*args):
        """Initialize an ip4range class. The constructor accepts an unlimited
        number of arguments that may either be tuples in the form (start,stop),
        integers, longs or strings, where start and stop in a tuple may
        also be of the form integer, long or string.

        Passing an integer or long means passing an IPv4-address that's already
        been converted to integer notation, whereas passing a string specifies
        an address where this conversion still has to be done. A string
        address may be in the following formats:

        - 1.2.3.4    - a plain address, interpreted as a single address
        - 1.2.3      - a set of addresses, interpreted as 1.2.3.0-1.2.3.255
        - localhost  - hostname to look up, interpreted as single address
        - 1.2.3<->5  - a set of addresses, interpreted as 1.2.3.0-1.2.5.255
        - 1.2.0.0/16 - a set of addresses, interpreted as 1.2.0.0-1.2.255.255

        Only the first three notations are valid if you use a string address in
        a tuple, whereby notation 2 is interpreted as 1.2.3.0 if specified as
        lower bound and 1.2.3.255 if specified as upper bound, not as a range
        of addresses.

        Specifying a range is done with the <-> operator. This is necessary
        because '-' might be present in a hostname. '<->' shouldn't be, ever.
        """

        # Special case copy constructor.
        if len(args) == 1 and isinstance(args[0],IP4Range):
            super(IP4Range,self).__init__(args[0])
            return

        # Convert arguments to tuple syntax.
        args = list(args)
        for i in range(len(args)):
            argval = args[i]
            if isinstance(argval,str):
                if "<->" in argval:
                    # Type 4 address.
                    args[i] = self._parseRange(*argval.split("<->",1))
                    continue
                elif "/" in argval:
                    # Type 5 address.
                    args[i] = self._parseMask(*argval.split("/",1))
                else:
                    # Type 1, 2 or 3.
                    args[i] = self._parseAddrRange(argval)
            elif isinstance(argval,tuple):
                if len(tuple) != 2:
                    raise ValueError("Tuple is of invalid length.")
                addr1, addr2 = argval
                if isinstance(addr1,str):
                    addr1 = self._parseAddrRange(addr1)[0]
                elif not isinstance(addr1, six.integer_types):
                    raise TypeError("Invalid argument.")
                if isinstance(addr2,str):
                    addr2 = self._parseAddrRange(addr2)[1]
                elif not isinstance(addr2, six.integer_types):
                    raise TypeError("Invalid argument.")
                args[i] = (addr1,addr2)
            elif not isinstance(argval, six.integer_types):
                raise TypeError("Invalid argument.")

        # Initialize the integer set.
        super(IP4Range,self).__init__(min=self._MINIP4,max=self._MAXIP4,*args)

    # Parsing functions
    # -----------------

    def _parseRange(self,addr1,addr2):
        naddr1, naddr1len = _parseAddr(addr1)
        naddr2, naddr2len = _parseAddr(addr2)
        if naddr2len < naddr1len:
            naddr2 += naddr1&(((1<<((naddr1len-naddr2len)*8))-1)<<
                              (naddr2len*8))
            naddr2len = naddr1len
        elif naddr2len > naddr1len:
            raise ValueError("Range has more dots than address.")
        naddr1 <<= (4-naddr1len)*8
        naddr2 <<= (4-naddr2len)*8
        naddr2 += (1<<((4-naddr2len)*8))-1
        return (naddr1,naddr2)

    def _parseMask(self,addr,mask):
        naddr, naddrlen = _parseAddr(addr)
        naddr <<= (4-naddrlen)*8
        try:
            if not mask:
                masklen = 0
            else:
                masklen = int(mask)
            if not 0 <= masklen <= 32:
                raise ValueError
        except ValueError:
            try:
                mask = _parseAddr(mask,False)
            except ValueError:
                raise ValueError("Mask isn't parseable.")
            remaining = 0
            masklen = 0
            if not mask:
                masklen = 0
            else:
                while not (mask&1):
                    remaining += 1
                while (mask&1):
                    mask >>= 1
                    masklen += 1
                if remaining+masklen != 32:
                    raise ValueError("Mask isn't a proper host mask.")
        naddr1 = naddr & (((1<<masklen)-1)<<(32-masklen))
        naddr2 = naddr1 + (1<<(32-masklen)) - 1
        return (naddr1,naddr2)

    def _parseAddrRange(self,addr):
        naddr, naddrlen = _parseAddr(addr)
        naddr1 = naddr<<((4-naddrlen)*8)
        naddr2 = ( (naddr<<((4-naddrlen)*8)) +
                   (1<<((4-naddrlen)*8)) - 1 )
        return (naddr1,naddr2)

    # Utility functions
    # -----------------

    def _int2ip(self,num):
        rv = []
        for i in range(4):
            rv.append(str(num&255))
            num >>= 8
        return ".".join(reversed(rv))

    # Iterating
    # ---------

    def iteraddresses(self):
        """Returns an iterator which iterates over ips in this iprange. An
        IP is returned in string form (e.g. '1.2.3.4')."""

        for v in super(IP4Range,self).__iter__():
            yield self._int2ip(v)

    def iterranges(self):
        """Returns an iterator which iterates over ip-ip ranges which build
        this iprange if combined. An ip-ip pair is returned in string form
        (e.g. '1.2.3.4-2.3.4.5')."""

        for r in self._ranges:
            if r[1]-r[0] == 1:
                yield self._int2ip(r[0])
            else:
                yield '%s-%s' % (self._int2ip(r[0]),self._int2ip(r[1]-1))

    def itermasks(self):
        """Returns an iterator which iterates over ip/mask pairs which build
        this iprange if combined. An IP/Mask pair is returned in string form
        (e.g. '1.2.3.0/24')."""

        for r in self._ranges:
            for v in self._itermasks(r):
                yield v

    def _itermasks(self,r):
        ranges = [r]
        while ranges:
            cur = ranges.pop()
            curmask = 0
            while True:
                curmasklen = 1<<(32-curmask)
                start = (cur[0]+curmasklen-1)&(((1<<curmask)-1)<<(32-curmask))
                if start >= cur[0] and start+curmasklen <= cur[1]:
                    break
                else:
                    curmask += 1
            yield "%s/%s" % (self._int2ip(start),curmask)
            if cur[0] < start:
                ranges.append((cur[0],start))
            if cur[1] > start+curmasklen:
                ranges.append((start+curmasklen,cur[1]))

    __iter__ = iteraddresses

    # Printing
    # --------

    def __repr__(self):
        """Returns a string which can be used to reconstruct this iprange."""

        rv = []
        for start, stop in self._ranges:
            if stop-start == 1:
                rv.append("%r" % (self._int2ip(start),))
            else:
                rv.append("(%r,%r)" % (self._int2ip(start),
                                       self._int2ip(stop-1)))
        return "%s(%s)" % (self.__class__.__name__,",".join(rv))

def _parseAddr(addr,lookup=True):
    if lookup and any(ch not in IP4Range._IPREMOVE for ch in addr):
        try:
            addr = socket.gethostbyname(addr)
        except socket.error:
            raise ValueError("Invalid Hostname as argument.")
    naddr = 0
    for naddrpos, part in enumerate(addr.split(".")):
        if naddrpos >= 4:
            raise ValueError("Address contains more than four parts.")
        try:
            if not part:
                part = 0
            else:
                part = int(part)
            if not 0 <= part < 256:
                raise ValueError
        except ValueError:
            raise ValueError("Address part out of range.")
        naddr <<= 8
        naddr += part
    return naddr, naddrpos+1

def ip2int(addr, lookup=True):
    return _parseAddr(addr, lookup=lookup)[0]

if __name__ == "__main__":
    # Little test script.
    x = IP4Range("172.22.162.250/24")
    y = IP4Range("172.22.162.250","172.22.163.250","172.22.163.253<->255")
    print(x)
    for val in x.itermasks():
        print(val)
    for val in y.itermasks():
        print(val)
    for val in (x|y).itermasks():
        print(val)
    for val in (x^y).iterranges():
        print(val)
    for val in x:
        print(val)