extend functionality of DecompressedStream
authorHelmut Grohne <helmut@subdivi.de>
Thu, 28 Apr 2016 18:28:11 +0000 (20:28 +0200)
committerHelmut Grohne <helmut@subdivi.de>
Thu, 28 Apr 2016 18:28:11 +0000 (20:28 +0200)
It now supports:
 * tell()
 * seek(absolute_position), forward only
 * close()
 * closed

This is sufficient for putting it as a fileobj into tarfile.TarFile. By
doing so we can decouple decompression from tar processing, which eases
papering over the Python 2.x vs Python 3.x differences.

dedup/compression.py

index 4fd7320..52917e3 100644 (file)
@@ -88,8 +88,8 @@ class GzipDecompressor(object):
         return new
 
 class DecompressedStream(object):
-    """Turn a readable file-like into a decompressed file-like. Te only part
-    of being file-like consists of the read(size) method in both cases."""
+    """Turn a readable file-like into a decompressed file-like. It supports
+    read(optional length), tell, seek(forward only) and close."""
     blocksize = 65536
 
     def __init__(self, fileobj, decompressor):
@@ -102,20 +102,52 @@ class DecompressedStream(object):
         self.fileobj = fileobj
         self.decompressor = decompressor
         self.buff = b""
+        self.pos = 0
+        self.closed = False
 
     def read(self, length=None):
+        assert not self.closed
         data = True
         while True:
             if length is not None and len(self.buff) >= length:
                 ret = self.buff[:length]
                 self.buff = self.buff[length:]
-                return ret
+                break
             elif not data: # read EOF in last iteration
                 ret = self.buff
                 self.buff = b""
-                return ret
+                break
             data = self.fileobj.read(self.blocksize)
             if data:
                 self.buff += self.decompressor.decompress(data)
             else:
                 self.buff += self.decompressor.flush()
+        self.pos += len(ret)
+        return ret
+
+    def tell(self):
+        assert not self.closed
+        return self.pos
+
+    def seek(self, pos):
+        """Forward seeks by absolute position only."""
+        assert not self.closed
+        if pos < self.pos:
+            raise ValueError("negative seek not allowed on decompressed stream")
+        while True:
+            left = pos - self.pos
+            # Reading self.buff entirely avoids string concatenation.
+            size = len(self.buff) or self.blocksize
+            if left > size:
+                self.read(size)
+            else:
+                self.read(left)
+                return
+
+    def close(self):
+        if not self.closed:
+            self.fileobj.close()
+            self.fileobj = None
+            self.decompressor = None
+            self.buff = b""
+            self.closed = True