diff options
author | Alberto Sartori <alberto.sartori@huawei.com> | 2023-02-27 10:27:52 +0100 |
---|---|---|
committer | Alberto Sartori <alberto.sartori@huawei.com> | 2023-03-10 09:38:39 +0100 |
commit | 55ba09ec97d2449b39d7fcc38c346969168d899b (patch) | |
tree | 4c97affeaae2e5bb88a41be6d389b2502bae6e24 /src | |
parent | 117a1dbf099d93dfe044971f90203a5d8d1975b4 (diff) | |
download | justbuild-55ba09ec97d2449b39d7fcc38c346969168d899b.tar.gz |
execution service: implement WaitExecution and google::longrunning::Operations::GetOperation
For each action that is executed, an entry is added to a shared thread
safe cache. Once the number of operations stored exceeds twice 2^n,
where n is given by the option --log-operations-threshold, at most 2^n
operations will be removed, in a FIFO scheme.
Diffstat (limited to 'src')
13 files changed, 441 insertions, 103 deletions
diff --git a/src/buildtool/common/cli.hpp b/src/buildtool/common/cli.hpp index da67d8e7..5590534a 100644 --- a/src/buildtool/common/cli.hpp +++ b/src/buildtool/common/cli.hpp @@ -150,6 +150,7 @@ struct ExecutionServiceArguments { std::optional<std::filesystem::path> info_file{std::nullopt}; std::optional<std::string> interface{std::nullopt}; std::optional<std::string> pid_file{std::nullopt}; + std::optional<uint8_t> op_exponent; }; static inline auto SetupCommonArguments( @@ -549,5 +550,13 @@ static inline auto SetupExecutionServiceArguments( es_args->pid_file, "Write pid to this file in plain txt. If the file exists, it " "will be overwritten."); + + app->add_option( + "--log-operations-threshold", + es_args->op_exponent, + "Once the number of operations stored exceeds twice 2^n, where n is " + "given by the option --log-operations-threshold, at most 2^n " + "operations will be removed, in a FIFO scheme. If unset, defaults to " + "14. Must be in the range [0,255]"); } #endif // INCLUDED_SRC_BUILDTOOL_COMMON_CLI_HPP diff --git a/src/buildtool/execution_api/execution_service/TARGETS b/src/buildtool/execution_api/execution_service/TARGETS index 968eb012..739e32e2 100644 --- a/src/buildtool/execution_api/execution_service/TARGETS +++ b/src/buildtool/execution_api/execution_service/TARGETS @@ -14,9 +14,10 @@ [ ["@", "fmt", "", "fmt"] , ["@", "gsl-lite", "", "gsl-lite"] , ["src/buildtool/execution_api/local", "garbage_collector"] - , ["src/buildtool/compatibility", "compatibility"] , ["src/buildtool/file_system", "file_system_manager"] + , "operation_cache" ] + , "private-ldflags": ["-pthread"] } , "ac_server": { "type": ["@", "rules", "CC", "library"] @@ -31,9 +32,7 @@ , ["src/buildtool/common", "bazel_types"] ] , "private-deps": - [ ["src/buildtool/execution_api/local", "garbage_collector"] - , ["src/buildtool/compatibility", "compatibility"] - ] + [["src/buildtool/execution_api/local", "garbage_collector"]] } , "cas_server": { "type": ["@", "rules", "CC", "library"] @@ -65,6 +64,7 @@ , "cas_server" , "bytestream_server" , "capabilities_server" + , "operations_server" , ["src/buildtool/execution_api/remote", "config"] , ["src/buildtool/auth", "auth"] , ["@", "json", "", "json"] @@ -105,4 +105,22 @@ , ["src/buildtool/compatibility", "compatibility"] ] } +, "operation_cache": + { "type": ["@", "rules", "CC", "library"] + , "name": ["operation_cache"] + , "hdrs": ["operation_cache.hpp"] + , "srcs": ["operation_cache.cpp"] + , "deps": [["src/buildtool/logging", "logging"]] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "proto": [["@", "googleapis", "", "google_longrunning_operations_proto"]] + } +, "operations_server": + { "type": ["@", "rules", "CC", "library"] + , "name": ["operations_server"] + , "hdrs": ["operations_server.hpp"] + , "srcs": ["operations_server.cpp"] + , "deps": [["src/buildtool/logging", "logging"], "operation_cache"] + , "proto": [["@", "googleapis", "", "google_longrunning_operations_proto"]] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + } } diff --git a/src/buildtool/execution_api/execution_service/ac_server.cpp b/src/buildtool/execution_api/execution_service/ac_server.cpp index 16dbdf3b..8746a0cd 100644 --- a/src/buildtool/execution_api/execution_service/ac_server.cpp +++ b/src/buildtool/execution_api/execution_service/ac_server.cpp @@ -15,7 +15,6 @@ #include "src/buildtool/execution_api/execution_service/ac_server.hpp" #include "fmt/format.h" -#include "src/buildtool/compatibility/native_support.hpp" #include "src/buildtool/execution_api/local/garbage_collector.hpp" auto ActionCacheServiceImpl::GetActionResult( @@ -24,7 +23,7 @@ auto ActionCacheServiceImpl::GetActionResult( ::bazel_re::ActionResult* response) -> ::grpc::Status { logger_.Emit(LogLevel::Trace, "GetActionResult: {}", - NativeSupport::Unprefix(request->action_digest().hash())); + request->action_digest().hash()); auto lock = GarbageCollector::SharedLock(); if (!lock) { auto str = fmt::format("Could not acquire SharedLock"); @@ -33,10 +32,9 @@ auto ActionCacheServiceImpl::GetActionResult( } auto x = ac_.CachedResult(request->action_digest()); if (!x) { - return grpc::Status{grpc::StatusCode::NOT_FOUND, - fmt::format("{} missing from AC", - NativeSupport::Unprefix( - request->action_digest().hash()))}; + return grpc::Status{ + grpc::StatusCode::NOT_FOUND, + fmt::format("{} missing from AC", request->action_digest().hash())}; } *response = *x; return ::grpc::Status::OK; diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp index fcc34f9c..07b9f3bf 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.cpp +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -21,21 +21,35 @@ static constexpr std::size_t kJustHashLength = 42; static constexpr std::size_t kSHA256Length = 64; +static auto IsValidHash(std::string const& x) -> bool { + auto const& length = x.size(); + return (Compatibility::IsCompatible() and length == kSHA256Length) or + length == kJustHashLength; +} + auto CASServiceImpl::FindMissingBlobs( ::grpc::ServerContext* /*context*/, const ::bazel_re::FindMissingBlobsRequest* request, ::bazel_re::FindMissingBlobsResponse* response) -> ::grpc::Status { auto lock = GarbageCollector::SharedLock(); if (!lock) { - auto str = fmt::format("Could not acquire SharedLock"); + auto str = + fmt::format("FindMissingBlobs: could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } for (auto const& x : request->blob_digests()) { auto const& hash = x.hash(); - logger_.Emit(LogLevel::Trace, - "FindMissingBlobs: {}", - NativeSupport::Unprefix(hash)); + + if (!IsValidHash(hash)) { + logger_.Emit(LogLevel::Error, + "FindMissingBlobs: unsupported digest {}", + hash); + auto* d = response->add_missing_blob_digests(); + d->CopyFrom(x); + continue; + } + logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash); if (NativeSupport::IsTree(hash)) { if (!storage_.TreePath(x)) { auto* d = response->add_missing_blob_digests(); @@ -68,29 +82,27 @@ auto CASServiceImpl::BatchUpdateBlobs( ::bazel_re::BatchUpdateBlobsResponse* response) -> ::grpc::Status { auto lock = GarbageCollector::SharedLock(); if (!lock) { - auto str = fmt::format("Could not acquire SharedLock"); + auto str = + fmt::format("BatchUpdateBlobs: could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } for (auto const& x : request->requests()) { auto const& hash = x.digest().hash(); - auto const& hash_lenght = hash.size(); - if (hash_lenght != kJustHashLength and hash_lenght != kSHA256Length) { - auto const& str = fmt::format("Unsupported digest {}", hash); + if (!IsValidHash(hash)) { + auto const& str = + fmt::format("BatchUpdateBlobs: unsupported digest {}", hash); logger_.Emit(LogLevel::Error, str); return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; } - logger_.Emit(LogLevel::Trace, - "BatchUpdateBlobs: {}", - NativeSupport::Unprefix(hash)); + logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash); auto* r = response->add_responses(); r->mutable_digest()->CopyFrom(x.digest()); if (NativeSupport::IsTree(hash)) { auto const& dgst = storage_.StoreTree(x.data()); if (!dgst) { - auto const& str = - fmt::format("BatchUpdateBlobs: could not upload tree {}", - NativeSupport::Unprefix(hash)); + auto const& str = fmt::format( + "BatchUpdateBlobs: could not upload tree {}", hash); logger_.Emit(LogLevel::Error, str); return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; } @@ -101,9 +113,8 @@ auto CASServiceImpl::BatchUpdateBlobs( else { auto const& dgst = storage_.StoreBlob(x.data(), false); if (!dgst) { - auto const& str = - fmt::format("BatchUpdateBlobs: could not upload blob {}", - NativeSupport::Unprefix(hash)); + auto const& str = fmt::format( + "BatchUpdateBlobs: could not upload blob {}", hash); logger_.Emit(LogLevel::Error, str); return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; } @@ -121,7 +132,7 @@ auto CASServiceImpl::BatchReadBlobs( ::bazel_re::BatchReadBlobsResponse* response) -> ::grpc::Status { auto lock = GarbageCollector::SharedLock(); if (!lock) { - auto str = fmt::format("Could not acquire SharedLock"); + auto str = fmt::format("BatchReadBlobs: Could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } diff --git a/src/buildtool/execution_api/execution_service/execution_server.cpp b/src/buildtool/execution_api/execution_service/execution_server.cpp index ab0ebfc9..5f21f5ac 100644 --- a/src/buildtool/execution_api/execution_service/execution_server.cpp +++ b/src/buildtool/execution_api/execution_service/execution_server.cpp @@ -16,26 +16,34 @@ #include <algorithm> #include <fstream> -#include <iostream> #include <string> #include <unordered_map> #include <utility> +#include "execution_server.hpp" #include "fmt/format.h" #include "gsl-lite/gsl-lite.hpp" -#include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/execution_api/execution_service/operation_cache.hpp" #include "src/buildtool/execution_api/local/garbage_collector.hpp" #include "src/buildtool/file_system/file_system_manager.hpp" +static void UpdateTimeStamp(::google::longrunning::Operation* op) { + ::google::protobuf::Timestamp t; + t.set_seconds( + std::chrono::duration_cast<std::chrono::seconds>( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + op->mutable_metadata()->PackFrom(t); +} + auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request) const noexcept -> std::pair<std::optional<::bazel_re::Action>, std::optional<std::string>> { // get action description auto path = storage_.BlobPath(request->action_digest(), false); if (!path) { - auto str = fmt::format( - "could not retrieve blob {} from cas", - NativeSupport::Unprefix(request->action_digest().hash())); + auto str = fmt::format("could not retrieve blob {} from cas", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -43,9 +51,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request) { std::ifstream f(*path); if (!action.ParseFromIstream(&f)) { - auto str = fmt::format( - "failed to parse action from blob {}", - NativeSupport::Unprefix(request->action_digest().hash())); + auto str = fmt::format("failed to parse action from blob {}", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -56,9 +63,8 @@ auto ExecutionServiceImpl::GetAction(::bazel_re::ExecuteRequest const* request) : storage_.TreePath(action.input_root_digest()); if (!path) { - auto str = fmt::format( - "could not retrieve input root {} from cas", - NativeSupport::Unprefix(action.input_root_digest().hash())); + auto str = fmt::format("could not retrieve input root {} from cas", + action.input_root_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -71,9 +77,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action) auto path = storage_.BlobPath(action.command_digest(), false); if (!path) { - auto str = fmt::format( - "could not retrieve blob {} from cas", - NativeSupport::Unprefix(action.command_digest().hash())); + auto str = fmt::format("could not retrieve blob {} from cas", + action.command_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -82,9 +87,8 @@ auto ExecutionServiceImpl::GetCommand(::bazel_re::Action const& action) { std::ifstream f(*path); if (!c.ParseFromIstream(&f)) { - auto str = fmt::format( - "failed to parse command from blob {}", - NativeSupport::Unprefix(action.command_digest().hash())); + auto str = fmt::format("failed to parse command from blob {}", + action.command_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -125,9 +129,8 @@ auto ExecutionServiceImpl::GetIExecutionAction( env_vars, {}); if (!i_execution_action) { - auto str = fmt::format( - "could not create action from {}", - NativeSupport::Unprefix(request->action_digest().hash())); + auto str = fmt::format("could not create action from {}", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return {std::nullopt, str}; } @@ -356,10 +359,8 @@ auto ExecutionServiceImpl::GetResponse( ::bazel_re::ExecuteResponse response{}; AddStatus(&response); - auto err = - AddResult(&response, - i_execution_response, - NativeSupport::Unprefix(request->action_digest().hash())); + auto err = AddResult( + &response, i_execution_response, request->action_digest().hash()); if (err) { return {std::nullopt, *err}; } @@ -367,42 +368,34 @@ auto ExecutionServiceImpl::GetResponse( return {response, std::nullopt}; } -auto ExecutionServiceImpl::WriteResponse( +auto ExecutionServiceImpl::StoreActionResult( ::bazel_re::ExecuteRequest const* request, IExecutionResponse::Ptr const& i_execution_response, - ::bazel_re::Action const& action, - ::grpc::ServerWriter<::google::longrunning::Operation>* writer) - const noexcept -> std::optional<std::string> { - - auto [execute_response, msg_r] = GetResponse(request, i_execution_response); - if (!execute_response) { - return *msg_r; - } - - // store action result + ::bazel_re::ExecuteResponse const& execute_response, + ::bazel_re::Action const& action) const noexcept + -> std::optional<std::string> { if (i_execution_response->ExitCode() == 0 && !action.do_not_cache() && !storage_.StoreActionResult(request->action_digest(), - execute_response->result())) { - auto str = fmt::format( - "Could not store action result for action {}", - NativeSupport::Unprefix(request->action_digest().hash())); + execute_response.result())) { + auto str = fmt::format("Could not store action result for action {}", + request->action_digest().hash()); logger_.Emit(LogLevel::Error, str); return str; } + return std::nullopt; +} +static void WriteResponse( + ::bazel_re::ExecuteResponse const& execute_response, + ::grpc::ServerWriter<::google::longrunning::Operation>* writer, + ::google::longrunning::Operation* op) noexcept { // send response to the client - auto op = ::google::longrunning::Operation{}; - op.mutable_response()->PackFrom(*execute_response); - op.set_name("just-remote-execution"); - op.set_done(true); - if (!writer->Write(op)) { - auto str = fmt::format( - "Could not write execution response for action {}", - NativeSupport::Unprefix(request->action_digest().hash())); - logger_.Emit(LogLevel::Error, str); - return str; - } - return std::nullopt; + op->mutable_response()->PackFrom(execute_response); + op->set_done(true); + UpdateTimeStamp(op); + + OperationCache::Set(op->name(), *op); + writer->Write(*op); } auto ExecutionServiceImpl::Execute( @@ -410,13 +403,13 @@ auto ExecutionServiceImpl::Execute( const ::bazel_re::ExecuteRequest* request, ::grpc::ServerWriter<::google::longrunning::Operation>* writer) -> ::grpc::Status { - auto lock = GarbageCollector::SharedLock(); if (!lock) { auto str = fmt::format("Could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } + auto [action, msg_a] = GetAction(request); if (!action) { return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_a}; @@ -426,26 +419,57 @@ auto ExecutionServiceImpl::Execute( return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg}; } - logger_.Emit(LogLevel::Info, - "Execute {}", - NativeSupport::Unprefix(request->action_digest().hash())); + logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash()); + // send initial response to the client + auto op = ::google::longrunning::Operation{}; + auto const& op_name = request->action_digest().hash(); + op.set_name(op_name); + op.set_done(false); + UpdateTimeStamp(&op); + OperationCache::Set(op_name, op); + writer->Write(op); + auto t0 = std::chrono::high_resolution_clock::now(); auto i_execution_response = i_execution_action->get()->Execute(&logger_); - logger_.Emit(LogLevel::Trace, - "Finished execution of {}", - NativeSupport::Unprefix(request->action_digest().hash())); - auto err = WriteResponse(request, i_execution_response, *action, writer); - if (err) { - return ::grpc::Status{grpc::StatusCode::INTERNAL, *err}; + auto t1 = std::chrono::high_resolution_clock::now(); + logger_.Emit( + LogLevel::Trace, + "Finished execution of {} in {} seconds", + request->action_digest().hash(), + std::chrono::duration_cast<std::chrono::seconds>(t1 - t0).count()); + + auto [execute_response, msg_r] = GetResponse(request, i_execution_response); + if (!execute_response) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, *msg_r}; + } + + auto err_s = StoreActionResult( + request, i_execution_response, *execute_response, *action); + if (err_s) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, *err_s}; } + WriteResponse(*execute_response, writer, &op); return ::grpc::Status::OK; } auto ExecutionServiceImpl::WaitExecution( ::grpc::ServerContext* /*context*/, - const ::bazel_re::WaitExecutionRequest* /*request*/, - ::grpc::ServerWriter<::google::longrunning::Operation>* /*writer*/) + const ::bazel_re::WaitExecutionRequest* request, + ::grpc::ServerWriter<::google::longrunning::Operation>* writer) -> ::grpc::Status { - auto const* str = "WaitExecution not implemented"; - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; + auto const& hash = request->name(); + logger_.Emit(LogLevel::Trace, "WaitExecution: {}", hash); + std::optional<::google::longrunning::Operation> op; + do { + op = OperationCache::Query(hash); + if (!op) { + auto const& str = fmt::format( + "Executing action {} not found in internal cache.", hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } while (!op->done()); + writer->Write(*op); + logger_.Emit(LogLevel::Trace, "Finished WaitExecution {}", hash); + return ::grpc::Status::OK; } diff --git a/src/buildtool/execution_api/execution_service/execution_server.hpp b/src/buildtool/execution_api/execution_service/execution_server.hpp index d0ec5074..65ace208 100644 --- a/src/buildtool/execution_api/execution_service/execution_server.hpp +++ b/src/buildtool/execution_api/execution_service/execution_server.hpp @@ -107,6 +107,10 @@ class ExecutionServiceImpl final : public bazel_re::Execution::Service { -> ::grpc::Status override; private: + LocalStorage storage_{}; + IExecutionApi::Ptr api_{new LocalApi()}; + Logger logger_{"execution-service"}; + [[nodiscard]] auto GetAction(::bazel_re::ExecuteRequest const* request) const noexcept -> std::pair<std::optional<::bazel_re::Action>, std::optional<std::string>>; @@ -126,21 +130,17 @@ class ExecutionServiceImpl final : public bazel_re::Execution::Service { -> std::pair<std::optional<::bazel_re::ExecuteResponse>, std::optional<std::string>>; - [[nodiscard]] auto WriteResponse( + [[nodiscard]] auto StoreActionResult( ::bazel_re::ExecuteRequest const* request, IExecutionResponse::Ptr const& i_execution_response, - ::bazel_re::Action const& action, - ::grpc::ServerWriter<::google::longrunning::Operation>* writer) - const noexcept -> std::optional<std::string>; + ::bazel_re::ExecuteResponse const& execute_response, + ::bazel_re::Action const& action) const noexcept + -> std::optional<std::string>; [[nodiscard]] auto AddResult( ::bazel_re::ExecuteResponse* response, IExecutionResponse::Ptr const& i_execution_response, std::string const& hash) const noexcept -> std::optional<std::string>; - - LocalStorage storage_{}; - IExecutionApi::Ptr api_{new LocalApi()}; - Logger logger_{"execution-service"}; }; #endif diff --git a/src/buildtool/execution_api/execution_service/operation_cache.cpp b/src/buildtool/execution_api/execution_service/operation_cache.cpp new file mode 100644 index 00000000..7fccdd41 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/operation_cache.cpp @@ -0,0 +1,49 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/operation_cache.hpp" + +#include <algorithm> + +#include "google/protobuf/timestamp.pb.h" + +void OperationCache::GarbageCollection() { + std::shared_lock slock{mutex_}; + if (cache_.size() > (threshold_ << 1U)) { + std::vector<std::pair<std::string, ::google::longrunning::Operation>> + tmp; + tmp.reserve(cache_.size()); + std::copy(cache_.begin(), cache_.end(), std::back_insert_iterator(tmp)); + slock.release(); + std::sort(tmp.begin(), tmp.end(), [](auto const& x, auto const& y) { + ::google::protobuf::Timestamp tx; + ::google::protobuf::Timestamp ty; + x.second.metadata().UnpackTo(&tx); + y.second.metadata().UnpackTo(&ty); + return tx.seconds() < ty.seconds(); + }); + + std::size_t deleted = 0; + std::unique_lock ulock{mutex_}; + for (auto const& [key, op] : tmp) { + if (op.done()) { + DropInternal(key); + ++deleted; + } + if (deleted == threshold_) { + break; + } + } + } +} diff --git a/src/buildtool/execution_api/execution_service/operation_cache.hpp b/src/buildtool/execution_api/execution_service/operation_cache.hpp new file mode 100644 index 00000000..97b137fb --- /dev/null +++ b/src/buildtool/execution_api/execution_service/operation_cache.hpp @@ -0,0 +1,88 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OPERATION_CACHE_HPP +#define OPERATION_CACHE_HPP + +#include <atomic> +#include <optional> +#include <shared_mutex> +#include <string> +#include <thread> +#include <unordered_map> +#include <vector> + +#include "google/longrunning/operations.pb.h" +#include "google/protobuf/timestamp.pb.h" + +class OperationCache { + using Operation = ::google::longrunning::Operation; + + public: + [[nodiscard]] static auto Instance() -> OperationCache& { + static OperationCache x; + return x; + } + OperationCache() noexcept = default; + ~OperationCache() noexcept = default; + + OperationCache(OperationCache const&) = delete; + auto operator=(OperationCache const&) -> OperationCache& = delete; + OperationCache(OperationCache&&) = delete; + auto operator=(OperationCache&&) -> OperationCache& = delete; + + static void Set(std::string const& action, Operation const& op) { + Instance().SetInternal(action, op); + } + + [[nodiscard]] static auto Query(std::string const& x) noexcept + -> std::optional<Operation> { + return Instance().QueryInternal(x); + } + + static void SetExponent(uint8_t x) noexcept { + Instance().threshold_ = 1U << x; + } + + private: + mutable std::shared_mutex mutex_; + std::unordered_map<std::string, ::google::longrunning::Operation> cache_; + static constexpr uint8_t kDefaultExponent{14}; + std::size_t threshold_{1U << kDefaultExponent}; + + void SetInternal(std::string const& action, Operation const& op) { + GarbageCollection(); + std::unique_lock lock{mutex_}; + cache_[action] = op; + } + + [[nodiscard]] auto QueryInternal(std::string const& x) const noexcept + -> std::optional<Operation> { + std::shared_lock lock{mutex_}; + auto it = cache_.find(x); + if (it != cache_.end()) { + return it->second; + } + return std::nullopt; + } + + void DropInternal(std::string const& x) noexcept { + cache_[x].Clear(); + cache_.erase(x); + } + + void GarbageCollection(); +}; + +#endif diff --git a/src/buildtool/execution_api/execution_service/operations_server.cpp b/src/buildtool/execution_api/execution_service/operations_server.cpp new file mode 100644 index 00000000..37d3552a --- /dev/null +++ b/src/buildtool/execution_api/execution_service/operations_server.cpp @@ -0,0 +1,63 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/operations_server.hpp" + +#include "src/buildtool/execution_api/execution_service/operation_cache.hpp" + +auto OperarationsServiceImpl::GetOperation( + ::grpc::ServerContext* /*context*/, + const ::google::longrunning::GetOperationRequest* request, + ::google::longrunning::Operation* response) -> ::grpc::Status { + auto const& hash = request->name(); + logger_.Emit(LogLevel::Trace, "GetOperation: {}", hash); + std::optional<::google::longrunning::Operation> op; + op = OperationCache::Query(hash); + if (!op) { + auto const& str = fmt::format( + "Executing action {} not found in internal cache.", hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + response->CopyFrom(*op); + return ::grpc::Status::OK; +} + +auto OperarationsServiceImpl::ListOperations( + ::grpc::ServerContext* /*context*/, + const ::google::longrunning::ListOperationsRequest* /*request*/, + ::google::longrunning::ListOperationsResponse* /*response*/) + -> ::grpc::Status { + auto const* str = "ListOperations not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} + +auto OperarationsServiceImpl::DeleteOperation( + ::grpc::ServerContext* /*context*/, + const ::google::longrunning::DeleteOperationRequest* /*request*/, + ::google::protobuf::Empty* /*response*/) -> ::grpc::Status { + auto const* str = "DeleteOperation not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} + +auto OperarationsServiceImpl::CancelOperation( + ::grpc::ServerContext* /*context*/, + const ::google::longrunning::CancelOperationRequest* /*request*/, + ::google::protobuf::Empty* /*response*/) -> ::grpc::Status { + auto const* str = "CancelOperation not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} diff --git a/src/buildtool/execution_api/execution_service/operations_server.hpp b/src/buildtool/execution_api/execution_service/operations_server.hpp new file mode 100644 index 00000000..44e3b887 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/operations_server.hpp @@ -0,0 +1,70 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OPERATIONS_SERVER_HPP +#define OPERATIONS_SERVER_HPP + +#include "google/longrunning/operations.grpc.pb.h" +#include "src/buildtool/logging/logger.hpp" + +class OperarationsServiceImpl final + : public ::google::longrunning::Operations::Service { + public: + // Lists operations that match the specified filter in the request. If the + // server doesn't support this method, it returns `UNIMPLEMENTED`. + // + // NOTE: the `name` binding below allows API services to override the + // binding to use different resource name schemes, such as + // `users/*/operations`. + auto ListOperations( + ::grpc::ServerContext* context, + const ::google::longrunning::ListOperationsRequest* request, + ::google::longrunning::ListOperationsResponse* response) + -> ::grpc::Status override; + // Gets the latest state of a long-running operation. Clients can use this + // method to poll the operation result at intervals as recommended by the + // API service. + auto GetOperation(::grpc::ServerContext* context, + const ::google::longrunning::GetOperationRequest* request, + ::google::longrunning::Operation* response) + -> ::grpc::Status override; + // Deletes a long-running operation. This method indicates that the client + // is no longer interested in the operation result. It does not cancel the + // operation. If the server doesn't support this method, it returns + // `google.rpc.Code.UNIMPLEMENTED`. + auto DeleteOperation( + ::grpc::ServerContext* context, + const ::google::longrunning::DeleteOperationRequest* request, + ::google::protobuf::Empty* response) -> ::grpc::Status override; + // Starts asynchronous cancellation on a long-running operation. The server + // makes a best effort to cancel the operation, but success is not + // guaranteed. If the server doesn't support this method, it returns + // `google.rpc.Code.UNIMPLEMENTED`. Clients can use + // [Operations.GetOperation][google.longrunning.Operations.GetOperation] or + // other methods to check whether the cancellation succeeded or whether the + // operation completed despite cancellation. On successful cancellation, + // the operation is not deleted; instead, it becomes an operation with + // an [Operation.error][google.longrunning.Operation.error] value with a + // [google.rpc.Status.code][google.rpc.Status.code] of 1, corresponding to + // `Code.CANCELLED`. + auto CancelOperation( + ::grpc::ServerContext* context, + const ::google::longrunning::CancelOperationRequest* request, + ::google::protobuf::Empty* response) -> ::grpc::Status override; + + private: + Logger logger_{"execution-service:operations"}; +}; + +#endif diff --git a/src/buildtool/execution_api/execution_service/server_implementation.cpp b/src/buildtool/execution_api/execution_service/server_implementation.cpp index 73850686..bba2e4cf 100644 --- a/src/buildtool/execution_api/execution_service/server_implementation.cpp +++ b/src/buildtool/execution_api/execution_service/server_implementation.cpp @@ -29,6 +29,7 @@ #include "src/buildtool/execution_api/execution_service/capabilities_server.hpp" #include "src/buildtool/execution_api/execution_service/cas_server.hpp" #include "src/buildtool/execution_api/execution_service/execution_server.hpp" +#include "src/buildtool/execution_api/execution_service/operations_server.hpp" #include "src/buildtool/execution_api/remote/config.hpp" #include "src/buildtool/logging/logger.hpp" @@ -53,6 +54,7 @@ auto ServerImpl::Run() -> bool { CASServiceImpl cas{}; BytestreamServiceImpl b{}; CapabilitiesServiceImpl cap{}; + OperarationsServiceImpl op{}; grpc::ServerBuilder builder; @@ -60,7 +62,8 @@ auto ServerImpl::Run() -> bool { .RegisterService(&ac) .RegisterService(&cas) .RegisterService(&b) - .RegisterService(&cap); + .RegisterService(&cap) + .RegisterService(&op); std::shared_ptr<grpc::ServerCredentials> creds; if (Auth::GetAuthMethod() == AuthMethod::kTLS) { diff --git a/src/buildtool/main/TARGETS b/src/buildtool/main/TARGETS index 818c24af..6bfa5512 100644 --- a/src/buildtool/main/TARGETS +++ b/src/buildtool/main/TARGETS @@ -23,6 +23,7 @@ , [ "src/buildtool/execution_api/execution_service" , "server_implementation" ] + , ["src/buildtool/execution_api/execution_service", "operation_cache"] , "common" , "version" , "analyse" diff --git a/src/buildtool/main/main.cpp b/src/buildtool/main/main.cpp index 869ed146..a3f985b7 100644 --- a/src/buildtool/main/main.cpp +++ b/src/buildtool/main/main.cpp @@ -40,6 +40,7 @@ #include "src/buildtool/main/install_cas.hpp" #ifndef BOOTSTRAP_BUILD_TOOL #include "src/buildtool/auth/authentication.hpp" +#include "src/buildtool/execution_api/execution_service/operation_cache.hpp" #include "src/buildtool/execution_api/execution_service/server_implementation.hpp" #include "src/buildtool/execution_api/local/garbage_collector.hpp" #include "src/buildtool/graph_traverser/graph_traverser.hpp" @@ -427,6 +428,9 @@ void SetupExecutionServiceConfig(ExecutionServiceArguments const& args) { std::exit(kExitFailure); } } + if (args.op_exponent) { + OperationCache::SetExponent(*args.op_exponent); + } } void SetupHashFunction() { |