summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/other_tools/just_mr/fetch.cpp5
-rw-r--r--src/other_tools/just_mr/setup.cpp1
-rw-r--r--src/other_tools/ops_maps/TARGETS4
-rw-r--r--src/other_tools/ops_maps/content_cas_map.cpp356
-rw-r--r--src/other_tools/ops_maps/content_cas_map.hpp17
5 files changed, 258 insertions, 125 deletions
diff --git a/src/other_tools/just_mr/fetch.cpp b/src/other_tools/just_mr/fetch.cpp
index 56c5fdec..d5e72f28 100644
--- a/src/other_tools/just_mr/fetch.cpp
+++ b/src/other_tools/just_mr/fetch.cpp
@@ -415,10 +415,13 @@ auto MultiRepoFetch(std::shared_ptr<Configuration> const& config,
common_args.remote_serve_address, auth_args);
// create async maps
+ auto crit_git_op_ptr = std::make_shared<CriticalGitOpGuard>();
+ auto critical_git_op_map = CreateCriticalGitOpMap(crit_git_op_ptr);
auto content_cas_map =
CreateContentCASMap(common_args.just_mr_paths,
common_args.alternative_mirrors,
common_args.ca_info,
+ &critical_git_op_map,
serve_api_exists,
local_api ? &(*local_api) : nullptr,
remote_api ? &(*remote_api) : nullptr,
@@ -429,8 +432,6 @@ auto MultiRepoFetch(std::shared_ptr<Configuration> const& config,
(fetch_args.backup_to_remote and local_api) ? &(*local_api) : nullptr,
(fetch_args.backup_to_remote and remote_api) ? &(*remote_api) : nullptr,
common_args.jobs);
- auto crit_git_op_ptr = std::make_shared<CriticalGitOpGuard>();
- auto critical_git_op_map = CreateCriticalGitOpMap(crit_git_op_ptr);
auto import_to_git_map =
CreateImportToGitMap(&critical_git_op_map,
common_args.git_path->string(),
diff --git a/src/other_tools/just_mr/setup.cpp b/src/other_tools/just_mr/setup.cpp
index 814aa1e4..dedf9ecb 100644
--- a/src/other_tools/just_mr/setup.cpp
+++ b/src/other_tools/just_mr/setup.cpp
@@ -108,6 +108,7 @@ auto MultiRepoSetup(std::shared_ptr<Configuration> const& config,
CreateContentCASMap(common_args.just_mr_paths,
common_args.alternative_mirrors,
common_args.ca_info,
+ &critical_git_op_map,
serve_api_exists,
local_api ? &(*local_api) : nullptr,
remote_api ? &(*remote_api) : nullptr,
diff --git a/src/other_tools/ops_maps/TARGETS b/src/other_tools/ops_maps/TARGETS
index d7af943d..66a43e8a 100644
--- a/src/other_tools/ops_maps/TARGETS
+++ b/src/other_tools/ops_maps/TARGETS
@@ -64,6 +64,7 @@
, ["src/buildtool/execution_api/common", "common"]
, ["src/buildtool/multithreading", "async_map_consumer"]
, ["src/other_tools/just_mr", "mirrors"]
+ , ["src/other_tools/ops_maps", "critical_git_op_map"]
, ["src/utils/cpp", "hash_combine"]
]
, "stage": ["src", "other_tools", "ops_maps"]
@@ -73,7 +74,10 @@
, ["src/buildtool/execution_api/local", "local"]
, ["src/buildtool/file_system", "file_storage"]
, ["src/buildtool/serve_api/remote", "serve_api"]
+ , ["src/buildtool/storage", "config"]
, ["src/buildtool/storage", "fs_utils"]
+ , ["src/buildtool/storage", "storage"]
+ , ["src/other_tools/git_operations", "git_repo_remote"]
, ["src/other_tools/just_mr/progress_reporting", "statistics"]
, ["src/other_tools/just_mr/progress_reporting", "progress"]
]
diff --git a/src/other_tools/ops_maps/content_cas_map.cpp b/src/other_tools/ops_maps/content_cas_map.cpp
index 4a486836..65449d97 100644
--- a/src/other_tools/ops_maps/content_cas_map.cpp
+++ b/src/other_tools/ops_maps/content_cas_map.cpp
@@ -17,147 +17,271 @@
#include "fmt/core.h"
#include "src/buildtool/file_system/file_storage.hpp"
#include "src/buildtool/serve_api/remote/serve_api.hpp"
+#include "src/buildtool/storage/config.hpp"
#include "src/buildtool/storage/fs_utils.hpp"
#include "src/buildtool/storage/storage.hpp"
+#include "src/other_tools/git_operations/git_repo_remote.hpp"
#include "src/other_tools/just_mr/progress_reporting/progress.hpp"
#include "src/other_tools/just_mr/progress_reporting/statistics.hpp"
#include "src/other_tools/utils/content.hpp"
#include "src/other_tools/utils/curl_url_handle.hpp"
-auto CreateContentCASMap(LocalPathsPtr const& just_mr_paths,
- MirrorsPtr const& additional_mirrors,
- CAInfoPtr const& ca_info,
- bool serve_api_exists,
- IExecutionApi* local_api,
- IExecutionApi* remote_api,
- std::size_t jobs) -> ContentCASMap {
+namespace {
+
+void CheckRemoteAndFetchFromNetwork(ArchiveContent const& key,
+ ArtifactDigest const& digest,
+ MirrorsPtr const& additional_mirrors,
+ CAInfoPtr const& ca_info,
+ IExecutionApi* local_api,
+ IExecutionApi* remote_api,
+ ContentCASMap::SetterPtr const& setter,
+ ContentCASMap::LoggerPtr const& logger) {
+ // check if content is in remote CAS, if a remote is given
+ if (remote_api != nullptr and local_api != nullptr and
+ remote_api->IsAvailable(digest) and
+ remote_api->RetrieveToCas(
+ {Artifact::ObjectInfo{.digest = digest, .type = ObjectType::File}},
+ local_api)) {
+ JustMRProgress::Instance().TaskTracker().Stop(key.origin);
+ (*setter)(nullptr);
+ return;
+ }
+ // archive needs network fetching;
+ // first, check that mandatory fields are provided
+ if (key.fetch_url.empty()) {
+ (*logger)("Failed to provide archive fetch url!",
+ /*fatal=*/true);
+ return;
+ }
+ // now do the actual fetch
+ auto res = NetworkFetchWithMirrors(
+ key.fetch_url, key.mirrors, ca_info, additional_mirrors);
+ auto* data =
+ std::get_if<1>(&res); // get pointer to fetched data, or nullptr
+ if (data == nullptr) {
+ (*logger)(fmt::format("Failed to fetch a file with id {} from provided "
+ "remotes:{}",
+ key.content,
+ std::get<0>(res)),
+ /*fatal=*/true);
+ return;
+ }
+ // check content wrt checksums
+ if (key.sha256) {
+ auto actual_sha256 = GetContentHash<Hasher::HashType::SHA256>(*data);
+ if (actual_sha256 != key.sha256.value()) {
+ (*logger)(fmt::format("SHA256 mismatch for {}: expected {}, got {}",
+ key.fetch_url,
+ key.sha256.value(),
+ actual_sha256),
+ /*fatal=*/true);
+ return;
+ }
+ }
+ if (key.sha512) {
+ auto actual_sha512 = GetContentHash<Hasher::HashType::SHA512>(*data);
+ if (actual_sha512 != key.sha512.value()) {
+ (*logger)(fmt::format("SHA512 mismatch for {}: expected {}, got {}",
+ key.fetch_url,
+ key.sha512.value(),
+ actual_sha512),
+ /*fatal=*/true);
+ return;
+ }
+ }
+ // add the fetched data to CAS
+ auto path = StorageUtils::AddToCAS(*data);
+ // check one last time if content is in CAS now
+ if (not path) {
+ (*logger)(fmt::format("Failed to store fetched content from {}",
+ key.fetch_url),
+ /*fatal=*/true);
+ return;
+ }
+ // check that the data we stored actually produces the requested digest
+ auto const& cas = Storage::Instance().CAS();
+ if (not cas.BlobPath(digest, /*is_executable=*/false)) {
+ (*logger)(
+ fmt::format("Content {} was not found at given fetch location {}",
+ key.content,
+ key.fetch_url),
+ /*fatal=*/true);
+ return;
+ }
+ if (key.fetch_only) {
+ JustMRProgress::Instance().TaskTracker().Stop(key.origin);
+ }
+ // success!
+ (*setter)(nullptr);
+}
+
+} // namespace
+
+auto CreateContentCASMap(
+ LocalPathsPtr const& just_mr_paths,
+ MirrorsPtr const& additional_mirrors,
+ CAInfoPtr const& ca_info,
+ gsl::not_null<CriticalGitOpMap*> const& critical_git_op_map,
+ bool serve_api_exists,
+ IExecutionApi* local_api,
+ IExecutionApi* remote_api,
+ std::size_t jobs) -> ContentCASMap {
auto ensure_in_cas = [just_mr_paths,
additional_mirrors,
ca_info,
+ critical_git_op_map,
serve_api_exists,
local_api,
- remote_api](auto /*unused*/,
+ remote_api](auto ts,
auto setter,
auto logger,
auto /*unused*/,
auto const& key) {
- auto const& cas = Storage::Instance().CAS();
auto digest = ArtifactDigest(key.content, 0, false);
// separate logic if we need a pure fetch
if (key.fetch_only) {
+ auto const& cas = Storage::Instance().CAS();
if (cas.BlobPath(digest, /*is_executable=*/false)) {
(*setter)(nullptr);
return;
}
- JustMRProgress::Instance().TaskTracker().Start(key.origin);
- // add distfile to CAS
- auto repo_distfile =
- (key.distfile ? key.distfile.value()
- : std::filesystem::path(key.fetch_url)
- .filename()
- .string());
- StorageUtils::AddDistfileToCAS(repo_distfile, just_mr_paths);
- // check if content is in CAS now
- if (cas.BlobPath(digest, /*is_executable=*/false)) {
- JustMRProgress::Instance().TaskTracker().Stop(key.origin);
- (*setter)(nullptr);
- return;
- }
- // check if content is known to remote serve service
- if (serve_api_exists and
- ServeApi::ContentInRemoteCAS(key.content)) {
- // try to get content from remote CAS
- if (remote_api != nullptr and local_api != nullptr and
- remote_api->RetrieveToCas(
- {Artifact::ObjectInfo{.digest = digest,
- .type = ObjectType::File}},
- local_api)) {
- JustMRProgress::Instance().TaskTracker().Stop(key.origin);
- (*setter)(nullptr);
- return;
- }
- }
- }
- // check if content is in remote CAS, if a remote is given
- if (remote_api and local_api and remote_api->IsAvailable(digest) and
- remote_api->RetrieveToCas(
- {Artifact::ObjectInfo{.digest = digest,
- .type = ObjectType::File}},
- local_api)) {
- JustMRProgress::Instance().TaskTracker().Stop(key.origin);
- (*setter)(nullptr);
- return;
- }
- // archive needs network fetching;
- // first, check that mandatory fields are provided
- if (key.fetch_url.empty()) {
- (*logger)("Failed to provide archive fetch url!",
- /*fatal=*/true);
- return;
- }
- // now do the actual fetch
- auto res = NetworkFetchWithMirrors(
- key.fetch_url, key.mirrors, ca_info, additional_mirrors);
- auto data =
- std::get_if<1>(&res); // get pointer to fetched data, or nullptr
- if (not data) {
- (*logger)(fmt::format("Failed to fetch a file with id {} from "
- "provided remotes:{}",
- key.content,
- std::get<0>(res)),
- /*fatal=*/true);
- return;
- }
- // check content wrt checksums
- if (key.sha256) {
- auto actual_sha256 =
- GetContentHash<Hasher::HashType::SHA256>(*data);
- if (actual_sha256 != key.sha256.value()) {
- (*logger)(
- fmt::format("SHA256 mismatch for {}: expected {}, got {}",
- key.fetch_url,
- key.sha256.value(),
- actual_sha256),
- /*fatal=*/true);
- return;
- }
- }
- if (key.sha512) {
- auto actual_sha512 =
- GetContentHash<Hasher::HashType::SHA512>(*data);
- if (actual_sha512 != key.sha512.value()) {
- (*logger)(
- fmt::format("SHA512 mismatch for {}: expected {}, got {}",
- key.fetch_url,
- key.sha512.value(),
- actual_sha512),
- /*fatal=*/true);
- return;
- }
- }
- // add the fetched data to CAS
- auto path = StorageUtils::AddToCAS(*data);
- // check one last time if content is in CAS now
- if (not path) {
- (*logger)(fmt::format("Failed to store fetched content from {}",
- key.fetch_url),
- /*fatal=*/true);
+ // check if content is in Git cache;
+ // ensure Git cache
+ GitOpKey op_key = {.params =
+ {
+ StorageConfig::GitRoot(), // target_path
+ "", // git_hash
+ "", // branch
+ std::nullopt, // message
+ true // init_bare
+ },
+ .op_type = GitOpType::ENSURE_INIT};
+ critical_git_op_map->ConsumeAfterKeysReady(
+ ts,
+ {std::move(op_key)},
+ [key,
+ digest,
+ just_mr_paths,
+ additional_mirrors,
+ ca_info,
+ serve_api_exists,
+ local_api,
+ remote_api,
+ setter,
+ logger](auto const& values) {
+ GitOpValue op_result = *values[0];
+ // check flag
+ if (not op_result.result) {
+ (*logger)("Git init failed",
+ /*fatal=*/true);
+ return;
+ }
+ auto const just_git_cas = op_result.git_cas;
+ // open fake repo wrap for GitCAS
+ auto just_git_repo = GitRepoRemote::Open(just_git_cas);
+ if (not just_git_repo) {
+ (*logger)("Could not open Git cache repository!",
+ /*fatal=*/true);
+ return;
+ }
+ // verify if local Git knows content blob
+ auto wrapped_logger =
+ std::make_shared<AsyncMapConsumerLogger>(
+ [&logger, blob = key.content](auto const& msg,
+ bool fatal) {
+ (*logger)(
+ fmt::format("While verifying presence of "
+ "blob {}:\n{}",
+ blob,
+ msg),
+ fatal);
+ });
+ auto res =
+ just_git_repo->TryReadBlob(key.content, wrapped_logger);
+ if (not res.first) {
+ // blob check failed
+ return;
+ }
+ auto const& cas = Storage::Instance().CAS();
+ if (res.second) {
+ // blob found; add it to CAS
+ if (not cas.StoreBlob(*res.second,
+ /*is_executable=*/false)) {
+ (*logger)(fmt::format("Failed to store content {} "
+ "to local CAS",
+ key.content),
+ /*fatal=*/true);
+ return;
+ }
+ // content stored to CAS
+ (*setter)(nullptr);
+ return;
+ }
+ // blob not found in Git cache
+ JustMRProgress::Instance().TaskTracker().Start(key.origin);
+ // add distfile to CAS
+ auto repo_distfile =
+ (key.distfile ? key.distfile.value()
+ : std::filesystem::path(key.fetch_url)
+ .filename()
+ .string());
+ StorageUtils::AddDistfileToCAS(repo_distfile,
+ just_mr_paths);
+ // check if content is in CAS now
+ if (cas.BlobPath(digest, /*is_executable=*/false)) {
+ JustMRProgress::Instance().TaskTracker().Stop(
+ key.origin);
+ (*setter)(nullptr);
+ return;
+ }
+ // check if content is known to remote serve service
+ if (serve_api_exists and
+ ServeApi::ContentInRemoteCAS(key.content)) {
+ // try to get content from remote CAS
+ if (remote_api != nullptr and local_api != nullptr and
+ remote_api->RetrieveToCas(
+ {Artifact::ObjectInfo{
+ .digest = digest,
+ .type = ObjectType::File}},
+ local_api)) {
+ JustMRProgress::Instance().TaskTracker().Stop(
+ key.origin);
+ (*setter)(nullptr);
+ return;
+ }
+ }
+ // check remote execution endpoint and if not found revert
+ // to network fetch
+ CheckRemoteAndFetchFromNetwork(key,
+ digest,
+ additional_mirrors,
+ ca_info,
+ local_api,
+ remote_api,
+ setter,
+ logger);
+ },
+ [logger, target_path = StorageConfig::GitRoot()](
+ auto const& msg, bool fatal) {
+ (*logger)(fmt::format("While running critical Git op "
+ "ENSURE_INIT for target {}:\n{}",
+ target_path.string(),
+ msg),
+ fatal);
+ });
+ // done!
return;
}
- // check that the data we stored actually produces the requested digest
- if (not cas.BlobPath(digest, /*is_executable=*/false)) {
- (*logger)(fmt::format(
- "Content {} was not found at given fetch location {}",
- key.content,
- key.fetch_url),
- /*fatal=*/true);
- return;
- }
- if (key.fetch_only) {
- JustMRProgress::Instance().TaskTracker().Stop(key.origin);
- }
- // success!
- (*setter)(nullptr);
+ // if not fetch only, then only check remote execution endpoint and
+ // revert to network fetch as last resort
+ CheckRemoteAndFetchFromNetwork(key,
+ digest,
+ additional_mirrors,
+ ca_info,
+ local_api,
+ remote_api,
+ setter,
+ logger);
};
return AsyncMapConsumer<ArchiveContent, std::nullptr_t>(ensure_in_cas,
jobs);
diff --git a/src/other_tools/ops_maps/content_cas_map.hpp b/src/other_tools/ops_maps/content_cas_map.hpp
index d32cfbd2..13c20387 100644
--- a/src/other_tools/ops_maps/content_cas_map.hpp
+++ b/src/other_tools/ops_maps/content_cas_map.hpp
@@ -24,6 +24,7 @@
#include "src/buildtool/file_system/symlinks_map/pragma_special.hpp"
#include "src/buildtool/multithreading/async_map_consumer.hpp"
#include "src/other_tools/just_mr/mirrors.hpp"
+#include "src/other_tools/ops_maps/critical_git_op_map.hpp"
#include "src/utils/cpp/hash_combine.hpp"
struct ArchiveContent {
@@ -65,13 +66,15 @@ struct ArchiveRepoInfo {
/// the map fails or not.
using ContentCASMap = AsyncMapConsumer<ArchiveContent, std::nullptr_t>;
-[[nodiscard]] auto CreateContentCASMap(LocalPathsPtr const& just_mr_paths,
- MirrorsPtr const& additional_mirrors,
- CAInfoPtr const& ca_info,
- bool serve_api_exists,
- IExecutionApi* local_api,
- IExecutionApi* remote_api,
- std::size_t jobs) -> ContentCASMap;
+[[nodiscard]] auto CreateContentCASMap(
+ LocalPathsPtr const& just_mr_paths,
+ MirrorsPtr const& additional_mirrors,
+ CAInfoPtr const& ca_info,
+ gsl::not_null<CriticalGitOpMap*> const& critical_git_op_map,
+ bool serve_api_exists,
+ IExecutionApi* local_api,
+ IExecutionApi* remote_api,
+ std::size_t jobs) -> ContentCASMap;
namespace std {
template <>