diff options
7 files changed, 268 insertions, 115 deletions
diff --git a/src/buildtool/execution_api/remote/TARGETS b/src/buildtool/execution_api/remote/TARGETS index 4e02ca89..a82bb6f7 100644 --- a/src/buildtool/execution_api/remote/TARGETS +++ b/src/buildtool/execution_api/remote/TARGETS @@ -46,6 +46,7 @@ , ["src/buildtool/compatibility", "compatibility"] , ["src/buildtool/crypto", "hash_function"] , ["@", "grpc", "", "grpc++"] + , ["src/buildtool/common/remote", "retry"] ] } , "bazel": diff --git a/src/buildtool/execution_api/remote/bazel/bazel_ac_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_ac_client.cpp index 2a478271..78675073 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_ac_client.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_ac_client.cpp @@ -17,6 +17,7 @@ #include "gsl/gsl" #include "src/buildtool/common/bazel_types.hpp" #include "src/buildtool/common/remote/client_common.hpp" +#include "src/buildtool/common/remote/retry.hpp" BazelAcClient::BazelAcClient(std::string const& server, Port port) noexcept { stub_ = bazel_re::ActionCache::NewStub( @@ -40,17 +41,20 @@ auto BazelAcClient::GetActionResult( inline_output_files.end(), pb::back_inserter(request.mutable_inline_output_files())); - grpc::ClientContext context; bazel_re::ActionResult response; - grpc::Status status = stub_->GetActionResult(&context, request, &response); - - if (not status.ok()) { + auto [ok, status] = WithRetry( + [this, &response, &request]() { + grpc::ClientContext context; + return stub_->GetActionResult(&context, request, &response); + }, + logger_); + if (not ok) { if (status.error_code() == grpc::StatusCode::NOT_FOUND) { logger_.Emit( LogLevel::Debug, "cache miss '{}'", status.error_message()); } else { - LogStatus(&logger_, LogLevel::Debug, status); + LogStatus(&logger_, LogLevel::Error, status); } return std::nullopt; } diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp index ae6d6da1..181a6b8f 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp @@ -23,6 +23,7 @@ #include "gsl/gsl" #include "src/buildtool/common/bazel_types.hpp" #include "src/buildtool/common/remote/client_common.hpp" +#include "src/buildtool/common/remote/retry.hpp" #include "src/buildtool/compatibility/native_support.hpp" #include "src/buildtool/crypto/hash_function.hpp" #include "src/buildtool/execution_api/common/execution_common.hpp" @@ -155,25 +156,47 @@ auto BazelCasClient::BatchReadBlobs( auto request = CreateRequest<bazel_re::BatchReadBlobsRequest, bazel_re::Digest>( instance_name, begin, end); - grpc::ClientContext context; bazel_re::BatchReadBlobsResponse response; - grpc::Status status = stub_->BatchReadBlobs(&context, request, &response); - std::vector<BazelBlob> result{}; - if (status.ok()) { - result = - ProcessBatchResponse<BazelBlob, - bazel_re::BatchReadBlobsResponse_Response>( - response, - [](std::vector<BazelBlob>* v, - bazel_re::BatchReadBlobsResponse_Response const& r) { - v->emplace_back(r.digest(), r.data(), /*is_exec=*/false); - }); - } - else { - LogStatus(&logger_, LogLevel::Debug, status); + try { + auto batch_read_blobs = + [this, &response, &request, &result]() -> RetryResponse { + grpc::ClientContext context; + auto status = stub_->BatchReadBlobs(&context, request, &response); + if (status.ok()) { + auto batch_response = ProcessBatchResponse< + BazelBlob, + bazel_re::BatchReadBlobsResponse_Response>( + response, + [](std::vector<BazelBlob>* v, + bazel_re::BatchReadBlobsResponse_Response const& r) { + v->emplace_back( + r.digest(), r.data(), /*is_exec=*/false); + }); + if (batch_response.ok) { + result = std::move(batch_response.result); + return {.ok = true}; + } + return {.ok = false, + .exit_retry_loop = batch_response.exit_retry_loop, + .error_msg = batch_response.error_msg}; + } + auto exit_retry_loop = + status.error_code() != grpc::StatusCode::UNAVAILABLE; + return { + .ok = false, + .exit_retry_loop = exit_retry_loop, + .error_msg = fmt::format("{}: {}", + static_cast<int>(status.error_code()), + status.error_message())}; + }; + + if (not WithRetry(batch_read_blobs, logger_)) { + logger_.Emit(LogLevel::Error, "Failed to BatchReadBlobs"); + } + } catch (...) { + logger_.Emit(LogLevel::Error, "Caught exception in BatchReadBlobs"); } - return result; } @@ -206,7 +229,7 @@ auto BazelCasClient::GetTree(std::string const& instance_name, auto status = stream->Finish(); if (not status.ok()) { - LogStatus(&logger_, LogLevel::Debug, status); + LogStatus(&logger_, LogLevel::Error, status); } return result; @@ -273,15 +296,18 @@ auto BazelCasClient::SplitBlob(std::string const& instance_name, if (not BlobSplitSupportCached(instance_name, stub_, &logger_)) { return std::nullopt; } - grpc::ClientContext context{}; bazel_re::SplitBlobRequest request{}; request.set_instance_name(instance_name); request.mutable_blob_digest()->CopyFrom(digest); bazel_re::SplitBlobResponse response{}; - grpc::Status status = stub_->SplitBlob(&context, request, &response); - std::vector<bazel_re::Digest> result{}; - if (not status.ok()) { - LogStatus(&logger_, LogLevel::Debug, status); + auto [ok, status] = WithRetry( + [this, &response, &request]() { + grpc::ClientContext context; + return stub_->SplitBlob(&context, request, &response); + }, + logger_); + if (not ok) { + LogStatus(&logger_, LogLevel::Error, status); return std::nullopt; } return ProcessResponseContents<bazel_re::Digest>(response); @@ -296,16 +322,19 @@ auto BazelCasClient::FindMissingBlobs(std::string const& instance_name, CreateRequest<bazel_re::FindMissingBlobsRequest, bazel_re::Digest>( instance_name, start, end); - grpc::ClientContext context; bazel_re::FindMissingBlobsResponse response; - grpc::Status status = stub_->FindMissingBlobs(&context, request, &response); - + auto [ok, status] = WithRetry( + [this, &response, &request]() { + grpc::ClientContext context; + return stub_->FindMissingBlobs(&context, request, &response); + }, + logger_); std::vector<bazel_re::Digest> result{}; - if (status.ok()) { + if (ok) { result = ProcessResponseContents<bazel_re::Digest>(response); } else { - LogStatus(&logger_, LogLevel::Debug, status); + LogStatus(&logger_, LogLevel::Error, status); } logger_.Emit(LogLevel::Trace, [&start, &end, &result]() { @@ -332,50 +361,74 @@ auto BazelCasClient::DoBatchUpdateBlobs(std::string const& instance_name, -> std::vector<bazel_re::Digest> { auto request = CreateUpdateBlobsRequest(instance_name, start, end); - grpc::ClientContext context; bazel_re::BatchUpdateBlobsResponse response; - grpc::Status status = stub_->BatchUpdateBlobs(&context, request, &response); - std::vector<bazel_re::Digest> result{}; - if (status.ok()) { - result = - ProcessBatchResponse<bazel_re::Digest, - bazel_re::BatchUpdateBlobsResponse_Response>( - response, - [](std::vector<bazel_re::Digest>* v, - bazel_re::BatchUpdateBlobsResponse_Response const& r) { - v->push_back(r.digest()); - }); - } - else { - LogStatus(&logger_, LogLevel::Debug, status); - if (status.error_code() == grpc::StatusCode::RESOURCE_EXHAUSTED) { - logger_.Emit(LogLevel::Debug, - "Falling back to single blob transfers"); - auto current = start; - while (current != end) { - if (UpdateSingleBlob(instance_name, (*current))) { - result.emplace_back((*current).digest); + try { + auto batch_update_blobs = + [this, &response, &request, &result, &start, &end, &instance_name]() + -> RetryResponse { + grpc::ClientContext context; + auto status = stub_->BatchUpdateBlobs(&context, request, &response); + if (status.ok()) { + auto batch_response = ProcessBatchResponse< + bazel_re::Digest, + bazel_re::BatchUpdateBlobsResponse_Response>( + response, + [](std::vector<bazel_re::Digest>* v, + bazel_re::BatchUpdateBlobsResponse_Response const& r) { + v->push_back(r.digest()); + }); + // todo: check status of each response + if (batch_response.ok) { + result = std::move(batch_response.result); + return {.ok = true}; + } + return {.ok = false, + .exit_retry_loop = batch_response.exit_retry_loop, + .error_msg = batch_response.error_msg}; + } + if (status.error_code() == grpc::StatusCode::RESOURCE_EXHAUSTED) { + LogStatus(&logger_, LogLevel::Progress, status); + logger_.Emit(LogLevel::Progress, + "Falling back to single blob transfers"); + auto current = start; + while (current != end) { + if (UpdateSingleBlob(instance_name, (*current))) { + result.emplace_back((*current).digest); + } + else { + // just retry + return {.ok = false}; + } + ++current; } - ++current; + return {.ok = true}; } + return {.ok = false, + .exit_retry_loop = + status.error_code() != grpc::StatusCode::UNAVAILABLE, + .error_msg = status.error_message()}; + }; + if (not WithRetry(batch_update_blobs, logger_)) { + logger_.Emit(LogLevel::Error, "Failed to BatchUpdateBlobs."); } - } - logger_.Emit(LogLevel::Trace, [&start, &end, &result]() { - std::ostringstream oss{}; - oss << "upload blobs" << std::endl; - std::for_each(start, end, [&oss](auto const& blob) { - oss << fmt::format(" - {}", blob.digest.hash()) << std::endl; - }); - oss << "received blobs" << std::endl; - std::for_each( - result.cbegin(), result.cend(), [&oss](auto const& digest) { - oss << fmt::format(" - {}", digest.hash()) << std::endl; + logger_.Emit(LogLevel::Trace, [&start, &end, &result]() { + std::ostringstream oss{}; + oss << "upload blobs" << std::endl; + std::for_each(start, end, [&oss](auto const& blob) { + oss << fmt::format(" - {}", blob.digest.hash()) << std::endl; }); - return oss.str(); - }); - + oss << "received blobs" << std::endl; + std::for_each( + result.cbegin(), result.cend(), [&oss](auto const& digest) { + oss << fmt::format(" - {}", digest.hash()) << std::endl; + }); + return oss.str(); + }); + } catch (...) { + logger_.Emit(LogLevel::Error, "Caught exception in DoBatchUpdateBlobs"); + } return result; } @@ -490,18 +543,26 @@ template <class T_Content, class T_Inner, class T_Response> auto BazelCasClient::ProcessBatchResponse( T_Response const& response, std::function<void(std::vector<T_Content>*, T_Inner const&)> const& - inserter) const noexcept -> std::vector<T_Content> { + inserter) const noexcept -> RetryProcessBatchResponse<T_Content> { std::vector<T_Content> output; for (auto const& res : response.responses()) { + bazel_re::BatchUpdateBlobsResponse_Response r; auto const& res_status = res.status(); if (res_status.code() == static_cast<int>(grpc::StatusCode::OK)) { inserter(&output, res); } else { - LogStatus(&logger_, LogLevel::Debug, res_status); + auto exit_retry_loop = + (res_status.code() != + static_cast<int>(grpc::StatusCode::UNAVAILABLE)); + return { + .ok = false, + .exit_retry_loop = exit_retry_loop, + .error_msg = fmt::format("While processing batch response: {}", + res_status.ShortDebugString())}; } } - return output; + return {.ok = true, .result = std::move(output)}; } template <class T_Content, class T_Response> diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp index 99da5c3e..5f3060df 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.hpp @@ -177,11 +177,21 @@ class BazelCasClient { int page_size, std::string const& page_token) noexcept -> bazel_re::GetTreeRequest; + /// \brief Utility class for supporting the Retry strategy while parsing a + /// BatchResponse + template <typename T_Content> + struct RetryProcessBatchResponse { + bool ok{false}; + std::vector<T_Content> result{}; + bool exit_retry_loop{false}; + std::optional<std::string> error_msg{}; + }; + template <class T_Content, class T_Inner, class T_Response> auto ProcessBatchResponse( T_Response const& response, std::function<void(std::vector<T_Content>*, T_Inner const&)> const& - inserter) const noexcept -> std::vector<T_Content>; + inserter) const noexcept -> RetryProcessBatchResponse<T_Content>; template <class T_Content, class T_Response> auto ProcessResponseContents(T_Response const& response) const noexcept diff --git a/src/buildtool/execution_api/remote/bazel/bazel_execution_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_execution_client.cpp index e4505ece..e5d9e6b7 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_execution_client.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_execution_client.cpp @@ -17,6 +17,7 @@ #include "grpcpp/grpcpp.h" #include "gsl/gsl" #include "src/buildtool/common/remote/client_common.hpp" +#include "src/buildtool/common/remote/retry.hpp" namespace bazel_re = build::bazel::remote::execution::v2; @@ -33,17 +34,23 @@ void LogExecutionStatus(gsl::not_null<Logger const*> const& logger, // Due to a transient condition, such as all workers being occupied // (and the server does not support a queue), the action could not // be started. The client should retry. - logger->Emit(LogLevel::Error, + logger->Emit(LogLevel::Debug, fmt::format("Execution could not be started.\n{}", - s.DebugString())); + s.ShortDebugString())); break; default: // fallback to default status logging - LogStatus(logger, LogLevel::Warning, s); + LogStatus(logger, LogLevel::Error, s); break; } } +auto DebugString(grpc::Status const& status) -> std::string { + return fmt::format("{}: {}", + static_cast<int>(status.error_code()), + status.error_message()); +} + } // namespace BazelExecutionClient::BazelExecutionClient(std::string const& server, @@ -71,12 +78,33 @@ auto BazelExecutionClient::Execute(std::string const& instance_name, gsl::owner<bazel_re::Digest*>{new bazel_re::Digest(action_digest)}); request.set_allocated_execution_policy(execution_policy.release()); request.set_allocated_results_cache_policy(results_cache_policy.release()); - - grpc::ClientContext context; - std::unique_ptr<grpc::ClientReader<google::longrunning::Operation>> reader( - stub_->Execute(&context, request)); - - return ExtractContents(ReadExecution(reader.get(), wait)); + BazelExecutionClient::ExecutionResponse response; + auto execute = [this, &request, wait, &response]() -> RetryResponse { + grpc::ClientContext context; + std::unique_ptr<grpc::ClientReader<google::longrunning::Operation>> + reader(stub_->Execute(&context, request)); + + auto [op, fatal, error_msg] = ReadExecution(reader.get(), wait); + if (!op.has_value()) { + return { + .ok = false, .exit_retry_loop = fatal, .error_msg = error_msg}; + } + auto contents = ExtractContents(std::move(op)); + response = contents.response; + if (response.state == ExecutionResponse::State::Finished) { + return {.ok = true}; + } + return {.ok = false, + .exit_retry_loop = + response.state != ExecutionResponse::State::Retry, + .error_msg = contents.error_msg}; + }; + if (not WithRetry(execute, logger_)) { + logger_.Emit(LogLevel::Error, + "Failed to execute action {}.", + action_digest.ShortDebugString()); + } + return response; } auto BazelExecutionClient::WaitExecution(std::string const& execution_handle) @@ -84,30 +112,56 @@ auto BazelExecutionClient::WaitExecution(std::string const& execution_handle) bazel_re::WaitExecutionRequest request; request.set_name(execution_handle); - grpc::ClientContext context; - std::unique_ptr<grpc::ClientReader<google::longrunning::Operation>> reader( - stub_->WaitExecution(&context, request)); + BazelExecutionClient::ExecutionResponse response; + + auto wait_execution = [this, &request, &response]() -> RetryResponse { + grpc::ClientContext context; + std::unique_ptr<grpc::ClientReader<google::longrunning::Operation>> + reader(stub_->WaitExecution(&context, request)); - return ExtractContents(ReadExecution(reader.get(), true)); + auto [op, fatal, error_msg] = + ReadExecution(reader.get(), /*wait=*/true); + if (!op.has_value()) { + return { + .ok = false, .exit_retry_loop = fatal, .error_msg = error_msg}; + } + auto contents = ExtractContents(std::move(op)); + response = contents.response; + if (response.state == ExecutionResponse::State::Finished) { + return {.ok = true}; + } + return {.ok = false, + .exit_retry_loop = + response.state != ExecutionResponse::State::Retry, + .error_msg = contents.error_msg}; + }; + if (not WithRetry(wait_execution, logger_)) { + logger_.Emit( + LogLevel::Error, "Failed to Execute action {}.", request.name()); + } + return response; } auto BazelExecutionClient::ReadExecution( grpc::ClientReader<google::longrunning::Operation>* reader, - bool wait) -> std::optional<google::longrunning::Operation> { + bool wait) -> RetryReadOperation { if (reader == nullptr) { - LogStatus( - &logger_, - LogLevel::Error, - grpc::Status{grpc::StatusCode::UNKNOWN, "Reader unavailable"}); - return std::nullopt; + grpc::Status status{grpc::StatusCode::UNKNOWN, "Reader unavailable"}; + LogStatus(&logger_, LogLevel::Error, status); + return {.operation = std::nullopt, + .exit_retry_loop = true, + .error_msg = DebugString(status)}; } google::longrunning::Operation operation; if (not reader->Read(&operation)) { grpc::Status status = reader->Finish(); - // TODO(vmoreno): log error using data in status and operation - LogStatus(&logger_, LogLevel::Error, status); - return std::nullopt; + auto exit_retry_loop = + status.error_code() != grpc::StatusCode::UNAVAILABLE; + LogStatus(&logger_, + (exit_retry_loop ? LogLevel::Error : LogLevel::Debug), + status); + return {std::nullopt, exit_retry_loop, DebugString(status)}; } // Important note: do not call reader->Finish() unless reader->Read() // returned false, otherwise the thread will be never released @@ -116,33 +170,40 @@ auto BazelExecutionClient::ReadExecution( } grpc::Status status = reader->Finish(); if (not status.ok()) { - // TODO(vmoreno): log error from status and operation - LogStatus(&logger_, LogLevel::Error, status); - return std::nullopt; + auto exit_retry_loop = + status.error_code() != grpc::StatusCode::UNAVAILABLE; + LogStatus(&logger_, + (exit_retry_loop ? LogLevel::Error : LogLevel::Debug), + status); + return {std::nullopt, exit_retry_loop, DebugString(status)}; } } - return operation; + return {.operation = operation, .exit_retry_loop = false}; } auto BazelExecutionClient::ExtractContents( std::optional<google::longrunning::Operation>&& operation) - -> BazelExecutionClient::ExecutionResponse { + -> RetryExtractContents { if (not operation) { // Error was already logged in ReadExecution() - return ExecutionResponse::MakeEmptyFailed(); + return {ExecutionResponse::MakeEmptyFailed(), std::nullopt}; } auto op = *operation; ExecutionResponse response; response.execution_handle = op.name(); if (not op.done()) { response.state = ExecutionResponse::State::Ongoing; - return response; + return {response, std::nullopt}; } if (op.has_error()) { - // TODO(vmoreno): log error from google::rpc::Status s = op.error() LogStatus(&logger_, LogLevel::Debug, op.error()); - response.state = ExecutionResponse::State::Failed; - return response; + if (op.error().code() == grpc::StatusCode::UNAVAILABLE) { + response.state = ExecutionResponse::State::Retry; + } + else { + response.state = ExecutionResponse::State::Failed; + } + return {response, op.error().ShortDebugString()}; } // Get execution response Unpacked from Protobufs Any type to the actual @@ -150,18 +211,23 @@ auto BazelExecutionClient::ExtractContents( auto const& raw_response = op.response(); if (not raw_response.Is<bazel_re::ExecuteResponse>()) { // Fatal error, the type should be correct + logger_.Emit(LogLevel::Error, "Corrupted ExecuteResponse"); response.state = ExecutionResponse::State::Failed; - return response; + return {response, "Corrupted ExecuteResponse"}; } bazel_re::ExecuteResponse exec_response; raw_response.UnpackTo(&exec_response); - - if (exec_response.status().code() != grpc::StatusCode::OK) { - // For now, treat all execution errors (e.g., action timeout) as fatal. + auto status_code = exec_response.status().code(); + if (status_code != grpc::StatusCode::OK) { LogExecutionStatus(&logger_, exec_response.status()); - response.state = ExecutionResponse::State::Failed; - return response; + if (status_code == grpc::StatusCode::UNAVAILABLE) { + response.state = ExecutionResponse::State::Retry; + } + else { + response.state = ExecutionResponse::State::Failed; + } + return {response, exec_response.status().ShortDebugString()}; } ExecutionOutput output; @@ -172,5 +238,5 @@ auto BazelExecutionClient::ExtractContents( response.output = output; response.state = ExecutionResponse::State::Finished; - return response; + return {response, std::nullopt}; } diff --git a/src/buildtool/execution_api/remote/bazel/bazel_execution_client.hpp b/src/buildtool/execution_api/remote/bazel/bazel_execution_client.hpp index 31ee1144..c258d833 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_execution_client.hpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_execution_client.hpp @@ -41,7 +41,7 @@ class BazelExecutionClient { }; struct ExecutionResponse { - enum class State { Failed, Ongoing, Finished, Unknown }; + enum class State { Failed, Ongoing, Finished, Unknown, Retry }; std::string execution_handle{}; State state{State::Unknown}; @@ -67,14 +67,24 @@ class BazelExecutionClient { private: std::unique_ptr<bazel_re::Execution::Stub> stub_; Logger logger_{"RemoteExecutionClient"}; + struct RetryReadOperation { + std::optional<google::longrunning::Operation> operation{std::nullopt}; + bool exit_retry_loop{false}; + std::optional<std::string> error_msg{std::nullopt}; + }; + + struct RetryExtractContents { + ExecutionResponse response; + std::optional<std::string> error_msg{std::nullopt}; + }; [[nodiscard]] auto ReadExecution( grpc::ClientReader<google::longrunning::Operation>* reader, - bool wait) -> std::optional<google::longrunning::Operation>; + bool wait) -> RetryReadOperation; [[nodiscard]] auto ExtractContents( std::optional<google::longrunning::Operation>&& operation) - -> ExecutionResponse; + -> RetryExtractContents; }; #endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BAZEL_EXECUTION_CLIENT_HPP diff --git a/src/buildtool/execution_api/remote/bazel/bazel_network.cpp b/src/buildtool/execution_api/remote/bazel/bazel_network.cpp index ef3837d3..95f88195 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_network.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_network.cpp @@ -204,6 +204,7 @@ auto BazelNetwork::DoUploadBlobs(T_Iter const& first, return true; } } catch (...) { + Logger::Log(LogLevel::Warning, "unknonwn exception"); } Logger::Log(LogLevel::Warning, "Failed to update all blobs"); |