#!/usr/bin/env python # Copyright 2011 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Apply gzip/deflate to separate chunks of data.""" import struct import zlib GZIP_HEADER = ( '\037\213' # magic header '\010' # compression method '\000' # flags (none) '\000\000\000\000' # packed time (use zero) '\002' '\377') def compress_chunks(uncompressed_chunks, use_gzip): """Compress a list of data with gzip or deflate. The returned chunks may be used with HTTP chunked encoding. Args: uncompressed_chunks: a list of strings (e.g. ["this is the first chunk", "and the second"]) use_gzip: if True, compress with gzip. Otherwise, use deflate. Returns: [compressed_chunk_1, compressed_chunk_2, ...] """ if use_gzip: size = 0 crc = zlib.crc32("") & 0xffffffffL compressor = zlib.compressobj( 6, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) else: compressor = zlib.compressobj() compressed_chunks = [] last_index = len(uncompressed_chunks) - 1 for index, data in enumerate(uncompressed_chunks): chunk = '' if use_gzip: size += len(data) crc = zlib.crc32(data, crc) & 0xffffffffL if index == 0: chunk += GZIP_HEADER chunk += compressor.compress(data) if index < last_index: chunk += compressor.flush(zlib.Z_SYNC_FLUSH) else: chunk += (compressor.flush(zlib.Z_FULL_FLUSH) + compressor.flush()) if use_gzip: chunk += (struct.pack("<L", long(crc)) + struct.pack("<L", long(size))) compressed_chunks.append(chunk) return compressed_chunks def uncompress_chunks(compressed_chunks, use_gzip): """Uncompress a list of data compressed with gzip or deflate. Args: compressed_chunks: a list of compressed data use_gzip: if True, uncompress with gzip. Otherwise, use deflate. Returns: [uncompressed_chunk_1, uncompressed_chunk_2, ...] """ if use_gzip: decompress = zlib.decompressobj(16 + zlib.MAX_WBITS).decompress else: decompress = zlib.decompressobj(-zlib.MAX_WBITS).decompress return [decompress(c) for c in compressed_chunks]