#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

u"""A test module to verify root directory content.

This test checks the root directory content on a device is the same as the
following structure, or a subset of it. For symlinks, the target value it
points to is checked. For *.rc files, the file content is compared, except
init.environ.rc because it's *built* from init.environ.rc.in. For other
files (e.g., init, .png), we allow their existence and don't check the file
content.

Root Directory Content:
├── acct
├── adb_keys
├── bugreports -> /data/user_de/0/com.android.shell/files/bugreports
├── cache OR
├── cache -> /data/cache
├── charger -> /sbin/charger
├── config
├── d -> /sys/kernel/debug
├── data
├── default.prop -> system/etc/prop.default
├── dev
├── etc -> /system/etc
├── init
├── init.*.rc
├── lost+found
├── mnt
├── odm
├── oem
├── postinstall
├── proc
├── res
│   └── images
│       └── charger
│           ├── battery_fail.png
│           └── battery_scale.png
├── sbin
│   ├── charger
│   ├── ueventd -> ../init
│   └── watchdogd -> ../init
├── sdcard -> /storage/self/primary
├── storage
├── sys
├── system
├── ueventd.rc
├── vendor
├── verity_key
"""

import filecmp
import glob
import logging
import os
import shutil
import tempfile

from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import const
from vts.runners.host import keys
from vts.runners.host import test_runner
from vts.utils.python.file import target_file_utils

# The allowed directories in three types:
#     - mount_point: skip checking its content as it's just a mount point.
#     - skip_check: skip checking its content for special directories.
#     - check_content: check its content recursively.
_ALLOWED_DIRS = {
    "/acct": "mount_point",
    # /cache is created when BOARD_CACHEIMAGE_FILE_SYSTEM_TYPE is defined.
    "/cache": "mount_point",
    "/config": "mount_point",
    "/data": "mount_point",
    "/dev": "mount_point",
    "/lost+found": "skip_check",  # Skip checking this created by fsck.
    "/mnt": "mount_point",
    "/odm": "mount_point",
    "/oem": "mount_point",
    # The A/B updater uses a top-level /postinstall directory to mount the new
    # system before reboot.
    "/postinstall": "skip_check",
    "/proc": "mount_point",
    "/res": "check_content",
    "/res/images": "check_content",
    "/res/images/charger": "check_content",
    "/sbin": "check_content",
    "/storage": "mount_point",
    "/sys": "mount_point",
    # /system is a mount point in non-A/B but a directory in A/B.
    # No need to check its content in both cases.
    "/system": "skip_check",
    "/vendor": "mount_point",
}

# The allowed symlinks and the target it points to.
# The test will check the value of the symlink target.
_ALLOWED_SYMLINKS = {
    "/bugreports": "/data/user_de/0/com.android.shell/files/bugreports",
    "/cache": "/data/cache",
    "/charger": "/sbin/charger",
    "/d": "/sys/kernel/debug",
    "/default.prop": "system/etc/prop.default",
    "/etc": "/system/etc",
    "/sbin/ueventd": "../init",
    "/sbin/watchdogd": "../init",
    "/sdcard": "/storage/self/primary",
}

# The allowed files for existence, where file content won't be checked.
# Note that init.environ.rc is generated by replacing some build
# environment variables (e.g., BOOTCLASSPATH) from its source:
# init.environ.rc.in, therefore its content is also not checked.
_ALLOWED_EXISTING_FILES = set(["/adb_keys",
                               "/init",
                               "/init.environ.rc",
                               "/res/images/charger/battery_scale.png",
                               "/res/images/charger/battery_fail.png",
                               "/sbin/charger",
                               "/verity_key"])


class VtsKernelRootDirTest(base_test.BaseTestClass):
    """A test class to verify root directory content.

    Attributes:
        data_file_path: The path to VTS data directory.
    """

    _INIT_RC_FILE_PATH = "vts/testcases/kernel/api/rootdir/init_rc_files"

    def setUpClass(self):
        """Initializes data file path, device, shell and adb."""
        required_params = [keys.ConfigKeys.IKEY_DATA_FILE_PATH]
        self.getUserParams(required_params)

        self._dut = self.android_devices[0]
        self._shell = self._dut.shell
        self._adb = self._dut.adb

        self._dirs_to_check = self._TraverseRootDir()
        logging.info("Dirs to check: %r", self._dirs_to_check)

    def setUp(self):
        """Initializes the temp_dir for adb pull."""
        self._temp_dir = tempfile.mkdtemp()
        logging.info("Create %s", self._temp_dir)

    def tearDown(self):
        """Deletes the temporary directory."""
        logging.info("Delete %s", self._temp_dir)
        shutil.rmtree(self._temp_dir)

    def _ListDir(self, dir_path, file_type="all"):
        """Lists files in dir_path with specific file_type.

        Args:
            dir_path: The current directory to list content.
            file_type: The file type to list, can be one of "dir", "file",
                "symlink" or "all".

        Returns:
            A set of paths under current directory.
        """
        find_option = "-maxdepth 1"  # Only list current directory.
        find_types = {
            "dir": "d",
            "file": "f",
            "symlink": "l",
        }
        if file_type != "all":
            find_option += " -type %s" % find_types[file_type]

        # FindFiles will include dir_path if file_type is "all" or "dir".
        # Excludes dir_path before return.
        return set(target_file_utils.FindFiles(
            self._shell, dir_path, "*", find_option)) - set([dir_path])

    def _ReadLink(self, path):
        """Executes readlink on device."""
        result = self._shell.Execute("readlink %s" % path)
        asserts.assertEqual(result[const.EXIT_CODE][0], 0)
        return result[const.STDOUT][0].strip()

    def _TraverseRootDir(self, dir_path="/"):
        """Returns a list of dirs to check the content in it."""
        # dir_path is eligible to check when being invoked here.
        dirs = [dir_path]
        for d in self._ListDir(dir_path, "dir"):
            if d in _ALLOWED_DIRS and _ALLOWED_DIRS[d] == "check_content":
                dirs.extend(self._TraverseRootDir(d))
        return dirs

    def testRootDirs(self):
        """Checks the subdirs under root directory."""
        error_msg = []

        for dir_path in self._dirs_to_check:
            current_dirs = self._ListDir(dir_path, "dir")
            logging.info("Current dirs: %r", current_dirs)

            unexpected_dirs = current_dirs - set(_ALLOWED_DIRS)
            error_msg.extend("Unexpected dir: " + d for d in unexpected_dirs)

        if error_msg:
            asserts.fail("UNEXPECTED ROOT DIRS:\n%s" % "\n".join(error_msg))

    def testRootSymlinks(self):
        """Checks the symlinks under root directory."""
        error_msg = []

        def _CheckSymlinks(dir_path):
            """Checks the symlinks under dir_path."""
            current_symlinks = self._ListDir(dir_path, "symlink")
            logging.info("Current symlinks: %r", current_symlinks)

            unexpected_symlinks = current_symlinks - set(_ALLOWED_SYMLINKS)
            error_msg.extend(
                "Unexpected symlink: " + l for l in unexpected_symlinks)

            # Checks symlink target.
            error_msg.extend(
                "Invalid symlink target: %s -> %s (expected: %s)" % (
                    l, target, _ALLOWED_SYMLINKS[l]) for l, target in (
                        (l, self._ReadLink(l)) for l in (
                            current_symlinks - unexpected_symlinks))
                    if target != _ALLOWED_SYMLINKS[l])

        for dir_path in self._dirs_to_check:
            _CheckSymlinks(dir_path)

        if error_msg:
            asserts.fail("UNEXPECTED ROOT SYMLINKS:\n%s" % "\n".join(error_msg))

    def testRootFiles(self):
        """Checks the files under root directory."""
        error_msg = []

        init_rc_dir = os.path.join(self.data_file_path, self._INIT_RC_FILE_PATH)
        allowed_rc_files = set("/" + os.path.basename(rc_file) for rc_file in
                               glob.glob(os.path.join(init_rc_dir, "*.rc")))

        def _CheckFiles(dir_path):
            """Checks the files under dir_path."""
            current_files = self._ListDir(dir_path, "file")
            logging.info("Current files: %r", current_files)

            unexpected_files = (current_files -
                                allowed_rc_files -
                                _ALLOWED_EXISTING_FILES)
            error_msg.extend("Unexpected file: " + f for f in unexpected_files)

            # Checks file content in *.rc files.
            for f in current_files - unexpected_files:
                if f in allowed_rc_files:
                    # adb pull the .rc file from the device.
                    logging.info("adb pull %s %s", f, self._temp_dir)
                    pull_output = self._adb.pull(f, self._temp_dir)
                    logging.debug(pull_output)
                    # Compares the content and trim the leading "/" in f.
                    if not filecmp.cmp(os.path.join(init_rc_dir, f[1:]),
                                       os.path.join(self._temp_dir, f[1:])):
                        error_msg.append("Unexpected file content: %s" % f)

        for dir_path in self._dirs_to_check:
            _CheckFiles(dir_path)

        if error_msg:
            asserts.fail("UNEXPECTED ROOT FILES:\n%s" % "\n".join(error_msg))

    def testRootAllFileTypes(self):
        """Checks there is no path other than dirs, symlinks and files."""
        error_msg = []

        for dir_path in self._dirs_to_check:
            unknown_files = (self._ListDir(dir_path) -
                             self._ListDir(dir_path, "dir") -
                             self._ListDir(dir_path, "symlink") -
                             self._ListDir(dir_path, "file"))
            error_msg.extend("Unexpected path: " + p for p in unknown_files)

        if error_msg:
            asserts.fail("UNEXPECTED ROOT PATHS:\n%s" % "\n".join(error_msg))


if __name__ == "__main__":
    test_runner.main()