diff options
author | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-12-18 10:57:26 +0100 |
---|---|---|
committer | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-12-19 16:01:56 +0100 |
commit | 87e19a3ad2df7ec3195cac1a38f8ee23121330fe (patch) | |
tree | d0e73dbb6d97f672befc10cdf1a42a5274b82500 | |
parent | 1b4027fc4c6363ec869b51f0198679720ea2aded (diff) | |
download | justbuild-87e19a3ad2df7ec3195cac1a38f8ee23121330fe.tar.gz |
just-mr fetch content: Check for blob also in Git cache
Now we look for the content blob also in the local Git cache, not
just in local CAS. If found, we store the blob read from Git cache
into local CAS and continue as usual.
-rw-r--r-- | src/other_tools/just_mr/fetch.cpp | 5 | ||||
-rw-r--r-- | src/other_tools/just_mr/setup.cpp | 1 | ||||
-rw-r--r-- | src/other_tools/ops_maps/TARGETS | 4 | ||||
-rw-r--r-- | src/other_tools/ops_maps/content_cas_map.cpp | 356 | ||||
-rw-r--r-- | src/other_tools/ops_maps/content_cas_map.hpp | 17 |
5 files changed, 258 insertions, 125 deletions
diff --git a/src/other_tools/just_mr/fetch.cpp b/src/other_tools/just_mr/fetch.cpp index 56c5fdec..d5e72f28 100644 --- a/src/other_tools/just_mr/fetch.cpp +++ b/src/other_tools/just_mr/fetch.cpp @@ -415,10 +415,13 @@ auto MultiRepoFetch(std::shared_ptr<Configuration> const& config, common_args.remote_serve_address, auth_args); // create async maps + auto crit_git_op_ptr = std::make_shared<CriticalGitOpGuard>(); + auto critical_git_op_map = CreateCriticalGitOpMap(crit_git_op_ptr); auto content_cas_map = CreateContentCASMap(common_args.just_mr_paths, common_args.alternative_mirrors, common_args.ca_info, + &critical_git_op_map, serve_api_exists, local_api ? &(*local_api) : nullptr, remote_api ? &(*remote_api) : nullptr, @@ -429,8 +432,6 @@ auto MultiRepoFetch(std::shared_ptr<Configuration> const& config, (fetch_args.backup_to_remote and local_api) ? &(*local_api) : nullptr, (fetch_args.backup_to_remote and remote_api) ? &(*remote_api) : nullptr, common_args.jobs); - auto crit_git_op_ptr = std::make_shared<CriticalGitOpGuard>(); - auto critical_git_op_map = CreateCriticalGitOpMap(crit_git_op_ptr); auto import_to_git_map = CreateImportToGitMap(&critical_git_op_map, common_args.git_path->string(), diff --git a/src/other_tools/just_mr/setup.cpp b/src/other_tools/just_mr/setup.cpp index 814aa1e4..dedf9ecb 100644 --- a/src/other_tools/just_mr/setup.cpp +++ b/src/other_tools/just_mr/setup.cpp @@ -108,6 +108,7 @@ auto MultiRepoSetup(std::shared_ptr<Configuration> const& config, CreateContentCASMap(common_args.just_mr_paths, common_args.alternative_mirrors, common_args.ca_info, + &critical_git_op_map, serve_api_exists, local_api ? &(*local_api) : nullptr, remote_api ? &(*remote_api) : nullptr, diff --git a/src/other_tools/ops_maps/TARGETS b/src/other_tools/ops_maps/TARGETS index d7af943d..66a43e8a 100644 --- a/src/other_tools/ops_maps/TARGETS +++ b/src/other_tools/ops_maps/TARGETS @@ -64,6 +64,7 @@ , ["src/buildtool/execution_api/common", "common"] , ["src/buildtool/multithreading", "async_map_consumer"] , ["src/other_tools/just_mr", "mirrors"] + , ["src/other_tools/ops_maps", "critical_git_op_map"] , ["src/utils/cpp", "hash_combine"] ] , "stage": ["src", "other_tools", "ops_maps"] @@ -73,7 +74,10 @@ , ["src/buildtool/execution_api/local", "local"] , ["src/buildtool/file_system", "file_storage"] , ["src/buildtool/serve_api/remote", "serve_api"] + , ["src/buildtool/storage", "config"] , ["src/buildtool/storage", "fs_utils"] + , ["src/buildtool/storage", "storage"] + , ["src/other_tools/git_operations", "git_repo_remote"] , ["src/other_tools/just_mr/progress_reporting", "statistics"] , ["src/other_tools/just_mr/progress_reporting", "progress"] ] diff --git a/src/other_tools/ops_maps/content_cas_map.cpp b/src/other_tools/ops_maps/content_cas_map.cpp index 4a486836..65449d97 100644 --- a/src/other_tools/ops_maps/content_cas_map.cpp +++ b/src/other_tools/ops_maps/content_cas_map.cpp @@ -17,147 +17,271 @@ #include "fmt/core.h" #include "src/buildtool/file_system/file_storage.hpp" #include "src/buildtool/serve_api/remote/serve_api.hpp" +#include "src/buildtool/storage/config.hpp" #include "src/buildtool/storage/fs_utils.hpp" #include "src/buildtool/storage/storage.hpp" +#include "src/other_tools/git_operations/git_repo_remote.hpp" #include "src/other_tools/just_mr/progress_reporting/progress.hpp" #include "src/other_tools/just_mr/progress_reporting/statistics.hpp" #include "src/other_tools/utils/content.hpp" #include "src/other_tools/utils/curl_url_handle.hpp" -auto CreateContentCASMap(LocalPathsPtr const& just_mr_paths, - MirrorsPtr const& additional_mirrors, - CAInfoPtr const& ca_info, - bool serve_api_exists, - IExecutionApi* local_api, - IExecutionApi* remote_api, - std::size_t jobs) -> ContentCASMap { +namespace { + +void CheckRemoteAndFetchFromNetwork(ArchiveContent const& key, + ArtifactDigest const& digest, + MirrorsPtr const& additional_mirrors, + CAInfoPtr const& ca_info, + IExecutionApi* local_api, + IExecutionApi* remote_api, + ContentCASMap::SetterPtr const& setter, + ContentCASMap::LoggerPtr const& logger) { + // check if content is in remote CAS, if a remote is given + if (remote_api != nullptr and local_api != nullptr and + remote_api->IsAvailable(digest) and + remote_api->RetrieveToCas( + {Artifact::ObjectInfo{.digest = digest, .type = ObjectType::File}}, + local_api)) { + JustMRProgress::Instance().TaskTracker().Stop(key.origin); + (*setter)(nullptr); + return; + } + // archive needs network fetching; + // first, check that mandatory fields are provided + if (key.fetch_url.empty()) { + (*logger)("Failed to provide archive fetch url!", + /*fatal=*/true); + return; + } + // now do the actual fetch + auto res = NetworkFetchWithMirrors( + key.fetch_url, key.mirrors, ca_info, additional_mirrors); + auto* data = + std::get_if<1>(&res); // get pointer to fetched data, or nullptr + if (data == nullptr) { + (*logger)(fmt::format("Failed to fetch a file with id {} from provided " + "remotes:{}", + key.content, + std::get<0>(res)), + /*fatal=*/true); + return; + } + // check content wrt checksums + if (key.sha256) { + auto actual_sha256 = GetContentHash<Hasher::HashType::SHA256>(*data); + if (actual_sha256 != key.sha256.value()) { + (*logger)(fmt::format("SHA256 mismatch for {}: expected {}, got {}", + key.fetch_url, + key.sha256.value(), + actual_sha256), + /*fatal=*/true); + return; + } + } + if (key.sha512) { + auto actual_sha512 = GetContentHash<Hasher::HashType::SHA512>(*data); + if (actual_sha512 != key.sha512.value()) { + (*logger)(fmt::format("SHA512 mismatch for {}: expected {}, got {}", + key.fetch_url, + key.sha512.value(), + actual_sha512), + /*fatal=*/true); + return; + } + } + // add the fetched data to CAS + auto path = StorageUtils::AddToCAS(*data); + // check one last time if content is in CAS now + if (not path) { + (*logger)(fmt::format("Failed to store fetched content from {}", + key.fetch_url), + /*fatal=*/true); + return; + } + // check that the data we stored actually produces the requested digest + auto const& cas = Storage::Instance().CAS(); + if (not cas.BlobPath(digest, /*is_executable=*/false)) { + (*logger)( + fmt::format("Content {} was not found at given fetch location {}", + key.content, + key.fetch_url), + /*fatal=*/true); + return; + } + if (key.fetch_only) { + JustMRProgress::Instance().TaskTracker().Stop(key.origin); + } + // success! + (*setter)(nullptr); +} + +} // namespace + +auto CreateContentCASMap( + LocalPathsPtr const& just_mr_paths, + MirrorsPtr const& additional_mirrors, + CAInfoPtr const& ca_info, + gsl::not_null<CriticalGitOpMap*> const& critical_git_op_map, + bool serve_api_exists, + IExecutionApi* local_api, + IExecutionApi* remote_api, + std::size_t jobs) -> ContentCASMap { auto ensure_in_cas = [just_mr_paths, additional_mirrors, ca_info, + critical_git_op_map, serve_api_exists, local_api, - remote_api](auto /*unused*/, + remote_api](auto ts, auto setter, auto logger, auto /*unused*/, auto const& key) { - auto const& cas = Storage::Instance().CAS(); auto digest = ArtifactDigest(key.content, 0, false); // separate logic if we need a pure fetch if (key.fetch_only) { + auto const& cas = Storage::Instance().CAS(); if (cas.BlobPath(digest, /*is_executable=*/false)) { (*setter)(nullptr); return; } - JustMRProgress::Instance().TaskTracker().Start(key.origin); - // add distfile to CAS - auto repo_distfile = - (key.distfile ? key.distfile.value() - : std::filesystem::path(key.fetch_url) - .filename() - .string()); - StorageUtils::AddDistfileToCAS(repo_distfile, just_mr_paths); - // check if content is in CAS now - if (cas.BlobPath(digest, /*is_executable=*/false)) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - (*setter)(nullptr); - return; - } - // check if content is known to remote serve service - if (serve_api_exists and - ServeApi::ContentInRemoteCAS(key.content)) { - // try to get content from remote CAS - if (remote_api != nullptr and local_api != nullptr and - remote_api->RetrieveToCas( - {Artifact::ObjectInfo{.digest = digest, - .type = ObjectType::File}}, - local_api)) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - (*setter)(nullptr); - return; - } - } - } - // check if content is in remote CAS, if a remote is given - if (remote_api and local_api and remote_api->IsAvailable(digest) and - remote_api->RetrieveToCas( - {Artifact::ObjectInfo{.digest = digest, - .type = ObjectType::File}}, - local_api)) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - (*setter)(nullptr); - return; - } - // archive needs network fetching; - // first, check that mandatory fields are provided - if (key.fetch_url.empty()) { - (*logger)("Failed to provide archive fetch url!", - /*fatal=*/true); - return; - } - // now do the actual fetch - auto res = NetworkFetchWithMirrors( - key.fetch_url, key.mirrors, ca_info, additional_mirrors); - auto data = - std::get_if<1>(&res); // get pointer to fetched data, or nullptr - if (not data) { - (*logger)(fmt::format("Failed to fetch a file with id {} from " - "provided remotes:{}", - key.content, - std::get<0>(res)), - /*fatal=*/true); - return; - } - // check content wrt checksums - if (key.sha256) { - auto actual_sha256 = - GetContentHash<Hasher::HashType::SHA256>(*data); - if (actual_sha256 != key.sha256.value()) { - (*logger)( - fmt::format("SHA256 mismatch for {}: expected {}, got {}", - key.fetch_url, - key.sha256.value(), - actual_sha256), - /*fatal=*/true); - return; - } - } - if (key.sha512) { - auto actual_sha512 = - GetContentHash<Hasher::HashType::SHA512>(*data); - if (actual_sha512 != key.sha512.value()) { - (*logger)( - fmt::format("SHA512 mismatch for {}: expected {}, got {}", - key.fetch_url, - key.sha512.value(), - actual_sha512), - /*fatal=*/true); - return; - } - } - // add the fetched data to CAS - auto path = StorageUtils::AddToCAS(*data); - // check one last time if content is in CAS now - if (not path) { - (*logger)(fmt::format("Failed to store fetched content from {}", - key.fetch_url), - /*fatal=*/true); + // check if content is in Git cache; + // ensure Git cache + GitOpKey op_key = {.params = + { + StorageConfig::GitRoot(), // target_path + "", // git_hash + "", // branch + std::nullopt, // message + true // init_bare + }, + .op_type = GitOpType::ENSURE_INIT}; + critical_git_op_map->ConsumeAfterKeysReady( + ts, + {std::move(op_key)}, + [key, + digest, + just_mr_paths, + additional_mirrors, + ca_info, + serve_api_exists, + local_api, + remote_api, + setter, + logger](auto const& values) { + GitOpValue op_result = *values[0]; + // check flag + if (not op_result.result) { + (*logger)("Git init failed", + /*fatal=*/true); + return; + } + auto const just_git_cas = op_result.git_cas; + // open fake repo wrap for GitCAS + auto just_git_repo = GitRepoRemote::Open(just_git_cas); + if (not just_git_repo) { + (*logger)("Could not open Git cache repository!", + /*fatal=*/true); + return; + } + // verify if local Git knows content blob + auto wrapped_logger = + std::make_shared<AsyncMapConsumerLogger>( + [&logger, blob = key.content](auto const& msg, + bool fatal) { + (*logger)( + fmt::format("While verifying presence of " + "blob {}:\n{}", + blob, + msg), + fatal); + }); + auto res = + just_git_repo->TryReadBlob(key.content, wrapped_logger); + if (not res.first) { + // blob check failed + return; + } + auto const& cas = Storage::Instance().CAS(); + if (res.second) { + // blob found; add it to CAS + if (not cas.StoreBlob(*res.second, + /*is_executable=*/false)) { + (*logger)(fmt::format("Failed to store content {} " + "to local CAS", + key.content), + /*fatal=*/true); + return; + } + // content stored to CAS + (*setter)(nullptr); + return; + } + // blob not found in Git cache + JustMRProgress::Instance().TaskTracker().Start(key.origin); + // add distfile to CAS + auto repo_distfile = + (key.distfile ? key.distfile.value() + : std::filesystem::path(key.fetch_url) + .filename() + .string()); + StorageUtils::AddDistfileToCAS(repo_distfile, + just_mr_paths); + // check if content is in CAS now + if (cas.BlobPath(digest, /*is_executable=*/false)) { + JustMRProgress::Instance().TaskTracker().Stop( + key.origin); + (*setter)(nullptr); + return; + } + // check if content is known to remote serve service + if (serve_api_exists and + ServeApi::ContentInRemoteCAS(key.content)) { + // try to get content from remote CAS + if (remote_api != nullptr and local_api != nullptr and + remote_api->RetrieveToCas( + {Artifact::ObjectInfo{ + .digest = digest, + .type = ObjectType::File}}, + local_api)) { + JustMRProgress::Instance().TaskTracker().Stop( + key.origin); + (*setter)(nullptr); + return; + } + } + // check remote execution endpoint and if not found revert + // to network fetch + CheckRemoteAndFetchFromNetwork(key, + digest, + additional_mirrors, + ca_info, + local_api, + remote_api, + setter, + logger); + }, + [logger, target_path = StorageConfig::GitRoot()]( + auto const& msg, bool fatal) { + (*logger)(fmt::format("While running critical Git op " + "ENSURE_INIT for target {}:\n{}", + target_path.string(), + msg), + fatal); + }); + // done! return; } - // check that the data we stored actually produces the requested digest - if (not cas.BlobPath(digest, /*is_executable=*/false)) { - (*logger)(fmt::format( - "Content {} was not found at given fetch location {}", - key.content, - key.fetch_url), - /*fatal=*/true); - return; - } - if (key.fetch_only) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - } - // success! - (*setter)(nullptr); + // if not fetch only, then only check remote execution endpoint and + // revert to network fetch as last resort + CheckRemoteAndFetchFromNetwork(key, + digest, + additional_mirrors, + ca_info, + local_api, + remote_api, + setter, + logger); }; return AsyncMapConsumer<ArchiveContent, std::nullptr_t>(ensure_in_cas, jobs); diff --git a/src/other_tools/ops_maps/content_cas_map.hpp b/src/other_tools/ops_maps/content_cas_map.hpp index d32cfbd2..13c20387 100644 --- a/src/other_tools/ops_maps/content_cas_map.hpp +++ b/src/other_tools/ops_maps/content_cas_map.hpp @@ -24,6 +24,7 @@ #include "src/buildtool/file_system/symlinks_map/pragma_special.hpp" #include "src/buildtool/multithreading/async_map_consumer.hpp" #include "src/other_tools/just_mr/mirrors.hpp" +#include "src/other_tools/ops_maps/critical_git_op_map.hpp" #include "src/utils/cpp/hash_combine.hpp" struct ArchiveContent { @@ -65,13 +66,15 @@ struct ArchiveRepoInfo { /// the map fails or not. using ContentCASMap = AsyncMapConsumer<ArchiveContent, std::nullptr_t>; -[[nodiscard]] auto CreateContentCASMap(LocalPathsPtr const& just_mr_paths, - MirrorsPtr const& additional_mirrors, - CAInfoPtr const& ca_info, - bool serve_api_exists, - IExecutionApi* local_api, - IExecutionApi* remote_api, - std::size_t jobs) -> ContentCASMap; +[[nodiscard]] auto CreateContentCASMap( + LocalPathsPtr const& just_mr_paths, + MirrorsPtr const& additional_mirrors, + CAInfoPtr const& ca_info, + gsl::not_null<CriticalGitOpMap*> const& critical_git_op_map, + bool serve_api_exists, + IExecutionApi* local_api, + IExecutionApi* remote_api, + std::size_t jobs) -> ContentCASMap; namespace std { template <> |