From 089733c819066b801e28d6441cbff887e60aef51 Mon Sep 17 00:00:00 2001 From: Paul Cristian Sarbu Date: Thu, 24 Aug 2023 14:02:37 +0200 Subject: just-mr fetch: Allow to back up fetched archives to a given remote CAS --- src/other_tools/ops_maps/repo_fetch_map.cpp | 131 +++++++++++++++++----------- 1 file changed, 78 insertions(+), 53 deletions(-) (limited to 'src/other_tools/ops_maps/repo_fetch_map.cpp') diff --git a/src/other_tools/ops_maps/repo_fetch_map.cpp b/src/other_tools/ops_maps/repo_fetch_map.cpp index 1cc663d7..f8d28d8d 100644 --- a/src/other_tools/ops_maps/repo_fetch_map.cpp +++ b/src/other_tools/ops_maps/repo_fetch_map.cpp @@ -20,61 +20,96 @@ #include "src/other_tools/just_mr/progress_reporting/statistics.hpp" #include "src/other_tools/just_mr/utils.hpp" +namespace { + +void ProcessContent(std::filesystem::path const& content_path, + std::filesystem::path const& target_name, + IExecutionApi* local_api, + IExecutionApi* remote_api, + std::string const& content, + ArtifactDigest const& digest, + RepoFetchMap::SetterPtr const& setter, + RepoFetchMap::LoggerPtr const& logger) { + // try to back up to remote CAS + if (local_api != nullptr and remote_api != nullptr) { + if (not local_api->RetrieveToCas( + {Artifact::ObjectInfo{.digest = digest, + .type = ObjectType::File}}, + remote_api)) { + // give a warning + (*logger)(fmt::format("Failed to back up content {} from local CAS " + "to remote", + content), + /*fatal=*/false); + } + } + // then, copy content into fetch_dir + if (FileSystemManager::Exists(target_name)) { + std::filesystem::permissions(target_name, + std::filesystem::perms::owner_write, + std::filesystem::perm_options::add); + } + if (not FileSystemManager::CopyFile(content_path, target_name)) { + (*logger)(fmt::format("Failed to copy content {} from CAS to {}", + content, + target_name.string()), + /*fatal=*/true); + return; + } + // success + JustMRStatistics::Instance().IncrementExecutedCounter(); + (*setter)(true); +} + +} // namespace + auto CreateRepoFetchMap(gsl::not_null const& content_cas_map, std::filesystem::path const& fetch_dir, + IExecutionApi* local_api, + IExecutionApi* remote_api, std::size_t jobs) -> RepoFetchMap { - auto fetch_repo = [content_cas_map, fetch_dir](auto ts, - auto setter, - auto logger, - auto /* unused */, - auto const& key) { + auto fetch_repo = [content_cas_map, fetch_dir, local_api, remote_api]( + auto ts, + auto setter, + auto logger, + auto /* unused */, + auto const& key) { // get corresponding distfile auto distfile = (key.archive.distfile ? key.archive.distfile.value() : std::filesystem::path(key.archive.fetch_url) .filename() .string()); + auto target_name = fetch_dir / distfile; // check if content not already in CAS + auto digest = ArtifactDigest(key.archive.content, 0, false); auto const& cas = Storage::Instance().CAS(); - auto content_path = - cas.BlobPath(ArtifactDigest(key.archive.content, 0, false), - /*is_executable=*/false); + auto content_path = cas.BlobPath(digest, + /*is_executable=*/false); if (not content_path) { // make sure content is in CAS content_cas_map->ConsumeAfterKeysReady( ts, {key.archive}, - [fetch_dir, + [target_name, + local_api, + remote_api, content = key.archive.content, - distfile, + digest = std::move(digest), setter, logger]([[maybe_unused]] auto const& values) { - // content is now in CAS, so copy content into fetch_dir auto const& cas = Storage::Instance().CAS(); - auto content_path = - cas.BlobPath(ArtifactDigest(content, 0, false), - /*is_executable=*/false) - .value(); - auto target_name = fetch_dir / distfile; - if (FileSystemManager::Exists(target_name)) { - std::filesystem::permissions( - target_name, - std::filesystem::perms::owner_write, - std::filesystem::perm_options::add); - } - if (not FileSystemManager::CopyFile(content_path, - target_name)) { - (*logger)( - fmt::format("Failed to copy content {} from CAS " - "to {}", - content, - target_name.string()), - /*fatal=*/true); - return; - } - // success - JustMRStatistics::Instance().IncrementExecutedCounter(); - (*setter)(true); + auto content_path = cas.BlobPath(digest, + /*is_executable=*/false) + .value(); + ProcessContent(content_path, + target_name, + local_api, + remote_api, + content, + digest, + setter, + logger); }, [logger, content = key.archive.content](auto const& msg, bool fatal) { @@ -86,24 +121,14 @@ auto CreateRepoFetchMap(gsl::not_null const& content_cas_map, }); } else { - auto target_name = fetch_dir / distfile; - if (FileSystemManager::Exists(target_name)) { - std::filesystem::permissions( - target_name, - std::filesystem::perms::owner_write, - std::filesystem::perm_options::add); - } - if (not FileSystemManager::CopyFile(*content_path, target_name)) { - (*logger)(fmt::format("Failed to copy content {} from CAS " - "to {}", - key.archive.content, - target_name.string()), - /*fatal=*/true); - return; - } - // success - JustMRStatistics::Instance().IncrementCacheHitsCounter(); - (*setter)(true); + ProcessContent(*content_path, + target_name, + local_api, + remote_api, + key.archive.content, + digest, + setter, + logger); } }; return AsyncMapConsumer(fetch_repo, jobs); -- cgit v1.2.3