summaryrefslogtreecommitdiff
path: root/bin/just-import-git.py
diff options
context:
space:
mode:
authorPaul Cristian Sarbu <paul.cristian.sarbu@huawei.com>2023-06-26 11:56:07 +0200
committerPaul Cristian Sarbu <paul.cristian.sarbu@huawei.com>2023-08-29 17:19:00 +0200
commit0d8aeb4655446d05ecbb3fc3336ebe02741e91bf (patch)
treee381bd76932c8b0f56e1efbba5d28e202619c772 /bin/just-import-git.py
parent0e17643a522756caa78c4ddf7226c77c66185c03 (diff)
downloadjustbuild-0d8aeb4655446d05ecbb3fc3336ebe02741e91bf.tar.gz
just-import-git.py: Add type hints and fix style
For maximum compatibility, we use the uppercase types from the typing package instead of the built-in types, therefore compliant with PEP 484 and PEP 526. As unfortunately there is no proper JSON typing option that requires many casts, we use a more lax typing for JSON inputs, but enforce return types in order to implicitly infer the actual format of an input JSON variable (dict, list, string etc.).
Diffstat (limited to 'bin/just-import-git.py')
-rwxr-xr-xbin/just-import-git.py230
1 files changed, 134 insertions, 96 deletions
diff --git a/bin/just-import-git.py b/bin/just-import-git.py
index 19b21539..217e8dc3 100755
--- a/bin/just-import-git.py
+++ b/bin/just-import-git.py
@@ -20,20 +20,28 @@ import shutil
import sys
import tempfile
-from argparse import ArgumentParser
+from argparse import ArgumentParser, Namespace
from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, cast
-def log(*args, **kwargs):
+# generic JSON type that avoids getter issues; proper use is being enforced by
+# return types of methods and typing vars holding return values of json getters
+Json = Dict[str, Any]
+
+
+def log(*args: str, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
-def fail(s, exit_code=1):
+
+def fail(s: str, exit_code: int = 1):
log(f"Error: {s}")
sys.exit(exit_code)
-MARKERS = [".git", "ROOT", "WORKSPACE"]
-SYSTEM_ROOT = os.path.abspath(os.sep)
-DEFAULT_CONFIG_LOCATIONS = [{
+
+MARKERS: List[str] = [".git", "ROOT", "WORKSPACE"]
+SYSTEM_ROOT: str = os.path.abspath(os.sep)
+DEFAULT_CONFIG_LOCATIONS: List[Dict[str, str]] = [{
"root": "workspace",
"path": "repos.json"
}, {
@@ -47,23 +55,21 @@ DEFAULT_CONFIG_LOCATIONS = [{
"path": "etc/just-repos.json"
}]
-def run_cmd(cmd,
+
+def run_cmd(cmd: List[str],
*,
- env=None,
- stdout=subprocess.DEVNULL,
- stdin=None,
- cwd):
- result = subprocess.run(cmd,
- cwd=cwd,
- env=env,
- stdout=stdout,
- stdin=stdin)
+ env: Optional[Any] = None,
+ stdout: Optional[Any] = subprocess.DEVNULL,
+ stdin: Optional[Any] = None,
+ cwd: str):
+ result = subprocess.run(cmd, cwd=cwd, env=env, stdout=stdout, stdin=stdin)
if result.returncode != 0:
fail("Command %s in %s failed" % (cmd, cwd))
return result.stdout
-def find_workspace_root(path=None):
- def is_workspace_root(path):
+
+def find_workspace_root(path: Optional[str] = None) -> Optional[str]:
+ def is_workspace_root(path: str) -> bool:
for m in MARKERS:
if os.path.exists(os.path.join(path, m)):
return True
@@ -79,7 +85,7 @@ def find_workspace_root(path=None):
path = os.path.dirname(path)
-def read_location(location, root=None):
+def read_location(location: Dict[str, str], root: Optional[str] = None) -> str:
search_root = location.get("root", None)
search_path = location.get("path", None)
@@ -96,95 +102,113 @@ def read_location(location, root=None):
fs_root = SYSTEM_ROOT
if fs_root:
- return os.path.realpath(os.path.join(fs_root, search_path))
- return "/" # certainly not a file
+ return os.path.realpath(
+ os.path.join(cast(str, fs_root), cast(str, search_path)))
+ return "/" # certainly not a file
+
-def get_repository_config_file(root=None):
+def get_repository_config_file(root: Optional[str] = None) -> Optional[str]:
for location in DEFAULT_CONFIG_LOCATIONS:
path = read_location(location, root=root)
if path and os.path.isfile(path):
return path
-def get_base_config(repository_config):
+def get_base_config(repository_config: Optional[str]) -> Optional[Json]:
if repository_config == "-":
return json.load(sys.stdin)
if not repository_config:
repository_config = get_repository_config_file()
- with open(repository_config) as f:
- return json.load(f)
+ if (repository_config):
+ with open(repository_config) as f:
+ return json.load(f)
+ fail('Could not get base config')
-def clone(url, branch):
+
+def clone(url: str, branch: str) -> Tuple[str, Dict[str, str], str]:
# clone the given git repository, checkout the specified
# branch, and return the checkout location
- workdir = tempfile.mkdtemp()
+ workdir: str = tempfile.mkdtemp()
run_cmd(["git", "clone", "-b", branch, "--depth", "1", url, "src"],
cwd=workdir)
- srcdir = os.path.join(workdir, "src")
- commit = run_cmd(["git", "log", "-n", "1", "--pretty=%H"],
- cwd = srcdir, stdout=subprocess.PIPE).decode('utf-8').strip()
- log("Importing commit %s" % (commit,))
- repo = { "type": "git",
- "repository": url,
- "branch": branch,
- "commit": commit,
- }
+ srcdir: str = os.path.join(workdir, "src")
+ commit: str = run_cmd(["git", "log", "-n", "1", "--pretty=%H"],
+ cwd=srcdir,
+ stdout=subprocess.PIPE).decode('utf-8').strip()
+ log("Importing commit %s" % (commit, ))
+ repo: Dict[str, str] = {
+ "type": "git",
+ "repository": url,
+ "branch": branch,
+ "commit": commit,
+ }
return srcdir, repo, workdir
-def get_repo_to_import(config):
+
+def get_repo_to_import(config: Json) -> Optional[str]:
"""From a given repository config, take the main repository."""
if config.get("main") is not None:
- return config.get("main")
+ return cast(str, config.get("main"))
repos = config.get("repositories", {}).keys()
if repos:
return sorted(repos)[0]
fail("Config does not contain any repositories; unsure what to import")
-def repos_to_import(repos_config, entry, known=None):
+
+def repos_to_import(repos_config: Json,
+ entry: str,
+ known: Optional[Iterable[str]] = None) -> List[str]:
"""Compute the set of transitively reachable repositories"""
- visited = set()
- to_import = []
+ visited: Set[str] = set()
+ to_import: List[str] = []
if known:
visited = set(known)
- def visit(name):
+ def visit(name: str) -> None:
if name in visited:
return
to_import.append(name)
visited.add(name)
- for n in repos_config.get(name, {}).get("bindings", {}).values():
+ vals = cast(Dict[str, str],
+ repos_config.get(name, {}).get("bindings", {})).values()
+ for n in vals:
visit(n)
visit(entry)
return to_import
-def extra_layers_to_import(repos_config, repos):
+
+def extra_layers_to_import(repos_config: Json, repos: List[str]) -> List[str]:
"""Compute the collection of repositories additionally needed as they serve
as layers for the repositories to import."""
- extra_imports = set()
+ extra_imports: Set[str] = set()
for repo in repos:
if isinstance(repos_config[repo]["repository"], str):
extra_imports.add(repos_config[repo]["repository"])
for layer in ["target_root", "rule_root", "expression_root"]:
if layer in repos_config[repo]:
- extra = repos_config[repo][layer]
+ extra: str = repos_config[repo][layer]
if (extra not in repos) and (extra not in extra_imports):
extra_imports.add(extra)
return list(extra_imports)
-def name_imports(to_import, existing, base_name, main=None):
+
+def name_imports(to_import: List[str],
+ existing: Set[str],
+ base_name: str,
+ main: Optional[str] = None) -> Dict[str, str]:
"""Assign names to the repositories to import in such a way that
no conflicts arise."""
- assign = {}
+ assign: Dict[str, str] = {}
- def find_name(name):
- base = "%s/%s" % (base_name, name)
+ def find_name(name: str) -> str:
+ base: str = "%s/%s" % (base_name, name)
if (base not in existing) and (base not in assign):
return base
- count = 0
+ count: int = 0
while True:
count += 1
- candidate = base + " (%d)" % count
+ candidate: str = base + " (%d)" % count
if (candidate not in existing) and (candidate not in assign):
return candidate
@@ -195,8 +219,10 @@ def name_imports(to_import, existing, base_name, main=None):
assign[repo] = find_name(repo)
return assign
-def rewrite_repo(repo_spec, *, remote, assign):
- new_spec = {}
+
+def rewrite_repo(repo_spec: Json, *, remote: Dict[str, str],
+ assign: Json) -> Json:
+ new_spec: Json = {}
repo = repo_spec.get("repository", {})
if isinstance(repo, str):
repo = assign[repo]
@@ -207,7 +233,8 @@ def rewrite_repo(repo_spec, *, remote, assign):
changes["subdir"] = subdir
repo = dict(remote, **changes)
elif repo.get("type") == "distdir":
- new_repos = [assign[k] for k in repo.get("repositories", [])]
+ existing_repos: List[str] = repo.get("repositories", [])
+ new_repos = [assign[k] for k in existing_repos]
repo = dict(repo, **{"repositories": new_repos})
new_spec["repository"] = repo
for key in ["target_root", "rule_root", "expression_root"]:
@@ -224,89 +251,100 @@ def rewrite_repo(repo_spec, *, remote, assign):
new_spec["bindings"] = new_bindings
return new_spec
-def handle_import(args):
- base_config = get_base_config(args.repository_config)
- base_repos = base_config.get("repositories", {})
+
+def handle_import(args: Namespace) -> Json:
+ base_config: Json = cast(Json, get_base_config(args.repository_config))
+ base_repos: Json = base_config.get("repositories", {})
srcdir, remote, to_cleanup = clone(args.URL, args.branch)
if args.foreign_repository_config:
foreign_config_file = args.foreign_repository_config
else:
foreign_config_file = get_repository_config_file(srcdir)
+ foreign_config: Json = {}
if args.plain:
- foreign_config = { "main": "",
- "repositories": {"": {"repository":
- {"type": "file",
- "path": "." }}}}
+ foreign_config = {
+ "main": "",
+ "repositories": {
+ "": {
+ "repository": {
+ "type": "file",
+ "path": "."
+ }
+ }
+ }
+ }
else:
- with open(foreign_config_file) as f:
- foreign_config = json.load(f)
- foreign_repos = foreign_config.get("repositories", {})
+ if (foreign_config_file):
+ with open(foreign_config_file) as f:
+ foreign_config = json.load(f)
+ else:
+ fail('Could not get repository config file')
+ foreign_repos: Json = foreign_config.get("repositories", {})
+ foreign_name: Optional[str] = None
if args.foreign_repository_name:
foreign_name = args.foreign_repository_name
else:
foreign_name = get_repo_to_import(foreign_config)
- import_map = {}
+ import_map: Json = {}
for theirs, ours in args.import_map:
import_map[theirs] = ours
- main_repos = repos_to_import(foreign_repos, foreign_name, import_map.keys())
+ main_repos: List[str] = []
+ if (foreign_name):
+ main_repos = repos_to_import(foreign_repos, foreign_name,
+ import_map.keys())
extra_repos = sorted([x for x in main_repos if x != foreign_name])
extra_imports = sorted(extra_layers_to_import(foreign_repos, main_repos))
- ordered_imports = [foreign_name] + extra_repos + extra_imports
- import_name = foreign_name
+ ordered_imports: List[str] = [cast(str, foreign_name)
+ ] + extra_repos + extra_imports
+ import_name = cast(str, foreign_name)
if args.import_as is not None:
import_name = args.import_as
- assign = name_imports(
+ assign: Dict[str, str] = name_imports(
ordered_imports,
set(base_repos.keys()),
import_name,
- main = foreign_name,
+ main=foreign_name,
)
log("Importing %r as %r" % (foreign_name, import_name))
- log("Transitive dependencies to import: %r" % (extra_repos,))
- log("Repositories imported as layers: %r" % (extra_imports,))
+ log("Transitive dependencies to import: %r" % (extra_repos, ))
+ log("Repositories imported as layers: %r" % (extra_imports, ))
total_assign = dict(assign, **import_map)
for repo in ordered_imports:
base_repos[assign[repo]] = rewrite_repo(
foreign_repos[repo],
- remote = remote,
- assign = total_assign,
+ remote=remote,
+ assign=total_assign,
)
base_config["repositories"] = base_repos
shutil.rmtree(to_cleanup)
return base_config
+
def main():
parser = ArgumentParser(
- prog = "just-import-deps",
- description =
- "Import a dependency transitively into a given"
- + " multi-repository configuration"
- )
- parser.add_argument(
- "-C",
- dest="repository_config",
- help="Repository-description file to import into",
- metavar="FILE"
- )
+ prog="just-import-deps",
+ description="Import a dependency transitively into a given" +
+ " multi-repository configuration")
+ parser.add_argument("-C",
+ dest="repository_config",
+ help="Repository-description file to import into",
+ metavar="FILE")
parser.add_argument(
"-b",
dest="branch",
help="The branch of the remote repository to import and follow",
metavar="branch",
- default="master"
- )
+ default="master")
parser.add_argument(
"-R",
dest="foreign_repository_config",
help="Repository-description file in the repository to import",
- metavar="relative-path"
- )
+ metavar="relative-path")
parser.add_argument(
"--plain",
action="store_true",
- help=
- "Pretend the remote repository description is the canonical"
- + " single-repository one",
+ help="Pretend the remote repository description is the canonical" +
+ " single-repository one",
)
parser.add_argument(
"--as",
@@ -318,10 +356,10 @@ def main():
"--map",
nargs=2,
dest="import_map",
- help="Map the specified foreign repository to the specified existing repository",
+ help=
+ "Map the specified foreign repository to the specified existing repository",
action="append",
- default=[]
- )
+ default=[])
parser.add_argument('URL')
parser.add_argument('foreign_repository_name', nargs='?')
args = parser.parse_args()