summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKlaus Aehlig <klaus.aehlig@huawei.com>2023-11-07 10:53:07 +0100
committerKlaus Aehlig <klaus.aehlig@huawei.com>2023-11-07 17:31:15 +0100
commit188679f62e3e42a07154fa592bf682861d4ee3c1 (patch)
treee8a84e322dca6007038a46dd8a5863263cca79d6 /src
parent825491edfe7433bd59345b6d27e1e177be7e5674 (diff)
downloadjustbuild-188679f62e3e42a07154fa592bf682861d4ee3c1.tar.gz
Bazel API: implement ParallelRetrieveToCas
... using thread-based parallelism for the blobs of each tree.
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/execution_api/remote/TARGETS1
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_api.cpp98
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_api.hpp5
3 files changed, 104 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/remote/TARGETS b/src/buildtool/execution_api/remote/TARGETS
index 8543db0d..1ea38e40 100644
--- a/src/buildtool/execution_api/remote/TARGETS
+++ b/src/buildtool/execution_api/remote/TARGETS
@@ -65,6 +65,7 @@
[ "bazel_network"
, ["@", "fmt", "", "fmt"]
, ["src/buildtool/compatibility", "compatibility"]
+ , ["src/buildtool/multithreading", "task_system"]
]
}
, "config":
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
index e2da0188..74e8286f 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
@@ -15,6 +15,7 @@
#include "src/buildtool/execution_api/remote/bazel/bazel_api.hpp"
#include <algorithm>
+#include <atomic>
#include <map>
#include <memory>
#include <optional>
@@ -37,6 +38,7 @@
#include "src/buildtool/execution_api/remote/bazel/bazel_response.hpp"
#include "src/buildtool/file_system/file_system_manager.hpp"
#include "src/buildtool/logging/logger.hpp"
+#include "src/buildtool/multithreading/task_system.hpp"
BazelApi::BazelApi(std::string const& instance_name,
std::string const& host,
@@ -251,6 +253,102 @@ auto BazelApi::CreateAction(
return api->Upload(container, /*skip_find_missing=*/true);
}
+/// NOLINTNEXTLINE(misc-no-recursion)
+[[nodiscard]] auto BazelApi::ParallelRetrieveToCas(
+ std::vector<Artifact::ObjectInfo> const& artifacts_info,
+ gsl::not_null<IExecutionApi*> const& api,
+ std::size_t jobs) noexcept -> bool {
+
+ // Return immediately if target CAS is this CAS
+ if (this == api) {
+ return true;
+ }
+
+ // Determine missing artifacts in other CAS.
+ std::vector<ArtifactDigest> digests;
+ digests.reserve(artifacts_info.size());
+ std::unordered_map<ArtifactDigest, Artifact::ObjectInfo> info_map;
+ for (auto const& info : artifacts_info) {
+ digests.push_back(info.digest);
+ info_map[info.digest] = info;
+ }
+ auto const& missing_digests = api->IsAvailable(digests);
+ std::vector<Artifact::ObjectInfo> missing_artifacts_info;
+ missing_artifacts_info.reserve(missing_digests.size());
+ for (auto const& digest : missing_digests) {
+ missing_artifacts_info.push_back(info_map[digest]);
+ }
+
+ // Recursively process trees.
+ std::atomic_bool failure{false};
+ try {
+ auto ts = TaskSystem{jobs};
+ for (auto const& info : missing_artifacts_info) {
+ if (IsTreeObject(info.type)) {
+ auto const infos = network_->ReadDirectTreeEntries(
+ info.digest, std::filesystem::path{});
+ if (not infos or
+ not ParallelRetrieveToCas(infos->second, api, jobs)) {
+ return false;
+ }
+ }
+
+ // Object infos created by network_->ReadTreeInfos() will contain 0
+ // as size, but this is handled by the remote execution engine, so
+ // no need to regenerate the digest.
+ ts.QueueTask(
+ [this, digest = info.digest, &api, &failure, &info_map]() {
+ auto reader = network_->ReadBlobs({digest});
+ auto blobs = reader.Next();
+ std::size_t count{};
+ BlobContainer container{};
+ while (not blobs.empty()) {
+ if (count + blobs.size() > 1) {
+ Logger::Log(LogLevel::Error,
+ "received more blobs than requested.");
+ failure = true;
+ return;
+ }
+ for (auto& blob : blobs) {
+ try {
+ auto exec = IsExecutableObject(
+ info_map[ArtifactDigest{blob.digest}].type);
+ container.Emplace(
+ BazelBlob{blob.digest, blob.data, exec});
+ } catch (std::exception const& ex) {
+ Logger::Log(LogLevel::Error,
+ "failed to emplace blob: {}",
+ ex.what());
+ failure = true;
+ return;
+ }
+ }
+ count += blobs.size();
+ blobs = reader.Next();
+ }
+ if (count != 1) {
+ Logger::Log(LogLevel::Error,
+ "could not retrieve all requested blobs.");
+ failure = true;
+ return;
+ }
+ auto result =
+ api->Upload(container, /*skip_find_missing=*/true);
+ if (not result) {
+ failure = true;
+ return;
+ }
+ });
+ }
+ } catch (std::exception const& ex) {
+ Logger::Log(
+ LogLevel::Error, "Artifact synchronization failed: {}", ex.what());
+ return false;
+ }
+
+ return not failure;
+}
+
[[nodiscard]] auto BazelApi::RetrieveToMemory(
Artifact::ObjectInfo const& artifact_info) -> std::optional<std::string> {
auto blobs = network_->ReadBlobs({artifact_info.digest}).Next();
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.hpp b/src/buildtool/execution_api/remote/bazel/bazel_api.hpp
index 2e2befba..3b1b5193 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_api.hpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_api.hpp
@@ -65,6 +65,11 @@ class BazelApi final : public IExecutionApi {
std::vector<int> const& fds,
bool raw_tree) noexcept -> bool final;
+ [[nodiscard]] auto ParallelRetrieveToCas(
+ std::vector<Artifact::ObjectInfo> const& artifacts_info,
+ gsl::not_null<IExecutionApi*> const& api,
+ std::size_t jobs) noexcept -> bool final;
+
[[nodiscard]] auto RetrieveToCas(
std::vector<Artifact::ObjectInfo> const& artifacts_info,
gsl::not_null<IExecutionApi*> const& api) noexcept -> bool final;