diff options
-rw-r--r-- | src/buildtool/execution_api/remote/TARGETS | 1 | ||||
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bazel_api.cpp | 98 | ||||
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bazel_api.hpp | 5 |
3 files changed, 104 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/remote/TARGETS b/src/buildtool/execution_api/remote/TARGETS index 8543db0d..1ea38e40 100644 --- a/src/buildtool/execution_api/remote/TARGETS +++ b/src/buildtool/execution_api/remote/TARGETS @@ -65,6 +65,7 @@ [ "bazel_network" , ["@", "fmt", "", "fmt"] , ["src/buildtool/compatibility", "compatibility"] + , ["src/buildtool/multithreading", "task_system"] ] } , "config": diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp index e2da0188..74e8286f 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp @@ -15,6 +15,7 @@ #include "src/buildtool/execution_api/remote/bazel/bazel_api.hpp" #include <algorithm> +#include <atomic> #include <map> #include <memory> #include <optional> @@ -37,6 +38,7 @@ #include "src/buildtool/execution_api/remote/bazel/bazel_response.hpp" #include "src/buildtool/file_system/file_system_manager.hpp" #include "src/buildtool/logging/logger.hpp" +#include "src/buildtool/multithreading/task_system.hpp" BazelApi::BazelApi(std::string const& instance_name, std::string const& host, @@ -251,6 +253,102 @@ auto BazelApi::CreateAction( return api->Upload(container, /*skip_find_missing=*/true); } +/// NOLINTNEXTLINE(misc-no-recursion) +[[nodiscard]] auto BazelApi::ParallelRetrieveToCas( + std::vector<Artifact::ObjectInfo> const& artifacts_info, + gsl::not_null<IExecutionApi*> const& api, + std::size_t jobs) noexcept -> bool { + + // Return immediately if target CAS is this CAS + if (this == api) { + return true; + } + + // Determine missing artifacts in other CAS. + std::vector<ArtifactDigest> digests; + digests.reserve(artifacts_info.size()); + std::unordered_map<ArtifactDigest, Artifact::ObjectInfo> info_map; + for (auto const& info : artifacts_info) { + digests.push_back(info.digest); + info_map[info.digest] = info; + } + auto const& missing_digests = api->IsAvailable(digests); + std::vector<Artifact::ObjectInfo> missing_artifacts_info; + missing_artifacts_info.reserve(missing_digests.size()); + for (auto const& digest : missing_digests) { + missing_artifacts_info.push_back(info_map[digest]); + } + + // Recursively process trees. + std::atomic_bool failure{false}; + try { + auto ts = TaskSystem{jobs}; + for (auto const& info : missing_artifacts_info) { + if (IsTreeObject(info.type)) { + auto const infos = network_->ReadDirectTreeEntries( + info.digest, std::filesystem::path{}); + if (not infos or + not ParallelRetrieveToCas(infos->second, api, jobs)) { + return false; + } + } + + // Object infos created by network_->ReadTreeInfos() will contain 0 + // as size, but this is handled by the remote execution engine, so + // no need to regenerate the digest. + ts.QueueTask( + [this, digest = info.digest, &api, &failure, &info_map]() { + auto reader = network_->ReadBlobs({digest}); + auto blobs = reader.Next(); + std::size_t count{}; + BlobContainer container{}; + while (not blobs.empty()) { + if (count + blobs.size() > 1) { + Logger::Log(LogLevel::Error, + "received more blobs than requested."); + failure = true; + return; + } + for (auto& blob : blobs) { + try { + auto exec = IsExecutableObject( + info_map[ArtifactDigest{blob.digest}].type); + container.Emplace( + BazelBlob{blob.digest, blob.data, exec}); + } catch (std::exception const& ex) { + Logger::Log(LogLevel::Error, + "failed to emplace blob: {}", + ex.what()); + failure = true; + return; + } + } + count += blobs.size(); + blobs = reader.Next(); + } + if (count != 1) { + Logger::Log(LogLevel::Error, + "could not retrieve all requested blobs."); + failure = true; + return; + } + auto result = + api->Upload(container, /*skip_find_missing=*/true); + if (not result) { + failure = true; + return; + } + }); + } + } catch (std::exception const& ex) { + Logger::Log( + LogLevel::Error, "Artifact synchronization failed: {}", ex.what()); + return false; + } + + return not failure; +} + [[nodiscard]] auto BazelApi::RetrieveToMemory( Artifact::ObjectInfo const& artifact_info) -> std::optional<std::string> { auto blobs = network_->ReadBlobs({artifact_info.digest}).Next(); diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.hpp b/src/buildtool/execution_api/remote/bazel/bazel_api.hpp index 2e2befba..3b1b5193 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_api.hpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_api.hpp @@ -65,6 +65,11 @@ class BazelApi final : public IExecutionApi { std::vector<int> const& fds, bool raw_tree) noexcept -> bool final; + [[nodiscard]] auto ParallelRetrieveToCas( + std::vector<Artifact::ObjectInfo> const& artifacts_info, + gsl::not_null<IExecutionApi*> const& api, + std::size_t jobs) noexcept -> bool final; + [[nodiscard]] auto RetrieveToCas( std::vector<Artifact::ObjectInfo> const& artifacts_info, gsl::not_null<IExecutionApi*> const& api) noexcept -> bool final; |