diff options
author | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-06-26 08:44:26 +0200 |
---|---|---|
committer | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-08-29 17:18:25 +0200 |
commit | 82e01709b6dc9e457001b15730b5014b4184c106 (patch) | |
tree | 4091b29127f3bbb61bd060b3b42658c805424971 /bin/just-mr.py | |
parent | 53ba5581070bc8914a35a5e16af901c562923e61 (diff) | |
download | justbuild-82e01709b6dc9e457001b15730b5014b4184c106.tar.gz |
just-mr.py: Add type hints, fix style, and improve readability
For maximum compatibility, we use the uppercase types from the
typing package instead of the built-in types, therefore compliant
with PEP 484 and PEP 526.
As unfortunately there is no proper JSON typing option that
requires many casts, we use a more lax typing for JSON inputs, but
enforce return types in order to implicitly infer the actual
format of an input JSON variable (dict, list, string etc.).
Diffstat (limited to 'bin/just-mr.py')
-rwxr-xr-x | bin/just-mr.py | 438 |
1 files changed, 230 insertions, 208 deletions
diff --git a/bin/just-mr.py b/bin/just-mr.py index b97e8dc7..be845f83 100755 --- a/bin/just-mr.py +++ b/bin/just-mr.py @@ -21,38 +21,42 @@ import subprocess import sys import tempfile import time -from typing import Optional +from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast from argparse import ArgumentParser from pathlib import Path -JUST = "just" -JUST_ARGS = {} -ROOT = "/justroot" -SYSTEM_ROOT = os.path.abspath(os.sep) -WORKSPACE_ROOT = None -SETUP_ROOT = None -DISTDIR = [] -MARKERS = [".git", "ROOT", "WORKSPACE"] +# generic JSON type that avoids getter issues; proper use is being enforced by +# return types of methods and typing vars holding return values of json getters +Json = Dict[str, Any] -ALWAYS_FILE = False +g_JUST: str = "just" +g_JUST_ARGS: Dict[str, str] = {} +g_ROOT: str = "/justroot" +SYSTEM_ROOT: str = os.path.abspath(os.sep) +g_WORKSPACE_ROOT: Optional[str] = None +g_SETUP_ROOT: Optional[str] = None +g_DISTDIR: List[str] = [] +MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"] -GIT_CHECKOUT_LOCATIONS_FILE = None -GIT_CHECKOUT_LOCATIONS = {} +g_ALWAYS_FILE: bool = False -TAKE_OVER = [ +g_GIT_CHECKOUT_LOCATIONS_FILE: Optional[str] = None +g_GIT_CHECKOUT_LOCATIONS: Dict[str, str] = {} + +TAKE_OVER: List[str] = [ "bindings", "target_file_name", "rule_file_name", "expression_file_name", ] -ALT_DIRS = [ +ALT_DIRS: List[str] = [ "target_root", "rule_root", "expression_root", ] -GIT_NOBODY_ENV = { +GIT_NOBODY_ENV: Dict[str, str] = { "GIT_AUTHOR_DATE": "1970-01-01T00:00Z", "GIT_AUTHOR_NAME": "Nobody", "GIT_AUTHOR_EMAIL": "nobody@example.org", @@ -63,7 +67,7 @@ GIT_NOBODY_ENV = { "GIT_CONFIG_SYSTEM": "/dev/null", } -KNOWN_JUST_SUBCOMMANDS = { +KNOWN_JUST_SUBCOMMANDS: Json = { "version": { "config": False, "build root": False @@ -97,15 +101,17 @@ KNOWN_JUST_SUBCOMMANDS = { "build root": True } } -CURRENT_SUBCOMMAND = None -LOCATION_TYPES = ["workspace", "home", "system"] +g_CURRENT_SUBCOMMAND: Optional[str] = None + +LOCATION_TYPES: List[str] = ["workspace", "home", "system"] -DEFAULT_RC_PATH = os.path.join(Path.home(), ".just-mrrc") -DEFAULT_BUILD_ROOT = os.path.join(Path.home(), ".cache/just") -DEFAULT_CHECKOUT_LOCATIONS_FILE = os.path.join(Path.home(), ".just-local.json") -DEFAULT_DISTDIRS = [os.path.join(Path.home(), ".distfiles")] -DEFAULT_CONFIG_LOCATIONS = [{ +DEFAULT_RC_PATH: str = os.path.join(Path.home(), ".just-mrrc") +DEFAULT_BUILD_ROOT: str = os.path.join(Path.home(), ".cache/just") +DEFAULT_CHECKOUT_LOCATIONS_FILE: str = os.path.join(Path.home(), + ".just-local.json") +DEFAULT_DISTDIRS: List[str] = [os.path.join(Path.home(), ".distfiles")] +DEFAULT_CONFIG_LOCATIONS: List[Json] = [{ "root": "workspace", "path": "repos.json" }, { @@ -120,16 +126,16 @@ DEFAULT_CONFIG_LOCATIONS = [{ }] -def log(*args, **kwargs): +def log(*args: str, **kwargs: Any) -> None: print(*args, file=sys.stderr, **kwargs) -def fail(s, exit_code=65): +def fail(s: str, exit_code: int = 65) -> None: log(f"Error: {s}") sys.exit(exit_code) -def try_rmtree(tree): +def try_rmtree(tree: str) -> None: for _ in range(10): try: shutil.rmtree(tree) @@ -139,7 +145,7 @@ def try_rmtree(tree): fail("Failed to remove %s" % (tree, )) -def move_to_place(src, dst): +def move_to_place(src: str, dst: str) -> None: os.makedirs(os.path.dirname(dst), exist_ok=True) try: os.rename(src, dst) @@ -150,24 +156,20 @@ def move_to_place(src, dst): fail("%s not present after move" % (dst, )) -def run_cmd(cmd, +def run_cmd(cmd: List[str], *, - env=None, - stdout=subprocess.DEVNULL, - stdin=None, - cwd, - attempts=1): + env: Any = None, + stdout: Any = subprocess.DEVNULL, + stdin: Any = None, + cwd: str, + attempts: int = 1) -> None: + attempts = max(attempts, 1) # at least one attempt for _ in range(attempts): - result = subprocess.run(cmd, - cwd=cwd, - env=env, - stdout=stdout, - stdin=stdin) - if result.returncode == 0: + if subprocess.run(cmd, cwd=cwd, env=env, stdout=stdout, + stdin=stdin).returncode == 0: return time.sleep(1.0) - if result.returncode != 0: - fail("Command %s in %s failed" % (cmd, cwd)) + fail("Command %s in %s failed" % (cmd, cwd)) def find_workspace_root() -> Optional[str]: @@ -177,7 +179,7 @@ def find_workspace_root() -> Optional[str]: return True return False - path = os.getcwd() + path: str = os.getcwd() while True: if is_workspace_root(path): return path @@ -186,26 +188,26 @@ def find_workspace_root() -> Optional[str]: path = os.path.dirname(path) -def read_config(configfile): +def read_config(configfile: str) -> Json: with open(configfile) as f: return json.load(f) -def git_root(*, upstream): - if upstream in GIT_CHECKOUT_LOCATIONS: - return GIT_CHECKOUT_LOCATIONS[upstream] +def git_root(*, upstream: Optional[str]) -> str: + if upstream and upstream in g_GIT_CHECKOUT_LOCATIONS: + return g_GIT_CHECKOUT_LOCATIONS[upstream] elif upstream and os.path.isabs(upstream) and os.path.isdir(upstream): return upstream else: - return os.path.join(ROOT, "git") + return os.path.join(g_ROOT, "git") -def is_cache_git_root(upstream): +def is_cache_git_root(upstream: Optional[str]) -> bool: """return true if upstream is the default git root in the cache directory""" return git_root(upstream=upstream) == git_root(upstream=None) -def git_keep(commit, *, upstream): +def git_keep(commit: str, *, upstream: Optional[str]) -> None: if not is_cache_git_root(upstream): # for those, we assume the referenced commit is kept by # some branch anyway @@ -219,22 +221,22 @@ def git_keep(commit, *, upstream): attempts=3) -def git_init_options(*, upstream): - if upstream in GIT_CHECKOUT_LOCATIONS: +def git_init_options(*, upstream: Optional[str]) -> List[str]: + if upstream in g_GIT_CHECKOUT_LOCATIONS: return [] else: return ["--bare"] -def ensure_git(*, upstream): - root = git_root(upstream=upstream) +def ensure_git(*, upstream: Optional[str]) -> None: + root: str = git_root(upstream=upstream) if os.path.exists(root): return os.makedirs(root) run_cmd(["git", "init"] + git_init_options(upstream=upstream), cwd=root) -def git_commit_present(commit, *, upstream): +def git_commit_present(commit: str, *, upstream: str) -> bool: result = subprocess.run(["git", "show", "--oneline", commit], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, @@ -242,24 +244,25 @@ def git_commit_present(commit, *, upstream): return result.returncode == 0 -def git_url_is_path(url): +def git_url_is_path(url: str) -> bool: for prefix in ["ssh://", "http://", "https://"]: if url.startswith(prefix): return False return True -def git_fetch(*, repo, branch): +def git_fetch(*, repo: str, branch: str) -> None: if git_url_is_path(repo): repo = os.path.abspath(repo) run_cmd(["git", "fetch", repo, branch], cwd=git_root(upstream=repo)) -def subdir_path(checkout, desc): - return os.path.normpath(os.path.join(checkout, desc.get("subdir", "."))) +def subdir_path(checkout: str, desc: Json) -> str: + return os.path.normpath( + os.path.join(checkout, cast(str, desc.get("subdir", ".")))) -def git_tree(*, commit, subdir, upstream): +def git_tree(*, commit: str, subdir: str, upstream: str) -> str: tree = subprocess.run( ["git", "log", "-n", "1", "--format=%T", commit], stdout=subprocess.PIPE, @@ -267,7 +270,7 @@ def git_tree(*, commit, subdir, upstream): return git_subtree(tree=tree, subdir=subdir, upstream=upstream) -def git_subtree(*, tree, subdir, upstream): +def git_subtree(*, tree: str, subdir: str, upstream: Optional[str]) -> str: if subdir == ".": return tree return subprocess.Popen( @@ -279,29 +282,29 @@ def git_subtree(*, tree, subdir, upstream): (tree, subdir)).encode())[0].decode('utf-8').strip() -def git_checkout_dir(commit): - return os.path.join(ROOT, "workspaces", "git", commit) +def git_checkout_dir(commit: str) -> str: + return os.path.join(g_ROOT, "workspaces", "git", commit) -def git_checkout(desc): - commit = desc["commit"] - target = git_checkout_dir(commit) - if ALWAYS_FILE and os.path.exists(target): +def git_checkout(desc: Json) -> List[str]: + commit: str = desc["commit"] + target: str = git_checkout_dir(commit) + if g_ALWAYS_FILE and os.path.exists(target): return ["file", subdir_path(target, desc)] - repo = desc["repository"] + repo: str = desc["repository"] if git_url_is_path(repo): repo = os.path.abspath(repo) - root = git_root(upstream=repo) + root: str = git_root(upstream=repo) ensure_git(upstream=repo) if not git_commit_present(commit, upstream=repo): - branch = desc["branch"] + branch: str = desc["branch"] log("Fetching %s from %s (in %s)" % (branch, repo, root)) git_fetch(repo=repo, branch=branch) if not git_commit_present(commit, upstream=repo): fail("Fetching %s from %s failed to fetch %s" % (branch, repo, commit)) git_keep(commit, upstream=repo) - if ALWAYS_FILE: + if g_ALWAYS_FILE: if os.path.exists(target): try_rmtree(target) os.makedirs(target) @@ -310,21 +313,21 @@ def git_checkout(desc): f.seek(0) run_cmd(["tar", "x"], cwd=target, stdin=f) return ["file", subdir_path(target, desc)] - tree = git_tree(commit=commit, - subdir=desc.get("subdir", "."), - upstream=repo) + tree: str = git_tree(commit=commit, + subdir=desc.get("subdir", "."), + upstream=repo) return ["git tree", tree, root] -def update_git(desc): - repo = desc["repository"] - branch = desc["branch"] +def update_git(desc: Json) -> None: + repo: str = desc["repository"] + branch: str = desc["branch"] lsremote = subprocess.run(["git", "ls-remote", repo, branch], stdout=subprocess.PIPE).stdout desc["commit"] = lsremote.decode('utf-8').split('\t')[0] -def git_hash(content): +def git_hash(content: bytes) -> str: header = "blob {}\0".format(len(content)).encode('utf-8') h = hashlib.sha1() h.update(header) @@ -332,12 +335,12 @@ def git_hash(content): return h.hexdigest() -def add_to_cas(data): +def add_to_cas(data: Union[str, bytes]) -> str: if isinstance(data, str): data = data.encode('utf-8') h = git_hash(data) cas_root = os.path.join( - ROOT, f"protocol-dependent/generation-0/git-sha1/casf/{h[0:2]}") + g_ROOT, f"protocol-dependent/generation-0/git-sha1/casf/{h[0:2]}") basename = h[2:] target = os.path.join(cas_root, basename) tempname = os.path.join(cas_root, "%s.%d" % (basename, os.getpid())) @@ -356,50 +359,51 @@ def add_to_cas(data): return target -def cas_path(h): +def cas_path(h: str) -> str: return os.path.join( - ROOT, f"protocol-dependent/generation-0/git-sha1/casf/{h[0:2]}", h[2:]) + g_ROOT, f"protocol-dependent/generation-0/git-sha1/casf/{h[0:2]}", + h[2:]) -def is_in_cas(h): +def is_in_cas(h: str) -> bool: return os.path.exists(cas_path(h)) -def add_file_to_cas(filename): +def add_file_to_cas(filename: str) -> None: # TODO: avoid going through memory with open(filename, "rb") as f: data = f.read() - add_to_cas(data) + add_to_cas(data) -def add_distfile_to_cas(distfile): - for d in DISTDIR: +def add_distfile_to_cas(distfile: str) -> None: + for d in g_DISTDIR: candidate = os.path.join(d, distfile) if os.path.exists(candidate): add_file_to_cas(candidate) -def archive_checkout_dir(content, repo_type): - return os.path.join(ROOT, "workspaces", repo_type, content) +def archive_checkout_dir(content: str, repo_type: str) -> str: + return os.path.join(g_ROOT, "workspaces", repo_type, content) -def archive_tmp_checkout_dir(content, repo_type): - return os.path.join(ROOT, "tmp-workspaces", repo_type, +def archive_tmp_checkout_dir(content: str, repo_type: str) -> str: + return os.path.join(g_ROOT, "tmp-workspaces", repo_type, "%d-%s" % (os.getpid(), content)) -def archive_tree_id_file(content, repo_type): - return os.path.join(ROOT, "tree-map", repo_type, content) +def archive_tree_id_file(content: str, repo_type: str) -> str: + return os.path.join(g_ROOT, "tree-map", repo_type, content) -def get_distfile(desc): +def get_distfile(desc: Json) -> str: distfile = desc.get("distfile") if not distfile: - distfile = os.path.basename(desc.get("fetch")) + distfile = os.path.basename(cast(str, desc.get("fetch"))) return distfile -def archive_fetch(desc, content): +def archive_fetch(desc: Json, content: str) -> None: """ Makes sure archive is available and accounted for in cas """ if not is_in_cas(content): @@ -407,7 +411,7 @@ def archive_fetch(desc, content): if distfile: add_distfile_to_cas(distfile) if not is_in_cas(content): - url = desc["fetch"] + url: str = desc["fetch"] data = subprocess.run(["wget", "-O", "-", url], stdout=subprocess.PIPE).stdout if "sha256" in desc: @@ -425,13 +429,13 @@ def archive_fetch(desc, content): fail("Failed to fetch a file with id %s from %s" % (content, url)) -def archive_checkout(desc, repo_type="archive"): - content_id = desc["content"] - target = archive_checkout_dir(content_id, repo_type=repo_type) - if ALWAYS_FILE and os.path.exists(target): +def archive_checkout(desc: Json, repo_type: str = "archive") -> List[str]: + content_id: str = desc["content"] + target: str = archive_checkout_dir(content_id, repo_type=repo_type) + if g_ALWAYS_FILE and os.path.exists(target): return ["file", subdir_path(target, desc)] - tree_id_file = archive_tree_id_file(content_id, repo_type=repo_type) - if (not ALWAYS_FILE) and os.path.exists(tree_id_file): + tree_id_file: str = archive_tree_id_file(content_id, repo_type=repo_type) + if (not g_ALWAYS_FILE) and os.path.exists(tree_id_file): with open(tree_id_file) as f: archive_tree_id = f.read() # ensure git cache exists @@ -444,7 +448,7 @@ def archive_checkout(desc, repo_type="archive"): git_root(upstream=None), ] archive_fetch(desc, content=content_id) - target_tmp = archive_tmp_checkout_dir(content_id, repo_type=repo_type) + target_tmp: str = archive_tmp_checkout_dir(content_id, repo_type=repo_type) if os.path.exists(target_tmp): try_rmtree(target_tmp) os.makedirs(target_tmp) @@ -464,10 +468,10 @@ def archive_checkout(desc, repo_type="archive"): except: print("Failed to extract tarball %s" % (cas_path(content_id), )) sys.exit(1) - if ALWAYS_FILE: + if g_ALWAYS_FILE: move_to_place(target_tmp, target) return ["file", subdir_path(target, desc)] - tree = import_to_git(target_tmp, repo_type, content_id) + tree: str = import_to_git(target_tmp, repo_type, content_id) try_rmtree(target_tmp) os.makedirs(os.path.dirname(tree_id_file), exist_ok=True) with open(tree_id_file, "w") as f: @@ -479,7 +483,7 @@ def archive_checkout(desc, repo_type="archive"): ] -def import_to_git(target, repo_type, content_id): +def import_to_git(target: str, repo_type: str, content_id: str) -> str: run_cmd( ["git", "init"], cwd=target, @@ -508,7 +512,7 @@ def import_to_git(target, repo_type, content_id): cwd=target).stdout.decode('utf-8').strip() -def file_as_git(fpath): +def file_as_git(fpath: str) -> List[str]: root_result = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=fpath, stdout=subprocess.PIPE, @@ -517,10 +521,10 @@ def file_as_git(fpath): # TODO: consider also doing this for pending changes # copy non-git paths to tmp-workspace and import to git fpath = os.path.realpath(fpath) - if CURRENT_SUBCOMMAND: + if g_CURRENT_SUBCOMMAND: log(f"""\ Warning: Inefficient Git import of file path '{fpath}'. - Please consider using 'just-mr setup' and 'just {CURRENT_SUBCOMMAND}' + Please consider using 'just-mr setup' and 'just {g_CURRENT_SUBCOMMAND}' separately to cache the output.""") target = archive_tmp_checkout_dir(os.path.relpath(fpath, "/"), repo_type="file") @@ -541,14 +545,18 @@ Warning: Inefficient Git import of file path '{fpath}'. ] -def file_checkout(desc): +def file_checkout(desc: Json) -> List[str]: fpath = os.path.abspath(desc["path"]) - if desc.get("pragma", {}).get("to_git") and not ALWAYS_FILE: + if desc.get("pragma", {}).get("to_git") and not g_ALWAYS_FILE: return file_as_git(fpath) return ["file", os.path.abspath(fpath)] -def resolve_repo(desc, *, seen=None, repos): +# strongly imposing typing on this method is complicated, so resort to Any +def resolve_repo(desc: Any, + *, + seen: Optional[List[str]] = None, + repos: Json) -> Json: seen = seen or [] if not isinstance(desc, str): return desc @@ -559,37 +567,37 @@ def resolve_repo(desc, *, seen=None, repos): repos=repos) -def distdir_repo_dir(content): - return os.path.join(ROOT, "distdir", content) +def distdir_repo_dir(content: str) -> str: + return os.path.join(g_ROOT, "distdir", content) -def distdir_tmp_dir(content): - return os.path.join(ROOT, "tmp-workspaces", "distdir", +def distdir_tmp_dir(content: str) -> str: + return os.path.join(g_ROOT, "tmp-workspaces", "distdir", "%d-%s" % (os.getpid(), content)) -def distdir_tree_id_file(content): - return os.path.join(ROOT, "distfiles-tree-map", content) +def distdir_tree_id_file(content: str) -> str: + return os.path.join(g_ROOT, "distfiles-tree-map", content) -def distdir_checkout(desc, repos): +def distdir_checkout(desc: Json, repos: Json): """ Logic for processing the distdir repo type. """ - content = {} + content: Dict[str, str] = {} # All repos in distdir list are considered, irrespective of reachability - distdir_repos = desc.get("repositories", []) + distdir_repos: List[str] = desc.get("repositories", []) for repo in distdir_repos: # If repo does not exist, fail if repo not in repos: print("No configuration for repository %s found" % (repo, )) sys.exit(1) - repo_desc = repos[repo].get("repository", {}) + repo_desc: Json = repos[repo].get("repository", {}) repo_desc_type = repo_desc.get("type") # Only do work for archived types if repo_desc_type in ["archive", "zip"]: # fetch repo - content_id = repo_desc["content"] + content_id: str = repo_desc["content"] # Store the relevant info in the map content[get_distfile(repo_desc)] = content_id @@ -600,10 +608,10 @@ def distdir_checkout(desc, repos): target_distdir_dir = distdir_repo_dir(distdir_content_id) # Check if content already exists - if ALWAYS_FILE and os.path.exists(target_distdir_dir): + if g_ALWAYS_FILE and os.path.exists(target_distdir_dir): return ["file", target_distdir_dir] tree_id_file = distdir_tree_id_file(distdir_content_id) - if (not ALWAYS_FILE) and os.path.exists(tree_id_file): + if (not g_ALWAYS_FILE) and os.path.exists(tree_id_file): with open(tree_id_file) as f: distdir_tree_id = f.read() # ensure git cache exists @@ -632,7 +640,7 @@ def distdir_checkout(desc, repos): target = os.path.join(target_tmp_dir, name) os.link(cas_path(content_id), target) - if (ALWAYS_FILE): + if g_ALWAYS_FILE: # Return with path to distdir repo move_to_place(target_tmp_dir, target_distdir_dir) return ["file", target_distdir_dir] @@ -654,13 +662,13 @@ def distdir_checkout(desc, repos): ] -def checkout(desc, *, name, repos): +def checkout(desc: Json, *, name: str, repos: Json) -> Optional[List[str]]: repo_desc = resolve_repo(desc, repos=repos) repo_type = repo_desc.get("type") if repo_type == "git": return git_checkout(repo_desc) if repo_type in ["archive", "zip"]: - return archive_checkout(repo_desc, repo_type=repo_type) + return archive_checkout(repo_desc, repo_type=cast(str, repo_type)) if repo_type == "file": return file_checkout(repo_desc) if repo_type == "distdir": @@ -668,11 +676,12 @@ def checkout(desc, *, name, repos): fail("Unknown repository type %s for %s" % (repo_type, name)) -def reachable_repositories(repo, *, repos): +def reachable_repositories(repo: str, *, + repos: Json) -> Tuple[Set[str], Set[str]]: # First compute the set of repositories transitively reachable via bindings - reachable = set() + reachable: Set[str] = set() - def traverse(x): + def traverse(x: str) -> None: nonlocal reachable if x in reachable: return @@ -695,14 +704,18 @@ def reachable_repositories(repo, *, repos): return reachable, to_fetch -def setup(*, config, main, setup_all=False, interactive=False): - if SETUP_ROOT: - cwd = os.getcwd() - os.chdir(SETUP_ROOT) - repos = config.get("repositories", {}) +def setup(*, + config: Json, + main: Optional[str], + setup_all: bool = False, + interactive: bool = False) -> str: + cwd: str = os.getcwd() + if g_SETUP_ROOT: + os.chdir(g_SETUP_ROOT) + repos: Json = config.get("repositories", {}) repos_to_setup = repos.keys() repos_to_include = repos.keys() - mr_config = {} + mr_config: Json = {} if main == None: main = config.setdefault("main", None) @@ -720,7 +733,7 @@ def setup(*, config, main, setup_all=False, interactive=False): repos_to_include, repos_to_setup = reachable_repositories(main, repos=repos) - mr_repos = {} + mr_repos: Json = {} for repo in repos_to_setup: desc = repos[repo] if repo == main and interactive: @@ -749,49 +762,52 @@ def setup(*, config, main, setup_all=False, interactive=False): if val == main and interactive: continue mr_repos[repo][key] = mr_repos[val]["workspace_root"] - mr_repos_actual = {} + mr_repos_actual: Json = {} for repo in repos_to_include: mr_repos_actual[repo] = mr_repos[repo] mr_config["repositories"] = mr_repos_actual - if SETUP_ROOT: + if g_SETUP_ROOT: os.chdir(cwd) return add_to_cas(json.dumps(mr_config, indent=2, sort_keys=True)) -def call_just(*, config, main, args): +def call_just(*, config: Json, main: Optional[str], args: List[str]) -> None: subcommand = args[0] if len(args) > 0 else None args = args[1:] - use_config = False - use_build_root = False - if subcommand in KNOWN_JUST_SUBCOMMANDS: + use_config: bool = False + use_build_root: bool = False + setup_config = config # default to given config path + if subcommand and subcommand in KNOWN_JUST_SUBCOMMANDS: if KNOWN_JUST_SUBCOMMANDS[subcommand]["config"]: use_config = True - global CURRENT_SUBCOMMAND - CURRENT_SUBCOMMAND = subcommand - config = setup(config=config, main=main) - CURRENT_SUBCOMMAND = None + global g_CURRENT_SUBCOMMAND + g_CURRENT_SUBCOMMAND = subcommand + setup_config = setup(config=config, main=main) + g_CURRENT_SUBCOMMAND = None use_build_root = KNOWN_JUST_SUBCOMMANDS[subcommand]["build root"] - cmd = [JUST] + if not setup_config: + fail("Setup failed") + cmd: List[str] = [g_JUST] cmd += [subcommand] if subcommand else [] - cmd += ["-C", config] if use_config else [] - cmd += ["--local-build-root", ROOT] if use_build_root else [] - if subcommand in JUST_ARGS: - cmd += JUST_ARGS[subcommand] + cmd += ["-C", cast(str, setup_config)] if use_config else [] + cmd += ["--local-build-root", g_ROOT] if use_build_root else [] + if subcommand and subcommand in g_JUST_ARGS: + cmd += g_JUST_ARGS[subcommand] cmd += args log("Setup finished, exec %s" % (cmd, )) try: - os.execvp(JUST, cmd) + os.execvp(g_JUST, cmd) except Exception as e: - log(e) + log(str(e)) finally: fail(f"exec failed", 64) -def update(*, config, repos): +def update(*, config: Json, repos: Json) -> None: for repo in repos: - desc = config["repositories"][repo]["repository"] + desc: Json = config["repositories"][repo]["repository"] desc = resolve_repo(desc, repos=config["repositories"]) repo_type = desc.get("type") if repo_type == "git": @@ -802,14 +818,18 @@ def update(*, config, repos): sys.exit(0) -def fetch(*, config, fetch_dir, main, fetch_all=False): +def fetch(*, + config: Json, + fetch_dir: Optional[str], + main: Optional[str], + fetch_all: bool = False) -> None: if not fetch_dir: - for d in DISTDIR: + for d in g_DISTDIR: if os.path.isdir(d): fetch_dir = os.path.abspath(d) break if not fetch_dir: - fail("No directory found to fetch to, considered %r" % (DISTDIR, )) + fail("No directory found to fetch to, considered %r" % (g_DISTDIR, )) repos = config["repositories"] repos_to_fetch = repos.keys() @@ -820,19 +840,20 @@ def fetch(*, config, fetch_dir, main, fetch_all=False): if main != None: repos_to_fetch = reachable_repositories(main, repos=repos)[0] - def is_subpath(path, base): + def is_subpath(path: str, base: str) -> bool: return os.path.commonpath([path, base]) == base # warn if fetch_dir is in invocation workspace - if WORKSPACE_ROOT and is_subpath(fetch_dir, WORKSPACE_ROOT): + if g_WORKSPACE_ROOT and is_subpath(cast(str, fetch_dir), + g_WORKSPACE_ROOT): repo = repos.get(main, {}).get("repository", {}) repo_path = repo.get("path", None) if repo_path != None and repo.get("type", None) == "file": if not os.path.isabs(repo_path): repo_path = os.path.realpath( - os.path.join(SETUP_ROOT, repo_path)) + os.path.join(cast(str, g_SETUP_ROOT), repo_path)) # only warn if repo workspace differs to invocation workspace - if not is_subpath(repo_path, WORKSPACE_ROOT): + if not is_subpath(repo_path, g_WORKSPACE_ROOT): log(f"""\ Warning: Writing distribution files to workspace location '{fetch_dir}', which is different to the workspace of the requested main repository @@ -851,12 +872,12 @@ Warning: Writing distribution files to workspace location '{fetch_dir}', print("%r --> %r (content: %s)" % (repo, distfile, content)) archive_fetch(repo_desc, content) shutil.copyfile(cas_path(content), - os.path.join(fetch_dir, distfile)) + os.path.join(cast(str, fetch_dir), distfile)) sys.exit(0) -def read_location(location): +def read_location(location: Json) -> Optional[Tuple[str, str]]: root = location.setdefault("root", None) path = location.setdefault("path", None) base = location.setdefault("base", ".") @@ -865,23 +886,23 @@ def read_location(location): fail(f"malformed location object: {location}") if root == "workspace": - if not WORKSPACE_ROOT: + if not g_WORKSPACE_ROOT: log(f"Warning: Not in workspace root, ignoring location %s." % (location, )) return None - root = WORKSPACE_ROOT + root = g_WORKSPACE_ROOT if root == "home": root = Path.home() if root == "system": root = SYSTEM_ROOT - return os.path.realpath(os.path.join(root, path)), os.path.realpath( - os.path.join(root, base)) + return os.path.realpath(os.path.join(cast(str, root), cast( + str, path))), os.path.realpath(os.path.join(cast(str, root), base)) # read settings from just-mrrc and return config path -def read_justmrrc(rcpath, no_rc=False): - rc = {} +def read_justmrrc(rcpath: str, no_rc: bool = False) -> Optional[str]: + rc: Json = {} if not no_rc: if not rcpath: rcpath = DEFAULT_RC_PATH @@ -891,46 +912,46 @@ def read_justmrrc(rcpath, no_rc=False): with open(rcpath) as f: rc = json.load(f) - location = rc.get("local build root", None) + location: Optional[Json] = rc.get("local build root", None) build_root = read_location(location) if location else None if build_root: - global ROOT - ROOT = build_root[0] + global g_ROOT + g_ROOT = build_root[0] location = rc.get("checkout locations", None) checkout = read_location(location) if location else None if checkout: - global GIT_CHECKOUT_LOCATIONS_FILE + global g_GIT_CHECKOUT_LOCATIONS_FILE if not os.path.isfile(checkout[0]): fail(f"cannot find checkout locations files {checkout[0]}.") - GIT_CHECKOUT_LOCATIONS_FILE = checkout[0] + g_GIT_CHECKOUT_LOCATIONS_FILE = checkout[0] location = rc.get("distdirs", None) if location: - global DISTDIR - DISTDIR = [] + global g_DISTDIR + g_DISTDIR = [] for l in location: - paths = read_location(l) + paths = read_location(cast(Json, l)) if paths: if os.path.isdir(paths[0]): - DISTDIR.append(paths[0]) + g_DISTDIR.append(paths[0]) else: log(f"Warning: Ignoring non-existing distdir {paths[0]}.") location = rc.get("just", None) just = read_location(location) if location else None if just: - global JUST - JUST = just[0] + global g_JUST + g_JUST = just[0] - global JUST_ARGS - JUST_ARGS = rc.get("just args", {}) + global g_JUST_ARGS + g_JUST_ARGS = rc.get("just args", {}) for location in rc.get("config lookup order", DEFAULT_CONFIG_LOCATIONS): - paths = read_location(location) + paths = read_location(cast(Json, location)) if paths and os.path.isfile(paths[0]): - global SETUP_ROOT - SETUP_ROOT = paths[1] + global g_SETUP_ROOT + g_SETUP_ROOT = paths[1] return paths[0] return None @@ -1033,13 +1054,13 @@ def main(): (options, args) = parser.parse_known_args() - global ROOT, GIT_CHECKOUT_LOCATIONS_FILE, DISTDIR, SETUP_ROOT, WORKSPACE_ROOT - ROOT = DEFAULT_BUILD_ROOT + global g_ROOT, g_GIT_CHECKOUT_LOCATIONS_FILE, g_DISTDIR, g_SETUP_ROOT, g_WORKSPACE_ROOT + g_ROOT = DEFAULT_BUILD_ROOT if os.path.isfile(DEFAULT_CHECKOUT_LOCATIONS_FILE): - GIT_CHECKOUT_LOCATIONS_FILE = DEFAULT_CHECKOUT_LOCATIONS_FILE - DISTDIR = DEFAULT_DISTDIRS - SETUP_ROOT = os.path.curdir - WORKSPACE_ROOT = find_workspace_root() + g_GIT_CHECKOUT_LOCATIONS_FILE = DEFAULT_CHECKOUT_LOCATIONS_FILE + g_DISTDIR = DEFAULT_DISTDIRS + g_SETUP_ROOT = os.path.curdir + g_WORKSPACE_ROOT = find_workspace_root() config_file = read_justmrrc(options.rcfile, options.norc) if options.repository_config: @@ -1047,31 +1068,32 @@ def main(): if not config_file: fail("cannot find repository configuration.") - config = read_config(config_file) + config = read_config(cast(str, config_file)) if options.local_build_root: - ROOT = os.path.abspath(options.local_build_root) + g_ROOT = os.path.abspath(options.local_build_root) if options.checkout_location: if not os.path.isfile(options.checkout_location): fail("cannot find checkout locations file %s" % (options.checkout_location, )) - GIT_CHECKOUT_LOCATIONS_FILE = os.path.abspath(options.checkout_location) + g_GIT_CHECKOUT_LOCATIONS_FILE = os.path.abspath( + options.checkout_location) - if GIT_CHECKOUT_LOCATIONS_FILE: - with open(GIT_CHECKOUT_LOCATIONS_FILE) as f: - global GIT_CHECKOUT_LOCATIONS - GIT_CHECKOUT_LOCATIONS = json.load(f).get("checkouts", - {}).get("git", {}) + if g_GIT_CHECKOUT_LOCATIONS_FILE: + with open(g_GIT_CHECKOUT_LOCATIONS_FILE) as f: + global g_GIT_CHECKOUT_LOCATIONS + g_GIT_CHECKOUT_LOCATIONS = json.load(f).get("checkouts", + {}).get("git", {}) if options.distdir: - DISTDIR += options.distdir + g_DISTDIR += options.distdir - global JUST + global g_JUST if options.just: - JUST = options.just - global ALWAYS_FILE - ALWAYS_FILE = options.always_file + g_JUST = options.just + global g_ALWAYS_FILE + g_ALWAYS_FILE = options.always_file sub_main = options.sub_main if hasattr(options, "sub_main") else None sub_all = options.sub_all if hasattr(options, "sub_all") else None @@ -1079,7 +1101,7 @@ def main(): print( "Warning: conflicting options for main repository, selecting '%s'." % (sub_main, )) - main = sub_main or options.main + main: Optional[str] = sub_main or options.main if options.subcommand in KNOWN_JUST_SUBCOMMANDS: call_just(config=config, main=main, args=[options.subcommand] + args) |