diff options
author | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-06-26 11:56:07 +0200 |
---|---|---|
committer | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-08-29 17:19:00 +0200 |
commit | 0d8aeb4655446d05ecbb3fc3336ebe02741e91bf (patch) | |
tree | e381bd76932c8b0f56e1efbba5d28e202619c772 /bin/just-import-git.py | |
parent | 0e17643a522756caa78c4ddf7226c77c66185c03 (diff) | |
download | justbuild-0d8aeb4655446d05ecbb3fc3336ebe02741e91bf.tar.gz |
just-import-git.py: Add type hints and fix style
For maximum compatibility, we use the uppercase types from the
typing package instead of the built-in types, therefore compliant
with PEP 484 and PEP 526.
As unfortunately there is no proper JSON typing option that
requires many casts, we use a more lax typing for JSON inputs, but
enforce return types in order to implicitly infer the actual
format of an input JSON variable (dict, list, string etc.).
Diffstat (limited to 'bin/just-import-git.py')
-rwxr-xr-x | bin/just-import-git.py | 230 |
1 files changed, 134 insertions, 96 deletions
diff --git a/bin/just-import-git.py b/bin/just-import-git.py index 19b21539..217e8dc3 100755 --- a/bin/just-import-git.py +++ b/bin/just-import-git.py @@ -20,20 +20,28 @@ import shutil import sys import tempfile -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, cast -def log(*args, **kwargs): +# generic JSON type that avoids getter issues; proper use is being enforced by +# return types of methods and typing vars holding return values of json getters +Json = Dict[str, Any] + + +def log(*args: str, **kwargs: Any) -> None: print(*args, file=sys.stderr, **kwargs) -def fail(s, exit_code=1): + +def fail(s: str, exit_code: int = 1): log(f"Error: {s}") sys.exit(exit_code) -MARKERS = [".git", "ROOT", "WORKSPACE"] -SYSTEM_ROOT = os.path.abspath(os.sep) -DEFAULT_CONFIG_LOCATIONS = [{ + +MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"] +SYSTEM_ROOT: str = os.path.abspath(os.sep) +DEFAULT_CONFIG_LOCATIONS: List[Dict[str, str]] = [{ "root": "workspace", "path": "repos.json" }, { @@ -47,23 +55,21 @@ DEFAULT_CONFIG_LOCATIONS = [{ "path": "etc/just-repos.json" }] -def run_cmd(cmd, + +def run_cmd(cmd: List[str], *, - env=None, - stdout=subprocess.DEVNULL, - stdin=None, - cwd): - result = subprocess.run(cmd, - cwd=cwd, - env=env, - stdout=stdout, - stdin=stdin) + env: Optional[Any] = None, + stdout: Optional[Any] = subprocess.DEVNULL, + stdin: Optional[Any] = None, + cwd: str): + result = subprocess.run(cmd, cwd=cwd, env=env, stdout=stdout, stdin=stdin) if result.returncode != 0: fail("Command %s in %s failed" % (cmd, cwd)) return result.stdout -def find_workspace_root(path=None): - def is_workspace_root(path): + +def find_workspace_root(path: Optional[str] = None) -> Optional[str]: + def is_workspace_root(path: str) -> bool: for m in MARKERS: if os.path.exists(os.path.join(path, m)): return True @@ -79,7 +85,7 @@ def find_workspace_root(path=None): path = os.path.dirname(path) -def read_location(location, root=None): +def read_location(location: Dict[str, str], root: Optional[str] = None) -> str: search_root = location.get("root", None) search_path = location.get("path", None) @@ -96,95 +102,113 @@ def read_location(location, root=None): fs_root = SYSTEM_ROOT if fs_root: - return os.path.realpath(os.path.join(fs_root, search_path)) - return "/" # certainly not a file + return os.path.realpath( + os.path.join(cast(str, fs_root), cast(str, search_path))) + return "/" # certainly not a file + -def get_repository_config_file(root=None): +def get_repository_config_file(root: Optional[str] = None) -> Optional[str]: for location in DEFAULT_CONFIG_LOCATIONS: path = read_location(location, root=root) if path and os.path.isfile(path): return path -def get_base_config(repository_config): +def get_base_config(repository_config: Optional[str]) -> Optional[Json]: if repository_config == "-": return json.load(sys.stdin) if not repository_config: repository_config = get_repository_config_file() - with open(repository_config) as f: - return json.load(f) + if (repository_config): + with open(repository_config) as f: + return json.load(f) + fail('Could not get base config') -def clone(url, branch): + +def clone(url: str, branch: str) -> Tuple[str, Dict[str, str], str]: # clone the given git repository, checkout the specified # branch, and return the checkout location - workdir = tempfile.mkdtemp() + workdir: str = tempfile.mkdtemp() run_cmd(["git", "clone", "-b", branch, "--depth", "1", url, "src"], cwd=workdir) - srcdir = os.path.join(workdir, "src") - commit = run_cmd(["git", "log", "-n", "1", "--pretty=%H"], - cwd = srcdir, stdout=subprocess.PIPE).decode('utf-8').strip() - log("Importing commit %s" % (commit,)) - repo = { "type": "git", - "repository": url, - "branch": branch, - "commit": commit, - } + srcdir: str = os.path.join(workdir, "src") + commit: str = run_cmd(["git", "log", "-n", "1", "--pretty=%H"], + cwd=srcdir, + stdout=subprocess.PIPE).decode('utf-8').strip() + log("Importing commit %s" % (commit, )) + repo: Dict[str, str] = { + "type": "git", + "repository": url, + "branch": branch, + "commit": commit, + } return srcdir, repo, workdir -def get_repo_to_import(config): + +def get_repo_to_import(config: Json) -> Optional[str]: """From a given repository config, take the main repository.""" if config.get("main") is not None: - return config.get("main") + return cast(str, config.get("main")) repos = config.get("repositories", {}).keys() if repos: return sorted(repos)[0] fail("Config does not contain any repositories; unsure what to import") -def repos_to_import(repos_config, entry, known=None): + +def repos_to_import(repos_config: Json, + entry: str, + known: Optional[Iterable[str]] = None) -> List[str]: """Compute the set of transitively reachable repositories""" - visited = set() - to_import = [] + visited: Set[str] = set() + to_import: List[str] = [] if known: visited = set(known) - def visit(name): + def visit(name: str) -> None: if name in visited: return to_import.append(name) visited.add(name) - for n in repos_config.get(name, {}).get("bindings", {}).values(): + vals = cast(Dict[str, str], + repos_config.get(name, {}).get("bindings", {})).values() + for n in vals: visit(n) visit(entry) return to_import -def extra_layers_to_import(repos_config, repos): + +def extra_layers_to_import(repos_config: Json, repos: List[str]) -> List[str]: """Compute the collection of repositories additionally needed as they serve as layers for the repositories to import.""" - extra_imports = set() + extra_imports: Set[str] = set() for repo in repos: if isinstance(repos_config[repo]["repository"], str): extra_imports.add(repos_config[repo]["repository"]) for layer in ["target_root", "rule_root", "expression_root"]: if layer in repos_config[repo]: - extra = repos_config[repo][layer] + extra: str = repos_config[repo][layer] if (extra not in repos) and (extra not in extra_imports): extra_imports.add(extra) return list(extra_imports) -def name_imports(to_import, existing, base_name, main=None): + +def name_imports(to_import: List[str], + existing: Set[str], + base_name: str, + main: Optional[str] = None) -> Dict[str, str]: """Assign names to the repositories to import in such a way that no conflicts arise.""" - assign = {} + assign: Dict[str, str] = {} - def find_name(name): - base = "%s/%s" % (base_name, name) + def find_name(name: str) -> str: + base: str = "%s/%s" % (base_name, name) if (base not in existing) and (base not in assign): return base - count = 0 + count: int = 0 while True: count += 1 - candidate = base + " (%d)" % count + candidate: str = base + " (%d)" % count if (candidate not in existing) and (candidate not in assign): return candidate @@ -195,8 +219,10 @@ def name_imports(to_import, existing, base_name, main=None): assign[repo] = find_name(repo) return assign -def rewrite_repo(repo_spec, *, remote, assign): - new_spec = {} + +def rewrite_repo(repo_spec: Json, *, remote: Dict[str, str], + assign: Json) -> Json: + new_spec: Json = {} repo = repo_spec.get("repository", {}) if isinstance(repo, str): repo = assign[repo] @@ -207,7 +233,8 @@ def rewrite_repo(repo_spec, *, remote, assign): changes["subdir"] = subdir repo = dict(remote, **changes) elif repo.get("type") == "distdir": - new_repos = [assign[k] for k in repo.get("repositories", [])] + existing_repos: List[str] = repo.get("repositories", []) + new_repos = [assign[k] for k in existing_repos] repo = dict(repo, **{"repositories": new_repos}) new_spec["repository"] = repo for key in ["target_root", "rule_root", "expression_root"]: @@ -224,89 +251,100 @@ def rewrite_repo(repo_spec, *, remote, assign): new_spec["bindings"] = new_bindings return new_spec -def handle_import(args): - base_config = get_base_config(args.repository_config) - base_repos = base_config.get("repositories", {}) + +def handle_import(args: Namespace) -> Json: + base_config: Json = cast(Json, get_base_config(args.repository_config)) + base_repos: Json = base_config.get("repositories", {}) srcdir, remote, to_cleanup = clone(args.URL, args.branch) if args.foreign_repository_config: foreign_config_file = args.foreign_repository_config else: foreign_config_file = get_repository_config_file(srcdir) + foreign_config: Json = {} if args.plain: - foreign_config = { "main": "", - "repositories": {"": {"repository": - {"type": "file", - "path": "." }}}} + foreign_config = { + "main": "", + "repositories": { + "": { + "repository": { + "type": "file", + "path": "." + } + } + } + } else: - with open(foreign_config_file) as f: - foreign_config = json.load(f) - foreign_repos = foreign_config.get("repositories", {}) + if (foreign_config_file): + with open(foreign_config_file) as f: + foreign_config = json.load(f) + else: + fail('Could not get repository config file') + foreign_repos: Json = foreign_config.get("repositories", {}) + foreign_name: Optional[str] = None if args.foreign_repository_name: foreign_name = args.foreign_repository_name else: foreign_name = get_repo_to_import(foreign_config) - import_map = {} + import_map: Json = {} for theirs, ours in args.import_map: import_map[theirs] = ours - main_repos = repos_to_import(foreign_repos, foreign_name, import_map.keys()) + main_repos: List[str] = [] + if (foreign_name): + main_repos = repos_to_import(foreign_repos, foreign_name, + import_map.keys()) extra_repos = sorted([x for x in main_repos if x != foreign_name]) extra_imports = sorted(extra_layers_to_import(foreign_repos, main_repos)) - ordered_imports = [foreign_name] + extra_repos + extra_imports - import_name = foreign_name + ordered_imports: List[str] = [cast(str, foreign_name) + ] + extra_repos + extra_imports + import_name = cast(str, foreign_name) if args.import_as is not None: import_name = args.import_as - assign = name_imports( + assign: Dict[str, str] = name_imports( ordered_imports, set(base_repos.keys()), import_name, - main = foreign_name, + main=foreign_name, ) log("Importing %r as %r" % (foreign_name, import_name)) - log("Transitive dependencies to import: %r" % (extra_repos,)) - log("Repositories imported as layers: %r" % (extra_imports,)) + log("Transitive dependencies to import: %r" % (extra_repos, )) + log("Repositories imported as layers: %r" % (extra_imports, )) total_assign = dict(assign, **import_map) for repo in ordered_imports: base_repos[assign[repo]] = rewrite_repo( foreign_repos[repo], - remote = remote, - assign = total_assign, + remote=remote, + assign=total_assign, ) base_config["repositories"] = base_repos shutil.rmtree(to_cleanup) return base_config + def main(): parser = ArgumentParser( - prog = "just-import-deps", - description = - "Import a dependency transitively into a given" - + " multi-repository configuration" - ) - parser.add_argument( - "-C", - dest="repository_config", - help="Repository-description file to import into", - metavar="FILE" - ) + prog="just-import-deps", + description="Import a dependency transitively into a given" + + " multi-repository configuration") + parser.add_argument("-C", + dest="repository_config", + help="Repository-description file to import into", + metavar="FILE") parser.add_argument( "-b", dest="branch", help="The branch of the remote repository to import and follow", metavar="branch", - default="master" - ) + default="master") parser.add_argument( "-R", dest="foreign_repository_config", help="Repository-description file in the repository to import", - metavar="relative-path" - ) + metavar="relative-path") parser.add_argument( "--plain", action="store_true", - help= - "Pretend the remote repository description is the canonical" - + " single-repository one", + help="Pretend the remote repository description is the canonical" + + " single-repository one", ) parser.add_argument( "--as", @@ -318,10 +356,10 @@ def main(): "--map", nargs=2, dest="import_map", - help="Map the specified foreign repository to the specified existing repository", + help= + "Map the specified foreign repository to the specified existing repository", action="append", - default=[] - ) + default=[]) parser.add_argument('URL') parser.add_argument('foreign_repository_name', nargs='?') args = parser.parse_args() |