summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/execution_service/execution_server.cpp
diff options
context:
space:
mode:
authorAlberto Sartori <alberto.sartori@huawei.com>2023-02-27 10:27:52 +0100
committerAlberto Sartori <alberto.sartori@huawei.com>2023-03-10 09:38:39 +0100
commit55ba09ec97d2449b39d7fcc38c346969168d899b (patch)
tree4c97affeaae2e5bb88a41be6d389b2502bae6e24 /src/buildtool/execution_api/execution_service/execution_server.cpp
parent117a1dbf099d93dfe044971f90203a5d8d1975b4 (diff)
downloadjustbuild-55ba09ec97d2449b39d7fcc38c346969168d899b.tar.gz
execution service: implement WaitExecution and google::longrunning::Operations::GetOperation
For each action that is executed, an entry is added to a shared thread safe cache. Once the number of operations stored exceeds twice 2^n, where n is given by the option --log-operations-threshold, at most 2^n operations will be removed, in a FIFO scheme.
Diffstat (limited to 'src/buildtool/execution_api/execution_service/execution_server.cpp')
-rw-r--r--src/buildtool/execution_api/execution_service/execution_server.cpp156
1 files changed, 90 insertions, 66 deletions
diff --git a/src/buildtool/execution_api/execution_service/execution_server.cpp b/src/buildtool/execution_api/execution_service/execution_server.cpp
index ab0ebfc9..5f21f5ac 100644
--- a/src/buildtool/execution_api/execution_service/execution_server.cpp
+++ b/src/buildtool/execution_api/execution_service/execution_server.cpp
@@ -16,26 +16,34 @@
#include <algorithm>
#include <fstream>
-#include <iostream>
#include <string>
#include <unordered_map>
#include <utility>
+#include "execution_server.hpp"
#include "fmt/format.h"
#include "gsl-lite/gsl-lite.hpp"
-#include "src/buildtool/compatibility/native_support.hpp"
+#include "src/buildtool/execution_api/execution_service/operation_cache.hpp"
#include "src/buildtool/execution_api/local/garbage_collector.hpp"
#include "src/buildtool/file_system/file_system_manager.hpp"
+static void UpdateTimeStamp(::google::longrunning::Operation* op) {
+ ::google::protobuf::Timestamp t;
+ t.set_seconds(
+ std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::high_resolution_clock::now().time_since_epoch())
+ .count());
+ op->mutable_metadata()->PackFrom(t);
+}
+
auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request)
const noexcept -> std::pair<std::optional<::bazel_re::Action>,
std::optional<std::string>> {
// get action description
auto path = storage_.BlobPath(request->action_digest(), false);
if (!path) {
- auto str = fmt::format(
- "could not retrieve blob {} from cas",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto str = fmt::format("could not retrieve blob {} from cas",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -43,9 +51,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request)
{
std::ifstream f(*path);
if (!action.ParseFromIstream(&f)) {
- auto str = fmt::format(
- "failed to parse action from blob {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto str = fmt::format("failed to parse action from blob {}",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -56,9 +63,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request)
: storage_.TreePath(action.input_root_digest());
if (!path) {
- auto str = fmt::format(
- "could not retrieve input root {} from cas",
- NativeSupport::Unprefix(action.input_root_digest().hash()));
+ auto str = fmt::format("could not retrieve input root {} from cas",
+ action.input_root_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -71,9 +77,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action)
auto path = storage_.BlobPath(action.command_digest(), false);
if (!path) {
- auto str = fmt::format(
- "could not retrieve blob {} from cas",
- NativeSupport::Unprefix(action.command_digest().hash()));
+ auto str = fmt::format("could not retrieve blob {} from cas",
+ action.command_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -82,9 +87,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action)
{
std::ifstream f(*path);
if (!c.ParseFromIstream(&f)) {
- auto str = fmt::format(
- "failed to parse command from blob {}",
- NativeSupport::Unprefix(action.command_digest().hash()));
+ auto str = fmt::format("failed to parse command from blob {}",
+ action.command_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -125,9 +129,8 @@ auto ExecutionServiceImpl::GetIExecutionAction(
env_vars,
{});
if (!i_execution_action) {
- auto str = fmt::format(
- "could not create action from {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto str = fmt::format("could not create action from {}",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -356,10 +359,8 @@ auto ExecutionServiceImpl::GetResponse(
::bazel_re::ExecuteResponse response{};
AddStatus(&response);
- auto err =
- AddResult(&response,
- i_execution_response,
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto err = AddResult(
+ &response, i_execution_response, request->action_digest().hash());
if (err) {
return {std::nullopt, *err};
}
@@ -367,42 +368,34 @@ auto ExecutionServiceImpl::GetResponse(
return {response, std::nullopt};
}
-auto ExecutionServiceImpl::WriteResponse(
+auto ExecutionServiceImpl::StoreActionResult(
::bazel_re::ExecuteRequest const* request,
IExecutionResponse::Ptr const& i_execution_response,
- ::bazel_re::Action const& action,
- ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
- const noexcept -> std::optional<std::string> {
-
- auto [execute_response, msg_r] = GetResponse(request, i_execution_response);
- if (!execute_response) {
- return *msg_r;
- }
-
- // store action result
+ ::bazel_re::ExecuteResponse const& execute_response,
+ ::bazel_re::Action const& action) const noexcept
+ -> std::optional<std::string> {
if (i_execution_response->ExitCode() == 0 && !action.do_not_cache() &&
!storage_.StoreActionResult(request->action_digest(),
- execute_response->result())) {
- auto str = fmt::format(
- "Could not store action result for action {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ execute_response.result())) {
+ auto str = fmt::format("Could not store action result for action {}",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return str;
}
+ return std::nullopt;
+}
+static void WriteResponse(
+ ::bazel_re::ExecuteResponse const& execute_response,
+ ::grpc::ServerWriter<::google::longrunning::Operation>* writer,
+ ::google::longrunning::Operation* op) noexcept {
// send response to the client
- auto op = ::google::longrunning::Operation{};
- op.mutable_response()->PackFrom(*execute_response);
- op.set_name("just-remote-execution");
- op.set_done(true);
- if (!writer->Write(op)) {
- auto str = fmt::format(
- "Could not write execution response for action {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
- logger_.Emit(LogLevel::Error, str);
- return str;
- }
- return std::nullopt;
+ op->mutable_response()->PackFrom(execute_response);
+ op->set_done(true);
+ UpdateTimeStamp(op);
+
+ OperationCache::Set(op->name(), *op);
+ writer->Write(*op);
}
auto ExecutionServiceImpl::Execute(
@@ -410,13 +403,13 @@ auto ExecutionServiceImpl::Execute(
const ::bazel_re::ExecuteRequest* request,
::grpc::ServerWriter<::google::longrunning::Operation>* writer)
-> ::grpc::Status {
-
auto lock = GarbageCollector::SharedLock();
if (!lock) {
auto str = fmt::format("Could not acquire SharedLock");
logger_.Emit(LogLevel::Error, str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
+
auto [action, msg_a] = GetAction(request);
if (!action) {
return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_a};
@@ -426,26 +419,57 @@ auto ExecutionServiceImpl::Execute(
return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg};
}
- logger_.Emit(LogLevel::Info,
- "Execute {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash());
+ // send initial response to the client
+ auto op = ::google::longrunning::Operation{};
+ auto const& op_name = request->action_digest().hash();
+ op.set_name(op_name);
+ op.set_done(false);
+ UpdateTimeStamp(&op);
+ OperationCache::Set(op_name, op);
+ writer->Write(op);
+ auto t0 = std::chrono::high_resolution_clock::now();
auto i_execution_response = i_execution_action->get()->Execute(&logger_);
- logger_.Emit(LogLevel::Trace,
- "Finished execution of {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
- auto err = WriteResponse(request, i_execution_response, *action, writer);
- if (err) {
- return ::grpc::Status{grpc::StatusCode::INTERNAL, *err};
+ auto t1 = std::chrono::high_resolution_clock::now();
+ logger_.Emit(
+ LogLevel::Trace,
+ "Finished execution of {} in {} seconds",
+ request->action_digest().hash(),
+ std::chrono::duration_cast<std::chrono::seconds>(t1 - t0).count());
+
+ auto [execute_response, msg_r] = GetResponse(request, i_execution_response);
+ if (!execute_response) {
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_r};
+ }
+
+ auto err_s = StoreActionResult(
+ request, i_execution_response, *execute_response, *action);
+ if (err_s) {
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, *err_s};
}
+ WriteResponse(*execute_response, writer, &op);
return ::grpc::Status::OK;
}
auto ExecutionServiceImpl::WaitExecution(
::grpc::ServerContext* /*context*/,
- const ::bazel_re::WaitExecutionRequest* /*request*/,
- ::grpc::ServerWriter<::google::longrunning::Operation>* /*writer*/)
+ const ::bazel_re::WaitExecutionRequest* request,
+ ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
-> ::grpc::Status {
- auto const* str = "WaitExecution not implemented";
- logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str};
+ auto const& hash = request->name();
+ logger_.Emit(LogLevel::Trace, "WaitExecution: {}", hash);
+ std::optional<::google::longrunning::Operation> op;
+ do {
+ op = OperationCache::Query(hash);
+ if (!op) {
+ auto const& str = fmt::format(
+ "Executing action {} not found in internal cache.", hash);
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ } while (!op->done());
+ writer->Write(*op);
+ logger_.Emit(LogLevel::Trace, "Finished WaitExecution {}", hash);
+ return ::grpc::Status::OK;
}