summaryrefslogtreecommitdiff
path: root/bin/just-import-git.py
diff options
context:
space:
mode:
Diffstat (limited to 'bin/just-import-git.py')
-rwxr-xr-xbin/just-import-git.py230
1 files changed, 134 insertions, 96 deletions
diff --git a/bin/just-import-git.py b/bin/just-import-git.py
index 19b21539..217e8dc3 100755
--- a/bin/just-import-git.py
+++ b/bin/just-import-git.py
@@ -20,20 +20,28 @@ import shutil
import sys
import tempfile
-from argparse import ArgumentParser
+from argparse import ArgumentParser, Namespace
from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, cast
-def log(*args, **kwargs):
+# generic JSON type that avoids getter issues; proper use is being enforced by
+# return types of methods and typing vars holding return values of json getters
+Json = Dict[str, Any]
+
+
+def log(*args: str, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
-def fail(s, exit_code=1):
+
+def fail(s: str, exit_code: int = 1):
log(f"Error: {s}")
sys.exit(exit_code)
-MARKERS = [".git", "ROOT", "WORKSPACE"]
-SYSTEM_ROOT = os.path.abspath(os.sep)
-DEFAULT_CONFIG_LOCATIONS = [{
+
+MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"]
+SYSTEM_ROOT: str = os.path.abspath(os.sep)
+DEFAULT_CONFIG_LOCATIONS: List[Dict[str, str]] = [{
"root": "workspace",
"path": "repos.json"
}, {
@@ -47,23 +55,21 @@ DEFAULT_CONFIG_LOCATIONS = [{
"path": "etc/just-repos.json"
}]
-def run_cmd(cmd,
+
+def run_cmd(cmd: List[str],
*,
- env=None,
- stdout=subprocess.DEVNULL,
- stdin=None,
- cwd):
- result = subprocess.run(cmd,
- cwd=cwd,
- env=env,
- stdout=stdout,
- stdin=stdin)
+ env: Optional[Any] = None,
+ stdout: Optional[Any] = subprocess.DEVNULL,
+ stdin: Optional[Any] = None,
+ cwd: str):
+ result = subprocess.run(cmd, cwd=cwd, env=env, stdout=stdout, stdin=stdin)
if result.returncode != 0:
fail("Command %s in %s failed" % (cmd, cwd))
return result.stdout
-def find_workspace_root(path=None):
- def is_workspace_root(path):
+
+def find_workspace_root(path: Optional[str] = None) -> Optional[str]:
+ def is_workspace_root(path: str) -> bool:
for m in MARKERS:
if os.path.exists(os.path.join(path, m)):
return True
@@ -79,7 +85,7 @@ def find_workspace_root(path=None):
path = os.path.dirname(path)
-def read_location(location, root=None):
+def read_location(location: Dict[str, str], root: Optional[str] = None) -> str:
search_root = location.get("root", None)
search_path = location.get("path", None)
@@ -96,95 +102,113 @@ def read_location(location, root=None):
fs_root = SYSTEM_ROOT
if fs_root:
- return os.path.realpath(os.path.join(fs_root, search_path))
- return "/" # certainly not a file
+ return os.path.realpath(
+ os.path.join(cast(str, fs_root), cast(str, search_path)))
+ return "/" # certainly not a file
+
-def get_repository_config_file(root=None):
+def get_repository_config_file(root: Optional[str] = None) -> Optional[str]:
for location in DEFAULT_CONFIG_LOCATIONS:
path = read_location(location, root=root)
if path and os.path.isfile(path):
return path
-def get_base_config(repository_config):
+def get_base_config(repository_config: Optional[str]) -> Optional[Json]:
if repository_config == "-":
return json.load(sys.stdin)
if not repository_config:
repository_config = get_repository_config_file()
- with open(repository_config) as f:
- return json.load(f)
+ if (repository_config):
+ with open(repository_config) as f:
+ return json.load(f)
+ fail('Could not get base config')
-def clone(url, branch):
+
+def clone(url: str, branch: str) -> Tuple[str, Dict[str, str], str]:
# clone the given git repository, checkout the specified
# branch, and return the checkout location
- workdir = tempfile.mkdtemp()
+ workdir: str = tempfile.mkdtemp()
run_cmd(["git", "clone", "-b", branch, "--depth", "1", url, "src"],
cwd=workdir)
- srcdir = os.path.join(workdir, "src")
- commit = run_cmd(["git", "log", "-n", "1", "--pretty=%H"],
- cwd = srcdir, stdout=subprocess.PIPE).decode('utf-8').strip()
- log("Importing commit %s" % (commit,))
- repo = { "type": "git",
- "repository": url,
- "branch": branch,
- "commit": commit,
- }
+ srcdir: str = os.path.join(workdir, "src")
+ commit: str = run_cmd(["git", "log", "-n", "1", "--pretty=%H"],
+ cwd=srcdir,
+ stdout=subprocess.PIPE).decode('utf-8').strip()
+ log("Importing commit %s" % (commit, ))
+ repo: Dict[str, str] = {
+ "type": "git",
+ "repository": url,
+ "branch": branch,
+ "commit": commit,
+ }
return srcdir, repo, workdir
-def get_repo_to_import(config):
+
+def get_repo_to_import(config: Json) -> Optional[str]:
"""From a given repository config, take the main repository."""
if config.get("main") is not None:
- return config.get("main")
+ return cast(str, config.get("main"))
repos = config.get("repositories", {}).keys()
if repos:
return sorted(repos)[0]
fail("Config does not contain any repositories; unsure what to import")
-def repos_to_import(repos_config, entry, known=None):
+
+def repos_to_import(repos_config: Json,
+ entry: str,
+ known: Optional[Iterable[str]] = None) -> List[str]:
"""Compute the set of transitively reachable repositories"""
- visited = set()
- to_import = []
+ visited: Set[str] = set()
+ to_import: List[str] = []
if known:
visited = set(known)
- def visit(name):
+ def visit(name: str) -> None:
if name in visited:
return
to_import.append(name)
visited.add(name)
- for n in repos_config.get(name, {}).get("bindings", {}).values():
+ vals = cast(Dict[str, str],
+ repos_config.get(name, {}).get("bindings", {})).values()
+ for n in vals:
visit(n)
visit(entry)
return to_import
-def extra_layers_to_import(repos_config, repos):
+
+def extra_layers_to_import(repos_config: Json, repos: List[str]) -> List[str]:
"""Compute the collection of repositories additionally needed as they serve
as layers for the repositories to import."""
- extra_imports = set()
+ extra_imports: Set[str] = set()
for repo in repos:
if isinstance(repos_config[repo]["repository"], str):
extra_imports.add(repos_config[repo]["repository"])
for layer in ["target_root", "rule_root", "expression_root"]:
if layer in repos_config[repo]:
- extra = repos_config[repo][layer]
+ extra: str = repos_config[repo][layer]
if (extra not in repos) and (extra not in extra_imports):
extra_imports.add(extra)
return list(extra_imports)
-def name_imports(to_import, existing, base_name, main=None):
+
+def name_imports(to_import: List[str],
+ existing: Set[str],
+ base_name: str,
+ main: Optional[str] = None) -> Dict[str, str]:
"""Assign names to the repositories to import in such a way that
no conflicts arise."""
- assign = {}
+ assign: Dict[str, str] = {}
- def find_name(name):
- base = "%s/%s" % (base_name, name)
+ def find_name(name: str) -> str:
+ base: str = "%s/%s" % (base_name, name)
if (base not in existing) and (base not in assign):
return base
- count = 0
+ count: int = 0
while True:
count += 1
- candidate = base + " (%d)" % count
+ candidate: str = base + " (%d)" % count
if (candidate not in existing) and (candidate not in assign):
return candidate
@@ -195,8 +219,10 @@ def name_imports(to_import, existing, base_name, main=None):
assign[repo] = find_name(repo)
return assign
-def rewrite_repo(repo_spec, *, remote, assign):
- new_spec = {}
+
+def rewrite_repo(repo_spec: Json, *, remote: Dict[str, str],
+ assign: Json) -> Json:
+ new_spec: Json = {}
repo = repo_spec.get("repository", {})
if isinstance(repo, str):
repo = assign[repo]
@@ -207,7 +233,8 @@ def rewrite_repo(repo_spec, *, remote, assign):
changes["subdir"] = subdir
repo = dict(remote, **changes)
elif repo.get("type") == "distdir":
- new_repos = [assign[k] for k in repo.get("repositories", [])]
+ existing_repos: List[str] = repo.get("repositories", [])
+ new_repos = [assign[k] for k in existing_repos]
repo = dict(repo, **{"repositories": new_repos})
new_spec["repository"] = repo
for key in ["target_root", "rule_root", "expression_root"]:
@@ -224,89 +251,100 @@ def rewrite_repo(repo_spec, *, remote, assign):
new_spec["bindings"] = new_bindings
return new_spec
-def handle_import(args):
- base_config = get_base_config(args.repository_config)
- base_repos = base_config.get("repositories", {})
+
+def handle_import(args: Namespace) -> Json:
+ base_config: Json = cast(Json, get_base_config(args.repository_config))
+ base_repos: Json = base_config.get("repositories", {})
srcdir, remote, to_cleanup = clone(args.URL, args.branch)
if args.foreign_repository_config:
foreign_config_file = args.foreign_repository_config
else:
foreign_config_file = get_repository_config_file(srcdir)
+ foreign_config: Json = {}
if args.plain:
- foreign_config = { "main": "",
- "repositories": {"": {"repository":
- {"type": "file",
- "path": "." }}}}
+ foreign_config = {
+ "main": "",
+ "repositories": {
+ "": {
+ "repository": {
+ "type": "file",
+ "path": "."
+ }
+ }
+ }
+ }
else:
- with open(foreign_config_file) as f:
- foreign_config = json.load(f)
- foreign_repos = foreign_config.get("repositories", {})
+ if (foreign_config_file):
+ with open(foreign_config_file) as f:
+ foreign_config = json.load(f)
+ else:
+ fail('Could not get repository config file')
+ foreign_repos: Json = foreign_config.get("repositories", {})
+ foreign_name: Optional[str] = None
if args.foreign_repository_name:
foreign_name = args.foreign_repository_name
else:
foreign_name = get_repo_to_import(foreign_config)
- import_map = {}
+ import_map: Json = {}
for theirs, ours in args.import_map:
import_map[theirs] = ours
- main_repos = repos_to_import(foreign_repos, foreign_name, import_map.keys())
+ main_repos: List[str] = []
+ if (foreign_name):
+ main_repos = repos_to_import(foreign_repos, foreign_name,
+ import_map.keys())
extra_repos = sorted([x for x in main_repos if x != foreign_name])
extra_imports = sorted(extra_layers_to_import(foreign_repos, main_repos))
- ordered_imports = [foreign_name] + extra_repos + extra_imports
- import_name = foreign_name
+ ordered_imports: List[str] = [cast(str, foreign_name)
+ ] + extra_repos + extra_imports
+ import_name = cast(str, foreign_name)
if args.import_as is not None:
import_name = args.import_as
- assign = name_imports(
+ assign: Dict[str, str] = name_imports(
ordered_imports,
set(base_repos.keys()),
import_name,
- main = foreign_name,
+ main=foreign_name,
)
log("Importing %r as %r" % (foreign_name, import_name))
- log("Transitive dependencies to import: %r" % (extra_repos,))
- log("Repositories imported as layers: %r" % (extra_imports,))
+ log("Transitive dependencies to import: %r" % (extra_repos, ))
+ log("Repositories imported as layers: %r" % (extra_imports, ))
total_assign = dict(assign, **import_map)
for repo in ordered_imports:
base_repos[assign[repo]] = rewrite_repo(
foreign_repos[repo],
- remote = remote,
- assign = total_assign,
+ remote=remote,
+ assign=total_assign,
)
base_config["repositories"] = base_repos
shutil.rmtree(to_cleanup)
return base_config
+
def main():
parser = ArgumentParser(
- prog = "just-import-deps",
- description =
- "Import a dependency transitively into a given"
- + " multi-repository configuration"
- )
- parser.add_argument(
- "-C",
- dest="repository_config",
- help="Repository-description file to import into",
- metavar="FILE"
- )
+ prog="just-import-deps",
+ description="Import a dependency transitively into a given" +
+ " multi-repository configuration")
+ parser.add_argument("-C",
+ dest="repository_config",
+ help="Repository-description file to import into",
+ metavar="FILE")
parser.add_argument(
"-b",
dest="branch",
help="The branch of the remote repository to import and follow",
metavar="branch",
- default="master"
- )
+ default="master")
parser.add_argument(
"-R",
dest="foreign_repository_config",
help="Repository-description file in the repository to import",
- metavar="relative-path"
- )
+ metavar="relative-path")
parser.add_argument(
"--plain",
action="store_true",
- help=
- "Pretend the remote repository description is the canonical"
- + " single-repository one",
+ help="Pretend the remote repository description is the canonical" +
+ " single-repository one",
)
parser.add_argument(
"--as",
@@ -318,10 +356,10 @@ def main():
"--map",
nargs=2,
dest="import_map",
- help="Map the specified foreign repository to the specified existing repository",
+ help=
+ "Map the specified foreign repository to the specified existing repository",
action="append",
- default=[]
- )
+ default=[])
parser.add_argument('URL')
parser.add_argument('foreign_repository_name', nargs='?')
args = parser.parse_args()