diff options
author | Alberto Sartori <alberto.sartori@huawei.com> | 2023-02-27 10:27:52 +0100 |
---|---|---|
committer | Alberto Sartori <alberto.sartori@huawei.com> | 2023-03-10 09:38:39 +0100 |
commit | 55ba09ec97d2449b39d7fcc38c346969168d899b (patch) | |
tree | 4c97affeaae2e5bb88a41be6d389b2502bae6e24 /src/buildtool/execution_api/execution_service/execution_server.cpp | |
parent | 117a1dbf099d93dfe044971f90203a5d8d1975b4 (diff) | |
download | justbuild-55ba09ec97d2449b39d7fcc38c346969168d899b.tar.gz |
execution service: implement WaitExecution and google::longrunning::Operations::GetOperation
For each action that is executed, an entry is added to a shared thread
safe cache. Once the number of operations stored exceeds twice 2^n,
where n is given by the option --log-operations-threshold, at most 2^n
operations will be removed, in a FIFO scheme.
Diffstat (limited to 'src/buildtool/execution_api/execution_service/execution_server.cpp')
-rw-r--r-- | src/buildtool/execution_api/execution_service/execution_server.cpp | 156 |
1 files changed, 90 insertions, 66 deletions
diff --git a/src/buildtool/execution_api/execution_service/execution_server.cpp b/src/buildtool/execution_api/execution_service/execution_server.cpp index ab0ebfc9..5f21f5ac 100644 --- a/src/buildtool/execution_api/execution_service/execution_server.cpp +++ b/src/buildtool/execution_api/execution_service/execution_server.cpp @@ -16,26 +16,34 @@ #include <algorithm> #include <fstream> -#include <iostream> #include <string> #include <unordered_map> #include <utility> +#include "execution_server.hpp" #include "fmt/format.h" #include "gsl-lite/gsl-lite.hpp" -#include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/execution_api/execution_service/operation_cache.hpp" #include "src/buildtool/execution_api/local/garbage_collector.hpp" #include "src/buildtool/file_system/file_system_manager.hpp" +static void UpdateTimeStamp(::google::longrunning::Operation* op) { + ::google::protobuf::Timestamp t; + t.set_seconds( + std::chrono::duration_cast<std::chrono::seconds>( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + op->mutable_metadata()->PackFrom(t); +} + auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request) const noexcept -> std::pair<std::optional<::bazel_re::Action>, std::optional<std::string>> { // get action description auto path = storage_.BlobPath(request->action_digest(), false); if (!path) { - auto str = fmt::format( - "could not retrieve blob {} from cas", - NativeSupport::Unprefix(request->action_digest().hash())); + auto str = fmt::format("could not retrieve blob {} from cas", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -43,9 +51,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request) { std::ifstream f(*path); if (!action.ParseFromIstream(&f)) { - auto str = fmt::format( - "failed to parse action from blob {}", - NativeSupport::Unprefix(request->action_digest().hash())); + auto str = fmt::format("failed to parse action from blob {}", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -56,9 +63,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request) : storage_.TreePath(action.input_root_digest()); if (!path) { - auto str = fmt::format( - "could not retrieve input root {} from cas", - NativeSupport::Unprefix(action.input_root_digest().hash())); + auto str = fmt::format("could not retrieve input root {} from cas", + action.input_root_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -71,9 +77,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action) auto path = storage_.BlobPath(action.command_digest(), false); if (!path) { - auto str = fmt::format( - "could not retrieve blob {} from cas", - NativeSupport::Unprefix(action.command_digest().hash())); + auto str = fmt::format("could not retrieve blob {} from cas", + action.command_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -82,9 +87,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action) { std::ifstream f(*path); if (!c.ParseFromIstream(&f)) { - auto str = fmt::format( - "failed to parse command from blob {}", - NativeSupport::Unprefix(action.command_digest().hash())); + auto str = fmt::format("failed to parse command from blob {}", + action.command_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -125,9 +129,8 @@ auto ExecutionServiceImpl::GetIExecutionAction( env_vars, {}); if (!i_execution_action) { - auto str = fmt::format( - "could not create action from {}", - NativeSupport::Unprefix(request->action_digest().hash())); + auto str = fmt::format("could not create action from {}", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -356,10 +359,8 @@ auto ExecutionServiceImpl::GetResponse( ::bazel_re::ExecuteResponse response{}; AddStatus(&response); - auto err = - AddResult(&response, - i_execution_response, - NativeSupport::Unprefix(request->action_digest().hash())); + auto err = AddResult( + &response, i_execution_response, request->action_digest().hash()); if (err) { return {std::nullopt, *err}; } @@ -367,42 +368,34 @@ auto ExecutionServiceImpl::GetResponse( return {response, std::nullopt}; } -auto ExecutionServiceImpl::WriteResponse( +auto ExecutionServiceImpl::StoreActionResult( ::bazel_re::ExecuteRequest const* request, IExecutionResponse::Ptr const& i_execution_response, - ::bazel_re::Action const& action, - ::grpc::ServerWriter<::google::longrunning::Operation>* writer) - const noexcept -> std::optional<std::string> { - - auto [execute_response, msg_r] = GetResponse(request, i_execution_response); - if (!execute_response) { - return *msg_r; - } - - // store action result + ::bazel_re::ExecuteResponse const& execute_response, + ::bazel_re::Action const& action) const noexcept + -> std::optional<std::string> { if (i_execution_response->ExitCode() == 0 && !action.do_not_cache() && !storage_.StoreActionResult(request->action_digest(), - execute_response->result())) { - auto str = fmt::format( - "Could not store action result for action {}", - NativeSupport::Unprefix(request->action_digest().hash())); + execute_response.result())) { + auto str = fmt::format("Could not store action result for action {}", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return str; } + return std::nullopt; +} +static void WriteResponse( + ::bazel_re::ExecuteResponse const& execute_response, + ::grpc::ServerWriter<::google::longrunning::Operation>* writer, + ::google::longrunning::Operation* op) noexcept { // send response to the client - auto op = ::google::longrunning::Operation{}; - op.mutable_response()->PackFrom(*execute_response); - op.set_name("just-remote-execution"); - op.set_done(true); - if (!writer->Write(op)) { - auto str = fmt::format( - "Could not write execution response for action {}", - NativeSupport::Unprefix(request->action_digest().hash())); - logger_.Emit(LogLevel::Error, str); - return str; - } - return std::nullopt; + op->mutable_response()->PackFrom(execute_response); + op->set_done(true); + UpdateTimeStamp(op); + + OperationCache::Set(op->name(), *op); + writer->Write(*op); } auto ExecutionServiceImpl::Execute( @@ -410,13 +403,13 @@ auto ExecutionServiceImpl::Execute( const ::bazel_re::ExecuteRequest* request, ::grpc::ServerWriter<::google::longrunning::Operation>* writer) -> ::grpc::Status { - auto lock = GarbageCollector::SharedLock(); if (!lock) { auto str = fmt::format("Could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } + auto [action, msg_a] = GetAction(request); if (!action) { return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_a}; @@ -426,26 +419,57 @@ auto ExecutionServiceImpl::Execute( return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg}; } - logger_.Emit(LogLevel::Info, - "Execute {}", - NativeSupport::Unprefix(request->action_digest().hash())); + logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash()); + // send initial response to the client + auto op = ::google::longrunning::Operation{}; + auto const& op_name = request->action_digest().hash(); + op.set_name(op_name); + op.set_done(false); + UpdateTimeStamp(&op); + OperationCache::Set(op_name, op); + writer->Write(op); + auto t0 = std::chrono::high_resolution_clock::now(); auto i_execution_response = i_execution_action->get()->Execute(&logger_); - logger_.Emit(LogLevel::Trace, - "Finished execution of {}", - NativeSupport::Unprefix(request->action_digest().hash())); - auto err = WriteResponse(request, i_execution_response, *action, writer); - if (err) { - return ::grpc::Status{grpc::StatusCode::INTERNAL, *err}; + auto t1 = std::chrono::high_resolution_clock::now(); + logger_.Emit( + LogLevel::Trace, + "Finished execution of {} in {} seconds", + request->action_digest().hash(), + std::chrono::duration_cast<std::chrono::seconds>(t1 - t0).count()); + + auto [execute_response, msg_r] = GetResponse(request, i_execution_response); + if (!execute_response) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_r}; + } + + auto err_s = StoreActionResult( + request, i_execution_response, *execute_response, *action); + if (err_s) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, *err_s}; } + WriteResponse(*execute_response, writer, &op); return ::grpc::Status::OK; } auto ExecutionServiceImpl::WaitExecution( ::grpc::ServerContext* /*context*/, - const ::bazel_re::WaitExecutionRequest* /*request*/, - ::grpc::ServerWriter<::google::longrunning::Operation>* /*writer*/) + const ::bazel_re::WaitExecutionRequest* request, + ::grpc::ServerWriter<::google::longrunning::Operation>* writer) -> ::grpc::Status { - auto const* str = "WaitExecution not implemented"; - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; + auto const& hash = request->name(); + logger_.Emit(LogLevel::Trace, "WaitExecution: {}", hash); + std::optional<::google::longrunning::Operation> op; + do { + op = OperationCache::Query(hash); + if (!op) { + auto const& str = fmt::format( + "Executing action {} not found in internal cache.", hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } while (!op->done()); + writer->Write(*op); + logger_.Emit(LogLevel::Trace, "Finished WaitExecution {}", hash); + return ::grpc::Status::OK; } |