summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_api.cpp36
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_network.cpp46
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_network.hpp29
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp86
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp61
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_response.cpp22
6 files changed, 163 insertions, 117 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
index f539f325..96acd065 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
@@ -58,25 +58,24 @@ namespace {
// Fetch blobs from this CAS.
auto size = digests.size();
- auto reader = network->ReadBlobs(digests);
- auto blobs = reader.Next();
+ auto reader = network->CreateReader();
std::size_t count{};
ArtifactBlobContainer container{};
- while (not blobs.empty()) {
+ for (auto blobs : reader.ReadIncrementally(digests)) {
if (count + blobs.size() > size) {
Logger::Log(LogLevel::Warning,
"received more blobs than requested.");
return false;
}
- for (auto const& blob : blobs) {
- auto digest = ArtifactDigest{blob.digest};
- auto exec = info_map.contains(digest)
- ? IsExecutableObject(info_map.at(digest).type)
- : false;
+ for (auto& blob : blobs) {
+ blob.is_exec =
+ info_map.contains(blob.digest)
+ ? IsExecutableObject(info_map.at(blob.digest).type)
+ : false;
// Collect blob and upload to other CAS if transfer size reached.
if (not UpdateContainerAndUpload<ArtifactDigest>(
&container,
- ArtifactBlob{std::move(digest), blob.data, exec},
+ std::move(blob),
/*exception_is_fatal=*/true,
[&api](ArtifactBlobContainer&& blobs) {
return api->Upload(std::move(blobs),
@@ -86,7 +85,6 @@ namespace {
}
}
count += blobs.size();
- blobs = reader.Next();
}
if (count != size) {
@@ -266,10 +264,9 @@ auto BazelApi::CreateAction(
// Request file blobs
auto size = file_digests.size();
- auto reader = network_->ReadBlobs(std::move(file_digests));
- auto blobs = reader.Next();
+ auto reader = network_->CreateReader();
std::size_t count{};
- while (not blobs.empty()) {
+ for (auto blobs : reader.ReadIncrementally(std::move(file_digests))) {
if (count + blobs.size() > size) {
Logger::Log(LogLevel::Warning,
"received more blobs than requested.");
@@ -288,7 +285,6 @@ auto BazelApi::CreateAction(
}
}
count += blobs.size();
- blobs = reader.Next();
}
if (count != size) {
@@ -437,9 +433,9 @@ auto BazelApi::CreateAction(
[[nodiscard]] auto BazelApi::RetrieveToMemory(
Artifact::ObjectInfo const& artifact_info) noexcept
-> std::optional<std::string> {
- auto blobs = network_->ReadBlobs({artifact_info.digest}).Next();
- if (blobs.size() == 1) {
- return *blobs.at(0).data;
+ auto reader = network_->CreateReader();
+ if (auto blob = reader.ReadSingleBlob(artifact_info.digest)) {
+ return *blob->data;
}
return std::nullopt;
}
@@ -468,14 +464,12 @@ auto BazelApi::CreateAction(
*build_root,
[&network = network_](std::vector<bazel_re::Digest> const& digests,
std::vector<std::string>* targets) {
- auto reader = network->ReadBlobs(digests);
- auto blobs = reader.Next();
+ auto reader = network->CreateReader();
targets->reserve(digests.size());
- while (not blobs.empty()) {
+ for (auto blobs : reader.ReadIncrementally(digests)) {
for (auto const& blob : blobs) {
targets->emplace_back(*blob.data);
}
- blobs = reader.Next();
}
});
}
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network.cpp b/src/buildtool/execution_api/remote/bazel/bazel_network.cpp
index ba30b564..38bb1aa0 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_network.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_network.cpp
@@ -134,52 +134,6 @@ auto BazelNetwork::ExecuteBazelActionSync(
return response.output;
}
-auto BazelNetwork::BlobReader::Next() noexcept -> std::vector<BazelBlob> {
- std::size_t size{};
- std::vector<BazelBlob> blobs{};
-
- try {
- while (current_ != ids_.end()) {
- auto blob_size = gsl::narrow<std::size_t>(current_->size_bytes());
- size += blob_size;
- // read if size is 0 (unknown) or exceeds transfer size
- if (blob_size == 0 or size > kMaxBatchTransferSize) {
- // perform read of range [begin_, current_)
- if (begin_ == current_) {
- auto blob = cas_->ReadSingleBlob(instance_name_, *begin_);
- if (blob) {
- blobs.emplace_back(std::move(*blob));
- }
- ++current_;
- }
- else {
- blobs =
- cas_->BatchReadBlobs(instance_name_, begin_, current_);
- }
- begin_ = current_;
- break;
- }
- ++current_;
- }
-
- if (begin_ != current_) {
- blobs = cas_->BatchReadBlobs(instance_name_, begin_, current_);
- begin_ = current_;
- }
- } catch (std::exception const& e) {
- Logger::Log(
- LogLevel::Warning, "Reading blobs failed with: {}", e.what());
- Ensures(false);
- }
-
- return blobs;
-}
-
-auto BazelNetwork::ReadBlobs(std::vector<bazel_re::Digest> ids) const noexcept
- -> BlobReader {
- return BlobReader{instance_name_, cas_.get(), std::move(ids)};
-}
-
auto BazelNetwork::CreateReader() const noexcept -> BazelNetworkReader {
return BazelNetworkReader{instance_name_, *cas_};
}
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network.hpp b/src/buildtool/execution_api/remote/bazel/bazel_network.hpp
index 422f59dd..853efa41 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_network.hpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_network.hpp
@@ -22,7 +22,6 @@
#include <utility>
#include <vector>
-#include "gsl/gsl"
#include "src/buildtool/common/bazel_types.hpp"
#include "src/buildtool/common/remote/port.hpp"
#include "src/buildtool/execution_api/bazel_msg/bazel_blob_container.hpp"
@@ -36,31 +35,6 @@
/// \brief Contains all network clients and is responsible for all network IO.
class BazelNetwork {
public:
- class BlobReader {
- friend class BazelNetwork;
-
- public:
- // Obtain the next batch of blobs that can be transferred in a single
- // request.
- [[nodiscard]] auto Next() noexcept -> std::vector<BazelBlob>;
-
- private:
- std::string instance_name_;
- gsl::not_null<BazelCasClient*> cas_;
- std::vector<bazel_re::Digest> const ids_;
- std::vector<bazel_re::Digest>::const_iterator begin_;
- std::vector<bazel_re::Digest>::const_iterator current_;
-
- BlobReader(std::string instance_name,
- gsl::not_null<BazelCasClient*> const& cas,
- std::vector<bazel_re::Digest> ids)
- : instance_name_{std::move(instance_name)},
- cas_{cas},
- ids_{std::move(ids)},
- begin_{ids_.begin()},
- current_{begin_} {};
- };
-
BazelNetwork(std::string instance_name,
std::string const& host,
Port port,
@@ -99,9 +73,6 @@ class BazelNetwork {
bazel_re::Digest const& action) noexcept
-> std::optional<BazelExecutionClient::ExecutionOutput>;
- [[nodiscard]] auto ReadBlobs(
- std::vector<bazel_re::Digest> ids) const noexcept -> BlobReader;
-
[[nodiscard]] auto CreateReader() const noexcept -> BazelNetworkReader;
[[nodiscard]] auto GetCachedActionResult(
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp
index 1bd4a90a..d945fdb3 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.cpp
@@ -67,16 +67,24 @@ auto BazelNetworkReader::ReadGitTree(ArtifactDigest const& digest)
return std::nullopt;
}
auto check_symlinks = [this](std::vector<bazel_re::Digest> const& ids) {
- // TODO(denisov) Fix non-incremental read
- auto blobs = BatchReadBlobs(ids);
- if (blobs.size() > ids.size()) {
- Logger::Log(LogLevel::Debug, "received more blobs than requested.");
- return false;
+ size_t const size = ids.size();
+ size_t count = 0;
+ for (auto blobs : ReadIncrementally(ids)) {
+ if (count + blobs.size() > size) {
+ Logger::Log(LogLevel::Debug,
+ "received more blobs than requested.");
+ return false;
+ }
+ bool valid = std::all_of(
+ blobs.begin(), blobs.end(), [](ArtifactBlob const& blob) {
+ return PathIsNonUpwards(*blob.data);
+ });
+ if (not valid) {
+ return false;
+ }
+ count += blobs.size();
}
- return std::all_of(
- blobs.begin(), blobs.end(), [](ArtifactBlob const& blob) {
- return PathIsNonUpwards(*blob.data);
- });
+ return true;
};
std::string const& content = *read_blob->data;
@@ -148,6 +156,11 @@ auto BazelNetworkReader::ReadSingleBlob(ArtifactDigest const& digest)
return std::nullopt;
}
+auto BazelNetworkReader::ReadIncrementally(
+ std::vector<bazel_re::Digest> digests) const noexcept -> IncrementalReader {
+ return IncrementalReader{*this, std::move(digests)};
+}
+
auto BazelNetworkReader::BatchReadBlobs(
std::vector<bazel_re::Digest> const& blobs) const noexcept
-> std::vector<ArtifactBlob> {
@@ -187,3 +200,58 @@ auto BazelNetworkReader::Validate(BazelBlob const& blob) noexcept -> bool {
rehashed_digest.hash());
return false;
}
+
+namespace {
+[[nodiscard]] auto FindBorderIterator(
+ std::vector<bazel_re::Digest>::const_iterator const& begin,
+ std::vector<bazel_re::Digest>::const_iterator const& end) noexcept {
+ std::int64_t size = 0;
+ for (auto it = begin; it != end; ++it) {
+ std::int64_t const blob_size = it->size_bytes();
+ size += blob_size;
+ if (blob_size == 0 or size > kMaxBatchTransferSize) {
+ return it;
+ }
+ }
+ return end;
+}
+
+[[nodiscard]] auto FindCurrentIterator(
+ std::vector<bazel_re::Digest>::const_iterator const& begin,
+ std::vector<bazel_re::Digest>::const_iterator const& end) noexcept {
+ auto it = FindBorderIterator(begin, end);
+ if (it == begin and begin != end) {
+ ++it;
+ }
+ return it;
+}
+} // namespace
+
+BazelNetworkReader::IncrementalReader::iterator::iterator(
+ BazelNetworkReader const& owner,
+ std::vector<bazel_re::Digest>::const_iterator begin,
+ std::vector<bazel_re::Digest>::const_iterator end) noexcept
+ : owner_{owner}, begin_{begin}, end_{end} {
+ current_ = FindCurrentIterator(begin_, end_);
+}
+
+auto BazelNetworkReader::IncrementalReader::iterator::operator*() const noexcept
+ -> value_type {
+ if (begin_ != current_) {
+ if (std::distance(begin_, current_) > 1) {
+ std::vector<bazel_re::Digest> request{begin_, current_};
+ return owner_.BatchReadBlobs(request);
+ }
+ if (auto blob = owner_.ReadSingleBlob(ArtifactDigest{*begin_})) {
+ return {std::move(*blob)};
+ }
+ }
+ return {};
+}
+
+auto BazelNetworkReader::IncrementalReader::iterator::operator++() noexcept
+ -> iterator& {
+ begin_ = current_;
+ current_ = FindCurrentIterator(begin_, end_);
+ return *this;
+}
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp
index 66be173b..9f1fbef2 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_network_reader.hpp
@@ -15,8 +15,10 @@
#ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BAZEL_TREE_READER_HPP
#define INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BAZEL_TREE_READER_HPP
+#include <cstddef>
#include <filesystem>
#include <functional>
+#include <iterator>
#include <optional>
#include <string>
#include <unordered_map>
@@ -31,6 +33,8 @@
#include "src/buildtool/file_system/git_repo.hpp"
class BazelNetworkReader final {
+ class IncrementalReader;
+
public:
using DumpCallback = std::function<bool(std::string const&)>;
@@ -58,6 +62,9 @@ class BazelNetworkReader final {
[[nodiscard]] auto ReadSingleBlob(ArtifactDigest const& digest)
const noexcept -> std::optional<ArtifactBlob>;
+ [[nodiscard]] auto ReadIncrementally(std::vector<bazel_re::Digest> digests)
+ const noexcept -> IncrementalReader;
+
private:
using DirectoryMap =
std::unordered_map<ArtifactDigest, bazel_re::Directory>;
@@ -77,4 +84,58 @@ class BazelNetworkReader final {
[[nodiscard]] static auto Validate(BazelBlob const& blob) noexcept -> bool;
};
+class BazelNetworkReader::IncrementalReader final {
+ public:
+ IncrementalReader(BazelNetworkReader const& owner,
+ std::vector<bazel_re::Digest> digests) noexcept
+ : owner_(owner), digests_(std::move(digests)) {}
+
+ class iterator final {
+ public:
+ using value_type = std::vector<ArtifactBlob>;
+ using pointer = value_type*;
+ using reference = value_type&;
+ using difference_type = std::ptrdiff_t;
+ using iterator_category = std::forward_iterator_tag;
+
+ iterator(BazelNetworkReader const& owner,
+ std::vector<bazel_re::Digest>::const_iterator begin,
+ std::vector<bazel_re::Digest>::const_iterator end) noexcept;
+
+ auto operator*() const noexcept -> value_type;
+ auto operator++() noexcept -> iterator&;
+
+ [[nodiscard]] friend auto operator==(iterator const& lhs,
+ iterator const& rhs) noexcept
+ -> bool {
+ return lhs.begin_ == rhs.begin_ and lhs.end_ == rhs.end_ and
+ lhs.current_ == rhs.current_;
+ }
+
+ [[nodiscard]] friend auto operator!=(iterator const& lhs,
+ iterator const& rhs) noexcept
+ -> bool {
+ return not(lhs == rhs);
+ }
+
+ private:
+ BazelNetworkReader const& owner_;
+ std::vector<bazel_re::Digest>::const_iterator begin_;
+ std::vector<bazel_re::Digest>::const_iterator end_;
+ std::vector<bazel_re::Digest>::const_iterator current_;
+ };
+
+ [[nodiscard]] auto begin() const noexcept {
+ return iterator{owner_, digests_.begin(), digests_.end()};
+ }
+
+ [[nodiscard]] auto end() const noexcept {
+ return iterator{owner_, digests_.end(), digests_.end()};
+ }
+
+ private:
+ BazelNetworkReader const& owner_;
+ std::vector<bazel_re::Digest> digests_;
+};
+
#endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BAZEL_TREE_READER_HPP
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_response.cpp b/src/buildtool/execution_api/remote/bazel/bazel_response.cpp
index df1d4736..1b7682bd 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_response.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_response.cpp
@@ -38,14 +38,14 @@ auto ProcessDirectoryMessage(bazel_re::Directory const& dir) noexcept
auto BazelResponse::ReadStringBlob(bazel_re::Digest const& id) noexcept
-> std::string {
- auto blobs = network_->ReadBlobs({id}).Next();
- if (blobs.empty()) {
- Logger::Log(LogLevel::Warning,
- "reading digest {} from action response failed",
- id.hash());
- return std::string{};
+ auto reader = network_->CreateReader();
+ if (auto blob = reader.ReadSingleBlob(ArtifactDigest{id})) {
+ return *blob->data;
}
- return *blobs[0].data;
+ Logger::Log(LogLevel::Warning,
+ "reading digest {} from action response failed",
+ id.hash());
+ return std::string{};
}
auto BazelResponse::Artifacts() noexcept -> ArtifactInfos {
@@ -175,10 +175,9 @@ auto BazelResponse::Populate() noexcept -> bool {
[](auto dir) { return dir.tree_digest(); });
// collect root digests from trees and store them
- auto blob_reader = network_->ReadBlobs(tree_digests);
- auto tree_blobs = blob_reader.Next();
- int pos{};
- while (not tree_blobs.empty()) {
+ auto reader = network_->CreateReader();
+ int pos = 0;
+ for (auto tree_blobs : reader.ReadIncrementally(tree_digests)) {
for (auto const& tree_blob : tree_blobs) {
try {
auto tree = BazelMsgFactory::MessageFromString<bazel_re::Tree>(
@@ -204,7 +203,6 @@ auto BazelResponse::Populate() noexcept -> bool {
}
++pos;
}
- tree_blobs = blob_reader.Next();
}
artifacts_ = std::move(artifacts);
dir_symlinks_ = std::move(dir_symlinks);