diff options
-rw-r--r-- | src/other_tools/just_mr/fetch.cpp | 5 | ||||
-rw-r--r-- | src/other_tools/just_mr/setup.cpp | 1 | ||||
-rw-r--r-- | src/other_tools/ops_maps/TARGETS | 4 | ||||
-rw-r--r-- | src/other_tools/ops_maps/content_cas_map.cpp | 356 | ||||
-rw-r--r-- | src/other_tools/ops_maps/content_cas_map.hpp | 17 |
5 files changed, 258 insertions, 125 deletions
diff --git a/src/other_tools/just_mr/fetch.cpp b/src/other_tools/just_mr/fetch.cpp index 56c5fdec..d5e72f28 100644 --- a/src/other_tools/just_mr/fetch.cpp +++ b/src/other_tools/just_mr/fetch.cpp @@ -415,10 +415,13 @@ auto MultiRepoFetch(std::shared_ptr<Configuration> const& config, common_args.remote_serve_address, auth_args); // create async maps + auto crit_git_op_ptr = std::make_shared<CriticalGitOpGuard>(); + auto critical_git_op_map = CreateCriticalGitOpMap(crit_git_op_ptr); auto content_cas_map = CreateContentCASMap(common_args.just_mr_paths, common_args.alternative_mirrors, common_args.ca_info, + &critical_git_op_map, serve_api_exists, local_api ? &(*local_api) : nullptr, remote_api ? &(*remote_api) : nullptr, @@ -429,8 +432,6 @@ auto MultiRepoFetch(std::shared_ptr<Configuration> const& config, (fetch_args.backup_to_remote and local_api) ? &(*local_api) : nullptr, (fetch_args.backup_to_remote and remote_api) ? &(*remote_api) : nullptr, common_args.jobs); - auto crit_git_op_ptr = std::make_shared<CriticalGitOpGuard>(); - auto critical_git_op_map = CreateCriticalGitOpMap(crit_git_op_ptr); auto import_to_git_map = CreateImportToGitMap(&critical_git_op_map, common_args.git_path->string(), diff --git a/src/other_tools/just_mr/setup.cpp b/src/other_tools/just_mr/setup.cpp index 814aa1e4..dedf9ecb 100644 --- a/src/other_tools/just_mr/setup.cpp +++ b/src/other_tools/just_mr/setup.cpp @@ -108,6 +108,7 @@ auto MultiRepoSetup(std::shared_ptr<Configuration> const& config, CreateContentCASMap(common_args.just_mr_paths, common_args.alternative_mirrors, common_args.ca_info, + &critical_git_op_map, serve_api_exists, local_api ? &(*local_api) : nullptr, remote_api ? &(*remote_api) : nullptr, diff --git a/src/other_tools/ops_maps/TARGETS b/src/other_tools/ops_maps/TARGETS index d7af943d..66a43e8a 100644 --- a/src/other_tools/ops_maps/TARGETS +++ b/src/other_tools/ops_maps/TARGETS @@ -64,6 +64,7 @@ , ["src/buildtool/execution_api/common", "common"] , ["src/buildtool/multithreading", "async_map_consumer"] , ["src/other_tools/just_mr", "mirrors"] + , ["src/other_tools/ops_maps", "critical_git_op_map"] , ["src/utils/cpp", "hash_combine"] ] , "stage": ["src", "other_tools", "ops_maps"] @@ -73,7 +74,10 @@ , ["src/buildtool/execution_api/local", "local"] , ["src/buildtool/file_system", "file_storage"] , ["src/buildtool/serve_api/remote", "serve_api"] + , ["src/buildtool/storage", "config"] , ["src/buildtool/storage", "fs_utils"] + , ["src/buildtool/storage", "storage"] + , ["src/other_tools/git_operations", "git_repo_remote"] , ["src/other_tools/just_mr/progress_reporting", "statistics"] , ["src/other_tools/just_mr/progress_reporting", "progress"] ] diff --git a/src/other_tools/ops_maps/content_cas_map.cpp b/src/other_tools/ops_maps/content_cas_map.cpp index 4a486836..65449d97 100644 --- a/src/other_tools/ops_maps/content_cas_map.cpp +++ b/src/other_tools/ops_maps/content_cas_map.cpp @@ -17,147 +17,271 @@ #include "fmt/core.h" #include "src/buildtool/file_system/file_storage.hpp" #include "src/buildtool/serve_api/remote/serve_api.hpp" +#include "src/buildtool/storage/config.hpp" #include "src/buildtool/storage/fs_utils.hpp" #include "src/buildtool/storage/storage.hpp" +#include "src/other_tools/git_operations/git_repo_remote.hpp" #include "src/other_tools/just_mr/progress_reporting/progress.hpp" #include "src/other_tools/just_mr/progress_reporting/statistics.hpp" #include "src/other_tools/utils/content.hpp" #include "src/other_tools/utils/curl_url_handle.hpp" -auto CreateContentCASMap(LocalPathsPtr const& just_mr_paths, - MirrorsPtr const& additional_mirrors, - CAInfoPtr const& ca_info, - bool serve_api_exists, - IExecutionApi* local_api, - IExecutionApi* remote_api, - std::size_t jobs) -> ContentCASMap { +namespace { + +void CheckRemoteAndFetchFromNetwork(ArchiveContent const& key, + ArtifactDigest const& digest, + MirrorsPtr const& additional_mirrors, + CAInfoPtr const& ca_info, + IExecutionApi* local_api, + IExecutionApi* remote_api, + ContentCASMap::SetterPtr const& setter, + ContentCASMap::LoggerPtr const& logger) { + // check if content is in remote CAS, if a remote is given + if (remote_api != nullptr and local_api != nullptr and + remote_api->IsAvailable(digest) and + remote_api->RetrieveToCas( + {Artifact::ObjectInfo{.digest = digest, .type = ObjectType::File}}, + local_api)) { + JustMRProgress::Instance().TaskTracker().Stop(key.origin); + (*setter)(nullptr); + return; + } + // archive needs network fetching; + // first, check that mandatory fields are provided + if (key.fetch_url.empty()) { + (*logger)("Failed to provide archive fetch url!", + /*fatal=*/true); + return; + } + // now do the actual fetch + auto res = NetworkFetchWithMirrors( + key.fetch_url, key.mirrors, ca_info, additional_mirrors); + auto* data = + std::get_if<1>(&res); // get pointer to fetched data, or nullptr + if (data == nullptr) { + (*logger)(fmt::format("Failed to fetch a file with id {} from provided " + "remotes:{}", + key.content, + std::get<0>(res)), + /*fatal=*/true); + return; + } + // check content wrt checksums + if (key.sha256) { + auto actual_sha256 = GetContentHash<Hasher::HashType::SHA256>(*data); + if (actual_sha256 != key.sha256.value()) { + (*logger)(fmt::format("SHA256 mismatch for {}: expected {}, got {}", + key.fetch_url, + key.sha256.value(), + actual_sha256), + /*fatal=*/true); + return; + } + } + if (key.sha512) { + auto actual_sha512 = GetContentHash<Hasher::HashType::SHA512>(*data); + if (actual_sha512 != key.sha512.value()) { + (*logger)(fmt::format("SHA512 mismatch for {}: expected {}, got {}", + key.fetch_url, + key.sha512.value(), + actual_sha512), + /*fatal=*/true); + return; + } + } + // add the fetched data to CAS + auto path = StorageUtils::AddToCAS(*data); + // check one last time if content is in CAS now + if (not path) { + (*logger)(fmt::format("Failed to store fetched content from {}", + key.fetch_url), + /*fatal=*/true); + return; + } + // check that the data we stored actually produces the requested digest + auto const& cas = Storage::Instance().CAS(); + if (not cas.BlobPath(digest, /*is_executable=*/false)) { + (*logger)( + fmt::format("Content {} was not found at given fetch location {}", + key.content, + key.fetch_url), + /*fatal=*/true); + return; + } + if (key.fetch_only) { + JustMRProgress::Instance().TaskTracker().Stop(key.origin); + } + // success! + (*setter)(nullptr); +} + +} // namespace + +auto CreateContentCASMap( + LocalPathsPtr const& just_mr_paths, + MirrorsPtr const& additional_mirrors, + CAInfoPtr const& ca_info, + gsl::not_null<CriticalGitOpMap*> const& critical_git_op_map, + bool serve_api_exists, + IExecutionApi* local_api, + IExecutionApi* remote_api, + std::size_t jobs) -> ContentCASMap { auto ensure_in_cas = [just_mr_paths, additional_mirrors, ca_info, + critical_git_op_map, serve_api_exists, local_api, - remote_api](auto /*unused*/, + remote_api](auto ts, auto setter, auto logger, auto /*unused*/, auto const& key) { - auto const& cas = Storage::Instance().CAS(); auto digest = ArtifactDigest(key.content, 0, false); // separate logic if we need a pure fetch if (key.fetch_only) { + auto const& cas = Storage::Instance().CAS(); if (cas.BlobPath(digest, /*is_executable=*/false)) { (*setter)(nullptr); return; } - JustMRProgress::Instance().TaskTracker().Start(key.origin); - // add distfile to CAS - auto repo_distfile = - (key.distfile ? key.distfile.value() - : std::filesystem::path(key.fetch_url) - .filename() - .string()); - StorageUtils::AddDistfileToCAS(repo_distfile, just_mr_paths); - // check if content is in CAS now - if (cas.BlobPath(digest, /*is_executable=*/false)) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - (*setter)(nullptr); - return; - } - // check if content is known to remote serve service - if (serve_api_exists and - ServeApi::ContentInRemoteCAS(key.content)) { - // try to get content from remote CAS - if (remote_api != nullptr and local_api != nullptr and - remote_api->RetrieveToCas( - {Artifact::ObjectInfo{.digest = digest, - .type = ObjectType::File}}, - local_api)) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - (*setter)(nullptr); - return; - } - } - } - // check if content is in remote CAS, if a remote is given - if (remote_api and local_api and remote_api->IsAvailable(digest) and - remote_api->RetrieveToCas( - {Artifact::ObjectInfo{.digest = digest, - .type = ObjectType::File}}, - local_api)) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - (*setter)(nullptr); - return; - } - // archive needs network fetching; - // first, check that mandatory fields are provided - if (key.fetch_url.empty()) { - (*logger)("Failed to provide archive fetch url!", - /*fatal=*/true); - return; - } - // now do the actual fetch - auto res = NetworkFetchWithMirrors( - key.fetch_url, key.mirrors, ca_info, additional_mirrors); - auto data = - std::get_if<1>(&res); // get pointer to fetched data, or nullptr - if (not data) { - (*logger)(fmt::format("Failed to fetch a file with id {} from " - "provided remotes:{}", - key.content, - std::get<0>(res)), - /*fatal=*/true); - return; - } - // check content wrt checksums - if (key.sha256) { - auto actual_sha256 = - GetContentHash<Hasher::HashType::SHA256>(*data); - if (actual_sha256 != key.sha256.value()) { - (*logger)( - fmt::format("SHA256 mismatch for {}: expected {}, got {}", - key.fetch_url, - key.sha256.value(), - actual_sha256), - /*fatal=*/true); - return; - } - } - if (key.sha512) { - auto actual_sha512 = - GetContentHash<Hasher::HashType::SHA512>(*data); - if (actual_sha512 != key.sha512.value()) { - (*logger)( - fmt::format("SHA512 mismatch for {}: expected {}, got {}", - key.fetch_url, - key.sha512.value(), - actual_sha512), - /*fatal=*/true); - return; - } - } - // add the fetched data to CAS - auto path = StorageUtils::AddToCAS(*data); - // check one last time if content is in CAS now - if (not path) { - (*logger)(fmt::format("Failed to store fetched content from {}", - key.fetch_url), - /*fatal=*/true); + // check if content is in Git cache; + // ensure Git cache + GitOpKey op_key = {.params = + { + StorageConfig::GitRoot(), // target_path + "", // git_hash + "", // branch + std::nullopt, // message + true // init_bare + }, + .op_type = GitOpType::ENSURE_INIT}; + critical_git_op_map->ConsumeAfterKeysReady( + ts, + {std::move(op_key)}, + [key, + digest, + just_mr_paths, + additional_mirrors, + ca_info, + serve_api_exists, + local_api, + remote_api, + setter, + logger](auto const& values) { + GitOpValue op_result = *values[0]; + // check flag + if (not op_result.result) { + (*logger)("Git init failed", + /*fatal=*/true); + return; + } + auto const just_git_cas = op_result.git_cas; + // open fake repo wrap for GitCAS + auto just_git_repo = GitRepoRemote::Open(just_git_cas); + if (not just_git_repo) { + (*logger)("Could not open Git cache repository!", + /*fatal=*/true); + return; + } + // verify if local Git knows content blob + auto wrapped_logger = + std::make_shared<AsyncMapConsumerLogger>( + [&logger, blob = key.content](auto const& msg, + bool fatal) { + (*logger)( + fmt::format("While verifying presence of " + "blob {}:\n{}", + blob, + msg), + fatal); + }); + auto res = + just_git_repo->TryReadBlob(key.content, wrapped_logger); + if (not res.first) { + // blob check failed + return; + } + auto const& cas = Storage::Instance().CAS(); + if (res.second) { + // blob found; add it to CAS + if (not cas.StoreBlob(*res.second, + /*is_executable=*/false)) { + (*logger)(fmt::format("Failed to store content {} " + "to local CAS", + key.content), + /*fatal=*/true); + return; + } + // content stored to CAS + (*setter)(nullptr); + return; + } + // blob not found in Git cache + JustMRProgress::Instance().TaskTracker().Start(key.origin); + // add distfile to CAS + auto repo_distfile = + (key.distfile ? key.distfile.value() + : std::filesystem::path(key.fetch_url) + .filename() + .string()); + StorageUtils::AddDistfileToCAS(repo_distfile, + just_mr_paths); + // check if content is in CAS now + if (cas.BlobPath(digest, /*is_executable=*/false)) { + JustMRProgress::Instance().TaskTracker().Stop( + key.origin); + (*setter)(nullptr); + return; + } + // check if content is known to remote serve service + if (serve_api_exists and + ServeApi::ContentInRemoteCAS(key.content)) { + // try to get content from remote CAS + if (remote_api != nullptr and local_api != nullptr and + remote_api->RetrieveToCas( + {Artifact::ObjectInfo{ + .digest = digest, + .type = ObjectType::File}}, + local_api)) { + JustMRProgress::Instance().TaskTracker().Stop( + key.origin); + (*setter)(nullptr); + return; + } + } + // check remote execution endpoint and if not found revert + // to network fetch + CheckRemoteAndFetchFromNetwork(key, + digest, + additional_mirrors, + ca_info, + local_api, + remote_api, + setter, + logger); + }, + [logger, target_path = StorageConfig::GitRoot()]( + auto const& msg, bool fatal) { + (*logger)(fmt::format("While running critical Git op " + "ENSURE_INIT for target {}:\n{}", + target_path.string(), + msg), + fatal); + }); + // done! return; } - // check that the data we stored actually produces the requested digest - if (not cas.BlobPath(digest, /*is_executable=*/false)) { - (*logger)(fmt::format( - "Content {} was not found at given fetch location {}", - key.content, - key.fetch_url), - /*fatal=*/true); - return; - } - if (key.fetch_only) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - } - // success! - (*setter)(nullptr); + // if not fetch only, then only check remote execution endpoint and + // revert to network fetch as last resort + CheckRemoteAndFetchFromNetwork(key, + digest, + additional_mirrors, + ca_info, + local_api, + remote_api, + setter, + logger); }; return AsyncMapConsumer<ArchiveContent, std::nullptr_t>(ensure_in_cas, jobs); diff --git a/src/other_tools/ops_maps/content_cas_map.hpp b/src/other_tools/ops_maps/content_cas_map.hpp index d32cfbd2..13c20387 100644 --- a/src/other_tools/ops_maps/content_cas_map.hpp +++ b/src/other_tools/ops_maps/content_cas_map.hpp @@ -24,6 +24,7 @@ #include "src/buildtool/file_system/symlinks_map/pragma_special.hpp" #include "src/buildtool/multithreading/async_map_consumer.hpp" #include "src/other_tools/just_mr/mirrors.hpp" +#include "src/other_tools/ops_maps/critical_git_op_map.hpp" #include "src/utils/cpp/hash_combine.hpp" struct ArchiveContent { @@ -65,13 +66,15 @@ struct ArchiveRepoInfo { /// the map fails or not. using ContentCASMap = AsyncMapConsumer<ArchiveContent, std::nullptr_t>; -[[nodiscard]] auto CreateContentCASMap(LocalPathsPtr const& just_mr_paths, - MirrorsPtr const& additional_mirrors, - CAInfoPtr const& ca_info, - bool serve_api_exists, - IExecutionApi* local_api, - IExecutionApi* remote_api, - std::size_t jobs) -> ContentCASMap; +[[nodiscard]] auto CreateContentCASMap( + LocalPathsPtr const& just_mr_paths, + MirrorsPtr const& additional_mirrors, + CAInfoPtr const& ca_info, + gsl::not_null<CriticalGitOpMap*> const& critical_git_op_map, + bool serve_api_exists, + IExecutionApi* local_api, + IExecutionApi* remote_api, + std::size_t jobs) -> ContentCASMap; namespace std { template <> |