drop support for Python 2.x
[~helmut/debian-dedup.git] / dedup / arreader.py
1 import struct
2
3 class ArReader:
4     """Streaming AR file reader. After constructing an object, you usually
5     call read_magic once. Then you call read_entry in a loop and use the
6     ArReader object as file-like only providing read() to read the respective
7     file contents until you get EOFError from read_entry.
8     """
9     global_magic = b"!<arch>\n"
10     file_magic = b"`\n"
11
12     def __init__(self, fileobj):
13         """
14         @param fileobj: a file-like object providing nothing but read(length)
15         """
16         self.fileobj = fileobj
17         self.remaining = None
18         self.padding = 0
19
20     def read_magic(self):
21         """Consume the AR magic marker at the beginning of an AR file. You
22         must not call any other method before calling this method.
23         @raises ValueError: if the magic is not found
24         """
25         data = self.fileobj.read(len(self.global_magic))
26         if data != self.global_magic:
27             raise ValueError("ar global header not found")
28         self.remaining = 0
29
30     def read_entry(self):
31         """Read the next file header, return the filename and record the
32         length of the next file, so that the read method can be used to
33         exhaustively read the current file.
34         @rtype: bytes
35         @returns: the name of the next file
36         @raises ValueError: if the data format is wrong
37         @raises EOFError: when the end f the stream is reached
38         """
39         self.skip_current_entry()
40         if self.padding:
41             if self.fileobj.read(1) != b'\n':
42                 raise ValueError("missing ar padding")
43             self.padding = 0
44         file_header = self.fileobj.read(60)
45         if not file_header:
46             raise EOFError("end of archive found")
47         parts = struct.unpack("16s 12s 6s 6s 8s 10s 2s", file_header)
48         parts = [p.rstrip(b"/ ") for p in parts]
49         if parts.pop() != self.file_magic:
50             raise ValueError("ar file header not found")
51         self.remaining = int(parts[5])
52         self.padding = self.remaining % 2
53         return parts[0] # name
54
55     def skip_current_entry(self):
56         """Skip the remainder of the current file. This method must not be
57         called before calling read_entry.
58         @raises ValueError: if the archive appears truncated
59         """
60         while self.remaining:
61             data = self.fileobj.read(min(4096, self.remaining))
62             if not data:
63                 raise ValueError("archive truncated")
64             self.remaining -= len(data)
65
66     def read(self, length=None):
67         """
68         @type length: int or None
69         @param length: number of bytes to read from the current file
70         @rtype: bytes
71         @returns: length or fewer bytes from the current file
72         """
73         if length is None:
74             length = self.remaining
75         else:
76             length = min(self.remaining, length)
77         data = self.fileobj.read(length)
78         self.remaining -= len(data)
79         return data