diff options
Diffstat (limited to 'src/buildtool/execution_api/remote/bazel/bazel_api.cpp')
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bazel_api.cpp | 237 |
1 files changed, 157 insertions, 80 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp index 74e8286f..19ff52f6 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp @@ -16,12 +16,9 @@ #include <algorithm> #include <atomic> -#include <map> -#include <memory> -#include <optional> -#include <string> +#include <cstdint> #include <unordered_map> -#include <vector> +#include <unordered_set> #include "fmt/core.h" #include "src/buildtool/common/bazel_types.hpp" @@ -37,9 +34,150 @@ #include "src/buildtool/execution_api/remote/bazel/bazel_network.hpp" #include "src/buildtool/execution_api/remote/bazel/bazel_response.hpp" #include "src/buildtool/file_system/file_system_manager.hpp" +#include "src/buildtool/file_system/object_type.hpp" #include "src/buildtool/logging/logger.hpp" #include "src/buildtool/multithreading/task_system.hpp" +namespace { + +[[nodiscard]] auto IsAvailable( + std::vector<bazel_re::Digest> const& digests, + gsl::not_null<IExecutionApi*> const& api) noexcept + -> std::vector<bazel_re::Digest> { + std::vector<ArtifactDigest> artifact_digests; + artifact_digests.reserve(digests.size()); + for (auto const& digest : digests) { + artifact_digests.emplace_back(digest); + } + auto const& missing_artifact_digests = api->IsAvailable(artifact_digests); + std::vector<bazel_re::Digest> missing_digests; + missing_digests.reserve(missing_artifact_digests.size()); + for (auto const& digest : missing_artifact_digests) { + missing_digests.emplace_back(static_cast<bazel_re::Digest>(digest)); + } + return missing_digests; +} + +[[nodiscard]] auto RetrieveToCas( + std::vector<bazel_re::Digest> const& digests, + gsl::not_null<IExecutionApi*> const& api, + std::shared_ptr<BazelNetwork> const& network, + std::unordered_map<ArtifactDigest, Artifact::ObjectInfo> const& + info_map) noexcept -> bool { + + // Fetch blobs from this CAS. + auto size = digests.size(); + auto reader = network->ReadBlobs(digests); + auto blobs = reader.Next(); + std::size_t count{}; + BlobContainer container{}; + while (not blobs.empty()) { + if (count + blobs.size() > size) { + Logger::Log(LogLevel::Error, "received more blobs than requested."); + return false; + } + for (auto const& blob : blobs) { + try { + auto digest = ArtifactDigest{blob.digest}; + auto exec = info_map.contains(digest) + ? IsExecutableObject(info_map.at(digest).type) + : false; + container.Emplace(BazelBlob{blob.digest, blob.data, exec}); + } catch (std::exception const& ex) { + Logger::Log( + LogLevel::Error, "failed to emplace blob: ", ex.what()); + return false; + } + } + count += blobs.size(); + blobs = reader.Next(); + } + + if (count != size) { + Logger::Log(LogLevel::Error, "could not retrieve all requested blobs."); + return false; + } + + // Upload blobs to other CAS. + return api->Upload(container, /*skip_find_missing=*/true); +} + +[[nodiscard]] auto RetrieveToCasSplitted( + Artifact::ObjectInfo const& artifact_info, + gsl::not_null<IExecutionApi*> const& api, + std::shared_ptr<BazelNetwork> const& network, + std::unordered_map<ArtifactDigest, Artifact::ObjectInfo> const& + info_map) noexcept -> bool { + + // Split blob into chunks at the remote side and retrieve chunk digests. + auto chunk_digests = network->SplitBlob(artifact_info.digest); + if (not chunk_digests) { + // If blob splitting failed, fall back to regular fetching. + return ::RetrieveToCas({artifact_info.digest}, api, network, info_map); + } + + // Fetch unknown chunks. + auto digest_set = std::unordered_set<bazel_re::Digest>{ + (*chunk_digests).begin(), (*chunk_digests).end()}; + auto unique_digests = + std::vector<bazel_re::Digest>{digest_set.begin(), digest_set.end()}; + auto missing_digests = ::IsAvailable(unique_digests, api); + if (not ::RetrieveToCas(missing_digests, api, network, info_map)) { + return false; + } + + // Assemble blob from chunks. + std::string blob_data{}; + for (auto const& chunk_digest : *chunk_digests) { + auto info = Artifact::ObjectInfo{.digest = ArtifactDigest{chunk_digest}, + .type = ObjectType::File}; + auto chunk_data = api->RetrieveToMemory(info); + if (not chunk_data) { + Logger::Log(LogLevel::Error, + "could not load blob chunk in memory: ", + chunk_digest.hash()); + return false; + } + blob_data += *chunk_data; + } + + Logger::Log( + LogLevel::Debug, + [&artifact_info, &unique_digests, &missing_digests, &blob_data]() { + auto missing_digest_set = std::unordered_set<bazel_re::Digest>{ + missing_digests.begin(), missing_digests.end()}; + std::uint64_t transmitted_bytes{0}; + for (auto const& chunk_digest : unique_digests) { + if (missing_digest_set.contains(chunk_digest)) { + transmitted_bytes += chunk_digest.size_bytes(); + } + } + double transmission_factor = + not blob_data.empty() + ? 100.0 * transmitted_bytes / blob_data.size() + : 100.0; + return fmt::format( + "Blob splitting saved {} bytes ({:.2f}%) of network traffic " + "when fetching {}.\n", + blob_data.size() - transmitted_bytes, + 100.0 - transmission_factor, + artifact_info.ToString()); + }); + + // Upload blob to other CAS. + BlobContainer container{}; + try { + auto exec = IsExecutableObject(artifact_info.type); + container.Emplace(BazelBlob{artifact_info.digest, blob_data, exec}); + } catch (std::exception const& ex) { + Logger::Log(LogLevel::Error, "failed to emplace blob: ", ex.what()); + return false; + } + return api->Upload(container, /*skip_find_missing=*/true); +} + +} // namespace + BazelApi::BazelApi(std::string const& instance_name, std::string const& host, Port port, @@ -218,46 +356,15 @@ auto BazelApi::CreateAction( blob_digests.push_back(info.digest); } - // Fetch blobs from this CAS. - auto size = blob_digests.size(); - auto reader = network_->ReadBlobs(std::move(blob_digests)); - auto blobs = reader.Next(); - std::size_t count{}; - BlobContainer container{}; - while (not blobs.empty()) { - if (count + blobs.size() > size) { - Logger::Log(LogLevel::Error, "received more blobs than requested."); - return false; - } - for (auto& blob : blobs) { - try { - auto exec = IsExecutableObject( - info_map[ArtifactDigest{blob.digest}].type); - container.Emplace(BazelBlob{blob.digest, blob.data, exec}); - } catch (std::exception const& ex) { - Logger::Log( - LogLevel::Error, "failed to emplace blob: ", ex.what()); - return false; - } - } - count += blobs.size(); - blobs = reader.Next(); - } - - if (count != size) { - Logger::Log(LogLevel::Error, "could not retrieve all requested blobs."); - return false; - } - - // Upload blobs to other CAS. - return api->Upload(container, /*skip_find_missing=*/true); + return ::RetrieveToCas(blob_digests, api, network_, info_map); } /// NOLINTNEXTLINE(misc-no-recursion) [[nodiscard]] auto BazelApi::ParallelRetrieveToCas( std::vector<Artifact::ObjectInfo> const& artifacts_info, gsl::not_null<IExecutionApi*> const& api, - std::size_t jobs) noexcept -> bool { + std::size_t jobs, + bool use_blob_splitting) noexcept -> bool { // Return immediately if target CAS is this CAS if (this == api) { @@ -288,7 +395,8 @@ auto BazelApi::CreateAction( auto const infos = network_->ReadDirectTreeEntries( info.digest, std::filesystem::path{}); if (not infos or - not ParallelRetrieveToCas(infos->second, api, jobs)) { + not ParallelRetrieveToCas( + infos->second, api, jobs, use_blob_splitting)) { return false; } } @@ -297,47 +405,15 @@ auto BazelApi::CreateAction( // as size, but this is handled by the remote execution engine, so // no need to regenerate the digest. ts.QueueTask( - [this, digest = info.digest, &api, &failure, &info_map]() { - auto reader = network_->ReadBlobs({digest}); - auto blobs = reader.Next(); - std::size_t count{}; - BlobContainer container{}; - while (not blobs.empty()) { - if (count + blobs.size() > 1) { - Logger::Log(LogLevel::Error, - "received more blobs than requested."); - failure = true; - return; - } - for (auto& blob : blobs) { - try { - auto exec = IsExecutableObject( - info_map[ArtifactDigest{blob.digest}].type); - container.Emplace( - BazelBlob{blob.digest, blob.data, exec}); - } catch (std::exception const& ex) { - Logger::Log(LogLevel::Error, - "failed to emplace blob: {}", - ex.what()); - failure = true; - return; - } - } - count += blobs.size(); - blobs = reader.Next(); - } - if (count != 1) { - Logger::Log(LogLevel::Error, - "could not retrieve all requested blobs."); - failure = true; - return; - } - auto result = - api->Upload(container, /*skip_find_missing=*/true); - if (not result) { - failure = true; + [this, &info, &api, &failure, &info_map, use_blob_splitting]() { + if (use_blob_splitting + ? ::RetrieveToCasSplitted( + info, api, network_, info_map) + : ::RetrieveToCas( + {info.digest}, api, network_, info_map)) { return; } + failure = true; }); } } catch (std::exception const& ex) { @@ -350,7 +426,8 @@ auto BazelApi::CreateAction( } [[nodiscard]] auto BazelApi::RetrieveToMemory( - Artifact::ObjectInfo const& artifact_info) -> std::optional<std::string> { + Artifact::ObjectInfo const& artifact_info) noexcept + -> std::optional<std::string> { auto blobs = network_->ReadBlobs({artifact_info.digest}).Next(); if (blobs.size() == 1) { return blobs.at(0).data; |