summaryrefslogtreecommitdiff
path: root/bin/just-lock.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/just-lock.py')
-rwxr-xr-xbin/just-lock.py943
1 files changed, 943 insertions, 0 deletions
diff --git a/bin/just-lock.py b/bin/just-lock.py
new file mode 100755
index 00000000..7cd47105
--- /dev/null
+++ b/bin/just-lock.py
@@ -0,0 +1,943 @@
+#!/usr/bin/env python3
+# Copyright 2024 Huawei Cloud Computing Technology Co., Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+
+from argparse import ArgumentParser
+from pathlib import Path
+from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union, cast
+
+# generic JSON type that avoids getter issues; proper use is being enforced by
+# return types of methods and typing vars holding return values of json getters
+Json = Dict[str, Any]
+
+###
+# Constants
+##
+
+MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"]
+SYSTEM_ROOT: str = os.path.abspath(os.sep)
+ALT_DIRS: List[str] = ["target_root", "rule_root", "expression_root"]
+REPO_ROOTS: List[str] = ["repository"] + ALT_DIRS
+DEFAULT_BUILD_ROOT: str = os.path.join(Path.home(), ".cache/just")
+DEFAULT_GIT_BIN: str = "git" # to be taken from PATH
+DEFAULT_LAUNCHER: List[str] = ["env", "--"]
+
+DEFAULT_REPO: Json = {"": {"repository": {"type": "file", "path": "."}}}
+
+DEFAULT_INPUT_CONFIG_NAME: str = "repos.in.json"
+DEFAULT_JUSTMR_CONFIG_NAME: str = "repos.json"
+DEFAULT_CONFIG_DIRS: List[str] = [".", "./etc"]
+"""Directories where to look for configuration file inside a root"""
+
+###
+# Global vars
+##
+
+g_ROOT: str = DEFAULT_BUILD_ROOT
+"""The configured local build root"""
+g_GIT: str = DEFAULT_GIT_BIN
+"""Git binary to use"""
+g_LAUNCHER: List[str] = DEFAULT_LAUNCHER
+"""Local launcher to use for commands provided in imports"""
+
+###
+# System utils
+##
+
+
+def log(*args: str, **kwargs: Any) -> None:
+ print(*args, file=sys.stderr, **kwargs)
+
+
+def fail(s: str, exit_code: int = 1) -> NoReturn:
+ log(f"ERROR: {s}")
+ sys.exit(exit_code)
+
+
+def report(s: Optional[str]) -> None:
+ log("" if s is None else f"INFO: {s}")
+
+
+def run_cmd(cmd: List[str],
+ *,
+ env: Optional[Any] = None,
+ stdout: Optional[Any] = subprocess.DEVNULL,
+ stdin: Optional[Any] = None,
+ cwd: str,
+ fail_context: Optional[str] = None) -> bytes:
+ """Run a specific command. Use fail_context for context-specific logging."""
+ result = subprocess.run(cmd, cwd=cwd, env=env, stdout=stdout, stdin=stdin)
+ if result.returncode != 0:
+ s = "" if fail_context is None else "%s\n" % (fail_context, )
+ fail("%sCommand %s in %s failed" % (s, cmd, cwd))
+ return result.stdout
+
+
+###
+# Config utils
+##
+
+
+def find_workspace_root() -> Optional[str]:
+ """Find the workspace root of the current working directory."""
+ def is_workspace_root(path: str) -> bool:
+ for m in MARKERS:
+ if os.path.exists(os.path.join(path, m)):
+ return True
+ return False
+
+ path: str = os.getcwd()
+ while True:
+ if is_workspace_root(path):
+ return path
+ if path == SYSTEM_ROOT:
+ return None
+ path = os.path.dirname(path)
+
+
+def get_repository_config_file(filename: str,
+ root: Optional[str] = None) -> Optional[str]:
+ """Get a named configuration file relative to a root."""
+ if not root:
+ root = find_workspace_root()
+ if not root:
+ return None
+ for dir in DEFAULT_CONFIG_DIRS:
+ path: str = os.path.realpath(os.path.join(root, dir, filename))
+ if os.path.isfile(path):
+ return path
+
+
+###
+# Imports utils
+##
+
+
+def get_repo_to_import(config: Json) -> str:
+ """From a given repository config, take the main repository."""
+ main = config.get("main")
+ if main is not None:
+ if not isinstance(main, str):
+ fail("Foreign config contains malformed \"main\" field:\n%r" %
+ (json.dumps(main, indent=2), ))
+ return main
+ repos = config.get("repositories", {})
+ if repos and isinstance(repos, dict):
+ # take main repo as first named lexicographically
+ return sorted(repos.keys())[0]
+ fail("Config does not contain any repositories; unsure what to import")
+
+
+def get_base_repo_if_computed(repo: Json) -> Optional[str]:
+ """If repository is computed, return the base repository name."""
+ if repo.get("type") == "computed":
+ return cast(str, repo.get("repo"))
+ return None
+
+
+def repos_to_import(repos_config: Json, entry: str,
+ known: Set[str]) -> Tuple[List[str], List[str]]:
+ """Compute the set of transitively reachable repositories and the collection
+ of repositories additionally needed as they serve as layers for the
+ repositories to import."""
+ to_import: Set[str] = set()
+ extra_imports: Set[str] = set()
+
+ def visit(name: str) -> None:
+ # skip any existing or already visited repositories
+ if name in known or name in to_import:
+ return
+ repo_desc: Json = repos_config.get(name, {})
+
+ # if proper import, visit bindings, which are fully imported
+ if name not in extra_imports:
+ to_import.add(name)
+
+ vals = cast(Dict[str, str], repo_desc.get("bindings", {})).values()
+ for n in vals:
+ extra_imports.discard(n)
+ visit(n)
+
+ repo = repo_desc.get("repository")
+ if isinstance(repo, str):
+ # visit referred repository, but skip bindings
+ if repo not in known and repo not in to_import:
+ extra_imports.add(repo)
+ visit(repo)
+ else:
+ # if computed, visit the referred repository
+ repo_base = get_base_repo_if_computed(cast(Json, repo))
+ if repo_base is not None:
+ extra_imports.discard(repo_base)
+ visit(repo_base)
+
+ # add layers as extra imports, but referred repositories of computed
+ # layers need to be fully imported
+ for layer in ALT_DIRS:
+ if layer in repo_desc:
+ extra: str = repo_desc[layer]
+ if extra not in known and extra not in to_import:
+ extra_imports.add(extra)
+ extra_repo_base = get_base_repo_if_computed(
+ cast(Json,
+ repos_config.get(extra, {}).get("repository", {})))
+ if extra_repo_base is not None:
+ extra_imports.discard(extra_repo_base)
+ visit(extra_repo_base)
+
+ visit(entry)
+ return list(to_import), list(extra_imports)
+
+
+def name_imports(to_import: List[str],
+ extra_imports: List[str],
+ existing: Set[str],
+ base_name: str,
+ main: Optional[str] = None) -> Dict[str, str]:
+ """Assign names to the repositories to import in such a way that
+ no conflicts arise."""
+ assign: Dict[str, str] = {}
+
+ def find_name(name: str) -> str:
+ base: str = "%s/%s" % (base_name, name)
+ if (base not in existing) and (base not in assign):
+ return base
+ count: int = 0
+ while True:
+ count += 1
+ candidate: str = base + " (%d)" % count
+ if (candidate not in existing) and (candidate not in assign):
+ return candidate
+
+ if main is not None and (base_name not in existing):
+ assign[main] = base_name
+ to_import = [x for x in to_import if x != main]
+ extra_imports = [x for x in extra_imports if x != main]
+ for repo in to_import + extra_imports:
+ assign[repo] = find_name(repo)
+ return assign
+
+
+def rewrite_file_repo(repo: Json, remote_type: str, remote: Dict[str,
+ Any]) -> Json:
+ """Rewrite \"file\"-type descriptions based on remote type."""
+ if remote_type == "git":
+ # for imports from Git, file repos become type git with subdir
+ changes = {}
+ subdir = repo.get("path", ".")
+ if subdir not in ["", "."]:
+ changes["subdir"] = subdir
+ return dict(remote, **changes)
+ else:
+ fail("Unsupported remote type!")
+
+
+def rewrite_repo(repo_spec: Json,
+ *,
+ remote_type: str,
+ remote: Dict[str, Any],
+ assign: Json,
+ absent: bool,
+ as_layer: bool = False) -> Json:
+ """Rewrite description of imported repositories."""
+ new_spec: Json = {}
+ repo = repo_spec.get("repository", {})
+ if isinstance(repo, str):
+ repo = assign[repo]
+ elif repo.get("type") == "file":
+ # "file"-type repositories need to be rewritten based on remote type
+ repo = rewrite_file_repo(repo, remote_type, remote)
+ elif repo.get("type") == "distdir":
+ existing_repos: List[str] = repo.get("repositories", [])
+ new_repos = [assign[k] for k in existing_repos]
+ repo = dict(repo, **{"repositories": new_repos})
+ elif repo.get("type") == "computed":
+ target: str = repo.get("repo", None)
+ repo = dict(repo, **{"repo": assign[target]})
+ if absent and isinstance(repo, dict):
+ repo["pragma"] = dict(repo.get("pragma", {}), **{"absent": True})
+ new_spec["repository"] = repo
+ for key in ["target_root", "rule_root", "expression_root"]:
+ if key in repo_spec:
+ new_spec[key] = assign[repo_spec[key]]
+ for key in ["target_file_name", "rule_file_name", "expression_file_name"]:
+ if key in repo_spec:
+ new_spec[key] = repo_spec[key]
+ # rewrite bindings, if actually needed to be imported
+ if not as_layer:
+ bindings = repo_spec.get("bindings", {})
+ new_bindings = {}
+ for k, v in bindings.items():
+ new_bindings[k] = assign[v]
+ if new_bindings:
+ new_spec["bindings"] = new_bindings
+ return new_spec
+
+
+def handle_import(remote_type: str, remote: Dict[str, Any], repo_desc: Json,
+ core_repos: Json, foreign_config: Json,
+ fail_context: str) -> Json:
+ """General handling of repository import from a foreign config."""
+
+ # parse input description
+ import_as: Optional[str] = repo_desc.get("alias", None)
+ if import_as is not None and not isinstance(import_as, str):
+ fail(
+ fail_context +
+ "Expected \"repos\" entry subfield \"import_as\" to be a string, but found:\n%r"
+ % (json.dumps(import_as, indent=2), ))
+
+ foreign_name: Optional[str] = repo_desc.get("repo", None)
+ if foreign_name is not None and not isinstance(foreign_name, str):
+ fail(
+ fail_context +
+ "Expected \"repos\" entry subfield \"repo\" to be a string, but found:\n%r"
+ % (json.dumps(foreign_name, indent=2), ))
+ if foreign_name is None:
+ # if not provided, get main repository from source config
+ foreign_name = get_repo_to_import(foreign_config)
+
+ import_map: Json = repo_desc.get("map", None)
+ if import_map is None:
+ import_map = {}
+ elif not isinstance(import_map, dict):
+ fail(
+ fail_context +
+ "Expected \"repos\" entry subfield \"map\" to be a map, but found:\n%r"
+ % (json.dumps(import_map, indent=2), ))
+
+ pragma: Json = repo_desc.get("pragma", None)
+ if pragma is not None and not isinstance(pragma, dict):
+ fail(
+ fail_context +
+ "Expected \"repos\" entry subfield \"pragma\" to be a map, but found:\n%r"
+ % (json.dumps(pragma, indent=2), ))
+ absent: bool = False if pragma is None else pragma.get("absent", False)
+ if absent is None:
+ absent = False
+ elif not isinstance(absent, bool):
+ fail(
+ fail_context +
+ "Expected \"repos\" entry pragma \"absent\" to be a bool, but found:\n%r"
+ % (json.dumps(absent, indent=2), ))
+
+ # Handle import with renaming
+ foreign_repos: Json = foreign_config.get("repositories", {})
+ if foreign_repos is None or not isinstance(foreign_repos, dict):
+ fail(
+ fail_context +
+ "Found empty or malformed \"repositories\" field in source configuration file"
+ )
+ main_repos, extra_imports = repos_to_import(foreign_repos, foreign_name,
+ set(import_map.keys()))
+ extra_repos = sorted([x for x in main_repos if x != foreign_name])
+ ordered_imports: List[str] = [foreign_name] + extra_repos
+ extra_imports = sorted(extra_imports)
+
+ import_name = import_as if import_as is not None else foreign_name
+ assign: Dict[str, str] = name_imports(ordered_imports,
+ extra_imports,
+ set(core_repos.keys()),
+ import_name,
+ main=foreign_name)
+
+ # Report progress
+ report("Importing %r as %r" % (foreign_name, import_name))
+ report("\tTransitive dependencies to import: %r" % (extra_repos, ))
+ report("\tRepositories imported as layers: %r" % (extra_imports, ))
+ report(None) # adds newline
+
+ total_assign = dict(assign, **import_map)
+ for repo in ordered_imports:
+ core_repos[assign[repo]] = rewrite_repo(foreign_repos[repo],
+ remote_type=remote_type,
+ remote=remote,
+ assign=total_assign,
+ absent=absent)
+ for repo in extra_imports:
+ core_repos[assign[repo]] = rewrite_repo(foreign_repos[repo],
+ remote_type=remote_type,
+ remote=remote,
+ assign=total_assign,
+ absent=absent,
+ as_layer=True)
+
+ return core_repos
+
+
+###
+# Import from Git
+##
+
+
+def git_checkout(url: str, branch: str, *, commit: Optional[str],
+ mirrors: List[str],
+ inherit_env: List[str]) -> Tuple[str, Dict[str, Any], str]:
+ """Clone a given remote Git repository, checkout a specified branch,
+ and return the checkout location."""
+ workdir: str = tempfile.mkdtemp()
+ srcdir: str = os.path.join(workdir, "src")
+
+ fail_context: str = "While checking out branch %r of %r:" % (branch, url)
+ if commit is None:
+ # If no commit given, do shallow clone and get HEAD commit
+ run_cmd(g_LAUNCHER +
+ [g_GIT, "clone", "-b", branch, "--depth", "1", url, "src"],
+ cwd=workdir,
+ fail_context=fail_context)
+ commit = run_cmd(g_LAUNCHER + [g_GIT, "log", "-n", "1", "--pretty=%H"],
+ cwd=srcdir,
+ stdout=subprocess.PIPE,
+ fail_context=fail_context).decode('utf-8').strip()
+ log("Importing remote Git commit %s" % (commit, ))
+ else:
+ # To get a specified commit, clone the specified branch fully and reset
+ run_cmd(g_LAUNCHER + [g_GIT, "clone", "-b", branch, url, "src"],
+ cwd=workdir,
+ fail_context=fail_context)
+ run_cmd(g_LAUNCHER + [g_GIT, "reset", "--hard", commit],
+ cwd=srcdir,
+ fail_context=fail_context)
+ repo: Dict[str, Any] = {
+ "type": "git",
+ "repository": url,
+ "branch": branch,
+ "commit": commit,
+ }
+ if mirrors:
+ repo = dict(repo, **{"mirrors": mirrors})
+ if inherit_env:
+ repo = dict(repo, **{"inherit env": inherit_env})
+ return srcdir, repo, workdir
+
+
+def import_from_git(core_repos: Json, imports_entry: Json) -> Json:
+ """Handles imports from Git repositories."""
+ # Check if anything is to be done
+ if not imports_entry.get("repos", []):
+ return core_repos
+
+ # Set granular logging message
+ fail_context: str = "While importing from source \"git\":\n"
+
+ # Get the repositories list
+ repos: List[Any] = imports_entry.get("repos", [])
+ if not isinstance(repos, list):
+ fail(fail_context +
+ "Expected field \"repos\" to be a list, but found:\n%r" %
+ (json.dumps(repos, indent=2), ))
+
+ # Parse source config fields
+ url: str = imports_entry.get("url", None)
+ if not isinstance(url, str):
+ fail(fail_context +
+ "Expected field \"url\" to be a string, but found:\n%r" %
+ (json.dumps(url, indent=2), ))
+
+ branch: str = imports_entry.get("branch", None)
+ if not isinstance(branch, str):
+ fail(fail_context +
+ "Expected field \"branch\" to be a string, but found:\n%r" %
+ (json.dumps(branch, indent=2), ))
+
+ commit: Optional[str] = imports_entry.get("commit", None)
+ if commit is not None and not isinstance(commit, str):
+ fail(fail_context +
+ "Expected field \"commit\" to be a string, but found:\n%r" %
+ (json.dumps(commit, indent=2), ))
+
+ mirrors: Optional[List[str]] = imports_entry.get("mirrors", [])
+ if mirrors is not None and not isinstance(mirrors, list):
+ fail(fail_context +
+ "Expected field \"mirrors\" to be a list, but found:\n%r" %
+ (json.dumps(mirrors, indent=2), ))
+
+ inherit_env: Optional[List[str]] = imports_entry.get("inherit_env", [])
+ if inherit_env is not None and not isinstance(inherit_env, list):
+ fail(fail_context +
+ "Expected field \"inherit_env\" to be a list, but found:\n%r" %
+ (json.dumps(inherit_env, indent=2), ))
+
+ as_plain: Optional[bool] = imports_entry.get("as_plain", False)
+ if as_plain is not None and not isinstance(as_plain, bool):
+ fail(fail_context +
+ "Expected field \"as_plain\" to be a bool, but found:\n%r" %
+ (json.dumps(as_plain, indent=2), ))
+
+ foreign_config_file: Optional[str] = imports_entry.get("config", None)
+ if foreign_config_file is not None and not isinstance(
+ foreign_config_file, str):
+ fail(fail_context +
+ "Expected field \"config\" to be a string, but found:\n%r" %
+ (json.dumps(foreign_config_file, indent=2), ))
+
+ # Fetch the source Git repository
+ srcdir, remote, to_clean_up = git_checkout(url,
+ branch,
+ commit=commit,
+ mirrors=mirrors,
+ inherit_env=inherit_env)
+
+ # Read in the foreign config file
+ if foreign_config_file:
+ foreign_config_file = os.path.join(srcdir, foreign_config_file)
+ else:
+ foreign_config_file = get_repository_config_file(
+ DEFAULT_JUSTMR_CONFIG_NAME, srcdir)
+ foreign_config: Json = {}
+ if as_plain:
+ foreign_config = {"main": "", **DEFAULT_REPO}
+ else:
+ if (foreign_config_file):
+ try:
+ with open(foreign_config_file) as f:
+ foreign_config = json.load(f)
+ except OSError:
+ fail(fail_context + "Failed to open foreign config file %s" %
+ (foreign_config_file, ))
+ except Exception as ex:
+ fail(fail_context +
+ "Reading foreign config file failed with:\n%r" % (ex, ))
+ else:
+ fail(fail_context +
+ "Failed to find the repository configuration file!")
+
+ # Process the imported repositories, in order
+ for repo_entry in repos:
+ if not isinstance(repo_entry, dict):
+ fail(fail_context +
+ "Expected \"repos\" entries to be objects, but found:\n%r" %
+ (json.dumps(repo_entry, indent=2), ))
+ repo_entry = cast(Json, repo_entry)
+
+ core_repos = handle_import("git", remote, repo_entry, core_repos,
+ foreign_config, fail_context)
+
+ # Clean up local fetch
+ shutil.rmtree(to_clean_up)
+ return core_repos
+
+
+###
+# Deduplication logic
+##
+
+
+def bisimilar_repos(repos: Json) -> List[List[str]]:
+ """Compute the maximal bisimulation between the repositories
+ and return the bisimilarity classes."""
+ bisim: Dict[Tuple[str, str], Json] = {}
+
+ def is_different(name_a: str, name_b: str) -> bool:
+ pos = (name_a, name_b) if name_a < name_b else (name_b, name_a)
+ return bisim.get(pos, {}).get("different", False)
+
+ def mark_as_different(name_a: str, name_b: str):
+ nonlocal bisim
+ pos = (name_a, name_b) if name_a < name_b else (name_b, name_a)
+ entry = bisim.get(pos, {})
+ if entry.get("different"):
+ return
+ bisim[pos] = dict(entry, **{"different": True})
+ also_different: List[Tuple[str, str]] = entry.get("different_if", [])
+ for a, b in also_different:
+ mark_as_different(a, b)
+
+ def register_dependency(name_a: str, name_b: str, dep_a: str, dep_b: str):
+ pos = (name_a, name_b) if name_a < name_b else (name_b, name_a)
+ entry = bisim.get(pos, {})
+ deps: List[Tuple[str, str]] = entry.get("different_if", [])
+ deps.append((dep_a, dep_b))
+ bisim[pos] = dict(entry, **{"different_if": deps})
+
+ def roots_equal(a: Json, b: Json, name_a: str, name_b: str) -> bool:
+ """Get equality condition between roots."""
+ if a["type"] != b["type"]:
+ return False
+ if a["type"] == "file":
+ return a["path"] == b["path"]
+ elif a["type"] in ["archive", "zip"]:
+ return (a["content"] == b["content"]
+ and a.get("subdir", ".") == b.get("subdir", "."))
+ elif a["type"] == "git":
+ return (a["commit"] == b["commit"]
+ and a.get("subdir", ".") == b.get("subdir", "."))
+ elif a["type"] == "computed":
+ if (a.get("config", {}) != b.get("config", {})
+ or a["target"] != b["target"]):
+ return False
+ if a["repo"] == b["repo"]:
+ return True
+ elif is_different(a["repo"], b["repo"]):
+ return False
+ else:
+ # equality pending target repo equality
+ register_dependency(a["repo"], b["repo"], name_a, name_b)
+ return True
+ else:
+ # unknown repository type, the only safe way is to test
+ # for full equality
+ return a == b
+
+ def get_root(repos: Json,
+ name: str,
+ *,
+ root_name: str = "repository",
+ default_root: Optional[Json] = None) -> Json:
+ """Get description of a given root."""
+ root = repos[name].get(root_name)
+ if root is None:
+ if default_root is not None:
+ return default_root
+ else:
+ fail("Did not find mandatory root %s" % (name, ))
+ if isinstance(root, str):
+ return get_root(repos, root)
+ return root
+
+ def repo_roots_equal(repos: Json, name_a: str, name_b: str) -> bool:
+ """Equality condition between repositories with respect to roots."""
+ if name_a == name_b:
+ return True
+ root_a = None
+ root_b = None
+ for root_name in REPO_ROOTS:
+ root_a = get_root(repos,
+ name_a,
+ root_name=root_name,
+ default_root=root_a)
+ root_b = get_root(repos,
+ name_b,
+ root_name=root_name,
+ default_root=root_b)
+ if not roots_equal(root_a, root_b, name_a, name_b):
+ return False
+ for file_name, default_name in [("target_file_name", "TARGETS"),
+ ("rule_file_name", "RULES"),
+ ("expression_file_name", "EXPRESSIONS")
+ ]:
+ fname_a = repos[name_a].get(file_name, default_name)
+ fname_b = repos[name_b].get(file_name, default_name)
+ if fname_a != fname_b:
+ return False
+ return True
+
+ names = sorted(repos.keys())
+ for j in range(len(names)):
+ b = names[j]
+ for i in range(j):
+ a = names[i]
+ if is_different(a, b):
+ continue
+ # Check equality between roots
+ if not repo_roots_equal(repos, a, b):
+ mark_as_different(a, b)
+ continue
+ # Check equality between bindings
+ links_a = repos[a].get("bindings", {})
+ links_b = repos[b].get("bindings", {})
+ if set(links_a.keys()) != set(links_b.keys()):
+ mark_as_different(a, b)
+ continue
+ for link in links_a.keys():
+ next_a = links_a[link]
+ next_b = links_b[link]
+ if next_a != next_b:
+ if is_different(next_a, next_b):
+ mark_as_different(a, b)
+ continue
+ else:
+ # equality pending binding equality
+ register_dependency(next_a, next_b, a, b)
+
+ classes: List[List[str]] = []
+ done: Dict[str, bool] = {}
+ for j in reversed(range(len(names))):
+ name_j = names[j]
+ if done.get(name_j):
+ continue
+ c = [name_j]
+ for i in range(j):
+ name_i = names[i]
+ if not bisim.get((name_i, name_j), {}).get("different"):
+ c.append(name_i)
+ done[name_i] = True
+ classes.append(c)
+ return classes
+
+
+def deduplicate(repos: Json, user_keep: List[str]) -> Json:
+ """Deduplicate repository names in a configuration object,
+ keeping the main repository and names provided explicitly by the user."""
+ keep = set(user_keep)
+ main = repos.get("main")
+ if isinstance(main, str):
+ keep.add(main)
+
+ def choose_representative(c: List[str]) -> str:
+ """Out of a bisimilarity class chose a main representative."""
+ candidates = c
+ # Keep a repository with a proper root, if any of those has a root.
+ # In this way, we're not losing actual roots.
+ with_root = [
+ n for n in candidates
+ if isinstance(repos["repositories"][n]["repository"], dict)
+ ]
+ if with_root:
+ candidates = with_root
+
+ # Prefer to choose a repository we have to keep anyway
+ keep_entries = set(candidates) & keep
+ if keep_entries:
+ candidates = list(keep_entries)
+
+ return sorted(candidates, key=lambda s: (s.count("/"), len(s), s))[0]
+
+ def merge_pragma(rep: str, merged: List[str]) -> Json:
+ """Merge the pragma fields in a sensible manner."""
+ desc = cast(Union[Any, Json], repos["repositories"][rep]["repository"])
+ if not isinstance(desc, dict):
+ return desc
+ pragma = desc.get("pragma", {})
+ # Clear pragma absent unless all merged repos that are not references
+ # have the pragma
+ absent: bool = pragma.get("absent", False)
+ for c in merged:
+ alt_desc = cast(Union[Any, Json],
+ repos["repositories"][c]["repository"])
+ if (isinstance(alt_desc, dict)):
+ absent = \
+ absent and alt_desc.get("pragma", {}).get("absent", False)
+ pragma = dict(pragma, **{"absent": absent})
+ if not absent:
+ del pragma["absent"]
+ # Add pragma to_git if at least one of the merged repos requires it
+ to_git = pragma.get("to_git", False)
+ for c in merged:
+ alt_desc = cast(Union[Any, Json],
+ repos["repositories"][c]["repository"])
+ if (isinstance(alt_desc, dict)):
+ to_git = \
+ to_git or alt_desc.get("pragma", {}).get("to_git", False)
+ pragma = dict(pragma, **{"to_git": to_git})
+ if not to_git:
+ del pragma["to_git"]
+ # Update the pragma
+ desc = dict(desc, **{"pragma": pragma})
+ if not pragma:
+ del desc["pragma"]
+ return desc
+
+ bisim = bisimilar_repos(repos["repositories"])
+ renaming: Dict[str, str] = {}
+ updated_repos: Json = {}
+ for c in bisim:
+ if len(c) == 1:
+ continue
+ rep = choose_representative(c)
+ updated_repos[rep] = merge_pragma(rep, c)
+ for repo in c:
+ if ((repo not in keep) and (repo != rep)):
+ renaming[repo] = rep
+
+ def final_root_reference(name: str) -> str:
+ """For a given repository name, return a name than can be used
+ to name root in the final repository configuration."""
+ root: Any = repos["repositories"][name]["repository"]
+ if isinstance(root, dict):
+ # actual root; can still be merged into a different once, but only
+ # one with a proper root as well.
+ return renaming.get(name, name)
+ if isinstance(root, str):
+ return final_root_reference(root)
+ fail("Invalid root found for %r: %r" % (name, root))
+
+ new_repos: Json = {}
+ for name in repos["repositories"].keys():
+ if name not in renaming:
+ desc: Json = repos["repositories"][name]
+ if name in updated_repos:
+ desc = dict(desc, **{"repository": updated_repos[name]})
+ if "bindings" in desc:
+ bindings = desc["bindings"]
+ new_bindings = {}
+ for k, v in bindings.items():
+ if v in renaming:
+ new_bindings[k] = renaming[v]
+ else:
+ new_bindings[k] = v
+ desc = dict(desc, **{"bindings": new_bindings})
+ new_roots: Json = {}
+ for root in ["repository", "target_root", "rule_root"]:
+ root_val = desc.get(root)
+ if isinstance(root_val, str) and (root_val in renaming):
+ new_roots[root] = final_root_reference(root_val)
+ desc = dict(desc, **new_roots)
+ new_repos[name] = desc
+ return dict(repos, **{"repositories": new_repos})
+
+
+###
+# Core logic
+##
+
+
+def lock_config(input_file: str) -> Json:
+ """Main logic of just-lock."""
+ input_config: Json = {}
+ try:
+ with open(input_file) as f:
+ input_config = json.load(f)
+ except OSError:
+ fail("Failed to open input file %s" % (input_file, ))
+ except Exception as ex:
+ fail("Reading input file %s failed with:\n%r" % (input_file, ex))
+
+ # Get the known fields
+ main: Optional[str] = None
+ if input_config.get("main") is not None:
+ main = input_config.get("main")
+ if not isinstance(main, str):
+ fail("Expected field \"main\" to be a string, but found:\n%r" %
+ (json.dumps(main, indent=2), ))
+
+ repositories: Json = input_config.get("repositories", DEFAULT_REPO)
+ if not repositories: # if empty, set it as default
+ repositories = DEFAULT_REPO
+
+ imports: List[Any] = input_config.get("imports", [])
+ if not isinstance(imports, list):
+ fail("Expected field \"imports\" to be a list, but found:\n%r" %
+ (json.dumps(imports, indent=2), ))
+
+ keep: List[Any] = input_config.get("keep", [])
+ if not isinstance(keep, list):
+ fail("Expected field \"keep\" to be a list, but found:\n%r" %
+ (json.dumps(keep, indent=2), ))
+
+ # Initialize the core config, which will be extended with imports
+ core_config: Json = {}
+ if main is not None:
+ core_config = dict(core_config, **{"main": main})
+ core_config = dict(core_config, **{"repositories": repositories})
+
+ # Handle imports
+ for entry in imports:
+ if not isinstance(entry, dict):
+ fail("Expected import entries to be objects, but found:\n%r" %
+ (json.dumps(entry, indent=2), ))
+ entry = cast(Json, entry)
+
+ source = entry.get("source")
+ if not isinstance(source, str):
+ fail("Expected field \"source\" to be a string, but found:\n%r" %
+ (json.dumps(source, indent=2), ))
+
+ if source == "git":
+ core_config["repositories"] = import_from_git(repositories, entry)
+ elif source == "file":
+ # TODO(psarbu): Implement source "file"
+ print("Import from source \"file\" not yet implemented!")
+ elif source == "archive":
+ # TODO(psarbu): Implement source "archive"
+ print("Import from source \"archive\" not yet implemented!")
+ elif source == "git-tree":
+ # TODO(psarbu): Implement source "git-tree"
+ print("Import from source \"git-tree\" not yet implemented!")
+ elif source == "generic":
+ # TODO(psarbu): Implement source "generic"
+ print("Import from source \"generic\" not yet implemented!")
+ else:
+ fail("Unknown source for import entry \n%r" %
+ (json.dumps(entry, indent=2), ))
+
+ # Deduplicate output config
+ core_config = deduplicate(core_config, keep)
+
+ return core_config
+
+
+def main():
+ parser = ArgumentParser(
+ prog="just-lock",
+ description="Generate or update a multi-repository configuration file")
+ parser.add_argument("-C",
+ dest="input_file",
+ help="Input configuration file",
+ metavar="FILE")
+ parser.add_argument("-o",
+ dest="output_file",
+ help="Output multi-repository configuration file",
+ metavar="FILE")
+ parser.add_argument("--local-build-root",
+ dest="local_build_root",
+ help="Root for CAS, repository space, etc",
+ metavar="PATH")
+ parser.add_argument("--git",
+ dest="git_bin",
+ help="Git binary to use",
+ metavar="PATH")
+ parser.add_argument("--launcher",
+ dest="launcher",
+ help="Local launcher to use for commands in imports",
+ metavar="JSON")
+ args = parser.parse_args()
+
+ # Get the path of the input file
+ if args.input_file:
+ input_file = cast(str, args.input_file)
+ # if explicitly provided, it must be an existing file
+ if not os.path.isfile(input_file):
+ fail("Provided input path '%s' is not a file!" % (input_file, ))
+ else:
+ # search usual paths in workspace root
+ input_file = get_repository_config_file(DEFAULT_INPUT_CONFIG_NAME)
+ if not input_file:
+ fail("Failed to find an input file in the workspace")
+
+ # Get the path of the output file
+ if args.output_file:
+ output_file = cast(str, args.output_file)
+ else:
+ # search usual paths in workspace root
+ output_file = get_repository_config_file(DEFAULT_JUSTMR_CONFIG_NAME)
+ if not output_file:
+ # set it next to the input file
+ parent_path = Path(input_file).parent
+ output_file = os.path.realpath(
+ os.path.join(parent_path, DEFAULT_JUSTMR_CONFIG_NAME))
+
+ # Process the rest of the command line; use globals for simplicity
+ global g_ROOT, g_GIT, g_LAUNCHER
+ if args.local_build_root:
+ g_ROOT = os.path.abspath(args.local_build_root)
+ if args.git_bin:
+ g_GIT = args.git_bin
+ if args.launcher:
+ g_LAUNCHER = json.loads(args.launcher)
+
+ out_config = lock_config(input_file)
+ with open(output_file, "w") as f:
+ json.dump(out_config, f, indent=2)
+
+
+if __name__ == "__main__":
+ main()