diff options
author | Alberto Sartori <alberto.sartori@huawei.com> | 2023-02-14 10:45:15 +0100 |
---|---|---|
committer | Alberto Sartori <alberto.sartori@huawei.com> | 2023-02-15 16:41:10 +0100 |
commit | ddf097f7a1c1bb438f1fc97150a263cf28be9293 (patch) | |
tree | 77a986d3c7b1143adc3bb71ce91ee72b6be41498 /src/buildtool/execution_api/execution_service/execution_server.cpp | |
parent | 1576b65e9365335aa7a2ed287eceaf93fbfac88a (diff) | |
download | justbuild-ddf097f7a1c1bb438f1fc97150a263cf28be9293.tar.gz |
ExecutionServiceImpl: refactor Execute
Diffstat (limited to 'src/buildtool/execution_api/execution_service/execution_server.cpp')
-rw-r--r-- | src/buildtool/execution_api/execution_service/execution_server.cpp | 306 |
1 files changed, 215 insertions, 91 deletions
diff --git a/src/buildtool/execution_api/execution_service/execution_server.cpp b/src/buildtool/execution_api/execution_service/execution_server.cpp index d3c05b94..2b2aa681 100644 --- a/src/buildtool/execution_api/execution_service/execution_server.cpp +++ b/src/buildtool/execution_api/execution_service/execution_server.cpp @@ -18,147 +18,271 @@ #include <fstream> #include <iostream> #include <string> +#include <utility> #include "fmt/format.h" #include "src/buildtool/execution_api/local/garbage_collector.hpp" -auto ExecutionServiceImpl::Execute( - ::grpc::ServerContext* /*context*/, - const ::build::bazel::remote::execution::v2::ExecuteRequest* request, - ::grpc::ServerWriter<::google::longrunning::Operation>* writer) - -> ::grpc::Status { - auto lock = GarbageCollector::SharedLock(); - if (!lock) { - auto str = fmt::format("Could not acquire SharedLock"); - logger_.Emit(LogLevel::Error, str); - return grpc::Status{grpc::StatusCode::INTERNAL, str}; - } +auto ExecutionServiceImpl::GetAction( + ::build::bazel::remote::execution::v2::ExecuteRequest const* request) + const noexcept + -> std::pair<std::optional<::build::bazel::remote::execution::v2::Action>, + std::optional<std::string>> { + // get action description auto path = storage_.BlobPath(request->action_digest(), false); if (!path) { - auto const& str = fmt::format("could not retrieve blob {} from cas", - request->action_digest().hash()); + auto str = fmt::format("could not retrieve blob {} from cas", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return {std::nullopt, str}; } - ::build::bazel::remote::execution::v2::Action a{}; + ::build::bazel::remote::execution::v2::Action action{}; { std::ifstream f(*path); - if (!a.ParseFromIstream(&f)) { - auto const& str = fmt::format("failed to parse action from blob {}", - request->action_digest().hash()); + if (!action.ParseFromIstream(&f)) { + auto str = fmt::format("failed to parse action from blob {}", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return {std::nullopt, str}; } } - path = storage_.BlobPath(a.command_digest(), false); + + path = Compatibility::IsCompatible() + ? storage_.BlobPath(action.input_root_digest(), false) + : storage_.TreePath(action.input_root_digest()); + if (!path) { - auto const& str = fmt::format("could not retrieve blob {} from cas", - a.command_digest().hash()); + auto str = fmt::format("could not retrieve input root {} from cas", + action.input_root_digest().hash()); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return {std::nullopt, str}; } + return {std::move(action), std::nullopt}; +} + +auto ExecutionServiceImpl::GetCommand( + ::build::bazel::remote::execution::v2::Action const& action) const noexcept + -> std::pair<std::optional<::build::bazel::remote::execution::v2::Command>, + std::optional<std::string>> { + + auto path = storage_.BlobPath(action.command_digest(), false); + if (!path) { + auto str = fmt::format("could not retrieve blob {} from cas", + action.command_digest().hash()); + logger_.Emit(LogLevel::Error, str); + return {std::nullopt, str}; + } + ::build::bazel::remote::execution::v2::Command c{}; { std::ifstream f(*path); if (!c.ParseFromIstream(&f)) { - auto const& str = - fmt::format("failed to parse command from blob {}", - a.command_digest().hash()); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + auto str = fmt::format("failed to parse command from blob {}", + action.command_digest().hash()); + logger_.Emit(LogLevel::Error, str); + return {std::nullopt, str}; } } - path = Compatibility::IsCompatible() - ? storage_.BlobPath(a.input_root_digest(), false) - : storage_.TreePath(a.input_root_digest()); + return {c, std::nullopt}; +} - if (!path) { - auto const& str = - fmt::format("could not retrieve input root {} from cas", - a.input_root_digest().hash()); - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; +static auto GetEnvVars(::build::bazel::remote::execution::v2::Command const& c) + -> std::map<std::string, std::string> { + std::map<std::string, std::string> env_vars{}; + std::transform(c.environment_variables().begin(), + c.environment_variables().end(), + std::inserter(env_vars, env_vars.begin()), + [](auto const& x) -> std::pair<std::string, std::string> { + return {x.name(), x.value()}; + }); + return env_vars; +} + +auto ExecutionServiceImpl::GetIExecutionAction( + ::build::bazel::remote::execution::v2::ExecuteRequest const* request, + ::build::bazel::remote::execution::v2::Action const& action) const + -> std::pair<std::optional<IExecutionAction::Ptr>, + std::optional<std::string>> { + + auto [c, msg_c] = GetCommand(action); + if (!c) { + return {std::nullopt, *msg_c}; } - auto op = ::google::longrunning::Operation{}; - op.set_name("just-remote-execution"); - std::map<std::string, std::string> env_vars; - for (auto const& x : c.environment_variables()) { - env_vars.emplace(x.name(), x.value()); - } - auto action = api_->CreateAction( - ArtifactDigest{a.input_root_digest()}, - {c.arguments().begin(), c.arguments().end()}, - {c.output_files().begin(), c.output_files().end()}, - {c.output_directories().begin(), c.output_directories().end()}, + + auto env_vars = GetEnvVars(*c); + + auto i_execution_action = api_->CreateAction( + ArtifactDigest{action.input_root_digest()}, + {c->arguments().begin(), c->arguments().end()}, + {c->output_files().begin(), c->output_files().end()}, + {c->output_directories().begin(), c->output_directories().end()}, env_vars, {}); - action->SetCacheFlag(a.do_not_cache() - ? IExecutionAction::CacheFlag::DoNotCacheOutput - : IExecutionAction::CacheFlag::CacheOutput); + if (!i_execution_action) { + auto str = fmt::format("could not create action from {}", + request->action_digest().hash()); + logger_.Emit(LogLevel::Error, str); + return {std::nullopt, str}; + } + i_execution_action->SetCacheFlag( + action.do_not_cache() ? IExecutionAction::CacheFlag::DoNotCacheOutput + : IExecutionAction::CacheFlag::CacheOutput); + return {std::move(i_execution_action), std::nullopt}; +} + +static void AddOutputPaths( + ::build::bazel::remote::execution::v2::ExecuteResponse* response, + IExecutionResponse::Ptr const& execution) noexcept { + auto const& size = static_cast<int>(execution->Artifacts().size()); + response->mutable_result()->mutable_output_files()->Reserve(size); + response->mutable_result()->mutable_output_directories()->Reserve(size); + + for (auto const& [path, info] : execution->Artifacts()) { + auto dgst = static_cast<::build::bazel::remote::execution::v2::Digest>( + info.digest); - logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash()); - auto tmp = action->Execute(&logger_); - logger_.Emit(LogLevel::Trace, - "Finished execution of {}", - request->action_digest().hash()); - ::build::bazel::remote::execution::v2::ExecuteResponse response{}; - for (auto const& [path, info] : tmp->Artifacts()) { if (info.type == ObjectType::Tree) { - auto* d = response.mutable_result()->add_output_directories(); - *(d->mutable_path()) = path; - *(d->mutable_tree_digest()) = - static_cast<::build::bazel::remote::execution::v2::Digest>( - info.digest); + ::build::bazel::remote::execution::v2::OutputDirectory out_dir; + *(out_dir.mutable_path()) = path; + *(out_dir.mutable_tree_digest()) = std::move(dgst); + response->mutable_result()->mutable_output_directories()->Add( + std::move(out_dir)); } else { - auto* f = response.mutable_result()->add_output_files(); - *(f->mutable_path()) = path; - *(f->mutable_digest()) = - static_cast<::build::bazel::remote::execution::v2::Digest>( - info.digest); - if (info.type == ObjectType::Executable) { - f->set_is_executable(true); - } + ::build::bazel::remote::execution::v2::OutputFile out_file; + *(out_file.mutable_path()) = path; + *(out_file.mutable_digest()) = std::move(dgst); + out_file.set_is_executable(info.type == ObjectType::Executable); + response->mutable_result()->mutable_output_files()->Add( + std::move(out_file)); } } - response.set_cached_result(tmp->IsCached()); +} - op.set_done(true); - ::google::rpc::Status status{}; - *(response.mutable_status()) = status; - auto* result = response.mutable_result(); - result->set_exit_code(tmp->ExitCode()); - if (tmp->HasStdErr()) { - auto dgst = storage_.StoreBlob(tmp->StdErr(), /*is_executable=*/false); +auto ExecutionServiceImpl::AddResult( + ::build::bazel::remote::execution::v2::ExecuteResponse* response, + IExecutionResponse::Ptr const& i_execution_response, + std::string const& hash) const noexcept -> std::optional<std::string> { + AddOutputPaths(response, i_execution_response); + auto* result = response->mutable_result(); + result->set_exit_code(i_execution_response->ExitCode()); + if (i_execution_response->HasStdErr()) { + auto dgst = storage_.StoreBlob(i_execution_response->StdErr(), + /*is_executable=*/false); if (!dgst) { - auto str = fmt::format("Could not store stderr of action {}", - request->action_digest().hash()); + auto str = fmt::format("Could not store stderr of action {}", hash); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return str; } result->mutable_stderr_digest()->CopyFrom(*dgst); } - if (tmp->HasStdOut()) { - auto dgst = storage_.StoreBlob(tmp->StdOut(), /*is_executable=*/false); + if (i_execution_response->HasStdOut()) { + auto dgst = storage_.StoreBlob(i_execution_response->StdOut(), + /*is_executable=*/false); if (!dgst) { - auto str = fmt::format("Could not store stdout of action {}", - request->action_digest().hash()); + auto str = fmt::format("Could not store stdout of action {}", hash); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return str; } result->mutable_stdout_digest()->CopyFrom(*dgst); } + return std::nullopt; +} - op.mutable_response()->PackFrom(response); - writer->Write(op); - if (tmp->ExitCode() == 0 && !a.do_not_cache() && +static void AddStatus( + ::build::bazel::remote::execution::v2::ExecuteResponse* response) noexcept { + ::google::rpc::Status status{}; + // we run the action locally, so no communication issues should happen + status.set_code(grpc::StatusCode::OK); + *(response->mutable_status()) = status; +} + +auto ExecutionServiceImpl::GetResponse( + ::build::bazel::remote::execution::v2::ExecuteRequest const* request, + IExecutionResponse::Ptr const& i_execution_response) const noexcept + -> std::pair< + std::optional<::build::bazel::remote::execution::v2::ExecuteResponse>, + std::optional<std::string>> { + + ::build::bazel::remote::execution::v2::ExecuteResponse response{}; + AddStatus(&response); + auto err = AddResult( + &response, i_execution_response, request->action_digest().hash()); + if (err) { + return {std::nullopt, *err}; + } + response.set_cached_result(i_execution_response->IsCached()); + return {response, std::nullopt}; +} + +auto ExecutionServiceImpl::WriteResponse( + ::build::bazel::remote::execution::v2::ExecuteRequest const* request, + IExecutionResponse::Ptr const& i_execution_response, + ::build::bazel::remote::execution::v2::Action const& action, + ::grpc::ServerWriter<::google::longrunning::Operation>* writer) + const noexcept -> std::optional<std::string> { + + auto [execute_response, msg_r] = GetResponse(request, i_execution_response); + if (!execute_response) { + + return *msg_r; + } + + // store action result + if (i_execution_response->ExitCode() == 0 && !action.do_not_cache() && !storage_.StoreActionResult(request->action_digest(), - response.result())) { + execute_response->result())) { auto str = fmt::format("Could not store action result for action {}", request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return str; + } + + // send response to the client + auto op = ::google::longrunning::Operation{}; + op.mutable_response()->PackFrom(*execute_response); + op.set_name("just-remote-execution"); + op.set_done(true); + if (!writer->Write(op)) { + auto str = + fmt::format("Could not write execution response for action {}", + request->action_digest().hash()); + logger_.Emit(LogLevel::Error, str); + return str; } + return std::nullopt; +} + +auto ExecutionServiceImpl::Execute( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::ExecuteRequest* request, + ::grpc::ServerWriter<::google::longrunning::Operation>* writer) + -> ::grpc::Status { + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + auto [action, msg_a] = GetAction(request); + if (!action) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_a}; + } + auto [i_execution_action, msg] = GetIExecutionAction(request, *action); + if (!i_execution_action) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg}; + } + + logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash()); + auto i_execution_response = i_execution_action->get()->Execute(&logger_); + logger_.Emit(LogLevel::Trace, + "Finished execution of {}", + request->action_digest().hash()); + auto err = WriteResponse(request, i_execution_response, *action, writer); + if (err) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, *err}; + } return ::grpc::Status::OK; } |