diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp | 166 | ||||
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp | 10 |
2 files changed, 65 insertions, 111 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp index fe84bced..c5e532b0 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp @@ -219,63 +219,74 @@ auto BazelCasClient::BatchReadBlobs( return result; } + auto request_creator = [&instance_name](bazel_re::Digest const& digest) { + bazel_re::BatchReadBlobsRequest request; + request.set_instance_name(instance_name); + *request.add_digests() = digest; + return request; + }; + try { result.reserve(blobs.size()); - auto requests = - CreateBatchRequestsMaxSize<bazel_re::BatchReadBlobsRequest>( - instance_name, - back_map->GetKeys().begin(), - back_map->GetKeys().end(), - "BatchReadBlobs", - [](bazel_re::BatchReadBlobsRequest* request, - bazel_re::Digest const& x) { - *(request->add_digests()) = x; - }); - bazel_re::BatchReadBlobsResponse response; - auto batch_read_blobs = [this, &response, &result, &back_map]( - auto const& request) -> RetryResponse { - grpc::ClientContext context; - auto status = stub_->BatchReadBlobs(&context, request, &response); - if (status.ok()) { - auto batch_response = ProcessBatchResponse< - ArtifactBlob, - bazel_re::BatchReadBlobsResponse_Response, - bazel_re::BatchReadBlobsResponse>( - response, - [&back_map]( - std::vector<ArtifactBlob>* v, - bazel_re::BatchReadBlobsResponse_Response const& r) { - if (auto value = back_map->GetReference(r.digest())) { - v->emplace_back( - *value.value(), r.data(), /*is_exec=*/false); + bool has_failure = false; + for (auto it = back_map->GetKeys().begin(); + it != back_map->GetKeys().end();) { + bazel_re::BatchReadBlobsRequest request; + it = InitRequest(&request, + request_creator, + it, + back_map->GetKeys().end(), + MessageLimits::kMaxGrpcLength); + logger_.Emit(LogLevel::Trace, + "BatchReadBlobs - Request size: {} bytes\n", + request.ByteSizeLong()); + + bool const retry_result = WithRetry( + [this, &request, &result, &back_map]() -> RetryResponse { + bazel_re::BatchReadBlobsResponse response; + grpc::ClientContext context; + auto status = + stub_->BatchReadBlobs(&context, request, &response); + if (status.ok()) { + auto batch_response = ProcessBatchResponse< + ArtifactBlob, + bazel_re::BatchReadBlobsResponse_Response, + bazel_re::BatchReadBlobsResponse>( + response, + [&back_map]( + std::vector<ArtifactBlob>* v, + bazel_re::BatchReadBlobsResponse_Response const& + r) { + if (auto value = + back_map->GetReference(r.digest())) { + v->emplace_back(*value.value(), + r.data(), + /*is_exec=*/false); + } + }); + if (batch_response.ok) { + std::move(batch_response.result.begin(), + batch_response.result.end(), + std::inserter(result, result.end())); + return {.ok = true}; } - }); - if (batch_response.ok) { - std::move(batch_response.result.begin(), - batch_response.result.end(), - std::inserter(result, result.end())); - return {.ok = true}; - } - return {.ok = false, - .exit_retry_loop = batch_response.exit_retry_loop, - .error_msg = batch_response.error_msg}; - } - auto exit_retry_loop = - status.error_code() != grpc::StatusCode::UNAVAILABLE; - return {.ok = false, - .exit_retry_loop = exit_retry_loop, - .error_msg = StatusString(status, "BatchReadBlobs")}; - }; - if (not std::all_of(std::begin(requests), - std::end(requests), - [this, &batch_read_blobs](auto const& request) { - return WithRetry( - [&request, &batch_read_blobs]() { - return batch_read_blobs(request); - }, - retry_config_, - logger_); - })) { + return { + .ok = false, + .exit_retry_loop = batch_response.exit_retry_loop, + .error_msg = batch_response.error_msg}; + } + auto exit_retry_loop = + status.error_code() != grpc::StatusCode::UNAVAILABLE; + return { + .ok = false, + .exit_retry_loop = exit_retry_loop, + .error_msg = StatusString(status, "BatchReadBlobs")}; + }, + retry_config_, + logger_); + has_failure = has_failure or retry_result; + } + if (has_failure) { logger_.Emit(LogLevel::Error, "Failed to BatchReadBlobs."); } } catch (...) { @@ -653,53 +664,6 @@ auto BazelCasClient::BatchUpdateBlobs(std::string const& instance_name, return updated.size(); } -template <typename TRequest, typename TForwardIter> -auto BazelCasClient::CreateBatchRequestsMaxSize( - std::string const& instance_name, - TForwardIter const& first, - TForwardIter const& last, - std::string const& heading, - std::function<void(TRequest*, - typename TForwardIter::value_type const&)> const& - request_builder) const noexcept -> std::vector<TRequest> { - if (first == last) { - return {}; - } - std::vector<TRequest> result; - TRequest accumulating_request; - std::for_each( - first, - last, - [&instance_name, &accumulating_request, &result, &request_builder]( - auto const& blob) { - TRequest request; - request.set_instance_name(instance_name); - request_builder(&request, blob); - if (accumulating_request.ByteSizeLong() + request.ByteSizeLong() > - MessageLimits::kMaxGrpcLength) { - result.emplace_back(std::move(accumulating_request)); - accumulating_request = std::move(request); - } - else { - accumulating_request.MergeFrom(request); - } - }); - result.emplace_back(std::move(accumulating_request)); - logger_.Emit(LogLevel::Trace, [&heading, &result]() { - std::ostringstream oss{}; - std::size_t count{0}; - oss << heading << " - Request sizes:" << std::endl; - std::for_each( - result.begin(), result.end(), [&oss, &count](auto const& request) { - oss << fmt::format( - " {}: {} bytes", ++count, request.ByteSizeLong()) - << std::endl; - }); - return oss.str(); - }); - return result; -} - auto BazelCasClient::CreateGetTreeRequest( std::string const& instance_name, bazel_re::Digest const& root_digest, diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp index ee03edad..6e2f928b 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp @@ -146,16 +146,6 @@ class BazelCasClient { std::unique_ptr<bazel_re::ContentAddressableStorage::Stub> stub_; Logger logger_{"RemoteCasClient"}; - template <typename TRequest, typename TForwardIter> - [[nodiscard]] auto CreateBatchRequestsMaxSize( - std::string const& instance_name, - TForwardIter const& first, - TForwardIter const& last, - std::string const& heading, - std::function<void(TRequest*, - typename TForwardIter::value_type const&)> const& - request_builder) const noexcept -> std::vector<TRequest>; - [[nodiscard]] static auto CreateGetTreeRequest( std::string const& instance_name, bazel_re::Digest const& root_digest, |