summaryrefslogtreecommitdiff
path: root/bin/parallel-bootstrap-traverser.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/parallel-bootstrap-traverser.py')
-rwxr-xr-xbin/parallel-bootstrap-traverser.py255
1 files changed, 176 insertions, 79 deletions
diff --git a/bin/parallel-bootstrap-traverser.py b/bin/parallel-bootstrap-traverser.py
index b9eb2d36..86f50e6b 100755
--- a/bin/parallel-bootstrap-traverser.py
+++ b/bin/parallel-bootstrap-traverser.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import hashlib
import json
import multiprocessing
@@ -23,14 +22,23 @@ import sys
import threading
from enum import Enum
from argparse import ArgumentParser
-from typing import Any, Callable, Dict, List, Optional, Tuple
+from typing import Any, Callable, Dict, List, Optional, Tuple, cast
+
+# generic JSON type that avoids getter issues; proper use is being enforced by
+# return types of methods and typing vars holding return values of json getters
+Json = Dict[str, Any]
+
class AtomicInt:
- def __init__(self, init: int = 0):
+ # types of attributes
+ __value: int
+ __cv: threading.Condition
+
+ def __init__(self, init: int = 0) -> None:
self.__value = init
self.__cv = threading.Condition()
- def __notify(self):
+ def __notify(self) -> None:
"""Must be called with acquired cv."""
if self.__value == 0: self.__cv.notify_all()
@@ -40,7 +48,7 @@ class AtomicInt:
return self.__value
@value.setter
- def value(self, to: int):
+ def value(self, to: int) -> None:
with self.__cv:
self.__value = to
self.__notify()
@@ -57,7 +65,7 @@ class AtomicInt:
"""Post-decrement"""
return self.fetch_inc(-by)
- def wait_for_zero(self):
+ def wait_for_zero(self) -> None:
with self.__cv:
self.__cv.wait_for(lambda: self.__value == 0)
@@ -67,11 +75,21 @@ class TaskSystem:
Task = Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]
class Queue:
- def __init__(self):
+ cv: threading.Condition
+
+ def __init__(self) -> None:
self.cv = threading.Condition()
self.tasks: List[TaskSystem.Task] = []
- def __init__(self, max_workers: int = multiprocessing.cpu_count()):
+ # types of attributes
+ __shutdown: bool
+ __num_workers: int
+ __current_idx: AtomicInt
+ __qs: List[Queue]
+ __total_work: AtomicInt
+ __workers: List[threading.Thread]
+
+ def __init__(self, max_workers: int = multiprocessing.cpu_count()) -> None:
"""Creates the task system with `max_workers` many threads."""
self.__shutdown = False
self.__num_workers = max(1, max_workers)
@@ -111,13 +129,14 @@ class TaskSystem:
def __enter__(self):
return self
- def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any):
+ def __exit__(self, exc_type: Any, exc_value: Any,
+ exc_traceback: Any) -> None:
try:
self.finish()
finally:
self.shutdown()
- def add(self, fn: Callable[..., None], *args: Any, **kw: Any):
+ def add(self, fn: Callable[..., None], *args: Any, **kw: Any) -> None:
"""Add task to task queue, might block."""
if not self.__shutdown:
q = self.__qs[self.__current_idx.fetch_inc() % self.__num_workers]
@@ -126,11 +145,11 @@ class TaskSystem:
q.tasks.append((fn, args, kw))
q.cv.notify_all()
- def finish(self):
+ def finish(self) -> None:
"""Wait for queued tasks and active threads to become zero."""
self.__total_work.wait_for_zero()
- def shutdown(self):
+ def shutdown(self) -> None:
"""Initiate shutdown of task system and wait for all threads to stop."""
self.__shutdown = True # signal shutdown
for q in self.__qs: # notify everyone about shutdown
@@ -139,61 +158,75 @@ class TaskSystem:
for w in self.__workers: # wait for workers to shutdown
w.join()
+
class AtomicListMap:
class Entry(Enum):
CREATED = 0
INSERTED = 1
CLEARED = 2
- def __init__(self):
+ # types of attributes
+ __map: Dict[str, Optional[List[Any]]]
+ __lock: threading.Lock
+
+ def __init__(self) -> None:
self.__map = dict()
self.__lock = threading.Lock()
- def add(self, key, val):
+ def add(self, key: str, val: Any):
with self.__lock:
if key not in self.__map:
self.__map[key] = [val]
return AtomicListMap.Entry.CREATED
elif self.__map[key] != None:
- self.__map[key].append(val)
+ cast(List[Any], self.__map[key]).append(val)
return AtomicListMap.Entry.INSERTED
else:
return AtomicListMap.Entry.CLEARED
- def fetch_clear(self, key):
+ def fetch_clear(self, key: str) -> Optional[List[Any]]:
with self.__lock:
vals = self.__map[key]
self.__map[key] = None
return vals
-CALLBACKS_PER_ID = AtomicListMap()
-def log(*args, **kwargs):
+g_CALLBACKS_PER_ID = AtomicListMap()
+
+
+def log(*args: str, **kwargs: Any):
print(*args, file=sys.stderr, **kwargs)
-def fail(s):
+
+def fail(s: str):
log(s)
sys.exit(1)
-def git_hash(content):
- header = "blob {}\0".format(len(content)).encode('utf-8')
- h = hashlib.sha1()
- h.update(header)
- h.update(content)
- return h.hexdigest()
-def create_blobs(blobs, *, root, ts):
+def git_hash(content: bytes) -> str:
+ header = "blob {}\0".format(len(content)).encode('utf-8')
+ h = hashlib.sha1()
+ h.update(header)
+ h.update(content)
+ return h.hexdigest()
+
+
+def create_blobs(blobs: List[str], *, root: str, ts: TaskSystem) -> None:
os.makedirs(os.path.join(root, "KNOWN"))
- def write_blob(blob_bin):
+
+ def write_blob(blob_bin: bytes) -> None:
with open(os.path.join(root, "KNOWN", git_hash(blob_bin)), "wb") as f:
f.write(blob_bin)
+
for blob in blobs:
ts.add(write_blob, blob.encode('utf-8'))
-def build_known(desc, *, root):
+
+def build_known(desc: Json, *, root: str) -> str:
return os.path.join(root, "KNOWN", desc["data"]["id"])
-def link(src, dest):
+
+def link(src: str, dest: str) -> None:
dest = os.path.normpath(dest)
os.makedirs(os.path.dirname(dest), exist_ok=True)
try:
@@ -201,105 +234,166 @@ def link(src, dest):
except:
os.symlink(src, dest)
-def build_local(desc, *, root, config):
- repo_name = desc["data"]["repository"]
- repo = config["repositories"][repo_name]["workspace_root"]
- rel_path = desc["data"]["path"]
+
+def build_local(desc: Json, *, root: str, config: Json) -> Optional[str]:
+ repo_name: str = desc["data"]["repository"]
+ repo: List[str] = config["repositories"][repo_name]["workspace_root"]
+ rel_path: str = desc["data"]["path"]
if repo[0] == "file":
return os.path.join(repo[1], rel_path)
- fail("Unsupported repository root %r" % (repo,))
+ fail("Unsupported repository root %r" % (repo, ))
+
-def build_tree(desc, *, config, root, graph, ts, callback):
+def build_tree(desc: Json, *, config: Json, root: str, graph: Json,
+ ts: TaskSystem, callback: Callable[..., None]):
tree_id = desc["data"]["id"]
tree_dir = os.path.normpath(os.path.join(root, "TREE", tree_id))
- state = CALLBACKS_PER_ID.add(f"TREE/{tree_id}", callback)
- if state != AtomicListMap.Entry.CREATED: # we are not first
- if state != AtomicListMap.Entry.INSERTED: # tree ready, run callback
+ state: AtomicListMap.Entry = g_CALLBACKS_PER_ID.add(f"TREE/{tree_id}",
+ callback)
+ if state != AtomicListMap.Entry.CREATED: # we are not first
+ if state != AtomicListMap.Entry.INSERTED: # tree ready, run callback
callback(tree_dir)
return
- tree_desc = graph["trees"][tree_id]
+ tree_desc: Json = graph["trees"][tree_id]
num_entries = AtomicInt(len(tree_desc.items()))
- def run_callbacks():
+
+ def run_callbacks() -> None:
if num_entries.fetch_dec() <= 1:
# correctly handle the empty tree
os.makedirs(tree_dir, exist_ok=True)
- for cb in CALLBACKS_PER_ID.fetch_clear(f"TREE/{tree_id}"): # mark ready
- ts.add(cb, tree_dir)
+ vals = g_CALLBACKS_PER_ID.fetch_clear(f"TREE/{tree_id}")
+ if vals:
+ for cb in vals: # mark ready
+ ts.add(cb, tree_dir)
if num_entries.value == 0:
run_callbacks()
for location, desc in tree_desc.items():
- def create_link(location):
- def do_link(path):
+
+ def create_link(location: str) -> Callable[..., None]:
+ def do_link(path: str) -> None:
link(path, os.path.join(tree_dir, location))
run_callbacks()
+
return do_link
- ts.add(build, desc, config=config, root=root, graph=graph, ts=ts, callback=create_link(location)),
-def run_action(action_id, *, config, root, graph, ts, callback):
- action_dir = os.path.normpath(os.path.join(root, "ACTION", action_id))
- state = CALLBACKS_PER_ID.add(f"ACTION/{action_id}", callback)
- if state != AtomicListMap.Entry.CREATED: # we are not first
- if state != AtomicListMap.Entry.INSERTED: # action ready, run callback
+ ts.add(build,
+ desc,
+ config=config,
+ root=root,
+ graph=graph,
+ ts=ts,
+ callback=create_link(location))
+
+
+def run_action(action_id: str, *, config: Json, root: str, graph: Json,
+ ts: TaskSystem, callback: Callable[..., None]) -> None:
+ action_dir: str = os.path.normpath(os.path.join(root, "ACTION", action_id))
+ state: AtomicListMap.Entry = g_CALLBACKS_PER_ID.add(f"ACTION/{action_id}",
+ callback)
+ if state != AtomicListMap.Entry.CREATED: # we are not first
+ if state != AtomicListMap.Entry.INSERTED: # action ready, run callback
callback(action_dir)
return
os.makedirs(action_dir)
- action_desc = graph["actions"][action_id]
+ action_desc: Json = graph["actions"][action_id]
num_deps = AtomicInt(len(action_desc.get("input", {}).items()))
- def run_command_and_callbacks():
+
+ def run_command_and_callbacks() -> None:
if num_deps.fetch_dec() <= 1:
- cmd = action_desc["command"]
+ cmd: List[str] = action_desc["command"]
env = action_desc.get("env")
- log("Running %r with env %r for action %r"
- % (cmd, env, action_id))
+ log("Running %r with env %r for action %r" % (cmd, env, action_id))
for out in action_desc["output"]:
os.makedirs(os.path.join(action_dir, os.path.dirname(out)),
exist_ok=True)
subprocess.run(cmd, env=env, cwd=action_dir, check=True)
- for cb in CALLBACKS_PER_ID.fetch_clear(f"ACTION/{action_id}"): # mark ready
- ts.add(cb, action_dir)
+ vals = g_CALLBACKS_PER_ID.fetch_clear(f"ACTION/{action_id}")
+ if vals:
+ for cb in vals: # mark ready
+ ts.add(cb, action_dir)
if num_deps.value == 0:
run_command_and_callbacks()
for location, desc in action_desc.get("input", {}).items():
- def create_link(location):
- def do_link(path):
+
+ def create_link(location: str) -> Callable[..., None]:
+ def do_link(path: str) -> None:
link(path, os.path.join(action_dir, location))
run_command_and_callbacks()
+
return do_link
- ts.add(build, desc, config=config, root=root, graph=graph, ts=ts, callback=create_link(location))
-def build_action(desc, *, config, root, graph, ts, callback):
- def link_output(action_dir):
+ ts.add(build,
+ desc,
+ config=config,
+ root=root,
+ graph=graph,
+ ts=ts,
+ callback=create_link(location))
+
+
+def build_action(desc: Json, *, config: Json, root: str, graph: Json,
+ ts: TaskSystem, callback: Callable[..., None]) -> None:
+ def link_output(action_dir: str) -> None:
callback(os.path.join(action_dir, desc["data"]["path"]))
- run_action(desc["data"]["id"], config=config, root=root, graph=graph, ts=ts, callback=link_output)
-def build(desc, *, config, root, graph, ts, callback):
+ run_action(desc["data"]["id"],
+ config=config,
+ root=root,
+ graph=graph,
+ ts=ts,
+ callback=link_output)
+
+
+def build(desc: Json, *, config: Json, root: str, graph: Json, ts: TaskSystem,
+ callback: Callable[..., None]) -> None:
if desc["type"] == "TREE":
- build_tree(desc, config=config, root=root, graph=graph, ts=ts, callback=callback)
+ build_tree(desc,
+ config=config,
+ root=root,
+ graph=graph,
+ ts=ts,
+ callback=callback)
elif desc["type"] == "ACTION":
- build_action(desc, config=config, root=root, graph=graph, ts=ts, callback=callback)
+ build_action(desc,
+ config=config,
+ root=root,
+ graph=graph,
+ ts=ts,
+ callback=callback)
elif desc["type"] == "KNOWN":
callback(build_known(desc, root=root))
elif desc["type"] == "LOCAL":
callback(build_local(desc, root=root, config=config))
else:
- fail("Don't know how to build artifact %r" % (desc,))
+ fail("Don't know how to build artifact %r" % (desc, ))
-def traverse(*, graph, to_build, out, root, config):
+
+def traverse(*, graph: Json, to_build: Json, out: str, root: str,
+ config: Json) -> None:
os.makedirs(out, exist_ok=True)
os.makedirs(root, exist_ok=True)
with TaskSystem() as ts:
create_blobs(graph["blobs"], root=root, ts=ts)
ts.finish()
for location, artifact in to_build.items():
- def create_link(location):
+
+ def create_link(location: str) -> Callable[..., None]:
return lambda path: link(path, os.path.join(out, location))
- ts.add(build, artifact, config=config, root=root, graph=graph, ts=ts, callback=create_link(location)),
+
+ ts.add(build,
+ artifact,
+ config=config,
+ root=root,
+ graph=graph,
+ ts=ts,
+ callback=create_link(location))
+
def main():
parser = ArgumentParser()
@@ -321,22 +415,25 @@ def main():
(options, args) = parser.parse_known_args()
if len(args) != 2:
- fail("usage: %r <graph> <targets_to_build>"
- % (sys.argv[0],))
+ fail("usage: %r <graph> <targets_to_build>" % (sys.argv[0], ))
with open(args[0]) as f:
- graph = json.load(f)
+ graph: Json = json.load(f)
with open(args[1]) as f:
- to_build = json.load(f)
- out = os.path.abspath(options.output_directory or "out-boot")
- root = os.path.abspath(options.local_build_root or ".just-boot")
+ to_build: Json = json.load(f)
+ out: str = os.path.abspath(cast(str, options.output_directory
+ or "out-boot"))
+ root: str = os.path.abspath(
+ cast(str, options.local_build_root or ".just-boot"))
with open(options.repository_config or "repo-conf.json") as f:
config = json.load(f)
if options.default_workspace:
- ws_root = os.path.abspath(options.default_workspace)
- repos = config.get("repositories", {}).keys()
+ ws_root: str = os.path.abspath(options.default_workspace)
+ repos: List[str] = config.get("repositories", {}).keys()
for repo in repos:
if not "workspace_root" in config["repositories"][repo]:
- config["repositories"][repo]["workspace_root"] = ["file", ws_root]
+ config["repositories"][repo]["workspace_root"] = [
+ "file", ws_root
+ ]
traverse(graph=graph, to_build=to_build, out=out, root=root, config=config)