diff options
Diffstat (limited to 'src/other_tools/ops_maps/content_cas_map.cpp')
-rw-r--r-- | src/other_tools/ops_maps/content_cas_map.cpp | 284 |
1 files changed, 126 insertions, 158 deletions
diff --git a/src/other_tools/ops_maps/content_cas_map.cpp b/src/other_tools/ops_maps/content_cas_map.cpp index 7fe4efef..d6a3e36e 100644 --- a/src/other_tools/ops_maps/content_cas_map.cpp +++ b/src/other_tools/ops_maps/content_cas_map.cpp @@ -28,25 +28,11 @@ namespace { -void CheckRemoteAndFetchFromNetwork( - ArchiveContent const& key, - ArtifactDigest const& digest, - MirrorsPtr const& additional_mirrors, - CAInfoPtr const& ca_info, - gsl::not_null<IExecutionApi*> const& local_api, - std::optional<gsl::not_null<IExecutionApi*>> const& remote_api, - ContentCASMap::SetterPtr const& setter, - ContentCASMap::LoggerPtr const& logger) { - // check if content is in remote CAS, if a remote is given - if (remote_api and - remote_api.value()->RetrieveToCas( - {Artifact::ObjectInfo{.digest = digest, .type = ObjectType::File}}, - local_api)) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - (*setter)(nullptr); - return; - } - // archive needs network fetching; +void FetchFromNetwork(ArchiveContent const& key, + MirrorsPtr const& additional_mirrors, + CAInfoPtr const& ca_info, + ContentCASMap::SetterPtr const& setter, + ContentCASMap::LoggerPtr const& logger) { // first, check that mandatory fields are provided if (key.fetch_url.empty()) { (*logger)("Failed to provide archive fetch url!", @@ -100,7 +86,8 @@ void CheckRemoteAndFetchFromNetwork( } // check that the data we stored actually produces the requested digest auto const& cas = Storage::Instance().CAS(); - if (not cas.BlobPath(digest, /*is_executable=*/false)) { + if (not cas.BlobPath(ArtifactDigest{key.content, 0, /*is_tree=*/false}, + /*is_executable=*/false)) { (*logger)( fmt::format("Content {} was not found at given fetch location {}", key.content, @@ -108,9 +95,7 @@ void CheckRemoteAndFetchFromNetwork( /*fatal=*/true); return; } - if (key.fetch_only) { - JustMRProgress::Instance().TaskTracker().Stop(key.origin); - } + JustMRProgress::Instance().TaskTracker().Stop(key.origin); // success! (*setter)(nullptr); } @@ -138,149 +123,132 @@ auto CreateContentCASMap( auto /*unused*/, auto const& key) { auto digest = ArtifactDigest(key.content, 0, false); - // separate logic if we need a pure fetch - if (key.fetch_only) { - auto const& cas = Storage::Instance().CAS(); - if (cas.BlobPath(digest, /*is_executable=*/false)) { - (*setter)(nullptr); - return; - } - // check if content is in Git cache; - // ensure Git cache - GitOpKey op_key = {.params = - { - StorageConfig::GitRoot(), // target_path - "", // git_hash - "", // branch - std::nullopt, // message - true // init_bare - }, - .op_type = GitOpType::ENSURE_INIT}; - critical_git_op_map->ConsumeAfterKeysReady( - ts, - {std::move(op_key)}, - [key, - digest, - just_mr_paths, - additional_mirrors, - ca_info, - serve_api_exists, - local_api, - remote_api, - setter, - logger](auto const& values) { - GitOpValue op_result = *values[0]; - // check flag - if (not op_result.result) { - (*logger)("Git init failed", - /*fatal=*/true); - return; - } - auto const just_git_cas = op_result.git_cas; - // open fake repo wrap for GitCAS - auto just_git_repo = GitRepoRemote::Open(just_git_cas); - if (not just_git_repo) { - (*logger)("Could not open Git cache repository!", + // check local CAS + auto const& cas = Storage::Instance().CAS(); + if (cas.BlobPath(digest, /*is_executable=*/false)) { + (*setter)(nullptr); + return; + } + // check if content is in Git cache; + // ensure Git cache + GitOpKey op_key = {.params = + { + StorageConfig::GitRoot(), // target_path + "", // git_hash + "", // branch + std::nullopt, // message + true // init_bare + }, + .op_type = GitOpType::ENSURE_INIT}; + critical_git_op_map->ConsumeAfterKeysReady( + ts, + {std::move(op_key)}, + [key, + digest, + just_mr_paths, + additional_mirrors, + ca_info, + serve_api_exists, + local_api, + remote_api, + setter, + logger](auto const& values) { + GitOpValue op_result = *values[0]; + // check flag + if (not op_result.result) { + (*logger)("Git init failed", + /*fatal=*/true); + return; + } + auto const just_git_cas = op_result.git_cas; + // open fake repo wrap for GitCAS + auto just_git_repo = GitRepoRemote::Open(just_git_cas); + if (not just_git_repo) { + (*logger)("Could not open Git cache repository!", + /*fatal=*/true); + return; + } + // verify if local Git knows content blob + auto wrapped_logger = std::make_shared<AsyncMapConsumerLogger>( + [&logger, blob = key.content](auto const& msg, bool fatal) { + (*logger)(fmt::format("While verifying presence of " + "blob {}:\n{}", + blob, + msg), + fatal); + }); + auto res = + just_git_repo->TryReadBlob(key.content, wrapped_logger); + if (not res.first) { + // blob check failed + return; + } + auto const& cas = Storage::Instance().CAS(); + if (res.second) { + // blob found; add it to CAS + if (not cas.StoreBlob(*res.second, + /*is_executable=*/false)) { + (*logger)(fmt::format("Failed to store content {} " + "to local CAS", + key.content), /*fatal=*/true); return; } - // verify if local Git knows content blob - auto wrapped_logger = - std::make_shared<AsyncMapConsumerLogger>( - [&logger, blob = key.content](auto const& msg, - bool fatal) { - (*logger)( - fmt::format("While verifying presence of " - "blob {}:\n{}", - blob, - msg), - fatal); - }); - auto res = - just_git_repo->TryReadBlob(key.content, wrapped_logger); - if (not res.first) { - // blob check failed - return; - } - auto const& cas = Storage::Instance().CAS(); - if (res.second) { - // blob found; add it to CAS - if (not cas.StoreBlob(*res.second, - /*is_executable=*/false)) { - (*logger)(fmt::format("Failed to store content {} " - "to local CAS", - key.content), - /*fatal=*/true); - return; - } - // content stored to CAS - (*setter)(nullptr); - return; - } - // blob not found in Git cache - JustMRProgress::Instance().TaskTracker().Start(key.origin); - // add distfile to CAS - auto repo_distfile = - (key.distfile ? key.distfile.value() - : std::filesystem::path(key.fetch_url) - .filename() - .string()); - StorageUtils::AddDistfileToCAS(repo_distfile, - just_mr_paths); - // check if content is in CAS now - if (cas.BlobPath(digest, /*is_executable=*/false)) { + // content stored to CAS + (*setter)(nullptr); + return; + } + // blob not found in Git cache + JustMRProgress::Instance().TaskTracker().Start(key.origin); + // add distfile to CAS + auto repo_distfile = + (key.distfile ? key.distfile.value() + : std::filesystem::path(key.fetch_url) + .filename() + .string()); + StorageUtils::AddDistfileToCAS(repo_distfile, just_mr_paths); + // check if content is in CAS now + if (cas.BlobPath(digest, /*is_executable=*/false)) { + JustMRProgress::Instance().TaskTracker().Stop(key.origin); + (*setter)(nullptr); + return; + } + // check if content is known to remote serve service + if (serve_api_exists and remote_api and + ServeApi::ContentInRemoteCAS(key.content)) { + // try to get content from remote CAS + if (remote_api.value()->RetrieveToCas( + {Artifact::ObjectInfo{.digest = digest, + .type = ObjectType::File}}, + local_api)) { JustMRProgress::Instance().TaskTracker().Stop( key.origin); (*setter)(nullptr); return; } - // check if content is known to remote serve service - if (serve_api_exists and - ServeApi::ContentInRemoteCAS(key.content)) { - // try to get content from remote CAS - if (remote_api and remote_api.value()->RetrieveToCas( - {Artifact::ObjectInfo{ - .digest = digest, - .type = ObjectType::File}}, - local_api)) { - JustMRProgress::Instance().TaskTracker().Stop( - key.origin); - (*setter)(nullptr); - return; - } - } - // check remote execution endpoint and if not found revert - // to network fetch - CheckRemoteAndFetchFromNetwork(key, - digest, - additional_mirrors, - ca_info, - local_api, - remote_api, - setter, - logger); - }, - [logger, target_path = StorageConfig::GitRoot()]( - auto const& msg, bool fatal) { - (*logger)(fmt::format("While running critical Git op " - "ENSURE_INIT for target {}:\n{}", - target_path.string(), - msg), - fatal); - }); - // done! - return; - } - // if not fetch only, then only check remote execution endpoint and - // revert to network fetch as last resort - CheckRemoteAndFetchFromNetwork(key, - digest, - additional_mirrors, - ca_info, - local_api, - remote_api, - setter, - logger); + } + // check remote execution endpoint, if given + if (remote_api and + remote_api.value()->RetrieveToCas( + {Artifact::ObjectInfo{.digest = digest, + .type = ObjectType::File}}, + local_api)) { + JustMRProgress::Instance().TaskTracker().Stop(key.origin); + (*setter)(nullptr); + return; + } + // revert to network fetch + FetchFromNetwork( + key, additional_mirrors, ca_info, setter, logger); + }, + [logger, target_path = StorageConfig::GitRoot()](auto const& msg, + bool fatal) { + (*logger)(fmt::format("While running critical Git op " + "ENSURE_INIT for target {}:\n{}", + target_path.string(), + msg), + fatal); + }); }; return AsyncMapConsumer<ArchiveContent, std::nullptr_t>(ensure_in_cas, jobs); |