split the import phase to a yaml stream
[~helmut/debian-dedup.git] / autoimport.py
1 #!/usr/bin/python
2 """This scrip takes a directory or a http base url to a mirror and imports all
3 packages contained. It has rather strong assumptions on the working directory.
4 """
5
6 import gzip
7 import io
8 import multiprocessing
9 import optparse
10 import os
11 import sqlite3
12 import subprocess
13 import urllib
14
15 import concurrent.futures
16 from debian import deb822
17 from debian.debian_support import version_compare
18
19 def process_http(pkgs, url):
20     pkglist = urllib.urlopen(url + "/dists/sid/main/binary-amd64/Packages.gz").read()
21     pkglist = gzip.GzipFile(fileobj=io.BytesIO(pkglist)).read()
22     pkglist = io.BytesIO(pkglist)
23     pkglist = deb822.Packages.iter_paragraphs(pkglist)
24     for pkg in pkglist:
25         name = pkg["Package"]
26         if name in pkgs and \
27                 version_compare(pkgs[name]["version"], pkg["Version"]) > 0:
28             continue
29         pkgs[name] = dict(version=pkg["Version"],
30                           filename="%s/%s" % (url, pkg["Filename"]))
31
32 def process_dir(pkgs, d):
33     for entry in os.listdir(d):
34         if not entry.endswith(".deb"):
35             continue
36         parts = entry.split("_")
37         if len(parts) != 3:
38             continue
39         name, version, _ = parts
40         version = urllib.unquote(version)
41         if name in pkgs and version_compare(pkgs[name]["version"], version) > 0:
42             continue
43         pkgs[name] = dict(version=version, filename=os.path.join(d, entry))
44
45 def process_pkg(name, filename):
46     print("importing %s" % filename)
47     if filename.startswith("http://"):
48         with open(os.path.join("tmp", name), "w") as outp:
49             dl = subprocess.Popen(["curl", "-s", filename],
50                                   stdout=subprocess.PIPE, close_fds=True)
51             imp = subprocess.Popen(["python", "importpkg.py"], stdin=dl.stdout,
52                                    stdout=outp, close_fds=True)
53             if imp.wait():
54                 raise ValueError("importpkg failed")
55             if dl.wait():
56                 raise ValueError("curl failed")
57     else:
58         with open(filename) as inp:
59             with open(os.path.join("tmp", name), "w") as outp:
60                 subprocess.check_call(["python", "importpkg.py"], stdin=inp,
61                                       stdout=outp, close_fds=True)
62     print("preprocessed %s" % name)
63
64 def main():
65     parser = optparse.OptionParser()
66     parser.add_option("-n", "--new", action="store_true",
67                       help="avoid reimporting same versions")
68     parser.add_option("-p", "--prune", action="store_true",
69                       help="prune packages old packages")
70     options, args = parser.parse_args()
71     subprocess.check_call(["mkdir", "-p", "tmp"])
72     db = sqlite3.connect("test.sqlite3")
73     cur = db.cursor()
74     cur.execute("PRAGMA foreign_keys = ON;")
75     e = concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count())
76     pkgs = {}
77     for d in args:
78         print("processing %s" % d)
79         if d.startswith("http://"):
80             process_http(pkgs, d)
81         else:
82             process_dir(pkgs, d)
83
84     print("reading database")
85     cur.execute("SELECT package, version FROM package;")
86     knownpkgs = dict((row[0], row[1]) for row in cur.fetchall())
87     distpkgs = set(pkgs.keys())
88     if options.new:
89         for name in distpkgs:
90             if name in knownpkgs and version_compare(pkgs[name]["version"],
91                     knownpkgs[name]) <= 0:
92                 del pkgs[name]
93     knownpkgs = set(knownpkgs)
94
95     with e:
96         fs = {}
97         for name, pkg in pkgs.items():
98             fs[e.submit(process_pkg, name, pkg["filename"])] = name
99
100         for f in concurrent.futures.as_completed(fs.keys()):
101             name = fs[f]
102             if f.exception():
103                 print("%s failed to import: %r" % (name, f.exception()))
104                 continue
105             inf = os.path.join("tmp", name)
106             print("sqlimporting %s" % name)
107             with open(inf) as inp:
108                 try:
109                     subprocess.check_call(["python", "readyaml.py"], stdin=inp)
110                 except subprocess.CalledProcessError:
111                     print("%s failed sql" % name)
112                 else:
113                     os.unlink(inf)
114
115     if options.prune:
116         delpkgs = knownpkgs - distpkgs
117         print("clearing packages %s" % " ".join(delpkgs))
118         cur.executemany("DELETE FROM package WHERE package = ?;",
119                         ((pkg,) for pkg in delpkgs))
120         # Tables content, dependency and sharing will also be pruned
121         # due to ON DELETE CASCADE clauses.
122         db.commit()
123
124 if __name__ == "__main__":
125     main()