diff options
Diffstat (limited to 'bin/just-import-git.py')
-rwxr-xr-x | bin/just-import-git.py | 133 |
1 files changed, 89 insertions, 44 deletions
diff --git a/bin/just-import-git.py b/bin/just-import-git.py index d2c25eef..d53e55db 100755 --- a/bin/just-import-git.py +++ b/bin/just-import-git.py @@ -23,7 +23,7 @@ import tempfile from argparse import ArgumentParser, Namespace from pathlib import Path -from typing import Any, Dict, Iterable, List, NoReturn, Optional, Set, Tuple, cast +from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, cast # generic JSON type that avoids getter issues; proper use is being enforced by # return types of methods and typing vars holding return values of json getters @@ -41,6 +41,7 @@ def fail(s: str, exit_code: int = 1) -> NoReturn: MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"] SYSTEM_ROOT: str = os.path.abspath(os.sep) +ALT_DIRS: List[str] = ["target_root", "rule_root", "expression_root"] DEFAULT_CONFIG_LOCATIONS: List[Dict[str, str]] = [{ "root": "workspace", "path": "repos.json" @@ -165,45 +166,69 @@ def get_repo_to_import(config: Json) -> str: fail("Config does not contain any repositories; unsure what to import") -def repos_to_import(repos_config: Json, - entry: str, - known: Optional[Iterable[str]] = None) -> List[str]: - """Compute the set of transitively reachable repositories""" - visited: Set[str] = set() - to_import: List[str] = [] - if known: - visited = set(known) +def get_target_if_computed_repo(repo: Json) -> Optional[str]: + """If repository is computed, return the target repository name.""" + if repo.get("type") == "computed": + return cast(str, repo.get("repo")) + return None + + +def repos_to_import(repos_config: Json, entry: str, + known: Set[str]) -> Tuple[List[str], List[str]]: + """Compute the set of transitively reachable repositories and the collection + of repositories additionally needed as they serve as layers for the + repositories to import.""" + to_import: Set[str] = set() + extra_imports: Set[str] = set() def visit(name: str) -> None: - if name in visited: + # skip any existing or already visited repositories + if name in known or name in to_import: return - to_import.append(name) - visited.add(name) - vals = cast(Dict[str, str], - repos_config.get(name, {}).get("bindings", {})).values() - for n in vals: - visit(n) + repo_desc: Json = repos_config.get(name, {}) + + # if proper import, visit bindings, which are fully imported + if name not in extra_imports: + to_import.add(name) + + vals = cast(Dict[str, str], repo_desc.get("bindings", {})).values() + for n in vals: + extra_imports.discard(n) + visit(n) + + repo = repo_desc.get("repository") + if isinstance(repo, str): + # visit referred repository, but skip bindings + if repo not in known and repo not in to_import: + extra_imports.add(repo) + visit(repo) + else: + # if computed, visit the referred repository + target = get_target_if_computed_repo(cast(Json, repo)) + if target is not None: + extra_imports.discard(target) + visit(target) + + # add layers as extra imports, but referred repositories of computed + # layers need to be fully imported + for layer in ALT_DIRS: + if layer in repo_desc: + extra: str = repo_desc[layer] + if extra not in known and extra not in to_import: + extra_imports.add(extra) + extra_target = get_target_if_computed_repo( + cast(Json, + repos_config.get(extra, {}).get("repository", {}))) + if extra_target is not None: + extra_imports.discard(extra_target) + visit(extra_target) visit(entry) - return to_import - - -def extra_layers_to_import(repos_config: Json, repos: List[str]) -> List[str]: - """Compute the collection of repositories additionally needed as they serve - as layers for the repositories to import.""" - extra_imports: Set[str] = set() - for repo in repos: - if isinstance(repos_config[repo]["repository"], str): - extra_imports.add(repos_config[repo]["repository"]) - for layer in ["target_root", "rule_root", "expression_root"]: - if layer in repos_config[repo]: - extra: str = repos_config[repo][layer] - if (extra not in repos) and (extra not in extra_imports): - extra_imports.add(extra) - return list(extra_imports) + return list(to_import), list(extra_imports) def name_imports(to_import: List[str], + extra_imports: List[str], existing: Set[str], base_name: str, main: Optional[str] = None) -> Dict[str, str]: @@ -225,13 +250,18 @@ def name_imports(to_import: List[str], if main is not None and (base_name not in existing): assign[main] = base_name to_import = [x for x in to_import if x != main] - for repo in to_import: + extra_imports = [x for x in extra_imports if x != main] + for repo in to_import + extra_imports: assign[repo] = find_name(repo) return assign -def rewrite_repo(repo_spec: Json, *, remote: Dict[str, Any], assign: Json, - absent: bool) -> Json: +def rewrite_repo(repo_spec: Json, + *, + remote: Dict[str, Any], + assign: Json, + absent: bool, + as_layer: bool = False) -> Json: new_spec: Json = {} repo = repo_spec.get("repository", {}) if isinstance(repo, str): @@ -246,6 +276,9 @@ def rewrite_repo(repo_spec: Json, *, remote: Dict[str, Any], assign: Json, existing_repos: List[str] = repo.get("repositories", []) new_repos = [assign[k] for k in existing_repos] repo = dict(repo, **{"repositories": new_repos}) + elif repo.get("type") == "computed": + target: str = repo.get("repo", None) + repo = dict(repo, **{"repo": assign[target]}) if absent and isinstance(repo, dict): repo["pragma"] = dict(repo.get("pragma", {}), **{"absent": True}) new_spec["repository"] = repo @@ -255,12 +288,14 @@ def rewrite_repo(repo_spec: Json, *, remote: Dict[str, Any], assign: Json, for key in ["target_file_name", "rule_file_name", "expression_file_name"]: if key in repo_spec: new_spec[key] = repo_spec[key] - bindings = repo_spec.get("bindings", {}) - new_bindings = {} - for k, v in bindings.items(): - new_bindings[k] = assign[v] - if new_bindings: - new_spec["bindings"] = new_bindings + # rewrite bindings, if actually needed to be imported + if not as_layer: + bindings = repo_spec.get("bindings", {}) + new_bindings = {} + for k, v in bindings.items(): + new_bindings[k] = assign[v] + if new_bindings: + new_spec["bindings"] = new_bindings return new_spec @@ -305,15 +340,17 @@ def handle_import(args: Namespace) -> Json: import_map: Json = {} for theirs, ours in args.import_map: import_map[theirs] = ours - main_repos = repos_to_import(foreign_repos, foreign_name, import_map.keys()) + main_repos, extra_imports = repos_to_import(foreign_repos, foreign_name, + set(import_map.keys())) extra_repos = sorted([x for x in main_repos if x != foreign_name]) - extra_imports = sorted(extra_layers_to_import(foreign_repos, main_repos)) - ordered_imports: List[str] = [foreign_name] + extra_repos + extra_imports + ordered_imports: List[str] = [foreign_name] + extra_repos + extra_imports = sorted(extra_imports) import_name = foreign_name if args.import_as is not None: import_name = args.import_as assign: Dict[str, str] = name_imports( ordered_imports, + extra_imports, set(base_repos.keys()), import_name, main=foreign_name, @@ -329,6 +366,14 @@ def handle_import(args: Namespace) -> Json: assign=total_assign, absent=args.absent, ) + for repo in extra_imports: + base_repos[assign[repo]] = rewrite_repo( + foreign_repos[repo], + remote=remote, + assign=total_assign, + absent=args.absent, + as_layer=True, + ) base_config["repositories"] = base_repos shutil.rmtree(to_cleanup) return base_config |