diff options
author | Alberto Sartori <alberto.sartori@huawei.com> | 2023-12-20 14:53:28 +0100 |
---|---|---|
committer | Alberto Sartori <alberto.sartori@huawei.com> | 2023-12-21 10:11:11 +0100 |
commit | f3bee019b87b18cc746f65db0d9bff836ec78b13 (patch) | |
tree | fdda4468402ebb1e9d174903cec008aea460ee80 /src | |
parent | 1b19effe3ed9c35e03686fc1c92a196d73119b62 (diff) | |
download | justbuild-f3bee019b87b18cc746f65db0d9bff836ec78b13.tar.gz |
BazelCasClient: split DoBatchUploadBlobs into multiple calls...
...to honor the message limit imposed by GRPC.
Diffstat (limited to 'src')
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp | 86 |
1 files changed, 45 insertions, 41 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp index cb8b2bd4..70ff27c4 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp @@ -402,14 +402,26 @@ auto BazelCasClient::DoBatchUpdateBlobs(std::string const& instance_name, T_OutputIter const& start, T_OutputIter const& end) noexcept -> std::vector<bazel_re::Digest> { - auto request = CreateUpdateBlobsRequest(instance_name, start, end); - - bazel_re::BatchUpdateBlobsResponse response; - std::vector<bazel_re::Digest> result{}; + std::vector<bazel_re::Digest> result; + if (start == end) { + return result; + } try { + auto requests = + CreateBatchRequestsMaxSize<bazel_re::BatchUpdateBlobsRequest>( + instance_name, + start, + end, + "BatchUpdateBlobs", + [this](bazel_re::BatchUpdateBlobsRequest* request, + BazelBlob const& x) { + *(request->add_requests()) = + this->CreateUpdateBlobsSingleRequest(x); + }); + result.reserve(std::distance(start, end)); auto batch_update_blobs = - [this, &response, &request, &result, &start, &end, &instance_name]() - -> RetryResponse { + [this, &result](auto const& request) -> RetryResponse { + bazel_re::BatchUpdateBlobsResponse response; grpc::ClientContext context; auto status = stub_->BatchUpdateBlobs(&context, request, &response); if (status.ok()) { @@ -421,57 +433,49 @@ auto BazelCasClient::DoBatchUpdateBlobs(std::string const& instance_name, bazel_re::BatchUpdateBlobsResponse_Response const& r) { v->push_back(r.digest()); }); - // todo: check status of each response if (batch_response.ok) { - result = std::move(batch_response.result); + std::move(std::begin(batch_response.result), + std::end(batch_response.result), + std::back_inserter(result)); return {.ok = true}; } return {.ok = false, .exit_retry_loop = batch_response.exit_retry_loop, .error_msg = batch_response.error_msg}; } - if (status.error_code() == grpc::StatusCode::RESOURCE_EXHAUSTED) { - LogStatus(&logger_, LogLevel::Progress, status); - logger_.Emit(LogLevel::Progress, - "Falling back to single blob transfers"); - auto current = start; - while (current != end) { - if (UpdateSingleBlob(instance_name, (*current))) { - result.emplace_back((*current).digest); - } - else { - // just retry - return {.ok = false}; - } - ++current; - } - return {.ok = true}; - } return {.ok = false, .exit_retry_loop = status.error_code() != grpc::StatusCode::UNAVAILABLE, - .error_msg = status.error_message()}; + .error_msg = StatusString(status, "BatchUpdateBlobs")}; }; - if (not WithRetry(batch_update_blobs, logger_)) { + if (not std::all_of(std::begin(requests), + std::end(requests), + [this, &batch_update_blobs](auto const& request) { + return WithRetry( + [&request, &batch_update_blobs]() { + return batch_update_blobs(request); + }, + logger_); + })) { logger_.Emit(LogLevel::Error, "Failed to BatchUpdateBlobs."); } - - logger_.Emit(LogLevel::Trace, [&start, &end, &result]() { - std::ostringstream oss{}; - oss << "upload blobs" << std::endl; - std::for_each(start, end, [&oss](auto const& blob) { - oss << fmt::format(" - {}", blob.digest.hash()) << std::endl; - }); - oss << "received blobs" << std::endl; - std::for_each( - result.cbegin(), result.cend(), [&oss](auto const& digest) { - oss << fmt::format(" - {}", digest.hash()) << std::endl; - }); - return oss.str(); - }); } catch (...) { logger_.Emit(LogLevel::Error, "Caught exception in DoBatchUpdateBlobs"); } + logger_.Emit(LogLevel::Trace, [&start, &end, &result]() { + std::ostringstream oss{}; + oss << "upload blobs" << std::endl; + std::for_each(start, end, [&oss](auto const& blob) { + oss << fmt::format(" - {}", blob.digest.hash()) << std::endl; + }); + oss << "received blobs" << std::endl; + std::for_each( + result.cbegin(), result.cend(), [&oss](auto const& digest) { + oss << fmt::format(" - {}", digest.hash()) << std::endl; + }); + return oss.str(); + }); + return result; } |