split the import phase to a yaml stream
authorHelmut Grohne <helmut@subdivi.de>
Mon, 10 Jun 2013 16:22:29 +0000 (18:22 +0200)
committerHelmut Grohne <helmut@subdivi.de>
Mon, 10 Jun 2013 16:22:29 +0000 (18:22 +0200)
importpkg.py now emits a yaml stream instead of updating the database.
The acutual updating now happens in readyaml.py. In this process
autoimport.py was significantly reworked to import packages in parallel.

README
autoimport.py
importpkg.py
readyaml.py [new file with mode: 0755]

diff --git a/README b/README
index 2d362f9..a5ce9d7 100644 (file)
--- a/README
+++ b/README
@@ -1,7 +1,7 @@
 Required packages
 -----------------
 
-    aptitude install python python-debian python-lzma curl python-jinja2 python-werkzeug sqlite3 python-imaging
+    aptitude install python python-debian python-lzma curl python-jinja2 python-werkzeug sqlite3 python-imaging python-yaml python-concurrent.futures
 
 Create a database
 -----------------
@@ -15,13 +15,17 @@ permanent.
 
 Import packages
 ---------------
-Import individual packages by feeding them to importpkg.py:
+Import individual packages by feeding them to importpkg.py and readyaml.py:
 
-    ls -t /var/cache/apt/archives/*.deb | while read f; do echo $f; ./importpkg.py < $f || break; done
+    ./importpkg.py < somepkg.deb | ./readyaml.py
 
-Import a full mirror::
+You can import your local apt cache:
 
-    ./autoimport.py http://your.mirror.example/debian
+    ./autoimport.py /var/cache/apt/archives
+
+Import a full mirror (only http supported):
+
+    ./autoimport.py -n -p http://your.mirror.example/debian
 
 Viewing the results
 -------------------
index 453a839..f30b36a 100755 (executable)
 #!/usr/bin/python
+"""This scrip takes a directory or a http base url to a mirror and imports all
+packages contained. It has rather strong assumptions on the working directory.
+"""
 
 import gzip
 import io
+import multiprocessing
+import optparse
+import os
 import sqlite3
 import subprocess
-import sys
 import urllib
 
+import concurrent.futures
 from debian import deb822
 from debian.debian_support import version_compare
 
+def process_http(pkgs, url):
+    pkglist = urllib.urlopen(url + "/dists/sid/main/binary-amd64/Packages.gz").read()
+    pkglist = gzip.GzipFile(fileobj=io.BytesIO(pkglist)).read()
+    pkglist = io.BytesIO(pkglist)
+    pkglist = deb822.Packages.iter_paragraphs(pkglist)
+    for pkg in pkglist:
+        name = pkg["Package"]
+        if name in pkgs and \
+                version_compare(pkgs[name]["version"], pkg["Version"]) > 0:
+            continue
+        pkgs[name] = dict(version=pkg["Version"],
+                          filename="%s/%s" % (url, pkg["Filename"]))
+
+def process_dir(pkgs, d):
+    for entry in os.listdir(d):
+        if not entry.endswith(".deb"):
+            continue
+        parts = entry.split("_")
+        if len(parts) != 3:
+            continue
+        name, version, _ = parts
+        version = urllib.unquote(version)
+        if name in pkgs and version_compare(pkgs[name]["version"], version) > 0:
+            continue
+        pkgs[name] = dict(version=version, filename=os.path.join(d, entry))
+
+def process_pkg(name, filename):
+    print("importing %s" % filename)
+    if filename.startswith("http://"):
+        with open(os.path.join("tmp", name), "w") as outp:
+            dl = subprocess.Popen(["curl", "-s", filename],
+                                  stdout=subprocess.PIPE, close_fds=True)
+            imp = subprocess.Popen(["python", "importpkg.py"], stdin=dl.stdout,
+                                   stdout=outp, close_fds=True)
+            if imp.wait():
+                raise ValueError("importpkg failed")
+            if dl.wait():
+                raise ValueError("curl failed")
+    else:
+        with open(filename) as inp:
+            with open(os.path.join("tmp", name), "w") as outp:
+                subprocess.check_call(["python", "importpkg.py"], stdin=inp,
+                                      stdout=outp, close_fds=True)
+    print("preprocessed %s" % name)
+
 def main():
-    urlbase = sys.argv[1]
+    parser = optparse.OptionParser()
+    parser.add_option("-n", "--new", action="store_true",
+                      help="avoid reimporting same versions")
+    parser.add_option("-p", "--prune", action="store_true",
+                      help="prune packages old packages")
+    options, args = parser.parse_args()
+    subprocess.check_call(["mkdir", "-p", "tmp"])
     db = sqlite3.connect("test.sqlite3")
     cur = db.cursor()
     cur.execute("PRAGMA foreign_keys = ON;")
+    e = concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count())
+    pkgs = {}
+    for d in args:
+        print("processing %s" % d)
+        if d.startswith("http://"):
+            process_http(pkgs, d)
+        else:
+            process_dir(pkgs, d)
+
+    print("reading database")
     cur.execute("SELECT package, version FROM package;")
     knownpkgs = dict((row[0], row[1]) for row in cur.fetchall())
+    distpkgs = set(pkgs.keys())
+    if options.new:
+        for name in distpkgs:
+            if name in knownpkgs and version_compare(pkgs[name]["version"],
+                    knownpkgs[name]) <= 0:
+                del pkgs[name]
+    knownpkgs = set(knownpkgs)
 
-    pkglist = urllib.urlopen(urlbase + "/dists/sid/main/binary-amd64/Packages.gz").read()
-    pkglist = gzip.GzipFile(fileobj=io.BytesIO(pkglist)).read()
-    distpkgs = set()
-    for pkg in deb822.Packages.iter_paragraphs(io.BytesIO(pkglist)):
-        name = pkg["Package"]
-        distpkgs.add(name)
-        if name in knownpkgs and \
-                version_compare(pkg["Version"], knownpkgs[name]) <= 0:
-            continue
-        pkgurl = "%s/%s" % (urlbase, pkg["Filename"])
-        print("importing %s" % name)
-        dl = subprocess.Popen(["curl", "-s", pkgurl], stdout=subprocess.PIPE)
-        imp = subprocess.Popen("./importpkg.py", stdin=dl.stdout)
-        if imp.wait():
-            print("import failed")
-        if dl.wait():
-            print("curl failed")
-    
-    delpkgs = set(knownpkgs) - distpkgs
-    print("clearing packages %s" % " ".join(delpkgs))
-    cur.executemany("DELETE FROM package WHERE package = ?;",
-                    ((pkg,) for pkg in delpkgs))
-    # Tables content, dependency and sharing will also be pruned
-    # due to ON DELETE CASCADE clauses.
-    db.commit()
+    with e:
+        fs = {}
+        for name, pkg in pkgs.items():
+            fs[e.submit(process_pkg, name, pkg["filename"])] = name
+
+        for f in concurrent.futures.as_completed(fs.keys()):
+            name = fs[f]
+            if f.exception():
+                print("%s failed to import: %r" % (name, f.exception()))
+                continue
+            inf = os.path.join("tmp", name)
+            print("sqlimporting %s" % name)
+            with open(inf) as inp:
+                try:
+                    subprocess.check_call(["python", "readyaml.py"], stdin=inp)
+                except subprocess.CalledProcessError:
+                    print("%s failed sql" % name)
+                else:
+                    os.unlink(inf)
+
+    if options.prune:
+        delpkgs = knownpkgs - distpkgs
+        print("clearing packages %s" % " ".join(delpkgs))
+        cur.executemany("DELETE FROM package WHERE package = ?;",
+                        ((pkg,) for pkg in delpkgs))
+        # Tables content, dependency and sharing will also be pruned
+        # due to ON DELETE CASCADE clauses.
+        db.commit()
 
 if __name__ == "__main__":
     main()
index e0160e6..6e22b54 100755 (executable)
@@ -1,14 +1,18 @@
 #!/usr/bin/python
+"""This tool reads a debian package from stdin and emits a yaml stream on
+stdout.  It does not access a database. Therefore it can be run in parallel and
+on multiple machines. The generated yaml conatins multiple documents. The first
+document contains package metadata. Then a document is emitted for each file.
+And finally a document consisting of the string "commit" is emitted."""
 
 import hashlib
-import sqlite3
 import sys
 import tarfile
 import zlib
 
-from debian.debian_support import version_compare
 from debian import deb822
 import lzma
+import yaml
 
 from dedup.arreader import ArReader
 from dedup.hashing import HashBlacklist, DecompressedHash, SuppressingHash, hash_file
@@ -57,17 +61,16 @@ def get_hashes(tar):
                 hashes[hashobj.name] = hashvalue
         yield (elem.name, elem.size, hashes)
 
-def process_package(db, filelike):
-    cur = db.cursor()
-    cur.execute("PRAGMA foreign_keys = ON;")
+def process_package(filelike):
     af = ArReader(filelike)
     af.read_magic()
     state = "start"
-    while True:
+    while state not in ("finished", "skipped"):
         try:
             name = af.read_entry()
         except EOFError:
-            break
+            if state != "finished":
+                raise ValueError("data.tar not found")
         if name == "control.tar.gz":
             if state != "start":
                 raise ValueError("unexpected control.tar.gz")
@@ -89,23 +92,11 @@ def process_package(db, filelike):
                 version = control["version"].encode("ascii")
                 architecture = control["architecture"].encode("ascii")
 
-                cur.execute("SELECT version FROM package WHERE package = ?;",
-                            (package,))
-                row = cur.fetchone()
-                if row and version_compare(row[0], version) > 0:
-                    return # already seen a newer package
-
-                cur.execute("DELETE FROM content WHERE package = ?;",
-                            (package,))
-                cur.execute("INSERT OR REPLACE INTO package (package, version, architecture, source) VALUES (?, ?, ?, ?);",
-                            (package, version, architecture, source))
                 depends = control.relations.get("depends", [])
                 depends = set(dep[0]["name"].encode("ascii")
                               for dep in depends if len(dep) == 1)
-                cur.execute("DELETE FROM dependency WHERE package = ?;",
-                            (package,))
-                cur.executemany("INSERT INTO dependency (package, required) VALUES (?, ?);",
-                                ((package, dep) for dep in depends))
+                yield dict(package=package, source=source, version=version,
+                           architecture=architecture, depends=depends)
                 break
             continue
         elif name == "data.tar.gz":
@@ -125,18 +116,12 @@ def process_package(db, filelike):
             except UnicodeDecodeError:
                 print("warning: skipping filename with encoding error")
                 continue # skip files with non-utf8 encoding for now
-            cur.execute("INSERT INTO content (package, filename, size) VALUES (?, ?, ?);",
-                        (package, name, size))
-            cid = cur.lastrowid
-            cur.executemany("INSERT INTO hash (cid, function, hash) VALUES (?, ?, ?);",
-                            ((cid, func, hexhash) for func, hexhash in hashes.items()))
-        db.commit()
-        return
-    raise ValueError("data.tar not found")
+            yield dict(name=name, size=size, hashes=hashes)
+        state = "finished"
+        yield "commit"
 
 def main():
-    db = sqlite3.connect("test.sqlite3")
-    process_package(db, sys.stdin)
+    yaml.safe_dump_all(process_package(sys.stdin), sys.stdout)
 
 if __name__ == "__main__":
     main()
diff --git a/readyaml.py b/readyaml.py
new file mode 100755 (executable)
index 0000000..b66c7f3
--- /dev/null
@@ -0,0 +1,48 @@
+#!/usr/bin/python
+"""This tool reads a yaml file as generated by importpkg.py on stdin and
+updates the database with the contents."""
+
+import sqlite3
+import sys
+
+from debian.debian_support import version_compare
+import yaml
+
+def main():
+    db = sqlite3.connect("test.sqlite3")
+    cur = db.cursor()
+    cur.execute("PRAGMA foreign_keys = ON;")
+    gen = yaml.safe_load_all(sys.stdin)
+    metadata = next(gen)
+    package = metadata["package"]
+    cur.execute("SELECT version FROM package WHERE package = ?;",
+                    (package,))
+    row = cur.fetchone()
+    if row and version_compare(row[0], metadata["version"]) > 0:
+        return
+
+    cur.execute("BEGIN;")
+    cur.execute("DELETE FROM content WHERE package = ?;",
+                (package,))
+    cur.execute("INSERT OR REPLACE INTO package (package, version, architecture, source) VALUES (?, ?, ?, ?);",
+                (package, metadata["version"], metadata["architecture"],
+                 metadata["source"]))
+    cur.execute("DELETE FROM dependency WHERE package = ?;",
+                (package,))
+    cur.executemany("INSERT INTO dependency (package, required) VALUES (?, ?);",
+                    ((package, dep) for dep in metadata["depends"]))
+    for entry in gen:
+        if entry == "commit":
+            db.commit()
+            return
+
+        cur.execute("INSERT INTO content (package, filename, size) VALUES (?, ?, ?);",
+                    (package, entry["name"], entry["size"]))
+        cid = cur.lastrowid
+        cur.executemany("INSERT INTO hash (cid, function, hash) VALUES (?, ?, ?);",
+                        ((cid, func, hexhash)
+                         for func, hexhash in entry["hashes"].items()))
+    raise ValueError("missing commit block")
+
+if __name__ == "__main__":
+    main()