summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/remote/bazel/bazel_api.cpp')
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_api.cpp98
1 files changed, 98 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
index e2da0188..74e8286f 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
@@ -15,6 +15,7 @@
#include "src/buildtool/execution_api/remote/bazel/bazel_api.hpp"
#include <algorithm>
+#include <atomic>
#include <map>
#include <memory>
#include <optional>
@@ -37,6 +38,7 @@
#include "src/buildtool/execution_api/remote/bazel/bazel_response.hpp"
#include "src/buildtool/file_system/file_system_manager.hpp"
#include "src/buildtool/logging/logger.hpp"
+#include "src/buildtool/multithreading/task_system.hpp"
BazelApi::BazelApi(std::string const& instance_name,
std::string const& host,
@@ -251,6 +253,102 @@ auto BazelApi::CreateAction(
return api->Upload(container, /*skip_find_missing=*/true);
}
+/// NOLINTNEXTLINE(misc-no-recursion)
+[[nodiscard]] auto BazelApi::ParallelRetrieveToCas(
+ std::vector<Artifact::ObjectInfo> const& artifacts_info,
+ gsl::not_null<IExecutionApi*> const& api,
+ std::size_t jobs) noexcept -> bool {
+
+ // Return immediately if target CAS is this CAS
+ if (this == api) {
+ return true;
+ }
+
+ // Determine missing artifacts in other CAS.
+ std::vector<ArtifactDigest> digests;
+ digests.reserve(artifacts_info.size());
+ std::unordered_map<ArtifactDigest, Artifact::ObjectInfo> info_map;
+ for (auto const& info : artifacts_info) {
+ digests.push_back(info.digest);
+ info_map[info.digest] = info;
+ }
+ auto const& missing_digests = api->IsAvailable(digests);
+ std::vector<Artifact::ObjectInfo> missing_artifacts_info;
+ missing_artifacts_info.reserve(missing_digests.size());
+ for (auto const& digest : missing_digests) {
+ missing_artifacts_info.push_back(info_map[digest]);
+ }
+
+ // Recursively process trees.
+ std::atomic_bool failure{false};
+ try {
+ auto ts = TaskSystem{jobs};
+ for (auto const& info : missing_artifacts_info) {
+ if (IsTreeObject(info.type)) {
+ auto const infos = network_->ReadDirectTreeEntries(
+ info.digest, std::filesystem::path{});
+ if (not infos or
+ not ParallelRetrieveToCas(infos->second, api, jobs)) {
+ return false;
+ }
+ }
+
+ // Object infos created by network_->ReadTreeInfos() will contain 0
+ // as size, but this is handled by the remote execution engine, so
+ // no need to regenerate the digest.
+ ts.QueueTask(
+ [this, digest = info.digest, &api, &failure, &info_map]() {
+ auto reader = network_->ReadBlobs({digest});
+ auto blobs = reader.Next();
+ std::size_t count{};
+ BlobContainer container{};
+ while (not blobs.empty()) {
+ if (count + blobs.size() > 1) {
+ Logger::Log(LogLevel::Error,
+ "received more blobs than requested.");
+ failure = true;
+ return;
+ }
+ for (auto& blob : blobs) {
+ try {
+ auto exec = IsExecutableObject(
+ info_map[ArtifactDigest{blob.digest}].type);
+ container.Emplace(
+ BazelBlob{blob.digest, blob.data, exec});
+ } catch (std::exception const& ex) {
+ Logger::Log(LogLevel::Error,
+ "failed to emplace blob: {}",
+ ex.what());
+ failure = true;
+ return;
+ }
+ }
+ count += blobs.size();
+ blobs = reader.Next();
+ }
+ if (count != 1) {
+ Logger::Log(LogLevel::Error,
+ "could not retrieve all requested blobs.");
+ failure = true;
+ return;
+ }
+ auto result =
+ api->Upload(container, /*skip_find_missing=*/true);
+ if (not result) {
+ failure = true;
+ return;
+ }
+ });
+ }
+ } catch (std::exception const& ex) {
+ Logger::Log(
+ LogLevel::Error, "Artifact synchronization failed: {}", ex.what());
+ return false;
+ }
+
+ return not failure;
+}
+
[[nodiscard]] auto BazelApi::RetrieveToMemory(
Artifact::ObjectInfo const& artifact_info) -> std::optional<std::string> {
auto blobs = network_->ReadBlobs({artifact_info.digest}).Next();