summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlberto Sartori <alberto.sartori@huawei.com>2023-12-20 14:53:28 +0100
committerAlberto Sartori <alberto.sartori@huawei.com>2023-12-21 10:11:11 +0100
commitf3bee019b87b18cc746f65db0d9bff836ec78b13 (patch)
treefdda4468402ebb1e9d174903cec008aea460ee80 /src
parent1b19effe3ed9c35e03686fc1c92a196d73119b62 (diff)
downloadjustbuild-f3bee019b87b18cc746f65db0d9bff836ec78b13.tar.gz
BazelCasClient: split DoBatchUploadBlobs into multiple calls...
...to honor the message limit imposed by GRPC.
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp86
1 files changed, 45 insertions, 41 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
index cb8b2bd4..70ff27c4 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
@@ -402,14 +402,26 @@ auto BazelCasClient::DoBatchUpdateBlobs(std::string const& instance_name,
T_OutputIter const& start,
T_OutputIter const& end) noexcept
-> std::vector<bazel_re::Digest> {
- auto request = CreateUpdateBlobsRequest(instance_name, start, end);
-
- bazel_re::BatchUpdateBlobsResponse response;
- std::vector<bazel_re::Digest> result{};
+ std::vector<bazel_re::Digest> result;
+ if (start == end) {
+ return result;
+ }
try {
+ auto requests =
+ CreateBatchRequestsMaxSize<bazel_re::BatchUpdateBlobsRequest>(
+ instance_name,
+ start,
+ end,
+ "BatchUpdateBlobs",
+ [this](bazel_re::BatchUpdateBlobsRequest* request,
+ BazelBlob const& x) {
+ *(request->add_requests()) =
+ this->CreateUpdateBlobsSingleRequest(x);
+ });
+ result.reserve(std::distance(start, end));
auto batch_update_blobs =
- [this, &response, &request, &result, &start, &end, &instance_name]()
- -> RetryResponse {
+ [this, &result](auto const& request) -> RetryResponse {
+ bazel_re::BatchUpdateBlobsResponse response;
grpc::ClientContext context;
auto status = stub_->BatchUpdateBlobs(&context, request, &response);
if (status.ok()) {
@@ -421,57 +433,49 @@ auto BazelCasClient::DoBatchUpdateBlobs(std::string const& instance_name,
bazel_re::BatchUpdateBlobsResponse_Response const& r) {
v->push_back(r.digest());
});
- // todo: check status of each response
if (batch_response.ok) {
- result = std::move(batch_response.result);
+ std::move(std::begin(batch_response.result),
+ std::end(batch_response.result),
+ std::back_inserter(result));
return {.ok = true};
}
return {.ok = false,
.exit_retry_loop = batch_response.exit_retry_loop,
.error_msg = batch_response.error_msg};
}
- if (status.error_code() == grpc::StatusCode::RESOURCE_EXHAUSTED) {
- LogStatus(&logger_, LogLevel::Progress, status);
- logger_.Emit(LogLevel::Progress,
- "Falling back to single blob transfers");
- auto current = start;
- while (current != end) {
- if (UpdateSingleBlob(instance_name, (*current))) {
- result.emplace_back((*current).digest);
- }
- else {
- // just retry
- return {.ok = false};
- }
- ++current;
- }
- return {.ok = true};
- }
return {.ok = false,
.exit_retry_loop =
status.error_code() != grpc::StatusCode::UNAVAILABLE,
- .error_msg = status.error_message()};
+ .error_msg = StatusString(status, "BatchUpdateBlobs")};
};
- if (not WithRetry(batch_update_blobs, logger_)) {
+ if (not std::all_of(std::begin(requests),
+ std::end(requests),
+ [this, &batch_update_blobs](auto const& request) {
+ return WithRetry(
+ [&request, &batch_update_blobs]() {
+ return batch_update_blobs(request);
+ },
+ logger_);
+ })) {
logger_.Emit(LogLevel::Error, "Failed to BatchUpdateBlobs.");
}
-
- logger_.Emit(LogLevel::Trace, [&start, &end, &result]() {
- std::ostringstream oss{};
- oss << "upload blobs" << std::endl;
- std::for_each(start, end, [&oss](auto const& blob) {
- oss << fmt::format(" - {}", blob.digest.hash()) << std::endl;
- });
- oss << "received blobs" << std::endl;
- std::for_each(
- result.cbegin(), result.cend(), [&oss](auto const& digest) {
- oss << fmt::format(" - {}", digest.hash()) << std::endl;
- });
- return oss.str();
- });
} catch (...) {
logger_.Emit(LogLevel::Error, "Caught exception in DoBatchUpdateBlobs");
}
+ logger_.Emit(LogLevel::Trace, [&start, &end, &result]() {
+ std::ostringstream oss{};
+ oss << "upload blobs" << std::endl;
+ std::for_each(start, end, [&oss](auto const& blob) {
+ oss << fmt::format(" - {}", blob.digest.hash()) << std::endl;
+ });
+ oss << "received blobs" << std::endl;
+ std::for_each(
+ result.cbegin(), result.cend(), [&oss](auto const& digest) {
+ oss << fmt::format(" - {}", digest.hash()) << std::endl;
+ });
+ return oss.str();
+ });
+
return result;
}