#!/usr/bin/python2
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit testing payload_info.py."""
from __future__ import print_function
import StringIO
import collections
import mock
import sys
import unittest
import payload_info
import update_payload
from contextlib import contextmanager
from update_payload import update_metadata_pb2
class FakePayloadError(Exception):
"""A generic error when using the FakePayload."""
class FakeOption(object):
"""Fake options object for testing."""
def __init__(self, **kwargs):
self.list_ops = False
self.stats = False
self.signatures = False
for key, val in kwargs.iteritems():
setattr(self, key, val)
if not hasattr(self, 'payload_file'):
self.payload_file = None
class FakeOp(object):
"""Fake manifest operation for testing."""
def __init__(self, src_extents, dst_extents, op_type, **kwargs):
self.src_extents = src_extents
self.dst_extents = dst_extents
self.type = op_type
for key, val in kwargs.iteritems():
setattr(self, key, val)
def HasField(self, field):
return hasattr(self, field)
class FakePartition(object):
"""Fake PartitionUpdate field for testing."""
def __init__(self, partition_name, operations):
self.partition_name = partition_name
self.operations = operations
class FakeManifest(object):
"""Fake manifest for testing."""
def __init__(self, major_version):
FakeExtent = collections.namedtuple('FakeExtent',
['start_block', 'num_blocks'])
self.install_operations = [FakeOp([],
[FakeExtent(1, 1), FakeExtent(2, 2)],
update_payload.common.OpType.REPLACE_BZ,
dst_length=3*4096,
data_offset=1,
data_length=1)]
self.kernel_install_operations = [FakeOp(
[FakeExtent(1, 1)],
[FakeExtent(x, x) for x in xrange(20)],
update_payload.common.OpType.SOURCE_COPY,
src_length=4096)]
if major_version == payload_info.MAJOR_PAYLOAD_VERSION_BRILLO:
self.partitions = [FakePartition('root', self.install_operations),
FakePartition('kernel',
self.kernel_install_operations)]
self.install_operations = self.kernel_install_operations = []
self.block_size = 4096
self.minor_version = 4
FakePartInfo = collections.namedtuple('FakePartInfo', ['size'])
self.old_rootfs_info = FakePartInfo(1 * 4096)
self.old_kernel_info = FakePartInfo(2 * 4096)
self.new_rootfs_info = FakePartInfo(3 * 4096)
self.new_kernel_info = FakePartInfo(4 * 4096)
self.signatures_offset = None
self.signatures_size = None
def HasField(self, field_name):
"""Fake HasField method based on the python members."""
return hasattr(self, field_name) and getattr(self, field_name) is not None
class FakeHeader(object):
"""Fake payload header for testing."""
def __init__(self, version, manifest_len, metadata_signature_len):
self.version = version
self.manifest_len = manifest_len
self.metadata_signature_len = metadata_signature_len
@property
def size(self):
return (20 if self.version == payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS
else 24)
class FakePayload(object):
"""Fake payload for testing."""
def __init__(self, major_version):
self._header = FakeHeader(major_version, 222, 0)
self.header = None
self._manifest = FakeManifest(major_version)
self.manifest = None
self._blobs = {}
self._payload_signatures = update_metadata_pb2.Signatures()
self._metadata_signatures = update_metadata_pb2.Signatures()
def Init(self):
"""Fake Init that sets header and manifest.
Failing to call Init() will not make header and manifest available to the
test.
"""
self.header = self._header
self.manifest = self._manifest
def ReadDataBlob(self, offset, length):
"""Return the blob that should be present at the offset location"""
if not offset in self._blobs:
raise FakePayloadError('Requested blob at unknown offset %d' % offset)
blob = self._blobs[offset]
if len(blob) != length:
raise FakePayloadError('Read blob with the wrong length (expect: %d, '
'actual: %d)' % (len(blob), length))
return blob
@staticmethod
def _AddSignatureToProto(proto, **kwargs):
"""Add a new Signature element to the passed proto."""
new_signature = proto.signatures.add()
for key, val in kwargs.iteritems():
setattr(new_signature, key, val)
def AddPayloadSignature(self, **kwargs):
self._AddSignatureToProto(self._payload_signatures, **kwargs)
blob = self._payload_signatures.SerializeToString()
self._manifest.signatures_offset = 1234
self._manifest.signatures_size = len(blob)
self._blobs[self._manifest.signatures_offset] = blob
def AddMetadataSignature(self, **kwargs):
self._AddSignatureToProto(self._metadata_signatures, **kwargs)
if self._header.metadata_signature_len:
del self._blobs[-self._header.metadata_signature_len]
blob = self._metadata_signatures.SerializeToString()
self._header.metadata_signature_len = len(blob)
self._blobs[-len(blob)] = blob
class PayloadCommandTest(unittest.TestCase):
"""Test class for our PayloadCommand class."""
@contextmanager
def OutputCapturer(self):
"""A tool for capturing the sys.stdout"""
stdout = sys.stdout
try:
sys.stdout = StringIO.StringIO()
yield sys.stdout
finally:
sys.stdout = stdout
def TestCommand(self, payload_cmd, payload, expected_out):
"""A tool for testing a payload command.
It tests that a payload command which runs with a given payload produces a
correct output.
"""
with mock.patch.object(update_payload, 'Payload', return_value=payload), \
self.OutputCapturer() as output:
payload_cmd.Run()
self.assertEquals(output.getvalue(), expected_out)
def testDisplayValue(self):
"""Verify that DisplayValue prints what we expect."""
with self.OutputCapturer() as output:
payload_info.DisplayValue('key', 'value')
self.assertEquals(output.getvalue(), 'key: value\n')
def testRun(self):
"""Verify that Run parses and displays the payload like we expect."""
payload_cmd = payload_info.PayloadCommand(FakeOption(action='show'))
payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
expected_out = """Payload version: 1
Manifest length: 222
Number of operations: 1
Number of kernel ops: 1
Block size: 4096
Minor version: 4
"""
self.TestCommand(payload_cmd, payload, expected_out)
def testListOpsOnVersion1(self):
"""Verify that the --list_ops option gives the correct output."""
payload_cmd = payload_info.PayloadCommand(
FakeOption(list_ops=True, action='show'))
payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
expected_out = """Payload version: 1
Manifest length: 222
Number of operations: 1
Number of kernel ops: 1
Block size: 4096
Minor version: 4
Install operations:
0: REPLACE_BZ
Data offset: 1
Data length: 1
Destination: 2 extents (3 blocks)
(1,1) (2,2)
Kernel install operations:
0: SOURCE_COPY
Source: 1 extent (1 block)
(1,1)
Destination: 20 extents (190 blocks)
(0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
(11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
"""
self.TestCommand(payload_cmd, payload, expected_out)
def testListOpsOnVersion2(self):
"""Verify that the --list_ops option gives the correct output."""
payload_cmd = payload_info.PayloadCommand(
FakeOption(list_ops=True, action='show'))
payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
expected_out = """Payload version: 2
Manifest length: 222
Number of partitions: 2
Number of "root" ops: 1
Number of "kernel" ops: 1
Block size: 4096
Minor version: 4
root install operations:
0: REPLACE_BZ
Data offset: 1
Data length: 1
Destination: 2 extents (3 blocks)
(1,1) (2,2)
kernel install operations:
0: SOURCE_COPY
Source: 1 extent (1 block)
(1,1)
Destination: 20 extents (190 blocks)
(0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10)
(11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19)
"""
self.TestCommand(payload_cmd, payload, expected_out)
def testStatsOnVersion1(self):
"""Verify that the --stats option works correctly."""
payload_cmd = payload_info.PayloadCommand(
FakeOption(stats=True, action='show'))
payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
expected_out = """Payload version: 1
Manifest length: 222
Number of operations: 1
Number of kernel ops: 1
Block size: 4096
Minor version: 4
Blocks read: 11
Blocks written: 193
Seeks when writing: 18
"""
self.TestCommand(payload_cmd, payload, expected_out)
def testStatsOnVersion2(self):
"""Verify that the --stats option works correctly on version 2."""
payload_cmd = payload_info.PayloadCommand(
FakeOption(stats=True, action='show'))
payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
expected_out = """Payload version: 2
Manifest length: 222
Number of partitions: 2
Number of "root" ops: 1
Number of "kernel" ops: 1
Block size: 4096
Minor version: 4
Blocks read: 11
Blocks written: 193
Seeks when writing: 18
"""
self.TestCommand(payload_cmd, payload, expected_out)
def testEmptySignatures(self):
"""Verify that the --signatures option works with unsigned payloads."""
payload_cmd = payload_info.PayloadCommand(
FakeOption(action='show', signatures=True))
payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS)
expected_out = """Payload version: 1
Manifest length: 222
Number of operations: 1
Number of kernel ops: 1
Block size: 4096
Minor version: 4
No metadata signatures stored in the payload
No payload signatures stored in the payload
"""
self.TestCommand(payload_cmd, payload, expected_out)
def testSignatures(self):
"""Verify that the --signatures option shows the present signatures."""
payload_cmd = payload_info.PayloadCommand(
FakeOption(action='show', signatures=True))
payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO)
payload.AddPayloadSignature(version=1,
data='12345678abcdefgh\x00\x01\x02\x03')
payload.AddPayloadSignature(data='I am a signature so access is yes.')
payload.AddMetadataSignature(data='\x00\x0a\x0c')
expected_out = """Payload version: 2
Manifest length: 222
Number of partitions: 2
Number of "root" ops: 1
Number of "kernel" ops: 1
Block size: 4096
Minor version: 4
Metadata signatures blob: file_offset=246 (7 bytes)
Metadata signatures: (1 entries)
version=None, hex_data: (3 bytes)
00 0a 0c | ...
Payload signatures blob: blob_offset=1234 (64 bytes)
Payload signatures: (2 entries)
version=1, hex_data: (20 bytes)
31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh
00 01 02 03 | ....
version=None, hex_data: (34 bytes)
49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature
20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye
73 2e | s.
"""
self.TestCommand(payload_cmd, payload, expected_out)
if __name__ == '__main__':
unittest.main()