summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp166
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp10
2 files changed, 65 insertions, 111 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
index fe84bced..c5e532b0 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
@@ -219,63 +219,74 @@ auto BazelCasClient::BatchReadBlobs(
return result;
}
+ auto request_creator = [&instance_name](bazel_re::Digest const& digest) {
+ bazel_re::BatchReadBlobsRequest request;
+ request.set_instance_name(instance_name);
+ *request.add_digests() = digest;
+ return request;
+ };
+
try {
result.reserve(blobs.size());
- auto requests =
- CreateBatchRequestsMaxSize<bazel_re::BatchReadBlobsRequest>(
- instance_name,
- back_map->GetKeys().begin(),
- back_map->GetKeys().end(),
- "BatchReadBlobs",
- [](bazel_re::BatchReadBlobsRequest* request,
- bazel_re::Digest const& x) {
- *(request->add_digests()) = x;
- });
- bazel_re::BatchReadBlobsResponse response;
- auto batch_read_blobs = [this, &response, &result, &back_map](
- auto const& request) -> RetryResponse {
- grpc::ClientContext context;
- auto status = stub_->BatchReadBlobs(&context, request, &response);
- if (status.ok()) {
- auto batch_response = ProcessBatchResponse<
- ArtifactBlob,
- bazel_re::BatchReadBlobsResponse_Response,
- bazel_re::BatchReadBlobsResponse>(
- response,
- [&back_map](
- std::vector<ArtifactBlob>* v,
- bazel_re::BatchReadBlobsResponse_Response const& r) {
- if (auto value = back_map->GetReference(r.digest())) {
- v->emplace_back(
- *value.value(), r.data(), /*is_exec=*/false);
+ bool has_failure = false;
+ for (auto it = back_map->GetKeys().begin();
+ it != back_map->GetKeys().end();) {
+ bazel_re::BatchReadBlobsRequest request;
+ it = InitRequest(&request,
+ request_creator,
+ it,
+ back_map->GetKeys().end(),
+ MessageLimits::kMaxGrpcLength);
+ logger_.Emit(LogLevel::Trace,
+ "BatchReadBlobs - Request size: {} bytes\n",
+ request.ByteSizeLong());
+
+ bool const retry_result = WithRetry(
+ [this, &request, &result, &back_map]() -> RetryResponse {
+ bazel_re::BatchReadBlobsResponse response;
+ grpc::ClientContext context;
+ auto status =
+ stub_->BatchReadBlobs(&context, request, &response);
+ if (status.ok()) {
+ auto batch_response = ProcessBatchResponse<
+ ArtifactBlob,
+ bazel_re::BatchReadBlobsResponse_Response,
+ bazel_re::BatchReadBlobsResponse>(
+ response,
+ [&back_map](
+ std::vector<ArtifactBlob>* v,
+ bazel_re::BatchReadBlobsResponse_Response const&
+ r) {
+ if (auto value =
+ back_map->GetReference(r.digest())) {
+ v->emplace_back(*value.value(),
+ r.data(),
+ /*is_exec=*/false);
+ }
+ });
+ if (batch_response.ok) {
+ std::move(batch_response.result.begin(),
+ batch_response.result.end(),
+ std::inserter(result, result.end()));
+ return {.ok = true};
}
- });
- if (batch_response.ok) {
- std::move(batch_response.result.begin(),
- batch_response.result.end(),
- std::inserter(result, result.end()));
- return {.ok = true};
- }
- return {.ok = false,
- .exit_retry_loop = batch_response.exit_retry_loop,
- .error_msg = batch_response.error_msg};
- }
- auto exit_retry_loop =
- status.error_code() != grpc::StatusCode::UNAVAILABLE;
- return {.ok = false,
- .exit_retry_loop = exit_retry_loop,
- .error_msg = StatusString(status, "BatchReadBlobs")};
- };
- if (not std::all_of(std::begin(requests),
- std::end(requests),
- [this, &batch_read_blobs](auto const& request) {
- return WithRetry(
- [&request, &batch_read_blobs]() {
- return batch_read_blobs(request);
- },
- retry_config_,
- logger_);
- })) {
+ return {
+ .ok = false,
+ .exit_retry_loop = batch_response.exit_retry_loop,
+ .error_msg = batch_response.error_msg};
+ }
+ auto exit_retry_loop =
+ status.error_code() != grpc::StatusCode::UNAVAILABLE;
+ return {
+ .ok = false,
+ .exit_retry_loop = exit_retry_loop,
+ .error_msg = StatusString(status, "BatchReadBlobs")};
+ },
+ retry_config_,
+ logger_);
+ has_failure = has_failure or retry_result;
+ }
+ if (has_failure) {
logger_.Emit(LogLevel::Error, "Failed to BatchReadBlobs.");
}
} catch (...) {
@@ -653,53 +664,6 @@ auto BazelCasClient::BatchUpdateBlobs(std::string const& instance_name,
return updated.size();
}
-template <typename TRequest, typename TForwardIter>
-auto BazelCasClient::CreateBatchRequestsMaxSize(
- std::string const& instance_name,
- TForwardIter const& first,
- TForwardIter const& last,
- std::string const& heading,
- std::function<void(TRequest*,
- typename TForwardIter::value_type const&)> const&
- request_builder) const noexcept -> std::vector<TRequest> {
- if (first == last) {
- return {};
- }
- std::vector<TRequest> result;
- TRequest accumulating_request;
- std::for_each(
- first,
- last,
- [&instance_name, &accumulating_request, &result, &request_builder](
- auto const& blob) {
- TRequest request;
- request.set_instance_name(instance_name);
- request_builder(&request, blob);
- if (accumulating_request.ByteSizeLong() + request.ByteSizeLong() >
- MessageLimits::kMaxGrpcLength) {
- result.emplace_back(std::move(accumulating_request));
- accumulating_request = std::move(request);
- }
- else {
- accumulating_request.MergeFrom(request);
- }
- });
- result.emplace_back(std::move(accumulating_request));
- logger_.Emit(LogLevel::Trace, [&heading, &result]() {
- std::ostringstream oss{};
- std::size_t count{0};
- oss << heading << " - Request sizes:" << std::endl;
- std::for_each(
- result.begin(), result.end(), [&oss, &count](auto const& request) {
- oss << fmt::format(
- " {}: {} bytes", ++count, request.ByteSizeLong())
- << std::endl;
- });
- return oss.str();
- });
- return result;
-}
-
auto BazelCasClient::CreateGetTreeRequest(
std::string const& instance_name,
bazel_re::Digest const& root_digest,
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp
index ee03edad..6e2f928b 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp
@@ -146,16 +146,6 @@ class BazelCasClient {
std::unique_ptr<bazel_re::ContentAddressableStorage::Stub> stub_;
Logger logger_{"RemoteCasClient"};
- template <typename TRequest, typename TForwardIter>
- [[nodiscard]] auto CreateBatchRequestsMaxSize(
- std::string const& instance_name,
- TForwardIter const& first,
- TForwardIter const& last,
- std::string const& heading,
- std::function<void(TRequest*,
- typename TForwardIter::value_type const&)> const&
- request_builder) const noexcept -> std::vector<TRequest>;
-
[[nodiscard]] static auto CreateGetTreeRequest(
std::string const& instance_name,
bazel_re::Digest const& root_digest,