drop support for Python 2.x
[~helmut/debian-dedup.git] / dedup / compression.py
1 import bz2
2 import struct
3 import zlib
4
5 import lzma
6
7 class GzipDecompressor:
8     """An interface to gzip which is similar to bz2.BZ2Decompressor and
9     lzma.LZMADecompressor."""
10     def __init__(self):
11         self.sawheader = False
12         self.inbuffer = b""
13         self.decompressor = None
14         self.crc = 0
15         self.size = 0
16
17     def decompress(self, data):
18         """
19         @raises ValueError: if no gzip magic is found
20         @raises zlib.error: from zlib invocations
21         """
22         while True:
23             if self.decompressor:
24                 data = self.decompressor.decompress(data)
25                 self.crc = zlib.crc32(data, self.crc)
26                 self.size += len(data)
27                 unused_data = self.decompressor.unused_data
28                 if not unused_data:
29                     return data
30                 self.decompressor = None
31                 return data + self.decompress(unused_data)
32             self.inbuffer += data
33             skip = 10
34             if len(self.inbuffer) < skip:
35                 return b""
36             if not self.inbuffer.startswith(b"\037\213\010"):
37                 raise ValueError("gzip magic not found")
38             flag = ord(self.inbuffer[3:4])
39             if flag & 4:
40                 if len(self.inbuffer) < skip + 2:
41                     return b""
42                 length, = struct.unpack("<H", self.inbuffer[skip:skip+2])
43                 skip += 2 + length
44             for field in (8, 16):
45                 if flag & field:
46                     length = self.inbuffer.find(b"\0", skip)
47                     if length < 0:
48                         return b""
49                     skip = length + 1
50             if flag & 2:
51                 skip += 2
52             if len(self.inbuffer) < skip:
53                 return b""
54             data = self.inbuffer[skip:]
55             self.inbuffer = b""
56             self.sawheader = True
57             self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
58
59     @property
60     def unused_data(self):
61         if self.decompressor:
62             return self.decompressor.unused_data
63         elif not self.sawheader:
64             return self.inbuffer
65         else:
66             expect = struct.pack("<LL", self.crc, self.size)
67             if self.inbuffer.startswith(expect) and \
68                     self.inbuffer[len(expect):].replace(b"\0", b"") == b"":
69                 return b""
70             return self.inbuffer
71
72     def flush(self):
73         """
74         @raises zlib.error: from zlib invocations
75         """
76         if not self.decompressor:
77             return b""
78         return self.decompressor.flush()
79
80     def copy(self):
81         new = GzipDecompressor()
82         new.inbuffer = self.inbuffer
83         if self.decompressor:
84             new.decompressor = self.decompressor.copy()
85         new.sawheader = self.sawheader
86         new.crc = self.crc
87         new.size = self.size
88         return new
89
90 class DecompressedStream:
91     """Turn a readable file-like into a decompressed file-like. It supports
92     read(optional length), tell, seek(forward only) and close."""
93     blocksize = 65536
94
95     def __init__(self, fileobj, decompressor):
96         """
97         @param fileobj: a file-like object providing read(size)
98         @param decompressor: a bz2.BZ2Decompressor or lzma.LZMADecompressor
99             like object providing methods decompress and flush and an
100             attribute unused_data
101         """
102         self.fileobj = fileobj
103         self.decompressor = decompressor
104         self.buff = b""
105         self.pos = 0
106         self.closed = False
107
108     def _fill_buff_until(self, predicate):
109         assert not self.closed
110         data = True
111         while True:
112             if predicate(self.buff) or not data:
113                 return
114             data = self.fileobj.read(self.blocksize)
115             if data:
116                 self.buff += self.decompressor.decompress(data)
117             elif hasattr(self.decompressor, "flush"):
118                 self.buff += self.decompressor.flush()
119
120     def _read_from_buff(self, length):
121         ret = self.buff[:length]
122         self.buff = self.buff[length:]
123         self.pos += length
124         return ret
125
126     def read(self, length=None):
127         if length is None:
128             self._fill_buff_until(lambda _: False)
129             length = len(self.buff)
130         else:
131             self._fill_buff_until(lambda b, l=length: len(b) >= l)
132         return self._read_from_buff(length)
133
134     def readline(self):
135         self._fill_buff_until(lambda b: b'\n' in b)
136         try:
137             length = self.buff.index(b'\n') + 1
138         except ValueError:
139             length = len(self.buff)
140         return self._read_from_buff(length)
141
142     def __iter__(self):
143         return iter(self.readline, b'')
144
145     def tell(self):
146         assert not self.closed
147         return self.pos
148
149     def seek(self, pos):
150         """Forward seeks by absolute position only."""
151         assert not self.closed
152         if pos < self.pos:
153             raise ValueError("negative seek not allowed on decompressed stream")
154         while True:
155             left = pos - self.pos
156             # Reading self.buff entirely avoids string concatenation.
157             size = len(self.buff) or self.blocksize
158             if left > size:
159                 self.read(size)
160             else:
161                 self.read(left)
162                 return
163
164     def close(self):
165         if not self.closed:
166             self.fileobj.close()
167             self.fileobj = None
168             self.decompressor = None
169             self.buff = b""
170             self.closed = True
171
172 decompressors = {
173     '.gz':   GzipDecompressor,
174     '.bz2':  bz2.BZ2Decompressor,
175     '.lzma': lzma.LZMADecompressor,
176     '.xz':   lzma.LZMADecompressor,
177 }
178
179 def decompress(filelike, extension):
180     """Decompress a stream according to its extension.
181     @param filelike: is a read-only byte-stream. It must support read(size) and
182                      close().
183     @param extension: permitted values are "", ".gz", ".bz2", ".lzma", and
184                       ".xz"
185     @type extension: unicode
186     @returns: a read-only byte-stream with the decompressed contents of the
187               original filelike. It supports read(size) and close(). If the
188               original supports seek(pos) and tell(), then it also supports
189               those.
190     @raises ValueError: on unkown extensions
191     """
192     if not extension:
193         return filelike
194     try:
195         decompressor = decompressors[extension]
196     except KeyError:
197         raise ValueError("unknown compression format with extension %r" %
198                          extension)
199     return DecompressedStream(filelike, decompressor())