extend functionality of DecompressedStream
[~helmut/debian-dedup.git] / dedup / compression.py
1 import struct
2 import sys
3 import zlib
4
5 crc32_type = "L" if sys.version_info.major >= 3 else "l"
6
7 class GzipDecompressor(object):
8     """An interface to gzip which is similar to bz2.BZ2Decompressor and
9     lzma.LZMADecompressor."""
10     def __init__(self):
11         self.sawheader = False
12         self.inbuffer = b""
13         self.decompressor = None
14         self.crc = 0
15         self.size = 0
16
17     def decompress(self, data):
18         """
19         @raises ValueError: if no gzip magic is found
20         @raises zlib.error: from zlib invocations
21         """
22         while True:
23             if self.decompressor:
24                 data = self.decompressor.decompress(data)
25                 self.crc = zlib.crc32(data, self.crc)
26                 self.size += len(data)
27                 unused_data = self.decompressor.unused_data
28                 if not unused_data:
29                     return data
30                 self.decompressor = None
31                 return data + self.decompress(unused_data)
32             self.inbuffer += data
33             skip = 10
34             if len(self.inbuffer) < skip:
35                 return b""
36             if not self.inbuffer.startswith(b"\037\213\010"):
37                 raise ValueError("gzip magic not found")
38             flag = ord(self.inbuffer[3:4])
39             if flag & 4:
40                 if len(self.inbuffer) < skip + 2:
41                     return b""
42                 length, = struct.unpack("<H", self.inbuffer[skip:skip+2])
43                 skip += 2 + length
44             for field in (8, 16):
45                 if flag & field:
46                     length = self.inbuffer.find(b"\0", skip)
47                     if length < 0:
48                         return b""
49                     skip = length + 1
50             if flag & 2:
51                 skip += 2
52             if len(self.inbuffer) < skip:
53                 return b""
54             data = self.inbuffer[skip:]
55             self.inbuffer = b""
56             self.sawheader = True
57             self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
58
59     @property
60     def unused_data(self):
61         if self.decompressor:
62             return self.decompressor.unused_data
63         elif not self.sawheader:
64             return self.inbuffer
65         else:
66             expect = struct.pack("<" + crc32_type + "L", self.crc, self.size)
67             if self.inbuffer.startswith(expect) and \
68                     self.inbuffer[len(expect):].replace(b"\0", b"") == b"":
69                 return b""
70             return self.inbuffer
71
72     def flush(self):
73         """
74         @raises zlib.error: from zlib invocations
75         """
76         if not self.decompressor:
77             return b""
78         return self.decompressor.flush()
79
80     def copy(self):
81         new = GzipDecompressor()
82         new.inbuffer = self.inbuffer
83         if self.decompressor:
84             new.decompressor = self.decompressor.copy()
85         new.sawheader = self.sawheader
86         new.crc = self.crc
87         new.size = self.size
88         return new
89
90 class DecompressedStream(object):
91     """Turn a readable file-like into a decompressed file-like. It supports
92     read(optional length), tell, seek(forward only) and close."""
93     blocksize = 65536
94
95     def __init__(self, fileobj, decompressor):
96         """
97         @param fileobj: a file-like object providing read(size)
98         @param decompressor: a bz2.BZ2Decompressor or lzma.LZMADecompressor
99             like object providing methods decompress and flush and an
100             attribute unused_data
101         """
102         self.fileobj = fileobj
103         self.decompressor = decompressor
104         self.buff = b""
105         self.pos = 0
106         self.closed = False
107
108     def read(self, length=None):
109         assert not self.closed
110         data = True
111         while True:
112             if length is not None and len(self.buff) >= length:
113                 ret = self.buff[:length]
114                 self.buff = self.buff[length:]
115                 break
116             elif not data: # read EOF in last iteration
117                 ret = self.buff
118                 self.buff = b""
119                 break
120             data = self.fileobj.read(self.blocksize)
121             if data:
122                 self.buff += self.decompressor.decompress(data)
123             else:
124                 self.buff += self.decompressor.flush()
125         self.pos += len(ret)
126         return ret
127
128     def tell(self):
129         assert not self.closed
130         return self.pos
131
132     def seek(self, pos):
133         """Forward seeks by absolute position only."""
134         assert not self.closed
135         if pos < self.pos:
136             raise ValueError("negative seek not allowed on decompressed stream")
137         while True:
138             left = pos - self.pos
139             # Reading self.buff entirely avoids string concatenation.
140             size = len(self.buff) or self.blocksize
141             if left > size:
142                 self.read(size)
143             else:
144                 self.read(left)
145                 return
146
147     def close(self):
148         if not self.closed:
149             self.fileobj.close()
150             self.fileobj = None
151             self.decompressor = None
152             self.buff = b""
153             self.closed = True