普通文本  |  358行  |  12.58 KB

#!/usr/bin/python2
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit testing payload_info.py."""

from __future__ import print_function

import StringIO
import collections
import mock
import sys
import unittest

import payload_info
import update_payload

from contextlib import contextmanager

from update_payload import update_metadata_pb2

class FakePayloadError(Exception):
  """A generic error when using the FakePayload."""

class FakeOption(object):
  """Fake options object for testing."""

  def __init__(self, **kwargs):
    self.list_ops = False
    self.stats = False
    self.signatures = False
    for key, val in kwargs.iteritems():
      setattr(self, key, val)
    if not hasattr(self, 'payload_file'):
      self.payload_file = None

class FakeOp(object):
  """Fake manifest operation for testing."""

  def __init__(self, src_extents, dst_extents, op_type, **kwargs):
    self.src_extents = src_extents
    self.dst_extents = dst_extents
    self.type = op_type
    for key, val in kwargs.iteritems():
      setattr(self, key, val)

  def HasField(self, field):
    return hasattr(self, field)

class FakePartition(object):
  """Fake PartitionUpdate field for testing."""

  def __init__(self, partition_name, operations):
    self.partition_name = partition_name
    self.operations = operations

class FakeManifest(object):
  """Fake manifest for testing."""

  def __init__(self, major_version):
    FakeExtent = collections.namedtuple('FakeExtent',
                                        ['start_block', 'num_blocks'])
    self.install_operations = [FakeOp([],
                                      [FakeExtent(1, 1), FakeExtent(2, 2)],
                                      update_payload.common.OpType.REPLACE_BZ,
                                      dst_length=3*4096,
                                      data_offset=1,
                                      data_length=1)]
    self.kernel_install_operations = [FakeOp(
        [FakeExtent(1, 1)],
        [FakeExtent(x, x) for x in xrange(20)],
        update_payload.common.OpType.SOURCE_COPY,
        src_length=4096)]
    if major_version == payload_info.MAJOR_PAYLOAD_VERSION_BRILLO:
      self.partitions = [FakePartition('root', self.install_operations),
                         FakePartition('kernel',
                                       self.kernel_install_operations)]
      self.install_operations = self.kernel_install_operations = []
    self.block_size = 4096
    self.minor_version = 4
    FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
    self.old_rootfs_info = FakePartInfo(1 * 4096)
    self.old_kernel_info = FakePartInfo(2 * 4096)
    self.new_rootfs_info = FakePartInfo(3 * 4096)
    self.new_kernel_info = FakePartInfo(4 * 4096)
    self.signatures_offset = None
    self.signatures_size = None

  def HasField(self, field_name):
    """Fake HasField method based on the python members."""
    return hasattr(self, field_name) and getattr(self, field_name) is not None

class FakeHeader(object):
  """Fake payload header for testing."""

  def __init__(self, version, manifest_len, metadata_signature_len):
    self.version = version
    self.manifest_len = manifest_len
    self.metadata_signature_len = metadata_signature_len

  @property
  def size(self):
    return (20 if self.version == payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS
            else 24)

class FakePayload(object):
  """Fake payload for testing."""

  def __init__(self, major_version):
    self._header = FakeHeader(major_version, 222, 0)
    self.header = None
    self._manifest = FakeManifest(major_version)
    self.manifest = None

    self._blobs = {}
    self._payload_signatures = update_metadata_pb2.Signatures()
    self._metadata_signatures = update_metadata_pb2.Signatures()

  def Init(self):
    """Fake Init that sets header and manifest.

    Failing to call Init() will not make header and manifest available to the
    test.
    """
    self.header = self._header
    self.manifest = self._manifest

  def ReadDataBlob(self, offset, length):
    """Return the blob that should be present at the offset location"""
    if not offset in self._blobs:
      raise FakePayloadError('Requested blob at unknown offset %d' % offset)
    blob = self._blobs[offset]
    if len(blob) != length:
      raise FakePayloadError('Read blob with the wrong length (expect: %d, '
                             'actual: %d)' % (len(blob), length))
    return blob

  @staticmethod
  def _AddSignatureToProto(proto, **kwargs):
    """Add a new Signature element to the passed proto."""
    new_signature = proto.signatures.add()
    for key, val in kwargs.iteritems():
      setattr(new_signature, key, val)

  def AddPayloadSignature(self, **kwargs):
    self._AddSignatureToProto(self._payload_signatures, **kwargs)
    blob = self._payload_signatures.SerializeToString()
    self._manifest.signatures_offset = 1234
    self._manifest.signatures_size = len(blob)
    self._blobs[self._manifest.signatures_offset] = blob

  def AddMetadataSignature(self, **kwargs):
    self._AddSignatureToProto(self._metadata_signatures, **kwargs)
    if self._header.metadata_signature_len:
      del self._blobs[-self._header.metadata_signature_len]
    blob = self._metadata_signatures.SerializeToString()
    self._header.metadata_signature_len = len(blob)
    self._blobs[-len(blob)] = blob

class PayloadCommandTest(unittest.TestCase):
  """Test class for our PayloadCommand class."""

  @contextmanager
  def OutputCapturer(self):
    """A tool for capturing the sys.stdout"""
    stdout = sys.stdout
    try:
      sys.stdout = StringIO.StringIO()
      yield sys.stdout
    finally:
      sys.stdout = stdout

  def TestCommand(self, payload_cmd, payload, expected_out):
    """A tool for testing a payload command.

    It tests that a payload command which runs with a given payload produces a
    correct output.
    """
    with mock.patch.object(update_payload, 'Payload', return_value=payload), \
         self.OutputCapturer() as output:
      payload_cmd.Run()
    self.assertEquals(output.getvalue(), expected_out)

  def testDisplayValue(self):
    """Verify that DisplayValue prints what we expect."""
    with self.OutputCapturer() as output:
      payload_info.DisplayValue('key', 'value')
    self.assertEquals(output.getvalue(), 'key:                         value\n')

  def testRun(self):
    """Verify that Run parses and displays the payload like we expect."""
    payload_cmd = payload_info.PayloadCommand(FakeOption(action='show'))
    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
    expected_out = """Payload version:             1
Manifest length:             222
Number of operations:        1
Number of kernel ops:        1
Block size:                  4096
Minor version:               4
"""
    self.TestCommand(payload_cmd, payload, expected_out)

  def testListOpsOnVersion1(self):
    """Verify that the --list_ops option gives the correct output."""
    payload_cmd = payload_info.PayloadCommand(
        FakeOption(list_ops=True, action='show'))
    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
    expected_out = """Payload version:             1
Manifest length:             222
Number of operations:        1
Number of kernel ops:        1
Block size:                  4096
Minor version:               4

Install operations:
  0: REPLACE_BZ
    Data offset: 1
    Data length: 1
    Destination: 2 extents (3 blocks)
      (1,1) (2,2)
Kernel install operations:
  0: SOURCE_COPY
    Source: 1 extent (1 block)
      (1,1)
    Destination: 20 extents (190 blocks)
      (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
      (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
"""
    self.TestCommand(payload_cmd, payload, expected_out)

  def testListOpsOnVersion2(self):
    """Verify that the --list_ops option gives the correct output."""
    payload_cmd = payload_info.PayloadCommand(
        FakeOption(list_ops=True, action='show'))
    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
    expected_out = """Payload version:             2
Manifest length:             222
Number of partitions:        2
  Number of "root" ops:      1
  Number of "kernel" ops:    1
Block size:                  4096
Minor version:               4

root install operations:
  0: REPLACE_BZ
    Data offset: 1
    Data length: 1
    Destination: 2 extents (3 blocks)
      (1,1) (2,2)
kernel install operations:
  0: SOURCE_COPY
    Source: 1 extent (1 block)
      (1,1)
    Destination: 20 extents (190 blocks)
      (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
      (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
"""
    self.TestCommand(payload_cmd, payload, expected_out)

  def testStatsOnVersion1(self):
    """Verify that the --stats option works correctly."""
    payload_cmd = payload_info.PayloadCommand(
        FakeOption(stats=True, action='show'))
    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
    expected_out = """Payload version:             1
Manifest length:             222
Number of operations:        1
Number of kernel ops:        1
Block size:                  4096
Minor version:               4
Blocks read:                 11
Blocks written:              193
Seeks when writing:          18
"""
    self.TestCommand(payload_cmd, payload, expected_out)

  def testStatsOnVersion2(self):
    """Verify that the --stats option works correctly on version 2."""
    payload_cmd = payload_info.PayloadCommand(
        FakeOption(stats=True, action='show'))
    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
    expected_out = """Payload version:             2
Manifest length:             222
Number of partitions:        2
  Number of "root" ops:      1
  Number of "kernel" ops:    1
Block size:                  4096
Minor version:               4
Blocks read:                 11
Blocks written:              193
Seeks when writing:          18
"""
    self.TestCommand(payload_cmd, payload, expected_out)

  def testEmptySignatures(self):
    """Verify that the --signatures option works with unsigned payloads."""
    payload_cmd = payload_info.PayloadCommand(
        FakeOption(action='show', signatures=True))
    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
    expected_out = """Payload version:             1
Manifest length:             222
Number of operations:        1
Number of kernel ops:        1
Block size:                  4096
Minor version:               4
No metadata signatures stored in the payload
No payload signatures stored in the payload
"""
    self.TestCommand(payload_cmd, payload, expected_out)

  def testSignatures(self):
    """Verify that the --signatures option shows the present signatures."""
    payload_cmd = payload_info.PayloadCommand(
        FakeOption(action='show', signatures=True))
    payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
    payload.AddPayloadSignature(version=1,
                                data='12345678abcdefgh\x00\x01\x02\x03')
    payload.AddPayloadSignature(data='I am a signature so access is yes.')
    payload.AddMetadataSignature(data='\x00\x0a\x0c')
    expected_out = """Payload version:             2
Manifest length:             222
Number of partitions:        2
  Number of "root" ops:      1
  Number of "kernel" ops:    1
Block size:                  4096
Minor version:               4
Metadata signatures blob:    file_offset=246 (7 bytes)
Metadata signatures: (1 entries)
  version=None, hex_data: (3 bytes)
    00 0a 0c                                        | ...
Payload signatures blob:     blob_offset=1234 (64 bytes)
Payload signatures: (2 entries)
  version=1, hex_data: (20 bytes)
    31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
    00 01 02 03                                     | ....
  version=None, hex_data: (34 bytes)
    49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
    20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 |  so access is ye
    73 2e                                           | s.
"""
    self.TestCommand(payload_cmd, payload, expected_out)

if __name__ == '__main__':
  unittest.main()