diff options
-rwxr-xr-x | bin/just-lock.py | 232 |
1 files changed, 228 insertions, 4 deletions
diff --git a/bin/just-lock.py b/bin/just-lock.py index 753cc75b..a17a11c5 100755 --- a/bin/just-lock.py +++ b/bin/just-lock.py @@ -24,7 +24,7 @@ import tempfile import time import zlib -from argparse import ArgumentParser, ArgumentError +from argparse import ArgumentParser, ArgumentError, RawTextHelpFormatter from pathlib import Path from typing import Any, Dict, List, NoReturn, Optional, Set, TextIO, Tuple, Union, cast from enum import Enum @@ -39,8 +39,13 @@ Json = Dict[str, Any] MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"] SYSTEM_ROOT: str = os.path.abspath(os.sep) + ALT_DIRS: List[str] = ["target_root", "rule_root", "expression_root"] REPO_ROOTS: List[str] = ["repository"] + ALT_DIRS +REPO_KEYS_TO_KEEP: List[str] = [ + "target_file_name", "rule_file_name", "expression_file_name", "bindings" +] + ALT_DIRS + DEFAULT_BUILD_ROOT: str = os.path.join(Path.home(), ".cache/just") DEFAULT_GIT_BIN: str = "git" # to be taken from PATH DEFAULT_LAUNCHER: List[str] = ["env", "--"] @@ -84,6 +89,8 @@ g_GIT: str = DEFAULT_GIT_BIN """Git binary to use""" g_LAUNCHER: List[str] = DEFAULT_LAUNCHER """Local launcher to use for commands provided in imports""" +g_CLONE_MAP: Dict[str, Tuple[str, List[str]]] = {} +"""Mapping from local path to pair of repository name and bindings chain for cloning""" ### # System utils @@ -1763,6 +1770,192 @@ def import_from_git_tree(core_repos: Json, imports_entry: Json) -> Json: ### +# Cloning logic +## + + +def rewrite_cloned_repo(repos: Json, *, clone_to: str, target_repo: str, + ws_root_repo: str) -> Json: + """Rewrite description of a locally-cloned repository.""" + # Set workspace root description + new_spec: Json = { + "repository": { + "type": "file", + "path": os.path.abspath(clone_to) + } + } + + # Keep any existing "special" or "to_git" pragmas from the workspace root + # repository + pragma: Json = {} + existing: Json = repos[ws_root_repo]["repository"].get("pragma", {}) + special = existing.get("special", None) + if special: + pragma["special"] = special + to_git = existing.get("to_git", False) + if to_git: + pragma["to_git"] = True + if pragma: + new_spec["repository"]["pragma"] = pragma + + # Keep bindings, roots, and root files from the target repository to be able + # to build against it + layer_desc: Json = repos[target_repo] + for key in REPO_KEYS_TO_KEEP: + if key in layer_desc: + new_spec[key] = layer_desc[key] + + return new_spec + + +def clone_repo(repos: Json, known_repo: str, deps_chain: List[str], + clone_to: str) -> Optional[Tuple[str, str]]: + """Clone the workspace root of a Git repository into a given directory path. + The repository is described by a dependency chain from a given start + repository. Returns the names of the target repository and of the repository + providing its workspace root (which can be the same) on success, None if no + cloning can be done.""" + # Set granular logging message + fail_context: str = "While processing clone entry %r:\n" % (json.dumps( + {clone_to: [known_repo, deps_chain]}), ) + + # Check cloning path + clone_to = os.path.abspath(clone_to) + if os.path.exists(clone_to): + if not os.path.isdir(clone_to): + fail(fail_context + "Clone path %r exists and is not a directory!" % + (json.dumps(clone_to), )) + if os.listdir(clone_to): + warn(fail_context + + "Clone directory %r exists and is not empty! Skipping" % + (json.dumps(clone_to), )) + return None + + ## Helper functions + + def process_file(repository: Json, *, clone_to: str, + fail_context: str) -> str: + """Process a file repository and return its path.""" + # Parse fields + fpath: str = repository.get("path", None) + if not isinstance(fpath, str): + fail(fail_context + + "Expected field \"path\" to be a string, but found:\n%r" % + (json.dumps(fpath, indent=2), )) + # Simply copy directory tree in the new location + try: + shutil.copytree(fpath, clone_to, symlinks=True, dirs_exist_ok=True) + except Exception as ex: + fail(fail_context + "Copying file path %s failed with:\n%r" % + (fpath, ex)) + return fpath + + def follow_binding(repos: Json, *, repo_name: str, dep_name: str, + fail_context: str) -> str: + """Follow a named binding.""" + if repo_name not in repos.keys(): + fail(fail_context + "Failed to find repository %r" % + (json.dumps(repo_name), )) + if "bindings" not in repos[repo_name].keys(): + fail(fail_context + "Repository %r does not have bindings!" % + (json.dumps(repo_name), )) + if dep_name not in repos[repo_name]["bindings"].keys(): + fail(fail_context + "Repository %r does not have binding %r" % + (json.dumps(repo_name), json.dumps(dep_name))) + return repos[repo_name]["bindings"][dep_name] + + ## Main logic + + # Follow the bindings + target_repo: str = known_repo + for dep in deps_chain: + target_repo = follow_binding(repos, + repo_name=target_repo, + dep_name=dep, + fail_context=fail_context) + + if target_repo not in repos.keys(): + fail(fail_context + "Failed to find repository %r" % + (json.dumps(target_repo), )) + + # Get the workspace root of the target repository + repo_to_clone: str = target_repo + while isinstance(repos[repo_to_clone].get("repository", None), str): + repo_to_clone = repos[repo_to_clone]["repository"] + if repo_to_clone not in repos.keys(): + fail(fail_context + "Failed to find repository %r" % + (json.dumps(repo_to_clone), )) + repo_desc: Json = repos[repo_to_clone] + repository: Json = repo_desc["repository"] + repo_type: str = repository.get("type", None) + if not isinstance(repo_type, str): + fail(fail_context + + "Expected field \"type\" to be a string, but found:\n%r" % + (json.dumps(repo_type, indent=2), )) + + # Clone the repository locally, based on type; lock exclusively for writing + lockfile = lock_acquire(os.path.join(Path(clone_to).parent, "clone.lock")) + report("Cloning workspace root of repository %r to %s" % + (json.dumps(target_repo), clone_to)) + + result: Optional[Tuple[str, str]] = (target_repo, repo_to_clone) + if repo_type == "file": + fpath = process_file(repository, + clone_to=clone_to, + fail_context=fail_context) + report("\tCloned file path %s to %s" % (fpath, clone_to)) + + elif repo_type == "git": + #TODO(psarbu): Implement git cloning + warn(fail_context + + "Cloning from \"%s\" repositories not yet implemented!" % + (repo_type, )) + result = None + + elif repo_type in ["archive", "zip"]: + #TODO(psarbu): Implement archives cloning + warn(fail_context + + "Cloning from \"%s\" repositories not yet implemented!" % + (repo_type, )) + result = None + + elif repo_type == "foreign file": + #TODO(psarbu): Implement foreign file cloning + warn(fail_context + + "Cloning from \"%s\" repositories not yet implemented!" % + (repo_type, )) + result = None + + elif repo_type == "distdir": + #TODO(psarbu): Implement distdir cloning + warn(fail_context + + "Cloning from \"%s\" repositories not yet implemented!" % + (repo_type, )) + result = None + + elif repo_type == "git tree": + #TODO(psarbu): Implement git tree cloning + warn(fail_context + + "Cloning from \"%s\" repositories not yet implemented!" % + (repo_type, )) + result = None + + elif repo_type in ["computed", "tree structure"]: + warn(fail_context + "Cloning not supported for type %r. Skipping" % + (json.dumps(repo_type), )) + result = None + + else: + warn(fail_context + "Found unknown type %r. Skipping" % + (json.dumps(repo_type), )) + result = None + + # Release lock and return keep list + lock_release(lockfile) + return result + + +### # Deduplication logic ## @@ -2064,7 +2257,7 @@ def lock_config(input_file: str) -> Json: fail("Expected field \"imports\" to be a list, but found:\n%r" % (json.dumps(imports, indent=2), )) - keep: List[Any] = input_config.get("keep", []) + keep: List[str] = input_config.get("keep", []) if not isinstance(keep, list): fail("Expected field \"keep\" to be a list, but found:\n%r" % (json.dumps(keep, indent=2), )) @@ -2110,6 +2303,25 @@ def lock_config(input_file: str) -> Json: fail("Unknown source for import entry \n%r" % (json.dumps(entry, indent=2), )) + # Clone specified Git repositories locally + rewritten_repos: Json = {} + for clone_to, (known_repo, deps_chain) in g_CLONE_MAP.items(): + # Find target repository and clone its workspace root + result = clone_repo(core_config["repositories"], known_repo, deps_chain, + clone_to) + if result is not None: + target_repo, cloned_repo = result + # Rewrite description of target repo to point to clone location + rewritten_repos[target_repo] = rewrite_cloned_repo( + core_config["repositories"], + clone_to=clone_to, + target_repo=target_repo, + ws_root_repo=cloned_repo) + # Add start and target repos to 'keep' list + keep += [known_repo, target_repo] + + core_config["repositories"].update(rewritten_repos) + # Release garbage collector locks lock_release(storage_gc_lock) lock_release(git_gc_lock) @@ -2123,6 +2335,7 @@ def lock_config(input_file: str) -> Json: def main(): parser = ArgumentParser( prog="just-lock", + formatter_class=RawTextHelpFormatter, description="Generate or update a multi-repository configuration file", exit_on_error=False, # catch parsing errors ourselves ) @@ -2150,6 +2363,15 @@ def main(): dest="launcher", help="Local launcher to use for commands in imports", metavar="JSON") + parser.add_argument( + "--clone", + dest="clone", + help="\n".join([ + "Mapping from filesystem path to pair of repository name and list of bindings.", + "Clone at path the workspace root of a repository found by following the bindings from named repository.", + "IMPORTANT: The output configuration will point to the cloned repositories!" + ]), + metavar="JSON") try: args = parser.parse_args() @@ -2182,15 +2404,17 @@ def main(): os.path.join(parent_path, DEFAULT_JUSTMR_CONFIG_NAME)) # Process the rest of the command line; use globals for simplicity - global g_ROOT, g_JUST, g_GIT, g_LAUNCHER + global g_ROOT, g_JUST, g_GIT, g_LAUNCHER, g_CLONE_MAP if args.local_build_root: g_ROOT = os.path.abspath(args.local_build_root) if args.just_bin: g_JUST = args.just_bin if args.git_bin: - g_GIT = args.git_bin + g_GIT = cast(str, args.git_bin) if args.launcher: g_LAUNCHER = json.loads(args.launcher) + if args.clone: + g_CLONE_MAP = json.loads(args.clone) out_config = lock_config(input_file) with open(output_file, "w") as f: |