summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/execution_service/execution_server.cpp
diff options
context:
space:
mode:
authorAlberto Sartori <alberto.sartori@huawei.com>2023-02-14 10:45:15 +0100
committerAlberto Sartori <alberto.sartori@huawei.com>2023-02-15 16:41:10 +0100
commitddf097f7a1c1bb438f1fc97150a263cf28be9293 (patch)
tree77a986d3c7b1143adc3bb71ce91ee72b6be41498 /src/buildtool/execution_api/execution_service/execution_server.cpp
parent1576b65e9365335aa7a2ed287eceaf93fbfac88a (diff)
downloadjustbuild-ddf097f7a1c1bb438f1fc97150a263cf28be9293.tar.gz
ExecutionServiceImpl: refactor Execute
Diffstat (limited to 'src/buildtool/execution_api/execution_service/execution_server.cpp')
-rw-r--r--src/buildtool/execution_api/execution_service/execution_server.cpp306
1 files changed, 215 insertions, 91 deletions
diff --git a/src/buildtool/execution_api/execution_service/execution_server.cpp b/src/buildtool/execution_api/execution_service/execution_server.cpp
index d3c05b94..2b2aa681 100644
--- a/src/buildtool/execution_api/execution_service/execution_server.cpp
+++ b/src/buildtool/execution_api/execution_service/execution_server.cpp
@@ -18,147 +18,271 @@
#include <fstream>
#include <iostream>
#include <string>
+#include <utility>
#include "fmt/format.h"
#include "src/buildtool/execution_api/local/garbage_collector.hpp"
-auto ExecutionServiceImpl::Execute(
- ::grpc::ServerContext* /*context*/,
- const ::build::bazel::remote::execution::v2::ExecuteRequest* request,
- ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
- -> ::grpc::Status {
- auto lock = GarbageCollector::SharedLock();
- if (!lock) {
- auto str = fmt::format("Could not acquire SharedLock");
- logger_.Emit(LogLevel::Error, str);
- return grpc::Status{grpc::StatusCode::INTERNAL, str};
- }
+auto ExecutionServiceImpl::GetAction(
+ ::build::bazel::remote::execution::v2::ExecuteRequest const* request)
+ const noexcept
+ -> std::pair<std::optional<::build::bazel::remote::execution::v2::Action>,
+ std::optional<std::string>> {
+ // get action description
auto path = storage_.BlobPath(request->action_digest(), false);
if (!path) {
- auto const& str = fmt::format("could not retrieve blob {} from cas",
- request->action_digest().hash());
+ auto str = fmt::format("could not retrieve blob {} from cas",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ return {std::nullopt, str};
}
- ::build::bazel::remote::execution::v2::Action a{};
+ ::build::bazel::remote::execution::v2::Action action{};
{
std::ifstream f(*path);
- if (!a.ParseFromIstream(&f)) {
- auto const& str = fmt::format("failed to parse action from blob {}",
- request->action_digest().hash());
+ if (!action.ParseFromIstream(&f)) {
+ auto str = fmt::format("failed to parse action from blob {}",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ return {std::nullopt, str};
}
}
- path = storage_.BlobPath(a.command_digest(), false);
+
+ path = Compatibility::IsCompatible()
+ ? storage_.BlobPath(action.input_root_digest(), false)
+ : storage_.TreePath(action.input_root_digest());
+
if (!path) {
- auto const& str = fmt::format("could not retrieve blob {} from cas",
- a.command_digest().hash());
+ auto str = fmt::format("could not retrieve input root {} from cas",
+ action.input_root_digest().hash());
logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ return {std::nullopt, str};
}
+ return {std::move(action), std::nullopt};
+}
+
+auto ExecutionServiceImpl::GetCommand(
+ ::build::bazel::remote::execution::v2::Action const& action) const noexcept
+ -> std::pair<std::optional<::build::bazel::remote::execution::v2::Command>,
+ std::optional<std::string>> {
+
+ auto path = storage_.BlobPath(action.command_digest(), false);
+ if (!path) {
+ auto str = fmt::format("could not retrieve blob {} from cas",
+ action.command_digest().hash());
+ logger_.Emit(LogLevel::Error, str);
+ return {std::nullopt, str};
+ }
+
::build::bazel::remote::execution::v2::Command c{};
{
std::ifstream f(*path);
if (!c.ParseFromIstream(&f)) {
- auto const& str =
- fmt::format("failed to parse command from blob {}",
- a.command_digest().hash());
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ auto str = fmt::format("failed to parse command from blob {}",
+ action.command_digest().hash());
+ logger_.Emit(LogLevel::Error, str);
+ return {std::nullopt, str};
}
}
- path = Compatibility::IsCompatible()
- ? storage_.BlobPath(a.input_root_digest(), false)
- : storage_.TreePath(a.input_root_digest());
+ return {c, std::nullopt};
+}
- if (!path) {
- auto const& str =
- fmt::format("could not retrieve input root {} from cas",
- a.input_root_digest().hash());
- logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+static auto GetEnvVars(::build::bazel::remote::execution::v2::Command const& c)
+ -> std::map<std::string, std::string> {
+ std::map<std::string, std::string> env_vars{};
+ std::transform(c.environment_variables().begin(),
+ c.environment_variables().end(),
+ std::inserter(env_vars, env_vars.begin()),
+ [](auto const& x) -> std::pair<std::string, std::string> {
+ return {x.name(), x.value()};
+ });
+ return env_vars;
+}
+
+auto ExecutionServiceImpl::GetIExecutionAction(
+ ::build::bazel::remote::execution::v2::ExecuteRequest const* request,
+ ::build::bazel::remote::execution::v2::Action const& action) const
+ -> std::pair<std::optional<IExecutionAction::Ptr>,
+ std::optional<std::string>> {
+
+ auto [c, msg_c] = GetCommand(action);
+ if (!c) {
+ return {std::nullopt, *msg_c};
}
- auto op = ::google::longrunning::Operation{};
- op.set_name("just-remote-execution");
- std::map<std::string, std::string> env_vars;
- for (auto const& x : c.environment_variables()) {
- env_vars.emplace(x.name(), x.value());
- }
- auto action = api_->CreateAction(
- ArtifactDigest{a.input_root_digest()},
- {c.arguments().begin(), c.arguments().end()},
- {c.output_files().begin(), c.output_files().end()},
- {c.output_directories().begin(), c.output_directories().end()},
+
+ auto env_vars = GetEnvVars(*c);
+
+ auto i_execution_action = api_->CreateAction(
+ ArtifactDigest{action.input_root_digest()},
+ {c->arguments().begin(), c->arguments().end()},
+ {c->output_files().begin(), c->output_files().end()},
+ {c->output_directories().begin(), c->output_directories().end()},
env_vars,
{});
- action->SetCacheFlag(a.do_not_cache()
- ? IExecutionAction::CacheFlag::DoNotCacheOutput
- : IExecutionAction::CacheFlag::CacheOutput);
+ if (!i_execution_action) {
+ auto str = fmt::format("could not create action from {}",
+ request->action_digest().hash());
+ logger_.Emit(LogLevel::Error, str);
+ return {std::nullopt, str};
+ }
+ i_execution_action->SetCacheFlag(
+ action.do_not_cache() ? IExecutionAction::CacheFlag::DoNotCacheOutput
+ : IExecutionAction::CacheFlag::CacheOutput);
+ return {std::move(i_execution_action), std::nullopt};
+}
+
+static void AddOutputPaths(
+ ::build::bazel::remote::execution::v2::ExecuteResponse* response,
+ IExecutionResponse::Ptr const& execution) noexcept {
+ auto const& size = static_cast<int>(execution->Artifacts().size());
+ response->mutable_result()->mutable_output_files()->Reserve(size);
+ response->mutable_result()->mutable_output_directories()->Reserve(size);
+
+ for (auto const& [path, info] : execution->Artifacts()) {
+ auto dgst = static_cast<::build::bazel::remote::execution::v2::Digest>(
+ info.digest);
- logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash());
- auto tmp = action->Execute(&logger_);
- logger_.Emit(LogLevel::Trace,
- "Finished execution of {}",
- request->action_digest().hash());
- ::build::bazel::remote::execution::v2::ExecuteResponse response{};
- for (auto const& [path, info] : tmp->Artifacts()) {
if (info.type == ObjectType::Tree) {
- auto* d = response.mutable_result()->add_output_directories();
- *(d->mutable_path()) = path;
- *(d->mutable_tree_digest()) =
- static_cast<::build::bazel::remote::execution::v2::Digest>(
- info.digest);
+ ::build::bazel::remote::execution::v2::OutputDirectory out_dir;
+ *(out_dir.mutable_path()) = path;
+ *(out_dir.mutable_tree_digest()) = std::move(dgst);
+ response->mutable_result()->mutable_output_directories()->Add(
+ std::move(out_dir));
}
else {
- auto* f = response.mutable_result()->add_output_files();
- *(f->mutable_path()) = path;
- *(f->mutable_digest()) =
- static_cast<::build::bazel::remote::execution::v2::Digest>(
- info.digest);
- if (info.type == ObjectType::Executable) {
- f->set_is_executable(true);
- }
+ ::build::bazel::remote::execution::v2::OutputFile out_file;
+ *(out_file.mutable_path()) = path;
+ *(out_file.mutable_digest()) = std::move(dgst);
+ out_file.set_is_executable(info.type == ObjectType::Executable);
+ response->mutable_result()->mutable_output_files()->Add(
+ std::move(out_file));
}
}
- response.set_cached_result(tmp->IsCached());
+}
- op.set_done(true);
- ::google::rpc::Status status{};
- *(response.mutable_status()) = status;
- auto* result = response.mutable_result();
- result->set_exit_code(tmp->ExitCode());
- if (tmp->HasStdErr()) {
- auto dgst = storage_.StoreBlob(tmp->StdErr(), /*is_executable=*/false);
+auto ExecutionServiceImpl::AddResult(
+ ::build::bazel::remote::execution::v2::ExecuteResponse* response,
+ IExecutionResponse::Ptr const& i_execution_response,
+ std::string const& hash) const noexcept -> std::optional<std::string> {
+ AddOutputPaths(response, i_execution_response);
+ auto* result = response->mutable_result();
+ result->set_exit_code(i_execution_response->ExitCode());
+ if (i_execution_response->HasStdErr()) {
+ auto dgst = storage_.StoreBlob(i_execution_response->StdErr(),
+ /*is_executable=*/false);
if (!dgst) {
- auto str = fmt::format("Could not store stderr of action {}",
- request->action_digest().hash());
+ auto str = fmt::format("Could not store stderr of action {}", hash);
logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ return str;
}
result->mutable_stderr_digest()->CopyFrom(*dgst);
}
- if (tmp->HasStdOut()) {
- auto dgst = storage_.StoreBlob(tmp->StdOut(), /*is_executable=*/false);
+ if (i_execution_response->HasStdOut()) {
+ auto dgst = storage_.StoreBlob(i_execution_response->StdOut(),
+ /*is_executable=*/false);
if (!dgst) {
- auto str = fmt::format("Could not store stdout of action {}",
- request->action_digest().hash());
+ auto str = fmt::format("Could not store stdout of action {}", hash);
logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ return str;
}
result->mutable_stdout_digest()->CopyFrom(*dgst);
}
+ return std::nullopt;
+}
- op.mutable_response()->PackFrom(response);
- writer->Write(op);
- if (tmp->ExitCode() == 0 && !a.do_not_cache() &&
+static void AddStatus(
+ ::build::bazel::remote::execution::v2::ExecuteResponse* response) noexcept {
+ ::google::rpc::Status status{};
+ // we run the action locally, so no communication issues should happen
+ status.set_code(grpc::StatusCode::OK);
+ *(response->mutable_status()) = status;
+}
+
+auto ExecutionServiceImpl::GetResponse(
+ ::build::bazel::remote::execution::v2::ExecuteRequest const* request,
+ IExecutionResponse::Ptr const& i_execution_response) const noexcept
+ -> std::pair<
+ std::optional<::build::bazel::remote::execution::v2::ExecuteResponse>,
+ std::optional<std::string>> {
+
+ ::build::bazel::remote::execution::v2::ExecuteResponse response{};
+ AddStatus(&response);
+ auto err = AddResult(
+ &response, i_execution_response, request->action_digest().hash());
+ if (err) {
+ return {std::nullopt, *err};
+ }
+ response.set_cached_result(i_execution_response->IsCached());
+ return {response, std::nullopt};
+}
+
+auto ExecutionServiceImpl::WriteResponse(
+ ::build::bazel::remote::execution::v2::ExecuteRequest const* request,
+ IExecutionResponse::Ptr const& i_execution_response,
+ ::build::bazel::remote::execution::v2::Action const& action,
+ ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
+ const noexcept -> std::optional<std::string> {
+
+ auto [execute_response, msg_r] = GetResponse(request, i_execution_response);
+ if (!execute_response) {
+
+ return *msg_r;
+ }
+
+ // store action result
+ if (i_execution_response->ExitCode() == 0 && !action.do_not_cache() &&
!storage_.StoreActionResult(request->action_digest(),
- response.result())) {
+ execute_response->result())) {
auto str = fmt::format("Could not store action result for action {}",
request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ return str;
+ }
+
+ // send response to the client
+ auto op = ::google::longrunning::Operation{};
+ op.mutable_response()->PackFrom(*execute_response);
+ op.set_name("just-remote-execution");
+ op.set_done(true);
+ if (!writer->Write(op)) {
+ auto str =
+ fmt::format("Could not write execution response for action {}",
+ request->action_digest().hash());
+ logger_.Emit(LogLevel::Error, str);
+ return str;
}
+ return std::nullopt;
+}
+
+auto ExecutionServiceImpl::Execute(
+ ::grpc::ServerContext* /*context*/,
+ const ::build::bazel::remote::execution::v2::ExecuteRequest* request,
+ ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
+ -> ::grpc::Status {
+ auto lock = GarbageCollector::SharedLock();
+ if (!lock) {
+ auto str = fmt::format("Could not acquire SharedLock");
+ logger_.Emit(LogLevel::Error, str);
+ return grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+ auto [action, msg_a] = GetAction(request);
+ if (!action) {
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_a};
+ }
+ auto [i_execution_action, msg] = GetIExecutionAction(request, *action);
+ if (!i_execution_action) {
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg};
+ }
+
+ logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash());
+ auto i_execution_response = i_execution_action->get()->Execute(&logger_);
+ logger_.Emit(LogLevel::Trace,
+ "Finished execution of {}",
+ request->action_digest().hash());
+ auto err = WriteResponse(request, i_execution_response, *action, writer);
+ if (err) {
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, *err};
+ }
return ::grpc::Status::OK;
}