普通文本  |  299行  |  10.41 KB

#!/usr/bin/python
#
# Copyright 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import random
from socket import *  # pylint: disable=wildcard-import
import time
import unittest

from scapy import all as scapy

import csocket
import multinetwork_base
import net_test


RTMGRP_NEIGH = 4

NUD_INCOMPLETE = 0x01
NUD_REACHABLE = 0x02
NUD_STALE = 0x04
NUD_DELAY = 0x08
NUD_PROBE = 0x10
NUD_FAILED = 0x20
NUD_PERMANENT = 0x80


# TODO: Support IPv4.
class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):

  # Set a 500-ms retrans timer so we can test for ND retransmits without
  # waiting too long. Apparently this cannot go below 500ms.
  RETRANS_TIME_MS = 500

  # This can only be in seconds, so 1000 is the minimum.
  DELAY_TIME_MS = 1000

  # Unfortunately, this must be above the delay timer or the kernel ND code will
  # not behave correctly (e.g., go straight from REACHABLE into DELAY). This is
  # is fuzzed by the kernel from 0.5x to 1.5x of its value, so we need a value
  # that's 2x the delay timer.
  BASE_REACHABLE_TIME_MS = 2 * DELAY_TIME_MS
  MAX_REACHABLE_TIME_MS = 1.5 * BASE_REACHABLE_TIME_MS

  @classmethod
  def setUpClass(cls):
    super(NeighbourTest, cls).setUpClass()
    for netid in cls.tuns:
      iface = cls.GetInterfaceName(netid)
      # This can't be set in an RA.
      cls.SetSysctl(
          "/proc/sys/net/ipv6/neigh/%s/delay_first_probe_time" % iface,
          cls.DELAY_TIME_MS / 1000)

  def setUp(self):
    super(NeighbourTest, self).setUp()

    for netid in self.tuns:
      # Clear the ND cache entries for all routers, so each test starts with
      # the IPv6 default router in state STALE.
      addr = self._RouterAddress(netid, 6)
      ifindex = self.ifindices[netid]
      self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED)

      # Configure IPv6 by sending an RA.
      self.SendRA(netid,
                  retranstimer=self.RETRANS_TIME_MS,
                  reachabletime=self.BASE_REACHABLE_TIME_MS)

    self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
    self.sock.bind((0, RTMGRP_NEIGH))
    net_test.SetNonBlocking(self.sock)

    self.netid = random.choice(self.tuns.keys())
    self.ifindex = self.ifindices[self.netid]

  def GetNeighbour(self, addr, ifindex):
    version = csocket.AddressVersion(addr)
    for msg, args in self.iproute.DumpNeighbours(version, ifindex):
      if args["NDA_DST"] == addr:
        return msg, args

  def GetNdEntry(self, addr):
    return self.GetNeighbour(addr, self.ifindex)

  def CheckNoNdEvents(self):
    self.assertRaisesErrno(errno.EAGAIN, self.sock.recvfrom, 4096, MSG_PEEK)

  def assertNeighbourState(self, state, addr):
    self.assertEquals(state, self.GetNdEntry(addr)[0].state)

  def assertNeighbourAttr(self, addr, name, value):
    self.assertEquals(value, self.GetNdEntry(addr)[1][name])

  def ExpectNeighbourNotification(self, addr, state, attrs=None):
    msg = self.sock.recv(4096)
    msg, actual_attrs = self.iproute.ParseNeighbourMessage(msg)
    self.assertEquals(addr, actual_attrs["NDA_DST"])
    self.assertEquals(state, msg.state)
    if attrs:
      for name in attrs:
        self.assertEquals(attrs[name], actual_attrs[name])

  def ExpectProbe(self, is_unicast, addr):
    version = csocket.AddressVersion(addr)
    if version == 6:
      llsrc = self.MyMacAddress(self.netid)
      if is_unicast:
        src = self.MyLinkLocalAddress(self.netid)
        dst = addr
      else:
        solicited = inet_pton(AF_INET6, addr)
        last3bytes = tuple([ord(b) for b in solicited[-3:]])
        dst = "ff02::1:ff%02x:%02x%02x" % last3bytes
        src = self.MyAddress(6, self.netid)
      expected = (
          scapy.IPv6(src=src, dst=dst) /
          scapy.ICMPv6ND_NS(tgt=addr) /
          scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc)
      )
      msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
      self.ExpectPacketOn(self.netid, msg, expected)
    else:
      raise NotImplementedError

  def ExpectUnicastProbe(self, addr):
    self.ExpectProbe(True, addr)

  def ExpectMulticastNS(self, addr):
    self.ExpectProbe(False, addr)

  def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None,
                                  S=1, O=0, R=1):
    version = csocket.AddressVersion(addr)
    if srcaddr is None:
      srcaddr = addr
    if dstaddr is None:
      dstaddr = self.MyLinkLocalAddress(self.netid)
    if version == 6:
      packet = (
          scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
          scapy.IPv6(src=srcaddr, dst=dstaddr) /
          scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) /
          scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
      )
      self.ReceiveEtherPacketOn(self.netid, packet)
    else:
      raise NotImplementedError

  def MonitorSleepMs(self, interval, addr):
    slept = 0
    while slept < interval:
      sleep_ms = min(100, interval - slept)
      time.sleep(sleep_ms / 1000.0)
      slept += sleep_ms
      print self.GetNdEntry(addr)

  def MonitorSleep(self, intervalseconds, addr):
    self.MonitorSleepMs(intervalseconds * 1000, addr)

  def SleepMs(self, ms):
    time.sleep(ms / 1000.0)

  def testNotifications(self):
    """Tests neighbour notifications.

    Relevant kernel commits:
      upstream net-next:
        765c9c6 neigh: Better handling of transition to NUD_PROBE state
        53385d2 neigh: Netlink notification for administrative NUD state change
          (only checked on kernel v3.13+, not on v3.10)

      android-3.10:
        e4a6d6b neigh: Better handling of transition to NUD_PROBE state

      android-3.18:
        2011e72 neigh: Better handling of transition to NUD_PROBE state
    """
    router4 = self._RouterAddress(self.netid, 4)
    router6 = self._RouterAddress(self.netid, 6)
    self.assertNeighbourState(NUD_PERMANENT, router4)
    self.assertNeighbourState(NUD_STALE, router6)

    # Send a packet and check that we go into DELAY.
    routing_mode = random.choice(["mark", "oif", "uid"])
    s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
    s.connect((net_test.IPV6_ADDR, 53))
    s.send(net_test.UDP_PAYLOAD)
    self.assertNeighbourState(NUD_DELAY, router6)

    # Wait for the probe interval, then check that we're in PROBE, and that the
    # kernel has notified us.
    self.SleepMs(self.DELAY_TIME_MS * 1.1)
    self.ExpectNeighbourNotification(router6, NUD_PROBE)
    self.assertNeighbourState(NUD_PROBE, router6)
    self.ExpectUnicastProbe(router6)

    # Respond to the NS and verify we're in REACHABLE again.
    self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid))
    self.assertNeighbourState(NUD_REACHABLE, router6)
    if net_test.LINUX_VERSION >= (3, 13, 0):
      # commit 53385d2 (v3.13) "neigh: Netlink notification for administrative
      # NUD state change" produces notifications for NUD_REACHABLE, but these
      # are not generated on earlier kernels.
      self.ExpectNeighbourNotification(router6, NUD_REACHABLE)

    # Wait until the reachable time has passed, and verify we're in STALE.
    self.SleepMs(self.MAX_REACHABLE_TIME_MS * 1.2)
    self.assertNeighbourState(NUD_STALE, router6)
    self.ExpectNeighbourNotification(router6, NUD_STALE)

    # Send a packet, and verify we go into DELAY and then to PROBE.
    s.send(net_test.UDP_PAYLOAD)
    self.assertNeighbourState(NUD_DELAY, router6)
    self.SleepMs(self.DELAY_TIME_MS * 1.1)
    self.assertNeighbourState(NUD_PROBE, router6)
    self.ExpectNeighbourNotification(router6, NUD_PROBE)

    # Wait for the probes to time out, and expect a FAILED notification.
    self.assertNeighbourAttr(router6, "NDA_PROBES", 1)
    self.ExpectUnicastProbe(router6)

    self.SleepMs(self.RETRANS_TIME_MS)
    self.ExpectUnicastProbe(router6)
    self.assertNeighbourAttr(router6, "NDA_PROBES", 2)

    self.SleepMs(self.RETRANS_TIME_MS)
    self.ExpectUnicastProbe(router6)
    self.assertNeighbourAttr(router6, "NDA_PROBES", 3)

    self.SleepMs(self.RETRANS_TIME_MS)
    self.assertNeighbourState(NUD_FAILED, router6)
    self.ExpectNeighbourNotification(router6, NUD_FAILED, {"NDA_PROBES": 3})

  def testRepeatedProbes(self):
    router4 = self._RouterAddress(self.netid, 4)
    router6 = self._RouterAddress(self.netid, 6)
    routermac = self.RouterMacAddress(self.netid)
    self.assertNeighbourState(NUD_PERMANENT, router4)
    self.assertNeighbourState(NUD_STALE, router6)

    def ForceProbe(addr, mac):
      self.iproute.UpdateNeighbour(6, addr, None, self.ifindex, NUD_PROBE)
      self.assertNeighbourState(NUD_PROBE, addr)
      self.SleepMs(1)  # TODO: Why is this necessary?
      self.assertNeighbourState(NUD_PROBE, addr)
      self.ExpectUnicastProbe(addr)
      self.ReceiveUnicastAdvertisement(addr, mac)
      self.assertNeighbourState(NUD_REACHABLE, addr)

    for _ in xrange(5):
      ForceProbe(router6, routermac)

  def testIsRouterFlag(self):
    router6 = self._RouterAddress(self.netid, 6)
    self.assertNeighbourState(NUD_STALE, router6)

    # Get into FAILED.
    ifindex = self.ifindices[self.netid]
    self.iproute.UpdateNeighbour(6, router6, None, ifindex, NUD_FAILED)
    self.ExpectNeighbourNotification(router6, NUD_FAILED)
    self.assertNeighbourState(NUD_FAILED, router6)

    time.sleep(1)

    # Send another packet and expect a multicast NS.
    routing_mode = random.choice(["mark", "oif", "uid"])
    s = self.BuildSocket(6, net_test.UDPSocket, self.netid, routing_mode)
    s.connect((net_test.IPV6_ADDR, 53))
    s.send(net_test.UDP_PAYLOAD)
    self.ExpectMulticastNS(router6)

    # Receive a unicast NA with the R flag set to 0.
    self.ReceiveUnicastAdvertisement(router6, self.RouterMacAddress(self.netid),
                                     srcaddr=self._RouterAddress(self.netid, 6),
                                     dstaddr=self.MyAddress(6, self.netid),
                                     S=1, O=0, R=0)

    # Expect that this takes us to REACHABLE.
    self.ExpectNeighbourNotification(router6, NUD_REACHABLE)
    self.assertNeighbourState(NUD_REACHABLE, router6)


if __name__ == "__main__":
  unittest.main()