diff options
Diffstat (limited to 'src')
6 files changed, 163 insertions, 117 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp index f539f325..96acd065 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp @@ -58,25 +58,24 @@ namespace { // Fetch blobs from this CAS. auto size = digests.size(); - auto reader = network->ReadBlobs(digests); - auto blobs = reader.Next(); + auto reader = network->CreateReader(); std::size_t count{}; ArtifactBlobContainer container{}; - while (not blobs.empty()) { + for (auto blobs : reader.ReadIncrementally(digests)) { if (count + blobs.size() > size) { Logger::Log(LogLevel::Warning, "received more blobs than requested."); return false; } - for (auto const& blob : blobs) { - auto digest = ArtifactDigest{blob.digest}; - auto exec = info_map.contains(digest) - ? IsExecutableObject(info_map.at(digest).type) - : false; + for (auto& blob : blobs) { + blob.is_exec = + info_map.contains(blob.digest) + ? IsExecutableObject(info_map.at(blob.digest).type) + : false; // Collect blob and upload to other CAS if transfer size reached. if (not UpdateContainerAndUpload<ArtifactDigest>( &container, - ArtifactBlob{std::move(digest), blob.data, exec}, + std::move(blob), /*exception_is_fatal=*/true, [&api](ArtifactBlobContainer&& blobs) { return api->Upload(std::move(blobs), @@ -86,7 +85,6 @@ namespace { } } count += blobs.size(); - blobs = reader.Next(); } if (count != size) { @@ -266,10 +264,9 @@ auto BazelApi::CreateAction( // Request file blobs auto size = file_digests.size(); - auto reader = network_->ReadBlobs(std::move(file_digests)); - auto blobs = reader.Next(); + auto reader = network_->CreateReader(); std::size_t count{}; - while (not blobs.empty()) { + for (auto blobs : reader.ReadIncrementally(std::move(file_digests))) { if (count + blobs.size() > size) { Logger::Log(LogLevel::Warning, "received more blobs than requested."); @@ -288,7 +285,6 @@ auto BazelApi::CreateAction( } } count += blobs.size(); - blobs = reader.Next(); } if (count != size) { @@ -437,9 +433,9 @@ auto BazelApi::CreateAction( [[nodiscard]] auto BazelApi::RetrieveToMemory( Artifact::ObjectInfo const& artifact_info) noexcept -> std::optional<std::string> { - auto blobs = network_->ReadBlobs({artifact_info.digest}).Next(); - if (blobs.size() == 1) { - return *blobs.at(0).data; + auto reader = network_->CreateReader(); + if (auto blob = reader.ReadSingleBlob(artifact_info.digest)) { + return *blob->data; } return std::nullopt; } @@ -468,14 +464,12 @@ auto BazelApi::CreateAction( *build_root, [&network = network_](std::vector<bazel_re::Digest> const& digests, std::vector<std::string>* targets) { - auto reader = network->ReadBlobs(digests); - auto blobs = reader.Next(); + auto reader = network->CreateReader(); targets->reserve(digests.size()); - while (not blobs.empty()) { + for (auto blobs : reader.ReadIncrementally(digests)) { for (auto const& blob : blobs) { targets->emplace_back(*blob.data); } - blobs = reader.Next(); } }); } diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network.cpp b/src/buildtool/execution_api/remote/bazel/bazel_network.cpp index ba30b564..38bb1aa0 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_network.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_network.cpp @@ -134,52 +134,6 @@ auto BazelNetwork::ExecuteBazelActionSync( return response.output; } -auto BazelNetwork::BlobReader::Next() noexcept -> std::vector<BazelBlob> { - std::size_t size{}; - std::vector<BazelBlob> blobs{}; - - try { - while (current_ != ids_.end()) { - auto blob_size = gsl::narrow<std::size_t>(current_->size_bytes()); - size += blob_size; - // read if size is 0 (unknown) or exceeds transfer size - if (blob_size == 0 or size > kMaxBatchTransferSize) { - // perform read of range [begin_, current_) - if (begin_ == current_) { - auto blob = cas_->ReadSingleBlob(instance_name_, *begin_); - if (blob) { - blobs.emplace_back(std::move(*blob)); - } - ++current_; - } - else { - blobs = - cas_->BatchReadBlobs(instance_name_, begin_, current_); - } - begin_ = current_; - break; - } - ++current_; - } - - if (begin_ != current_) { - blobs = cas_->BatchReadBlobs(instance_name_, begin_, current_); - begin_ = current_; - } - } catch (std::exception const& e) { - Logger::Log( - LogLevel::Warning, "Reading blobs failed with: {}", e.what()); - Ensures(false); - } - - return blobs; -} - -auto BazelNetwork::ReadBlobs(std::vector<bazel_re::Digest> ids) const noexcept - -> BlobReader { - return BlobReader{instance_name_, cas_.get(), std::move(ids)}; -} - auto BazelNetwork::CreateReader() const noexcept -> BazelNetworkReader { return BazelNetworkReader{instance_name_, *cas_}; } diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network.hpp b/src/buildtool/execution_api/remote/bazel/bazel_network.hpp index 422f59dd..853efa41 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_network.hpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_network.hpp @@ -22,7 +22,6 @@ #include <utility> #include <vector> -#include "gsl/gsl" #include "src/buildtool/common/bazel_types.hpp" #include "src/buildtool/common/remote/port.hpp" #include "src/buildtool/execution_api/bazel_msg/bazel_blob_container.hpp" @@ -36,31 +35,6 @@ /// \brief Contains all network clients and is responsible for all network IO. class BazelNetwork { public: - class BlobReader { - friend class BazelNetwork; - - public: - // Obtain the next batch of blobs that can be transferred in a single - // request. - [[nodiscard]] auto Next() noexcept -> std::vector<BazelBlob>; - - private: - std::string instance_name_; - gsl::not_null<BazelCasClient*> cas_; - std::vector<bazel_re::Digest> const ids_; - std::vector<bazel_re::Digest>::const_iterator begin_; - std::vector<bazel_re::Digest>::const_iterator current_; - - BlobReader(std::string instance_name, - gsl::not_null<BazelCasClient*> const& cas, - std::vector<bazel_re::Digest> ids) - : instance_name_{std::move(instance_name)}, - cas_{cas}, - ids_{std::move(ids)}, - begin_{ids_.begin()}, - current_{begin_} {}; - }; - BazelNetwork(std::string instance_name, std::string const& host, Port port, @@ -99,9 +73,6 @@ class BazelNetwork { bazel_re::Digest const& action) noexcept -> std::optional<BazelExecutionClient::ExecutionOutput>; - [[nodiscard]] auto ReadBlobs( - std::vector<bazel_re::Digest> ids) const noexcept -> BlobReader; - [[nodiscard]] auto CreateReader() const noexcept -> BazelNetworkReader; [[nodiscard]] auto GetCachedActionResult( diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp index 1bd4a90a..d945fdb3 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp @@ -67,16 +67,24 @@ auto BazelNetworkReader::ReadGitTree(ArtifactDigest const& digest) return std::nullopt; } auto check_symlinks = [this](std::vector<bazel_re::Digest> const& ids) { - // TODO(denisov) Fix non-incremental read - auto blobs = BatchReadBlobs(ids); - if (blobs.size() > ids.size()) { - Logger::Log(LogLevel::Debug, "received more blobs than requested."); - return false; + size_t const size = ids.size(); + size_t count = 0; + for (auto blobs : ReadIncrementally(ids)) { + if (count + blobs.size() > size) { + Logger::Log(LogLevel::Debug, + "received more blobs than requested."); + return false; + } + bool valid = std::all_of( + blobs.begin(), blobs.end(), [](ArtifactBlob const& blob) { + return PathIsNonUpwards(*blob.data); + }); + if (not valid) { + return false; + } + count += blobs.size(); } - return std::all_of( - blobs.begin(), blobs.end(), [](ArtifactBlob const& blob) { - return PathIsNonUpwards(*blob.data); - }); + return true; }; std::string const& content = *read_blob->data; @@ -148,6 +156,11 @@ auto BazelNetworkReader::ReadSingleBlob(ArtifactDigest const& digest) return std::nullopt; } +auto BazelNetworkReader::ReadIncrementally( + std::vector<bazel_re::Digest> digests) const noexcept -> IncrementalReader { + return IncrementalReader{*this, std::move(digests)}; +} + auto BazelNetworkReader::BatchReadBlobs( std::vector<bazel_re::Digest> const& blobs) const noexcept -> std::vector<ArtifactBlob> { @@ -187,3 +200,58 @@ auto BazelNetworkReader::Validate(BazelBlob const& blob) noexcept -> bool { rehashed_digest.hash()); return false; } + +namespace { +[[nodiscard]] auto FindBorderIterator( + std::vector<bazel_re::Digest>::const_iterator const& begin, + std::vector<bazel_re::Digest>::const_iterator const& end) noexcept { + std::int64_t size = 0; + for (auto it = begin; it != end; ++it) { + std::int64_t const blob_size = it->size_bytes(); + size += blob_size; + if (blob_size == 0 or size > kMaxBatchTransferSize) { + return it; + } + } + return end; +} + +[[nodiscard]] auto FindCurrentIterator( + std::vector<bazel_re::Digest>::const_iterator const& begin, + std::vector<bazel_re::Digest>::const_iterator const& end) noexcept { + auto it = FindBorderIterator(begin, end); + if (it == begin and begin != end) { + ++it; + } + return it; +} +} // namespace + +BazelNetworkReader::IncrementalReader::iterator::iterator( + BazelNetworkReader const& owner, + std::vector<bazel_re::Digest>::const_iterator begin, + std::vector<bazel_re::Digest>::const_iterator end) noexcept + : owner_{owner}, begin_{begin}, end_{end} { + current_ = FindCurrentIterator(begin_, end_); +} + +auto BazelNetworkReader::IncrementalReader::iterator::operator*() const noexcept + -> value_type { + if (begin_ != current_) { + if (std::distance(begin_, current_) > 1) { + std::vector<bazel_re::Digest> request{begin_, current_}; + return owner_.BatchReadBlobs(request); + } + if (auto blob = owner_.ReadSingleBlob(ArtifactDigest{*begin_})) { + return {std::move(*blob)}; + } + } + return {}; +} + +auto BazelNetworkReader::IncrementalReader::iterator::operator++() noexcept + -> iterator& { + begin_ = current_; + current_ = FindCurrentIterator(begin_, end_); + return *this; +} diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp index 66be173b..9f1fbef2 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp @@ -15,8 +15,10 @@ #ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BAZEL_TREE_READER_HPP #define INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BAZEL_TREE_READER_HPP +#include <cstddef> #include <filesystem> #include <functional> +#include <iterator> #include <optional> #include <string> #include <unordered_map> @@ -31,6 +33,8 @@ #include "src/buildtool/file_system/git_repo.hpp" class BazelNetworkReader final { + class IncrementalReader; + public: using DumpCallback = std::function<bool(std::string const&)>; @@ -58,6 +62,9 @@ class BazelNetworkReader final { [[nodiscard]] auto ReadSingleBlob(ArtifactDigest const& digest) const noexcept -> std::optional<ArtifactBlob>; + [[nodiscard]] auto ReadIncrementally(std::vector<bazel_re::Digest> digests) + const noexcept -> IncrementalReader; + private: using DirectoryMap = std::unordered_map<ArtifactDigest, bazel_re::Directory>; @@ -77,4 +84,58 @@ class BazelNetworkReader final { [[nodiscard]] static auto Validate(BazelBlob const& blob) noexcept -> bool; }; +class BazelNetworkReader::IncrementalReader final { + public: + IncrementalReader(BazelNetworkReader const& owner, + std::vector<bazel_re::Digest> digests) noexcept + : owner_(owner), digests_(std::move(digests)) {} + + class iterator final { + public: + using value_type = std::vector<ArtifactBlob>; + using pointer = value_type*; + using reference = value_type&; + using difference_type = std::ptrdiff_t; + using iterator_category = std::forward_iterator_tag; + + iterator(BazelNetworkReader const& owner, + std::vector<bazel_re::Digest>::const_iterator begin, + std::vector<bazel_re::Digest>::const_iterator end) noexcept; + + auto operator*() const noexcept -> value_type; + auto operator++() noexcept -> iterator&; + + [[nodiscard]] friend auto operator==(iterator const& lhs, + iterator const& rhs) noexcept + -> bool { + return lhs.begin_ == rhs.begin_ and lhs.end_ == rhs.end_ and + lhs.current_ == rhs.current_; + } + + [[nodiscard]] friend auto operator!=(iterator const& lhs, + iterator const& rhs) noexcept + -> bool { + return not(lhs == rhs); + } + + private: + BazelNetworkReader const& owner_; + std::vector<bazel_re::Digest>::const_iterator begin_; + std::vector<bazel_re::Digest>::const_iterator end_; + std::vector<bazel_re::Digest>::const_iterator current_; + }; + + [[nodiscard]] auto begin() const noexcept { + return iterator{owner_, digests_.begin(), digests_.end()}; + } + + [[nodiscard]] auto end() const noexcept { + return iterator{owner_, digests_.end(), digests_.end()}; + } + + private: + BazelNetworkReader const& owner_; + std::vector<bazel_re::Digest> digests_; +}; + #endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BAZEL_TREE_READER_HPP diff --git a/src/buildtool/execution_api/remote/bazel/bazel_response.cpp b/src/buildtool/execution_api/remote/bazel/bazel_response.cpp index df1d4736..1b7682bd 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_response.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_response.cpp @@ -38,14 +38,14 @@ auto ProcessDirectoryMessage(bazel_re::Directory const& dir) noexcept auto BazelResponse::ReadStringBlob(bazel_re::Digest const& id) noexcept -> std::string { - auto blobs = network_->ReadBlobs({id}).Next(); - if (blobs.empty()) { - Logger::Log(LogLevel::Warning, - "reading digest {} from action response failed", - id.hash()); - return std::string{}; + auto reader = network_->CreateReader(); + if (auto blob = reader.ReadSingleBlob(ArtifactDigest{id})) { + return *blob->data; } - return *blobs[0].data; + Logger::Log(LogLevel::Warning, + "reading digest {} from action response failed", + id.hash()); + return std::string{}; } auto BazelResponse::Artifacts() noexcept -> ArtifactInfos { @@ -175,10 +175,9 @@ auto BazelResponse::Populate() noexcept -> bool { [](auto dir) { return dir.tree_digest(); }); // collect root digests from trees and store them - auto blob_reader = network_->ReadBlobs(tree_digests); - auto tree_blobs = blob_reader.Next(); - int pos{}; - while (not tree_blobs.empty()) { + auto reader = network_->CreateReader(); + int pos = 0; + for (auto tree_blobs : reader.ReadIncrementally(tree_digests)) { for (auto const& tree_blob : tree_blobs) { try { auto tree = BazelMsgFactory::MessageFromString<bazel_re::Tree>( @@ -204,7 +203,6 @@ auto BazelResponse::Populate() noexcept -> bool { } ++pos; } - tree_blobs = blob_reader.Next(); } artifacts_ = std::move(artifacts); dir_symlinks_ = std::move(dir_symlinks); |