summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlberto Sartori <alberto.sartori@huawei.com>2023-02-27 10:27:52 +0100
committerAlberto Sartori <alberto.sartori@huawei.com>2023-03-10 09:38:39 +0100
commit55ba09ec97d2449b39d7fcc38c346969168d899b (patch)
tree4c97affeaae2e5bb88a41be6d389b2502bae6e24 /src
parent117a1dbf099d93dfe044971f90203a5d8d1975b4 (diff)
downloadjustbuild-55ba09ec97d2449b39d7fcc38c346969168d899b.tar.gz
execution service: implement WaitExecution and google::longrunning::Operations::GetOperation
For each action that is executed, an entry is added to a shared thread safe cache. Once the number of operations stored exceeds twice 2^n, where n is given by the option --log-operations-threshold, at most 2^n operations will be removed, in a FIFO scheme.
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/common/cli.hpp9
-rw-r--r--src/buildtool/execution_api/execution_service/TARGETS26
-rw-r--r--src/buildtool/execution_api/execution_service/ac_server.cpp10
-rw-r--r--src/buildtool/execution_api/execution_service/cas_server.cpp47
-rw-r--r--src/buildtool/execution_api/execution_service/execution_server.cpp156
-rw-r--r--src/buildtool/execution_api/execution_service/execution_server.hpp16
-rw-r--r--src/buildtool/execution_api/execution_service/operation_cache.cpp49
-rw-r--r--src/buildtool/execution_api/execution_service/operation_cache.hpp88
-rw-r--r--src/buildtool/execution_api/execution_service/operations_server.cpp63
-rw-r--r--src/buildtool/execution_api/execution_service/operations_server.hpp70
-rw-r--r--src/buildtool/execution_api/execution_service/server_implementation.cpp5
-rw-r--r--src/buildtool/main/TARGETS1
-rw-r--r--src/buildtool/main/main.cpp4
13 files changed, 441 insertions, 103 deletions
diff --git a/src/buildtool/common/cli.hpp b/src/buildtool/common/cli.hpp
index da67d8e7..5590534a 100644
--- a/src/buildtool/common/cli.hpp
+++ b/src/buildtool/common/cli.hpp
@@ -150,6 +150,7 @@ struct ExecutionServiceArguments {
std::optional<std::filesystem::path> info_file{std::nullopt};
std::optional<std::string> interface{std::nullopt};
std::optional<std::string> pid_file{std::nullopt};
+ std::optional<uint8_t> op_exponent;
};
static inline auto SetupCommonArguments(
@@ -549,5 +550,13 @@ static inline auto SetupExecutionServiceArguments(
es_args->pid_file,
"Write pid to this file in plain txt. If the file exists, it "
"will be overwritten.");
+
+ app->add_option(
+ "--log-operations-threshold",
+ es_args->op_exponent,
+ "Once the number of operations stored exceeds twice 2^n, where n is "
+ "given by the option --log-operations-threshold, at most 2^n "
+ "operations will be removed, in a FIFO scheme. If unset, defaults to "
+ "14. Must be in the range [0,255]");
}
#endif // INCLUDED_SRC_BUILDTOOL_COMMON_CLI_HPP
diff --git a/src/buildtool/execution_api/execution_service/TARGETS b/src/buildtool/execution_api/execution_service/TARGETS
index 968eb012..739e32e2 100644
--- a/src/buildtool/execution_api/execution_service/TARGETS
+++ b/src/buildtool/execution_api/execution_service/TARGETS
@@ -14,9 +14,10 @@
[ ["@", "fmt", "", "fmt"]
, ["@", "gsl-lite", "", "gsl-lite"]
, ["src/buildtool/execution_api/local", "garbage_collector"]
- , ["src/buildtool/compatibility", "compatibility"]
, ["src/buildtool/file_system", "file_system_manager"]
+ , "operation_cache"
]
+ , "private-ldflags": ["-pthread"]
}
, "ac_server":
{ "type": ["@", "rules", "CC", "library"]
@@ -31,9 +32,7 @@
, ["src/buildtool/common", "bazel_types"]
]
, "private-deps":
- [ ["src/buildtool/execution_api/local", "garbage_collector"]
- , ["src/buildtool/compatibility", "compatibility"]
- ]
+ [["src/buildtool/execution_api/local", "garbage_collector"]]
}
, "cas_server":
{ "type": ["@", "rules", "CC", "library"]
@@ -65,6 +64,7 @@
, "cas_server"
, "bytestream_server"
, "capabilities_server"
+ , "operations_server"
, ["src/buildtool/execution_api/remote", "config"]
, ["src/buildtool/auth", "auth"]
, ["@", "json", "", "json"]
@@ -105,4 +105,22 @@
, ["src/buildtool/compatibility", "compatibility"]
]
}
+, "operation_cache":
+ { "type": ["@", "rules", "CC", "library"]
+ , "name": ["operation_cache"]
+ , "hdrs": ["operation_cache.hpp"]
+ , "srcs": ["operation_cache.cpp"]
+ , "deps": [["src/buildtool/logging", "logging"]]
+ , "stage": ["src", "buildtool", "execution_api", "execution_service"]
+ , "proto": [["@", "googleapis", "", "google_longrunning_operations_proto"]]
+ }
+, "operations_server":
+ { "type": ["@", "rules", "CC", "library"]
+ , "name": ["operations_server"]
+ , "hdrs": ["operations_server.hpp"]
+ , "srcs": ["operations_server.cpp"]
+ , "deps": [["src/buildtool/logging", "logging"], "operation_cache"]
+ , "proto": [["@", "googleapis", "", "google_longrunning_operations_proto"]]
+ , "stage": ["src", "buildtool", "execution_api", "execution_service"]
+ }
}
diff --git a/src/buildtool/execution_api/execution_service/ac_server.cpp b/src/buildtool/execution_api/execution_service/ac_server.cpp
index 16dbdf3b..8746a0cd 100644
--- a/src/buildtool/execution_api/execution_service/ac_server.cpp
+++ b/src/buildtool/execution_api/execution_service/ac_server.cpp
@@ -15,7 +15,6 @@
#include "src/buildtool/execution_api/execution_service/ac_server.hpp"
#include "fmt/format.h"
-#include "src/buildtool/compatibility/native_support.hpp"
#include "src/buildtool/execution_api/local/garbage_collector.hpp"
auto ActionCacheServiceImpl::GetActionResult(
@@ -24,7 +23,7 @@ auto ActionCacheServiceImpl::GetActionResult(
::bazel_re::ActionResult* response) -> ::grpc::Status {
logger_.Emit(LogLevel::Trace,
"GetActionResult: {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ request->action_digest().hash());
auto lock = GarbageCollector::SharedLock();
if (!lock) {
auto str = fmt::format("Could not acquire SharedLock");
@@ -33,10 +32,9 @@ auto ActionCacheServiceImpl::GetActionResult(
}
auto x = ac_.CachedResult(request->action_digest());
if (!x) {
- return grpc::Status{grpc::StatusCode::NOT_FOUND,
- fmt::format("{} missing from AC",
- NativeSupport::Unprefix(
- request->action_digest().hash()))};
+ return grpc::Status{
+ grpc::StatusCode::NOT_FOUND,
+ fmt::format("{} missing from AC", request->action_digest().hash())};
}
*response = *x;
return ::grpc::Status::OK;
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp
index fcc34f9c..07b9f3bf 100644
--- a/src/buildtool/execution_api/execution_service/cas_server.cpp
+++ b/src/buildtool/execution_api/execution_service/cas_server.cpp
@@ -21,21 +21,35 @@
static constexpr std::size_t kJustHashLength = 42;
static constexpr std::size_t kSHA256Length = 64;
+static auto IsValidHash(std::string const& x) -> bool {
+ auto const& length = x.size();
+ return (Compatibility::IsCompatible() and length == kSHA256Length) or
+ length == kJustHashLength;
+}
+
auto CASServiceImpl::FindMissingBlobs(
::grpc::ServerContext* /*context*/,
const ::bazel_re::FindMissingBlobsRequest* request,
::bazel_re::FindMissingBlobsResponse* response) -> ::grpc::Status {
auto lock = GarbageCollector::SharedLock();
if (!lock) {
- auto str = fmt::format("Could not acquire SharedLock");
+ auto str =
+ fmt::format("FindMissingBlobs: could not acquire SharedLock");
logger_.Emit(LogLevel::Error, str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
for (auto const& x : request->blob_digests()) {
auto const& hash = x.hash();
- logger_.Emit(LogLevel::Trace,
- "FindMissingBlobs: {}",
- NativeSupport::Unprefix(hash));
+
+ if (!IsValidHash(hash)) {
+ logger_.Emit(LogLevel::Error,
+ "FindMissingBlobs: unsupported digest {}",
+ hash);
+ auto* d = response->add_missing_blob_digests();
+ d->CopyFrom(x);
+ continue;
+ }
+ logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash);
if (NativeSupport::IsTree(hash)) {
if (!storage_.TreePath(x)) {
auto* d = response->add_missing_blob_digests();
@@ -68,29 +82,27 @@ auto CASServiceImpl::BatchUpdateBlobs(
::bazel_re::BatchUpdateBlobsResponse* response) -> ::grpc::Status {
auto lock = GarbageCollector::SharedLock();
if (!lock) {
- auto str = fmt::format("Could not acquire SharedLock");
+ auto str =
+ fmt::format("BatchUpdateBlobs: could not acquire SharedLock");
logger_.Emit(LogLevel::Error, str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
for (auto const& x : request->requests()) {
auto const& hash = x.digest().hash();
- auto const& hash_lenght = hash.size();
- if (hash_lenght != kJustHashLength and hash_lenght != kSHA256Length) {
- auto const& str = fmt::format("Unsupported digest {}", hash);
+ if (!IsValidHash(hash)) {
+ auto const& str =
+ fmt::format("BatchUpdateBlobs: unsupported digest {}", hash);
logger_.Emit(LogLevel::Error, str);
return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
}
- logger_.Emit(LogLevel::Trace,
- "BatchUpdateBlobs: {}",
- NativeSupport::Unprefix(hash));
+ logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash);
auto* r = response->add_responses();
r->mutable_digest()->CopyFrom(x.digest());
if (NativeSupport::IsTree(hash)) {
auto const& dgst = storage_.StoreTree(x.data());
if (!dgst) {
- auto const& str =
- fmt::format("BatchUpdateBlobs: could not upload tree {}",
- NativeSupport::Unprefix(hash));
+ auto const& str = fmt::format(
+ "BatchUpdateBlobs: could not upload tree {}", hash);
logger_.Emit(LogLevel::Error, str);
return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
}
@@ -101,9 +113,8 @@ auto CASServiceImpl::BatchUpdateBlobs(
else {
auto const& dgst = storage_.StoreBlob(x.data(), false);
if (!dgst) {
- auto const& str =
- fmt::format("BatchUpdateBlobs: could not upload blob {}",
- NativeSupport::Unprefix(hash));
+ auto const& str = fmt::format(
+ "BatchUpdateBlobs: could not upload blob {}", hash);
logger_.Emit(LogLevel::Error, str);
return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
}
@@ -121,7 +132,7 @@ auto CASServiceImpl::BatchReadBlobs(
::bazel_re::BatchReadBlobsResponse* response) -> ::grpc::Status {
auto lock = GarbageCollector::SharedLock();
if (!lock) {
- auto str = fmt::format("Could not acquire SharedLock");
+ auto str = fmt::format("BatchReadBlobs: Could not acquire SharedLock");
logger_.Emit(LogLevel::Error, str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
diff --git a/src/buildtool/execution_api/execution_service/execution_server.cpp b/src/buildtool/execution_api/execution_service/execution_server.cpp
index ab0ebfc9..5f21f5ac 100644
--- a/src/buildtool/execution_api/execution_service/execution_server.cpp
+++ b/src/buildtool/execution_api/execution_service/execution_server.cpp
@@ -16,26 +16,34 @@
#include <algorithm>
#include <fstream>
-#include <iostream>
#include <string>
#include <unordered_map>
#include <utility>
+#include "execution_server.hpp"
#include "fmt/format.h"
#include "gsl-lite/gsl-lite.hpp"
-#include "src/buildtool/compatibility/native_support.hpp"
+#include "src/buildtool/execution_api/execution_service/operation_cache.hpp"
#include "src/buildtool/execution_api/local/garbage_collector.hpp"
#include "src/buildtool/file_system/file_system_manager.hpp"
+static void UpdateTimeStamp(::google::longrunning::Operation* op) {
+ ::google::protobuf::Timestamp t;
+ t.set_seconds(
+ std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::high_resolution_clock::now().time_since_epoch())
+ .count());
+ op->mutable_metadata()->PackFrom(t);
+}
+
auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request)
const noexcept -> std::pair<std::optional<::bazel_re::Action>,
std::optional<std::string>> {
// get action description
auto path = storage_.BlobPath(request->action_digest(), false);
if (!path) {
- auto str = fmt::format(
- "could not retrieve blob {} from cas",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto str = fmt::format("could not retrieve blob {} from cas",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -43,9 +51,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request)
{
std::ifstream f(*path);
if (!action.ParseFromIstream(&f)) {
- auto str = fmt::format(
- "failed to parse action from blob {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto str = fmt::format("failed to parse action from blob {}",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -56,9 +63,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request)
: storage_.TreePath(action.input_root_digest());
if (!path) {
- auto str = fmt::format(
- "could not retrieve input root {} from cas",
- NativeSupport::Unprefix(action.input_root_digest().hash()));
+ auto str = fmt::format("could not retrieve input root {} from cas",
+ action.input_root_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -71,9 +77,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action)
auto path = storage_.BlobPath(action.command_digest(), false);
if (!path) {
- auto str = fmt::format(
- "could not retrieve blob {} from cas",
- NativeSupport::Unprefix(action.command_digest().hash()));
+ auto str = fmt::format("could not retrieve blob {} from cas",
+ action.command_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -82,9 +87,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action)
{
std::ifstream f(*path);
if (!c.ParseFromIstream(&f)) {
- auto str = fmt::format(
- "failed to parse command from blob {}",
- NativeSupport::Unprefix(action.command_digest().hash()));
+ auto str = fmt::format("failed to parse command from blob {}",
+ action.command_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -125,9 +129,8 @@ auto ExecutionServiceImpl::GetIExecutionAction(
env_vars,
{});
if (!i_execution_action) {
- auto str = fmt::format(
- "could not create action from {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto str = fmt::format("could not create action from {}",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return {std::nullopt, str};
}
@@ -356,10 +359,8 @@ auto ExecutionServiceImpl::GetResponse(
::bazel_re::ExecuteResponse response{};
AddStatus(&response);
- auto err =
- AddResult(&response,
- i_execution_response,
- NativeSupport::Unprefix(request->action_digest().hash()));
+ auto err = AddResult(
+ &response, i_execution_response, request->action_digest().hash());
if (err) {
return {std::nullopt, *err};
}
@@ -367,42 +368,34 @@ auto ExecutionServiceImpl::GetResponse(
return {response, std::nullopt};
}
-auto ExecutionServiceImpl::WriteResponse(
+auto ExecutionServiceImpl::StoreActionResult(
::bazel_re::ExecuteRequest const* request,
IExecutionResponse::Ptr const& i_execution_response,
- ::bazel_re::Action const& action,
- ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
- const noexcept -> std::optional<std::string> {
-
- auto [execute_response, msg_r] = GetResponse(request, i_execution_response);
- if (!execute_response) {
- return *msg_r;
- }
-
- // store action result
+ ::bazel_re::ExecuteResponse const& execute_response,
+ ::bazel_re::Action const& action) const noexcept
+ -> std::optional<std::string> {
if (i_execution_response->ExitCode() == 0 && !action.do_not_cache() &&
!storage_.StoreActionResult(request->action_digest(),
- execute_response->result())) {
- auto str = fmt::format(
- "Could not store action result for action {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ execute_response.result())) {
+ auto str = fmt::format("Could not store action result for action {}",
+ request->action_digest().hash());
logger_.Emit(LogLevel::Error, str);
return str;
}
+ return std::nullopt;
+}
+static void WriteResponse(
+ ::bazel_re::ExecuteResponse const& execute_response,
+ ::grpc::ServerWriter<::google::longrunning::Operation>* writer,
+ ::google::longrunning::Operation* op) noexcept {
// send response to the client
- auto op = ::google::longrunning::Operation{};
- op.mutable_response()->PackFrom(*execute_response);
- op.set_name("just-remote-execution");
- op.set_done(true);
- if (!writer->Write(op)) {
- auto str = fmt::format(
- "Could not write execution response for action {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
- logger_.Emit(LogLevel::Error, str);
- return str;
- }
- return std::nullopt;
+ op->mutable_response()->PackFrom(execute_response);
+ op->set_done(true);
+ UpdateTimeStamp(op);
+
+ OperationCache::Set(op->name(), *op);
+ writer->Write(*op);
}
auto ExecutionServiceImpl::Execute(
@@ -410,13 +403,13 @@ auto ExecutionServiceImpl::Execute(
const ::bazel_re::ExecuteRequest* request,
::grpc::ServerWriter<::google::longrunning::Operation>* writer)
-> ::grpc::Status {
-
auto lock = GarbageCollector::SharedLock();
if (!lock) {
auto str = fmt::format("Could not acquire SharedLock");
logger_.Emit(LogLevel::Error, str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
+
auto [action, msg_a] = GetAction(request);
if (!action) {
return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_a};
@@ -426,26 +419,57 @@ auto ExecutionServiceImpl::Execute(
return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg};
}
- logger_.Emit(LogLevel::Info,
- "Execute {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
+ logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash());
+ // send initial response to the client
+ auto op = ::google::longrunning::Operation{};
+ auto const& op_name = request->action_digest().hash();
+ op.set_name(op_name);
+ op.set_done(false);
+ UpdateTimeStamp(&op);
+ OperationCache::Set(op_name, op);
+ writer->Write(op);
+ auto t0 = std::chrono::high_resolution_clock::now();
auto i_execution_response = i_execution_action->get()->Execute(&logger_);
- logger_.Emit(LogLevel::Trace,
- "Finished execution of {}",
- NativeSupport::Unprefix(request->action_digest().hash()));
- auto err = WriteResponse(request, i_execution_response, *action, writer);
- if (err) {
- return ::grpc::Status{grpc::StatusCode::INTERNAL, *err};
+ auto t1 = std::chrono::high_resolution_clock::now();
+ logger_.Emit(
+ LogLevel::Trace,
+ "Finished execution of {} in {} seconds",
+ request->action_digest().hash(),
+ std::chrono::duration_cast<std::chrono::seconds>(t1 - t0).count());
+
+ auto [execute_response, msg_r] = GetResponse(request, i_execution_response);
+ if (!execute_response) {
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_r};
+ }
+
+ auto err_s = StoreActionResult(
+ request, i_execution_response, *execute_response, *action);
+ if (err_s) {
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, *err_s};
}
+ WriteResponse(*execute_response, writer, &op);
return ::grpc::Status::OK;
}
auto ExecutionServiceImpl::WaitExecution(
::grpc::ServerContext* /*context*/,
- const ::bazel_re::WaitExecutionRequest* /*request*/,
- ::grpc::ServerWriter<::google::longrunning::Operation>* /*writer*/)
+ const ::bazel_re::WaitExecutionRequest* request,
+ ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
-> ::grpc::Status {
- auto const* str = "WaitExecution not implemented";
- logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str};
+ auto const& hash = request->name();
+ logger_.Emit(LogLevel::Trace, "WaitExecution: {}", hash);
+ std::optional<::google::longrunning::Operation> op;
+ do {
+ op = OperationCache::Query(hash);
+ if (!op) {
+ auto const& str = fmt::format(
+ "Executing action {} not found in internal cache.", hash);
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ } while (!op->done());
+ writer->Write(*op);
+ logger_.Emit(LogLevel::Trace, "Finished WaitExecution {}", hash);
+ return ::grpc::Status::OK;
}
diff --git a/src/buildtool/execution_api/execution_service/execution_server.hpp b/src/buildtool/execution_api/execution_service/execution_server.hpp
index d0ec5074..65ace208 100644
--- a/src/buildtool/execution_api/execution_service/execution_server.hpp
+++ b/src/buildtool/execution_api/execution_service/execution_server.hpp
@@ -107,6 +107,10 @@ class ExecutionServiceImpl final : public bazel_re::Execution::Service {
-> ::grpc::Status override;
private:
+ LocalStorage storage_{};
+ IExecutionApi::Ptr api_{new LocalApi()};
+ Logger logger_{"execution-service"};
+
[[nodiscard]] auto GetAction(::bazel_re::ExecuteRequest const* request)
const noexcept -> std::pair<std::optional<::bazel_re::Action>,
std::optional<std::string>>;
@@ -126,21 +130,17 @@ class ExecutionServiceImpl final : public bazel_re::Execution::Service {
-> std::pair<std::optional<::bazel_re::ExecuteResponse>,
std::optional<std::string>>;
- [[nodiscard]] auto WriteResponse(
+ [[nodiscard]] auto StoreActionResult(
::bazel_re::ExecuteRequest const* request,
IExecutionResponse::Ptr const& i_execution_response,
- ::bazel_re::Action const& action,
- ::grpc::ServerWriter<::google::longrunning::Operation>* writer)
- const noexcept -> std::optional<std::string>;
+ ::bazel_re::ExecuteResponse const& execute_response,
+ ::bazel_re::Action const& action) const noexcept
+ -> std::optional<std::string>;
[[nodiscard]] auto AddResult(
::bazel_re::ExecuteResponse* response,
IExecutionResponse::Ptr const& i_execution_response,
std::string const& hash) const noexcept -> std::optional<std::string>;
-
- LocalStorage storage_{};
- IExecutionApi::Ptr api_{new LocalApi()};
- Logger logger_{"execution-service"};
};
#endif
diff --git a/src/buildtool/execution_api/execution_service/operation_cache.cpp b/src/buildtool/execution_api/execution_service/operation_cache.cpp
new file mode 100644
index 00000000..7fccdd41
--- /dev/null
+++ b/src/buildtool/execution_api/execution_service/operation_cache.cpp
@@ -0,0 +1,49 @@
+// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "src/buildtool/execution_api/execution_service/operation_cache.hpp"
+
+#include <algorithm>
+
+#include "google/protobuf/timestamp.pb.h"
+
+void OperationCache::GarbageCollection() {
+ std::shared_lock slock{mutex_};
+ if (cache_.size() > (threshold_ << 1U)) {
+ std::vector<std::pair<std::string, ::google::longrunning::Operation>>
+ tmp;
+ tmp.reserve(cache_.size());
+ std::copy(cache_.begin(), cache_.end(), std::back_insert_iterator(tmp));
+ slock.release();
+ std::sort(tmp.begin(), tmp.end(), [](auto const& x, auto const& y) {
+ ::google::protobuf::Timestamp tx;
+ ::google::protobuf::Timestamp ty;
+ x.second.metadata().UnpackTo(&tx);
+ y.second.metadata().UnpackTo(&ty);
+ return tx.seconds() < ty.seconds();
+ });
+
+ std::size_t deleted = 0;
+ std::unique_lock ulock{mutex_};
+ for (auto const& [key, op] : tmp) {
+ if (op.done()) {
+ DropInternal(key);
+ ++deleted;
+ }
+ if (deleted == threshold_) {
+ break;
+ }
+ }
+ }
+}
diff --git a/src/buildtool/execution_api/execution_service/operation_cache.hpp b/src/buildtool/execution_api/execution_service/operation_cache.hpp
new file mode 100644
index 00000000..97b137fb
--- /dev/null
+++ b/src/buildtool/execution_api/execution_service/operation_cache.hpp
@@ -0,0 +1,88 @@
+// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef OPERATION_CACHE_HPP
+#define OPERATION_CACHE_HPP
+
+#include <atomic>
+#include <optional>
+#include <shared_mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include "google/longrunning/operations.pb.h"
+#include "google/protobuf/timestamp.pb.h"
+
+class OperationCache {
+ using Operation = ::google::longrunning::Operation;
+
+ public:
+ [[nodiscard]] static auto Instance() -> OperationCache& {
+ static OperationCache x;
+ return x;
+ }
+ OperationCache() noexcept = default;
+ ~OperationCache() noexcept = default;
+
+ OperationCache(OperationCache const&) = delete;
+ auto operator=(OperationCache const&) -> OperationCache& = delete;
+ OperationCache(OperationCache&&) = delete;
+ auto operator=(OperationCache&&) -> OperationCache& = delete;
+
+ static void Set(std::string const& action, Operation const& op) {
+ Instance().SetInternal(action, op);
+ }
+
+ [[nodiscard]] static auto Query(std::string const& x) noexcept
+ -> std::optional<Operation> {
+ return Instance().QueryInternal(x);
+ }
+
+ static void SetExponent(uint8_t x) noexcept {
+ Instance().threshold_ = 1U << x;
+ }
+
+ private:
+ mutable std::shared_mutex mutex_;
+ std::unordered_map<std::string, ::google::longrunning::Operation> cache_;
+ static constexpr uint8_t kDefaultExponent{14};
+ std::size_t threshold_{1U << kDefaultExponent};
+
+ void SetInternal(std::string const& action, Operation const& op) {
+ GarbageCollection();
+ std::unique_lock lock{mutex_};
+ cache_[action] = op;
+ }
+
+ [[nodiscard]] auto QueryInternal(std::string const& x) const noexcept
+ -> std::optional<Operation> {
+ std::shared_lock lock{mutex_};
+ auto it = cache_.find(x);
+ if (it != cache_.end()) {
+ return it->second;
+ }
+ return std::nullopt;
+ }
+
+ void DropInternal(std::string const& x) noexcept {
+ cache_[x].Clear();
+ cache_.erase(x);
+ }
+
+ void GarbageCollection();
+};
+
+#endif
diff --git a/src/buildtool/execution_api/execution_service/operations_server.cpp b/src/buildtool/execution_api/execution_service/operations_server.cpp
new file mode 100644
index 00000000..37d3552a
--- /dev/null
+++ b/src/buildtool/execution_api/execution_service/operations_server.cpp
@@ -0,0 +1,63 @@
+// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "src/buildtool/execution_api/execution_service/operations_server.hpp"
+
+#include "src/buildtool/execution_api/execution_service/operation_cache.hpp"
+
+auto OperarationsServiceImpl::GetOperation(
+ ::grpc::ServerContext* /*context*/,
+ const ::google::longrunning::GetOperationRequest* request,
+ ::google::longrunning::Operation* response) -> ::grpc::Status {
+ auto const& hash = request->name();
+ logger_.Emit(LogLevel::Trace, "GetOperation: {}", hash);
+ std::optional<::google::longrunning::Operation> op;
+ op = OperationCache::Query(hash);
+ if (!op) {
+ auto const& str = fmt::format(
+ "Executing action {} not found in internal cache.", hash);
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+ response->CopyFrom(*op);
+ return ::grpc::Status::OK;
+}
+
+auto OperarationsServiceImpl::ListOperations(
+ ::grpc::ServerContext* /*context*/,
+ const ::google::longrunning::ListOperationsRequest* /*request*/,
+ ::google::longrunning::ListOperationsResponse* /*response*/)
+ -> ::grpc::Status {
+ auto const* str = "ListOperations not implemented";
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str};
+}
+
+auto OperarationsServiceImpl::DeleteOperation(
+ ::grpc::ServerContext* /*context*/,
+ const ::google::longrunning::DeleteOperationRequest* /*request*/,
+ ::google::protobuf::Empty* /*response*/) -> ::grpc::Status {
+ auto const* str = "DeleteOperation not implemented";
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str};
+}
+
+auto OperarationsServiceImpl::CancelOperation(
+ ::grpc::ServerContext* /*context*/,
+ const ::google::longrunning::CancelOperationRequest* /*request*/,
+ ::google::protobuf::Empty* /*response*/) -> ::grpc::Status {
+ auto const* str = "CancelOperation not implemented";
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str};
+}
diff --git a/src/buildtool/execution_api/execution_service/operations_server.hpp b/src/buildtool/execution_api/execution_service/operations_server.hpp
new file mode 100644
index 00000000..44e3b887
--- /dev/null
+++ b/src/buildtool/execution_api/execution_service/operations_server.hpp
@@ -0,0 +1,70 @@
+// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef OPERATIONS_SERVER_HPP
+#define OPERATIONS_SERVER_HPP
+
+#include "google/longrunning/operations.grpc.pb.h"
+#include "src/buildtool/logging/logger.hpp"
+
+class OperarationsServiceImpl final
+ : public ::google::longrunning::Operations::Service {
+ public:
+ // Lists operations that match the specified filter in the request. If the
+ // server doesn't support this method, it returns `UNIMPLEMENTED`.
+ //
+ // NOTE: the `name` binding below allows API services to override the
+ // binding to use different resource name schemes, such as
+ // `users/*/operations`.
+ auto ListOperations(
+ ::grpc::ServerContext* context,
+ const ::google::longrunning::ListOperationsRequest* request,
+ ::google::longrunning::ListOperationsResponse* response)
+ -> ::grpc::Status override;
+ // Gets the latest state of a long-running operation. Clients can use this
+ // method to poll the operation result at intervals as recommended by the
+ // API service.
+ auto GetOperation(::grpc::ServerContext* context,
+ const ::google::longrunning::GetOperationRequest* request,
+ ::google::longrunning::Operation* response)
+ -> ::grpc::Status override;
+ // Deletes a long-running operation. This method indicates that the client
+ // is no longer interested in the operation result. It does not cancel the
+ // operation. If the server doesn't support this method, it returns
+ // `google.rpc.Code.UNIMPLEMENTED`.
+ auto DeleteOperation(
+ ::grpc::ServerContext* context,
+ const ::google::longrunning::DeleteOperationRequest* request,
+ ::google::protobuf::Empty* response) -> ::grpc::Status override;
+ // Starts asynchronous cancellation on a long-running operation. The server
+ // makes a best effort to cancel the operation, but success is not
+ // guaranteed. If the server doesn't support this method, it returns
+ // `google.rpc.Code.UNIMPLEMENTED`. Clients can use
+ // [Operations.GetOperation][google.longrunning.Operations.GetOperation] or
+ // other methods to check whether the cancellation succeeded or whether the
+ // operation completed despite cancellation. On successful cancellation,
+ // the operation is not deleted; instead, it becomes an operation with
+ // an [Operation.error][google.longrunning.Operation.error] value with a
+ // [google.rpc.Status.code][google.rpc.Status.code] of 1, corresponding to
+ // `Code.CANCELLED`.
+ auto CancelOperation(
+ ::grpc::ServerContext* context,
+ const ::google::longrunning::CancelOperationRequest* request,
+ ::google::protobuf::Empty* response) -> ::grpc::Status override;
+
+ private:
+ Logger logger_{"execution-service:operations"};
+};
+
+#endif
diff --git a/src/buildtool/execution_api/execution_service/server_implementation.cpp b/src/buildtool/execution_api/execution_service/server_implementation.cpp
index 73850686..bba2e4cf 100644
--- a/src/buildtool/execution_api/execution_service/server_implementation.cpp
+++ b/src/buildtool/execution_api/execution_service/server_implementation.cpp
@@ -29,6 +29,7 @@
#include "src/buildtool/execution_api/execution_service/capabilities_server.hpp"
#include "src/buildtool/execution_api/execution_service/cas_server.hpp"
#include "src/buildtool/execution_api/execution_service/execution_server.hpp"
+#include "src/buildtool/execution_api/execution_service/operations_server.hpp"
#include "src/buildtool/execution_api/remote/config.hpp"
#include "src/buildtool/logging/logger.hpp"
@@ -53,6 +54,7 @@ auto ServerImpl::Run() -> bool {
CASServiceImpl cas{};
BytestreamServiceImpl b{};
CapabilitiesServiceImpl cap{};
+ OperarationsServiceImpl op{};
grpc::ServerBuilder builder;
@@ -60,7 +62,8 @@ auto ServerImpl::Run() -> bool {
.RegisterService(&ac)
.RegisterService(&cas)
.RegisterService(&b)
- .RegisterService(&cap);
+ .RegisterService(&cap)
+ .RegisterService(&op);
std::shared_ptr<grpc::ServerCredentials> creds;
if (Auth::GetAuthMethod() == AuthMethod::kTLS) {
diff --git a/src/buildtool/main/TARGETS b/src/buildtool/main/TARGETS
index 818c24af..6bfa5512 100644
--- a/src/buildtool/main/TARGETS
+++ b/src/buildtool/main/TARGETS
@@ -23,6 +23,7 @@
, [ "src/buildtool/execution_api/execution_service"
, "server_implementation"
]
+ , ["src/buildtool/execution_api/execution_service", "operation_cache"]
, "common"
, "version"
, "analyse"
diff --git a/src/buildtool/main/main.cpp b/src/buildtool/main/main.cpp
index 869ed146..a3f985b7 100644
--- a/src/buildtool/main/main.cpp
+++ b/src/buildtool/main/main.cpp
@@ -40,6 +40,7 @@
#include "src/buildtool/main/install_cas.hpp"
#ifndef BOOTSTRAP_BUILD_TOOL
#include "src/buildtool/auth/authentication.hpp"
+#include "src/buildtool/execution_api/execution_service/operation_cache.hpp"
#include "src/buildtool/execution_api/execution_service/server_implementation.hpp"
#include "src/buildtool/execution_api/local/garbage_collector.hpp"
#include "src/buildtool/graph_traverser/graph_traverser.hpp"
@@ -427,6 +428,9 @@ void SetupExecutionServiceConfig(ExecutionServiceArguments const& args) {
std::exit(kExitFailure);
}
}
+ if (args.op_exponent) {
+ OperationCache::SetExponent(*args.op_exponent);
+ }
}
void SetupHashFunction() {