summaryrefslogtreecommitdiff
path: root/src/other_tools/ops_maps/content_cas_map.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/other_tools/ops_maps/content_cas_map.cpp')
-rw-r--r--src/other_tools/ops_maps/content_cas_map.cpp284
1 files changed, 126 insertions, 158 deletions
diff --git a/src/other_tools/ops_maps/content_cas_map.cpp b/src/other_tools/ops_maps/content_cas_map.cpp
index 7fe4efef..d6a3e36e 100644
--- a/src/other_tools/ops_maps/content_cas_map.cpp
+++ b/src/other_tools/ops_maps/content_cas_map.cpp
@@ -28,25 +28,11 @@
namespace {
-void CheckRemoteAndFetchFromNetwork(
- ArchiveContent const& key,
- ArtifactDigest const& digest,
- MirrorsPtr const& additional_mirrors,
- CAInfoPtr const& ca_info,
- gsl::not_null<IExecutionApi*> const& local_api,
- std::optional<gsl::not_null<IExecutionApi*>> const& remote_api,
- ContentCASMap::SetterPtr const& setter,
- ContentCASMap::LoggerPtr const& logger) {
- // check if content is in remote CAS, if a remote is given
- if (remote_api and
- remote_api.value()->RetrieveToCas(
- {Artifact::ObjectInfo{.digest = digest, .type = ObjectType::File}},
- local_api)) {
- JustMRProgress::Instance().TaskTracker().Stop(key.origin);
- (*setter)(nullptr);
- return;
- }
- // archive needs network fetching;
+void FetchFromNetwork(ArchiveContent const& key,
+ MirrorsPtr const& additional_mirrors,
+ CAInfoPtr const& ca_info,
+ ContentCASMap::SetterPtr const& setter,
+ ContentCASMap::LoggerPtr const& logger) {
// first, check that mandatory fields are provided
if (key.fetch_url.empty()) {
(*logger)("Failed to provide archive fetch url!",
@@ -100,7 +86,8 @@ void CheckRemoteAndFetchFromNetwork(
}
// check that the data we stored actually produces the requested digest
auto const& cas = Storage::Instance().CAS();
- if (not cas.BlobPath(digest, /*is_executable=*/false)) {
+ if (not cas.BlobPath(ArtifactDigest{key.content, 0, /*is_tree=*/false},
+ /*is_executable=*/false)) {
(*logger)(
fmt::format("Content {} was not found at given fetch location {}",
key.content,
@@ -108,9 +95,7 @@ void CheckRemoteAndFetchFromNetwork(
/*fatal=*/true);
return;
}
- if (key.fetch_only) {
- JustMRProgress::Instance().TaskTracker().Stop(key.origin);
- }
+ JustMRProgress::Instance().TaskTracker().Stop(key.origin);
// success!
(*setter)(nullptr);
}
@@ -138,149 +123,132 @@ auto CreateContentCASMap(
auto /*unused*/,
auto const& key) {
auto digest = ArtifactDigest(key.content, 0, false);
- // separate logic if we need a pure fetch
- if (key.fetch_only) {
- auto const& cas = Storage::Instance().CAS();
- if (cas.BlobPath(digest, /*is_executable=*/false)) {
- (*setter)(nullptr);
- return;
- }
- // check if content is in Git cache;
- // ensure Git cache
- GitOpKey op_key = {.params =
- {
- StorageConfig::GitRoot(), // target_path
- "", // git_hash
- "", // branch
- std::nullopt, // message
- true // init_bare
- },
- .op_type = GitOpType::ENSURE_INIT};
- critical_git_op_map->ConsumeAfterKeysReady(
- ts,
- {std::move(op_key)},
- [key,
- digest,
- just_mr_paths,
- additional_mirrors,
- ca_info,
- serve_api_exists,
- local_api,
- remote_api,
- setter,
- logger](auto const& values) {
- GitOpValue op_result = *values[0];
- // check flag
- if (not op_result.result) {
- (*logger)("Git init failed",
- /*fatal=*/true);
- return;
- }
- auto const just_git_cas = op_result.git_cas;
- // open fake repo wrap for GitCAS
- auto just_git_repo = GitRepoRemote::Open(just_git_cas);
- if (not just_git_repo) {
- (*logger)("Could not open Git cache repository!",
+ // check local CAS
+ auto const& cas = Storage::Instance().CAS();
+ if (cas.BlobPath(digest, /*is_executable=*/false)) {
+ (*setter)(nullptr);
+ return;
+ }
+ // check if content is in Git cache;
+ // ensure Git cache
+ GitOpKey op_key = {.params =
+ {
+ StorageConfig::GitRoot(), // target_path
+ "", // git_hash
+ "", // branch
+ std::nullopt, // message
+ true // init_bare
+ },
+ .op_type = GitOpType::ENSURE_INIT};
+ critical_git_op_map->ConsumeAfterKeysReady(
+ ts,
+ {std::move(op_key)},
+ [key,
+ digest,
+ just_mr_paths,
+ additional_mirrors,
+ ca_info,
+ serve_api_exists,
+ local_api,
+ remote_api,
+ setter,
+ logger](auto const& values) {
+ GitOpValue op_result = *values[0];
+ // check flag
+ if (not op_result.result) {
+ (*logger)("Git init failed",
+ /*fatal=*/true);
+ return;
+ }
+ auto const just_git_cas = op_result.git_cas;
+ // open fake repo wrap for GitCAS
+ auto just_git_repo = GitRepoRemote::Open(just_git_cas);
+ if (not just_git_repo) {
+ (*logger)("Could not open Git cache repository!",
+ /*fatal=*/true);
+ return;
+ }
+ // verify if local Git knows content blob
+ auto wrapped_logger = std::make_shared<AsyncMapConsumerLogger>(
+ [&logger, blob = key.content](auto const& msg, bool fatal) {
+ (*logger)(fmt::format("While verifying presence of "
+ "blob {}:\n{}",
+ blob,
+ msg),
+ fatal);
+ });
+ auto res =
+ just_git_repo->TryReadBlob(key.content, wrapped_logger);
+ if (not res.first) {
+ // blob check failed
+ return;
+ }
+ auto const& cas = Storage::Instance().CAS();
+ if (res.second) {
+ // blob found; add it to CAS
+ if (not cas.StoreBlob(*res.second,
+ /*is_executable=*/false)) {
+ (*logger)(fmt::format("Failed to store content {} "
+ "to local CAS",
+ key.content),
/*fatal=*/true);
return;
}
- // verify if local Git knows content blob
- auto wrapped_logger =
- std::make_shared<AsyncMapConsumerLogger>(
- [&logger, blob = key.content](auto const& msg,
- bool fatal) {
- (*logger)(
- fmt::format("While verifying presence of "
- "blob {}:\n{}",
- blob,
- msg),
- fatal);
- });
- auto res =
- just_git_repo->TryReadBlob(key.content, wrapped_logger);
- if (not res.first) {
- // blob check failed
- return;
- }
- auto const& cas = Storage::Instance().CAS();
- if (res.second) {
- // blob found; add it to CAS
- if (not cas.StoreBlob(*res.second,
- /*is_executable=*/false)) {
- (*logger)(fmt::format("Failed to store content {} "
- "to local CAS",
- key.content),
- /*fatal=*/true);
- return;
- }
- // content stored to CAS
- (*setter)(nullptr);
- return;
- }
- // blob not found in Git cache
- JustMRProgress::Instance().TaskTracker().Start(key.origin);
- // add distfile to CAS
- auto repo_distfile =
- (key.distfile ? key.distfile.value()
- : std::filesystem::path(key.fetch_url)
- .filename()
- .string());
- StorageUtils::AddDistfileToCAS(repo_distfile,
- just_mr_paths);
- // check if content is in CAS now
- if (cas.BlobPath(digest, /*is_executable=*/false)) {
+ // content stored to CAS
+ (*setter)(nullptr);
+ return;
+ }
+ // blob not found in Git cache
+ JustMRProgress::Instance().TaskTracker().Start(key.origin);
+ // add distfile to CAS
+ auto repo_distfile =
+ (key.distfile ? key.distfile.value()
+ : std::filesystem::path(key.fetch_url)
+ .filename()
+ .string());
+ StorageUtils::AddDistfileToCAS(repo_distfile, just_mr_paths);
+ // check if content is in CAS now
+ if (cas.BlobPath(digest, /*is_executable=*/false)) {
+ JustMRProgress::Instance().TaskTracker().Stop(key.origin);
+ (*setter)(nullptr);
+ return;
+ }
+ // check if content is known to remote serve service
+ if (serve_api_exists and remote_api and
+ ServeApi::ContentInRemoteCAS(key.content)) {
+ // try to get content from remote CAS
+ if (remote_api.value()->RetrieveToCas(
+ {Artifact::ObjectInfo{.digest = digest,
+ .type = ObjectType::File}},
+ local_api)) {
JustMRProgress::Instance().TaskTracker().Stop(
key.origin);
(*setter)(nullptr);
return;
}
- // check if content is known to remote serve service
- if (serve_api_exists and
- ServeApi::ContentInRemoteCAS(key.content)) {
- // try to get content from remote CAS
- if (remote_api and remote_api.value()->RetrieveToCas(
- {Artifact::ObjectInfo{
- .digest = digest,
- .type = ObjectType::File}},
- local_api)) {
- JustMRProgress::Instance().TaskTracker().Stop(
- key.origin);
- (*setter)(nullptr);
- return;
- }
- }
- // check remote execution endpoint and if not found revert
- // to network fetch
- CheckRemoteAndFetchFromNetwork(key,
- digest,
- additional_mirrors,
- ca_info,
- local_api,
- remote_api,
- setter,
- logger);
- },
- [logger, target_path = StorageConfig::GitRoot()](
- auto const& msg, bool fatal) {
- (*logger)(fmt::format("While running critical Git op "
- "ENSURE_INIT for target {}:\n{}",
- target_path.string(),
- msg),
- fatal);
- });
- // done!
- return;
- }
- // if not fetch only, then only check remote execution endpoint and
- // revert to network fetch as last resort
- CheckRemoteAndFetchFromNetwork(key,
- digest,
- additional_mirrors,
- ca_info,
- local_api,
- remote_api,
- setter,
- logger);
+ }
+ // check remote execution endpoint, if given
+ if (remote_api and
+ remote_api.value()->RetrieveToCas(
+ {Artifact::ObjectInfo{.digest = digest,
+ .type = ObjectType::File}},
+ local_api)) {
+ JustMRProgress::Instance().TaskTracker().Stop(key.origin);
+ (*setter)(nullptr);
+ return;
+ }
+ // revert to network fetch
+ FetchFromNetwork(
+ key, additional_mirrors, ca_info, setter, logger);
+ },
+ [logger, target_path = StorageConfig::GitRoot()](auto const& msg,
+ bool fatal) {
+ (*logger)(fmt::format("While running critical Git op "
+ "ENSURE_INIT for target {}:\n{}",
+ target_path.string(),
+ msg),
+ fatal);
+ });
};
return AsyncMapConsumer<ArchiveContent, std::nullptr_t>(ensure_in_cas,
jobs);