diff options
Diffstat (limited to 'bin/just-deduplicate-repos.py')
-rwxr-xr-x | bin/just-deduplicate-repos.py | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/bin/just-deduplicate-repos.py b/bin/just-deduplicate-repos.py new file mode 100755 index 00000000..f6bf9548 --- /dev/null +++ b/bin/just-deduplicate-repos.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +# Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import sys + +from typing import Any, List, Optional + +# generic JSON type +Json = Any + +def log(*args: str, **kwargs: Any) -> None: + print(*args, file=sys.stderr, **kwargs) + +def fail(s: str, exit_code: int = 1): + log(f"Error: {s}") + sys.exit(exit_code) + + +def roots_equal(a: Json, b: Json) -> bool: + if a["type"] != b["type"]: + return False + if a["type"] == "file": + return a["path"] == b["path"] + elif a["type"] in ["archive", "zip"]: + return (a["content"] == b["content"] + and a.get("subdir", ".") == b.get("subdir", ".")) + elif a["type"] == "git": + return (a["commit"] == b["commit"] + and a.get("subdir", ".") == b.get("subdir", ".")) + else: + # unknown repository type, the only safe way is to test + # for full equality + return a == b + +def get_root(repos: Json, name: str, *, root_name: str="repository", + default_root : Optional[Json]=None) -> Json: + root = repos[name].get(root_name) + if root is None: + if default_root is not None: + return default_root + else: + fail("Did not find mandatory root %s" % (name,)) + if isinstance(root, str): + return get_root(repos, root) + return root + +def local_repos_equal(repos: Json, name_a: str, name_b: str) -> bool: + if name_a == name_b: + return True + root_a = None + root_b = None + for root_name in ["repository", + "target_root", "rule_root", "expression_root"]: + root_a = get_root(repos, name_a, root_name=root_name, + default_root = root_a) + root_b = get_root(repos, name_b, root_name=root_name, + default_root = root_b) + if not roots_equal(root_a, root_b): + return False + for file_name, default_name in [("target_file_name", "TARGETS"), + ("rule_file_name", "RULES"), + ("expression_file_name", "EXPRESSIONS")]: + fname_a = repos[name_a].get(file_name, default_name) + fname_b = repos[name_b].get(file_name, default_name) + if fname_a != fname_b: + return False + open_names_a = set(repos[name_a].get("bindings", {}).keys()) + open_names_b = set(repos[name_b].get("bindings", {}).keys()) + if open_names_a != open_names_b: + return False + return True + +def bisimilar_repos(repos: Json) -> List[List[str]]: + """Compute the maximal bisimulation between the repositories + and return the bisimilarity classes.""" + bisim = {} + + def is_different(name_a: str, name_b: str) -> bool: + return bisim.get((name_a, name_b), {}).get("different", False) + + def mark_as_different(name_a: str, name_b: str): + nonlocal bisim + entry = bisim.get((name_a, name_b),{}) + if entry.get("different"): + return + bisim[(name_a, name_b)] = dict(entry, **{"different": True}) + also_different = entry.get("different_if", []) + for a, b in also_different: + mark_as_different(a, b) + + def register_dependency(name_a: str, name_b: str, dep_a: str, dep_b: str): + pos = (name_a, name_b) if name_a < name_b else (name_b, name_a) + entry = bisim.get(pos, {}) + deps = entry.get("different_if", []) + deps.append((dep_a, dep_b)) + bisim[pos] = dict(entry, **{"different_if": deps}) + + + names = sorted(repos.keys()) + for j in range(len(names)): + b = names[j] + for i in range(j): + a = names[i] + if is_different(a,b): + continue + if not local_repos_equal(repos, names[i], names[j]): + mark_as_different(names[i], names[j]) + continue + links_a = repos[a].get("bindings", {}) + links_b = repos[b].get("bindings", {}) + for link in links_a.keys(): + next_a = links_a[link] + next_b = links_b[link] + if next_a != next_b: + if is_different(next_a, next_b): + mark_as_different(a,b) + continue + else: + register_dependency(next_a, next_b, a, b) + classes = [] + done = {} + for j in reversed(range(len(names))): + name_j = names[j] + if done.get(name_j): + continue + c = [name_j] + for i in range(j): + name_i = names[i] + if not bisim.get((name_i, name_j),{}).get("different"): + c.append(name_i) + done[name_i] = True + classes.append(c) + return classes + +def dedup(repos: Json, user_keep: List[str]) -> Json: + + keep = set(user_keep) + main = repos.get("main") + if isinstance(main, str): + keep.add(main) + + def choose_representative(c: List[str]) -> str: + """Out of a bisimilarity class chose a main representative""" + candidates = c + # Keep a repository with a proper root, if any of those has a root. + # In this way, we're not losing actual roots. + with_root = [ n for n in candidates + if isinstance(repos["repositories"][n]["repository"], + dict)] + if with_root: + candidates = with_root + + # Prefer to choose a repository we have to keep anyway + keep_entries = set(candidates) & keep + if keep_entries: + candidates = list(keep_entries) + + return sorted(candidates, + key=lambda s: (s.count("/"), len(s), s))[0] + + bisim = bisimilar_repos(repos["repositories"]) + renaming = {} + for c in bisim: + if len(c) == 1: + continue + rep = choose_representative(c) + for repo in c: + if ((repo not in keep) and (repo != rep)): + renaming[repo] = rep + + def final_root_reference(name: str) -> str: + """For a given repository name, return a name than can be used + to name root in the final repository configuration.""" + root: Json = repos["repositories"][name]["repository"] + if isinstance(root, dict): + # actual root; can still be merged into a different once, but only + # one with a proper root as well. + return renaming.get(name, name) + elif isinstance(root, str): + return final_root_reference(root) + else: + fail("Invalid root found for %r: %r" % (name, root)) + + new_repos = {} + for name in repos["repositories"].keys(): + if name not in renaming: + desc = repos["repositories"][name] + if "bindings" in desc: + bindings = desc["bindings"] + new_bindings = {} + for k, v in bindings.items(): + if v in renaming: + new_bindings[k] = renaming[v] + else: + new_bindings[k] = v + desc = dict(desc, **{"bindings": new_bindings}) + new_roots: Json = {} + for root in ["repository", "target_root", "rule_root"]: + root_val: Json = desc.get(root) + if isinstance(root_val, str) and (root_val in renaming): + new_roots[root] = final_root_reference(root_val) + desc = dict(desc, **new_roots) + new_repos[name] = desc + return dict(repos, **{"repositories": new_repos}) + +if __name__ == "__main__": + orig = json.load(sys.stdin) + final = dedup(orig, sys.argv[1:]) + print(json.dumps(final)) |