diff options
Diffstat (limited to 'bin/just-import-git.py')
-rwxr-xr-x | bin/just-import-git.py | 230 |
1 files changed, 134 insertions, 96 deletions
diff --git a/bin/just-import-git.py b/bin/just-import-git.py index 19b21539..217e8dc3 100755 --- a/bin/just-import-git.py +++ b/bin/just-import-git.py @@ -20,20 +20,28 @@ import shutil import sys import tempfile -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, cast -def log(*args, **kwargs): +# generic JSON type that avoids getter issues; proper use is being enforced by +# return types of methods and typing vars holding return values of json getters +Json = Dict[str, Any] + + +def log(*args: str, **kwargs: Any) -> None: print(*args, file=sys.stderr, **kwargs) -def fail(s, exit_code=1): + +def fail(s: str, exit_code: int = 1): log(f"Error: {s}") sys.exit(exit_code) -MARKERS = [".git", "ROOT", "WORKSPACE"] -SYSTEM_ROOT = os.path.abspath(os.sep) -DEFAULT_CONFIG_LOCATIONS = [{ + +MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"] +SYSTEM_ROOT: str = os.path.abspath(os.sep) +DEFAULT_CONFIG_LOCATIONS: List[Dict[str, str]] = [{ "root": "workspace", "path": "repos.json" }, { @@ -47,23 +55,21 @@ DEFAULT_CONFIG_LOCATIONS = [{ "path": "etc/just-repos.json" }] -def run_cmd(cmd, + +def run_cmd(cmd: List[str], *, - env=None, - stdout=subprocess.DEVNULL, - stdin=None, - cwd): - result = subprocess.run(cmd, - cwd=cwd, - env=env, - stdout=stdout, - stdin=stdin) + env: Optional[Any] = None, + stdout: Optional[Any] = subprocess.DEVNULL, + stdin: Optional[Any] = None, + cwd: str): + result = subprocess.run(cmd, cwd=cwd, env=env, stdout=stdout, stdin=stdin) if result.returncode != 0: fail("Command %s in %s failed" % (cmd, cwd)) return result.stdout -def find_workspace_root(path=None): - def is_workspace_root(path): + +def find_workspace_root(path: Optional[str] = None) -> Optional[str]: + def is_workspace_root(path: str) -> bool: for m in MARKERS: if os.path.exists(os.path.join(path, m)): return True @@ -79,7 +85,7 @@ def find_workspace_root(path=None): path = os.path.dirname(path) -def read_location(location, root=None): +def read_location(location: Dict[str, str], root: Optional[str] = None) -> str: search_root = location.get("root", None) search_path = location.get("path", None) @@ -96,95 +102,113 @@ def read_location(location, root=None): fs_root = SYSTEM_ROOT if fs_root: - return os.path.realpath(os.path.join(fs_root, search_path)) - return "/" # certainly not a file + return os.path.realpath( + os.path.join(cast(str, fs_root), cast(str, search_path))) + return "/" # certainly not a file + -def get_repository_config_file(root=None): +def get_repository_config_file(root: Optional[str] = None) -> Optional[str]: for location in DEFAULT_CONFIG_LOCATIONS: path = read_location(location, root=root) if path and os.path.isfile(path): return path -def get_base_config(repository_config): +def get_base_config(repository_config: Optional[str]) -> Optional[Json]: if repository_config == "-": return json.load(sys.stdin) if not repository_config: repository_config = get_repository_config_file() - with open(repository_config) as f: - return json.load(f) + if (repository_config): + with open(repository_config) as f: + return json.load(f) + fail('Could not get base config') -def clone(url, branch): + +def clone(url: str, branch: str) -> Tuple[str, Dict[str, str], str]: # clone the given git repository, checkout the specified # branch, and return the checkout location - workdir = tempfile.mkdtemp() + workdir: str = tempfile.mkdtemp() run_cmd(["git", "clone", "-b", branch, "--depth", "1", url, "src"], cwd=workdir) - srcdir = os.path.join(workdir, "src") - commit = run_cmd(["git", "log", "-n", "1", "--pretty=%H"], - cwd = srcdir, stdout=subprocess.PIPE).decode('utf-8').strip() - log("Importing commit %s" % (commit,)) - repo = { "type": "git", - "repository": url, - "branch": branch, - "commit": commit, - } + srcdir: str = os.path.join(workdir, "src") + commit: str = run_cmd(["git", "log", "-n", "1", "--pretty=%H"], + cwd=srcdir, + stdout=subprocess.PIPE).decode('utf-8').strip() + log("Importing commit %s" % (commit, )) + repo: Dict[str, str] = { + "type": "git", + "repository": url, + "branch": branch, + "commit": commit, + } return srcdir, repo, workdir -def get_repo_to_import(config): + +def get_repo_to_import(config: Json) -> Optional[str]: """From a given repository config, take the main repository.""" if config.get("main") is not None: - return config.get("main") + return cast(str, config.get("main")) repos = config.get("repositories", {}).keys() if repos: return sorted(repos)[0] fail("Config does not contain any repositories; unsure what to import") -def repos_to_import(repos_config, entry, known=None): + +def repos_to_import(repos_config: Json, + entry: str, + known: Optional[Iterable[str]] = None) -> List[str]: """Compute the set of transitively reachable repositories""" - visited = set() - to_import = [] + visited: Set[str] = set() + to_import: List[str] = [] if known: visited = set(known) - def visit(name): + def visit(name: str) -> None: if name in visited: return to_import.append(name) visited.add(name) - for n in repos_config.get(name, {}).get("bindings", {}).values(): + vals = cast(Dict[str, str], + repos_config.get(name, {}).get("bindings", {})).values() + for n in vals: visit(n) visit(entry) return to_import -def extra_layers_to_import(repos_config, repos): + +def extra_layers_to_import(repos_config: Json, repos: List[str]) -> List[str]: """Compute the collection of repositories additionally needed as they serve as layers for the repositories to import.""" - extra_imports = set() + extra_imports: Set[str] = set() for repo in repos: if isinstance(repos_config[repo]["repository"], str): extra_imports.add(repos_config[repo]["repository"]) for layer in ["target_root", "rule_root", "expression_root"]: if layer in repos_config[repo]: - extra = repos_config[repo][layer] + extra: str = repos_config[repo][layer] if (extra not in repos) and (extra not in extra_imports): extra_imports.add(extra) return list(extra_imports) -def name_imports(to_import, existing, base_name, main=None): + +def name_imports(to_import: List[str], + existing: Set[str], + base_name: str, + main: Optional[str] = None) -> Dict[str, str]: """Assign names to the repositories to import in such a way that no conflicts arise.""" - assign = {} + assign: Dict[str, str] = {} - def find_name(name): - base = "%s/%s" % (base_name, name) + def find_name(name: str) -> str: + base: str = "%s/%s" % (base_name, name) if (base not in existing) and (base not in assign): return base - count = 0 + count: int = 0 while True: count += 1 - candidate = base + " (%d)" % count + candidate: str = base + " (%d)" % count if (candidate not in existing) and (candidate not in assign): return candidate @@ -195,8 +219,10 @@ def name_imports(to_import, existing, base_name, main=None): assign[repo] = find_name(repo) return assign -def rewrite_repo(repo_spec, *, remote, assign): - new_spec = {} + +def rewrite_repo(repo_spec: Json, *, remote: Dict[str, str], + assign: Json) -> Json: + new_spec: Json = {} repo = repo_spec.get("repository", {}) if isinstance(repo, str): repo = assign[repo] @@ -207,7 +233,8 @@ def rewrite_repo(repo_spec, *, remote, assign): changes["subdir"] = subdir repo = dict(remote, **changes) elif repo.get("type") == "distdir": - new_repos = [assign[k] for k in repo.get("repositories", [])] + existing_repos: List[str] = repo.get("repositories", []) + new_repos = [assign[k] for k in existing_repos] repo = dict(repo, **{"repositories": new_repos}) new_spec["repository"] = repo for key in ["target_root", "rule_root", "expression_root"]: @@ -224,89 +251,100 @@ def rewrite_repo(repo_spec, *, remote, assign): new_spec["bindings"] = new_bindings return new_spec -def handle_import(args): - base_config = get_base_config(args.repository_config) - base_repos = base_config.get("repositories", {}) + +def handle_import(args: Namespace) -> Json: + base_config: Json = cast(Json, get_base_config(args.repository_config)) + base_repos: Json = base_config.get("repositories", {}) srcdir, remote, to_cleanup = clone(args.URL, args.branch) if args.foreign_repository_config: foreign_config_file = args.foreign_repository_config else: foreign_config_file = get_repository_config_file(srcdir) + foreign_config: Json = {} if args.plain: - foreign_config = { "main": "", - "repositories": {"": {"repository": - {"type": "file", - "path": "." }}}} + foreign_config = { + "main": "", + "repositories": { + "": { + "repository": { + "type": "file", + "path": "." + } + } + } + } else: - with open(foreign_config_file) as f: - foreign_config = json.load(f) - foreign_repos = foreign_config.get("repositories", {}) + if (foreign_config_file): + with open(foreign_config_file) as f: + foreign_config = json.load(f) + else: + fail('Could not get repository config file') + foreign_repos: Json = foreign_config.get("repositories", {}) + foreign_name: Optional[str] = None if args.foreign_repository_name: foreign_name = args.foreign_repository_name else: foreign_name = get_repo_to_import(foreign_config) - import_map = {} + import_map: Json = {} for theirs, ours in args.import_map: import_map[theirs] = ours - main_repos = repos_to_import(foreign_repos, foreign_name, import_map.keys()) + main_repos: List[str] = [] + if (foreign_name): + main_repos = repos_to_import(foreign_repos, foreign_name, + import_map.keys()) extra_repos = sorted([x for x in main_repos if x != foreign_name]) extra_imports = sorted(extra_layers_to_import(foreign_repos, main_repos)) - ordered_imports = [foreign_name] + extra_repos + extra_imports - import_name = foreign_name + ordered_imports: List[str] = [cast(str, foreign_name) + ] + extra_repos + extra_imports + import_name = cast(str, foreign_name) if args.import_as is not None: import_name = args.import_as - assign = name_imports( + assign: Dict[str, str] = name_imports( ordered_imports, set(base_repos.keys()), import_name, - main = foreign_name, + main=foreign_name, ) log("Importing %r as %r" % (foreign_name, import_name)) - log("Transitive dependencies to import: %r" % (extra_repos,)) - log("Repositories imported as layers: %r" % (extra_imports,)) + log("Transitive dependencies to import: %r" % (extra_repos, )) + log("Repositories imported as layers: %r" % (extra_imports, )) total_assign = dict(assign, **import_map) for repo in ordered_imports: base_repos[assign[repo]] = rewrite_repo( foreign_repos[repo], - remote = remote, - assign = total_assign, + remote=remote, + assign=total_assign, ) base_config["repositories"] = base_repos shutil.rmtree(to_cleanup) return base_config + def main(): parser = ArgumentParser( - prog = "just-import-deps", - description = - "Import a dependency transitively into a given" - + " multi-repository configuration" - ) - parser.add_argument( - "-C", - dest="repository_config", - help="Repository-description file to import into", - metavar="FILE" - ) + prog="just-import-deps", + description="Import a dependency transitively into a given" + + " multi-repository configuration") + parser.add_argument("-C", + dest="repository_config", + help="Repository-description file to import into", + metavar="FILE") parser.add_argument( "-b", dest="branch", help="The branch of the remote repository to import and follow", metavar="branch", - default="master" - ) + default="master") parser.add_argument( "-R", dest="foreign_repository_config", help="Repository-description file in the repository to import", - metavar="relative-path" - ) + metavar="relative-path") parser.add_argument( "--plain", action="store_true", - help= - "Pretend the remote repository description is the canonical" - + " single-repository one", + help="Pretend the remote repository description is the canonical" + + " single-repository one", ) parser.add_argument( "--as", @@ -318,10 +356,10 @@ def main(): "--map", nargs=2, dest="import_map", - help="Map the specified foreign repository to the specified existing repository", + help= + "Map the specified foreign repository to the specified existing repository", action="append", - default=[] - ) + default=[]) parser.add_argument('URL') parser.add_argument('foreign_repository_name', nargs='?') args = parser.parse_args() |