summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/remote/bazel/bazel_api.cpp')
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_api.cpp237
1 files changed, 157 insertions, 80 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
index 74e8286f..19ff52f6 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
@@ -16,12 +16,9 @@
#include <algorithm>
#include <atomic>
-#include <map>
-#include <memory>
-#include <optional>
-#include <string>
+#include <cstdint>
#include <unordered_map>
-#include <vector>
+#include <unordered_set>
#include "fmt/core.h"
#include "src/buildtool/common/bazel_types.hpp"
@@ -37,9 +34,150 @@
#include "src/buildtool/execution_api/remote/bazel/bazel_network.hpp"
#include "src/buildtool/execution_api/remote/bazel/bazel_response.hpp"
#include "src/buildtool/file_system/file_system_manager.hpp"
+#include "src/buildtool/file_system/object_type.hpp"
#include "src/buildtool/logging/logger.hpp"
#include "src/buildtool/multithreading/task_system.hpp"
+namespace {
+
+[[nodiscard]] auto IsAvailable(
+ std::vector<bazel_re::Digest> const& digests,
+ gsl::not_null<IExecutionApi*> const& api) noexcept
+ -> std::vector<bazel_re::Digest> {
+ std::vector<ArtifactDigest> artifact_digests;
+ artifact_digests.reserve(digests.size());
+ for (auto const& digest : digests) {
+ artifact_digests.emplace_back(digest);
+ }
+ auto const& missing_artifact_digests = api->IsAvailable(artifact_digests);
+ std::vector<bazel_re::Digest> missing_digests;
+ missing_digests.reserve(missing_artifact_digests.size());
+ for (auto const& digest : missing_artifact_digests) {
+ missing_digests.emplace_back(static_cast<bazel_re::Digest>(digest));
+ }
+ return missing_digests;
+}
+
+[[nodiscard]] auto RetrieveToCas(
+ std::vector<bazel_re::Digest> const& digests,
+ gsl::not_null<IExecutionApi*> const& api,
+ std::shared_ptr<BazelNetwork> const& network,
+ std::unordered_map<ArtifactDigest, Artifact::ObjectInfo> const&
+ info_map) noexcept -> bool {
+
+ // Fetch blobs from this CAS.
+ auto size = digests.size();
+ auto reader = network->ReadBlobs(digests);
+ auto blobs = reader.Next();
+ std::size_t count{};
+ BlobContainer container{};
+ while (not blobs.empty()) {
+ if (count + blobs.size() > size) {
+ Logger::Log(LogLevel::Error, "received more blobs than requested.");
+ return false;
+ }
+ for (auto const& blob : blobs) {
+ try {
+ auto digest = ArtifactDigest{blob.digest};
+ auto exec = info_map.contains(digest)
+ ? IsExecutableObject(info_map.at(digest).type)
+ : false;
+ container.Emplace(BazelBlob{blob.digest, blob.data, exec});
+ } catch (std::exception const& ex) {
+ Logger::Log(
+ LogLevel::Error, "failed to emplace blob: ", ex.what());
+ return false;
+ }
+ }
+ count += blobs.size();
+ blobs = reader.Next();
+ }
+
+ if (count != size) {
+ Logger::Log(LogLevel::Error, "could not retrieve all requested blobs.");
+ return false;
+ }
+
+ // Upload blobs to other CAS.
+ return api->Upload(container, /*skip_find_missing=*/true);
+}
+
+[[nodiscard]] auto RetrieveToCasSplitted(
+ Artifact::ObjectInfo const& artifact_info,
+ gsl::not_null<IExecutionApi*> const& api,
+ std::shared_ptr<BazelNetwork> const& network,
+ std::unordered_map<ArtifactDigest, Artifact::ObjectInfo> const&
+ info_map) noexcept -> bool {
+
+ // Split blob into chunks at the remote side and retrieve chunk digests.
+ auto chunk_digests = network->SplitBlob(artifact_info.digest);
+ if (not chunk_digests) {
+ // If blob splitting failed, fall back to regular fetching.
+ return ::RetrieveToCas({artifact_info.digest}, api, network, info_map);
+ }
+
+ // Fetch unknown chunks.
+ auto digest_set = std::unordered_set<bazel_re::Digest>{
+ (*chunk_digests).begin(), (*chunk_digests).end()};
+ auto unique_digests =
+ std::vector<bazel_re::Digest>{digest_set.begin(), digest_set.end()};
+ auto missing_digests = ::IsAvailable(unique_digests, api);
+ if (not ::RetrieveToCas(missing_digests, api, network, info_map)) {
+ return false;
+ }
+
+ // Assemble blob from chunks.
+ std::string blob_data{};
+ for (auto const& chunk_digest : *chunk_digests) {
+ auto info = Artifact::ObjectInfo{.digest = ArtifactDigest{chunk_digest},
+ .type = ObjectType::File};
+ auto chunk_data = api->RetrieveToMemory(info);
+ if (not chunk_data) {
+ Logger::Log(LogLevel::Error,
+ "could not load blob chunk in memory: ",
+ chunk_digest.hash());
+ return false;
+ }
+ blob_data += *chunk_data;
+ }
+
+ Logger::Log(
+ LogLevel::Debug,
+ [&artifact_info, &unique_digests, &missing_digests, &blob_data]() {
+ auto missing_digest_set = std::unordered_set<bazel_re::Digest>{
+ missing_digests.begin(), missing_digests.end()};
+ std::uint64_t transmitted_bytes{0};
+ for (auto const& chunk_digest : unique_digests) {
+ if (missing_digest_set.contains(chunk_digest)) {
+ transmitted_bytes += chunk_digest.size_bytes();
+ }
+ }
+ double transmission_factor =
+ not blob_data.empty()
+ ? 100.0 * transmitted_bytes / blob_data.size()
+ : 100.0;
+ return fmt::format(
+ "Blob splitting saved {} bytes ({:.2f}%) of network traffic "
+ "when fetching {}.\n",
+ blob_data.size() - transmitted_bytes,
+ 100.0 - transmission_factor,
+ artifact_info.ToString());
+ });
+
+ // Upload blob to other CAS.
+ BlobContainer container{};
+ try {
+ auto exec = IsExecutableObject(artifact_info.type);
+ container.Emplace(BazelBlob{artifact_info.digest, blob_data, exec});
+ } catch (std::exception const& ex) {
+ Logger::Log(LogLevel::Error, "failed to emplace blob: ", ex.what());
+ return false;
+ }
+ return api->Upload(container, /*skip_find_missing=*/true);
+}
+
+} // namespace
+
BazelApi::BazelApi(std::string const& instance_name,
std::string const& host,
Port port,
@@ -218,46 +356,15 @@ auto BazelApi::CreateAction(
blob_digests.push_back(info.digest);
}
- // Fetch blobs from this CAS.
- auto size = blob_digests.size();
- auto reader = network_->ReadBlobs(std::move(blob_digests));
- auto blobs = reader.Next();
- std::size_t count{};
- BlobContainer container{};
- while (not blobs.empty()) {
- if (count + blobs.size() > size) {
- Logger::Log(LogLevel::Error, "received more blobs than requested.");
- return false;
- }
- for (auto& blob : blobs) {
- try {
- auto exec = IsExecutableObject(
- info_map[ArtifactDigest{blob.digest}].type);
- container.Emplace(BazelBlob{blob.digest, blob.data, exec});
- } catch (std::exception const& ex) {
- Logger::Log(
- LogLevel::Error, "failed to emplace blob: ", ex.what());
- return false;
- }
- }
- count += blobs.size();
- blobs = reader.Next();
- }
-
- if (count != size) {
- Logger::Log(LogLevel::Error, "could not retrieve all requested blobs.");
- return false;
- }
-
- // Upload blobs to other CAS.
- return api->Upload(container, /*skip_find_missing=*/true);
+ return ::RetrieveToCas(blob_digests, api, network_, info_map);
}
/// NOLINTNEXTLINE(misc-no-recursion)
[[nodiscard]] auto BazelApi::ParallelRetrieveToCas(
std::vector<Artifact::ObjectInfo> const& artifacts_info,
gsl::not_null<IExecutionApi*> const& api,
- std::size_t jobs) noexcept -> bool {
+ std::size_t jobs,
+ bool use_blob_splitting) noexcept -> bool {
// Return immediately if target CAS is this CAS
if (this == api) {
@@ -288,7 +395,8 @@ auto BazelApi::CreateAction(
auto const infos = network_->ReadDirectTreeEntries(
info.digest, std::filesystem::path{});
if (not infos or
- not ParallelRetrieveToCas(infos->second, api, jobs)) {
+ not ParallelRetrieveToCas(
+ infos->second, api, jobs, use_blob_splitting)) {
return false;
}
}
@@ -297,47 +405,15 @@ auto BazelApi::CreateAction(
// as size, but this is handled by the remote execution engine, so
// no need to regenerate the digest.
ts.QueueTask(
- [this, digest = info.digest, &api, &failure, &info_map]() {
- auto reader = network_->ReadBlobs({digest});
- auto blobs = reader.Next();
- std::size_t count{};
- BlobContainer container{};
- while (not blobs.empty()) {
- if (count + blobs.size() > 1) {
- Logger::Log(LogLevel::Error,
- "received more blobs than requested.");
- failure = true;
- return;
- }
- for (auto& blob : blobs) {
- try {
- auto exec = IsExecutableObject(
- info_map[ArtifactDigest{blob.digest}].type);
- container.Emplace(
- BazelBlob{blob.digest, blob.data, exec});
- } catch (std::exception const& ex) {
- Logger::Log(LogLevel::Error,
- "failed to emplace blob: {}",
- ex.what());
- failure = true;
- return;
- }
- }
- count += blobs.size();
- blobs = reader.Next();
- }
- if (count != 1) {
- Logger::Log(LogLevel::Error,
- "could not retrieve all requested blobs.");
- failure = true;
- return;
- }
- auto result =
- api->Upload(container, /*skip_find_missing=*/true);
- if (not result) {
- failure = true;
+ [this, &info, &api, &failure, &info_map, use_blob_splitting]() {
+ if (use_blob_splitting
+ ? ::RetrieveToCasSplitted(
+ info, api, network_, info_map)
+ : ::RetrieveToCas(
+ {info.digest}, api, network_, info_map)) {
return;
}
+ failure = true;
});
}
} catch (std::exception const& ex) {
@@ -350,7 +426,8 @@ auto BazelApi::CreateAction(
}
[[nodiscard]] auto BazelApi::RetrieveToMemory(
- Artifact::ObjectInfo const& artifact_info) -> std::optional<std::string> {
+ Artifact::ObjectInfo const& artifact_info) noexcept
+ -> std::optional<std::string> {
auto blobs = network_->ReadBlobs({artifact_info.digest}).Next();
if (blobs.size() == 1) {
return blobs.at(0).data;