diff options
author | Alberto Sartori <alberto.sartori@huawei.com> | 2023-01-23 18:31:14 +0100 |
---|---|---|
committer | Alberto Sartori <alberto.sartori@huawei.com> | 2023-02-02 17:57:19 +0100 |
commit | bd66d45945dc186a0d08db7d9845ef657d549577 (patch) | |
tree | 51fa0b9c630ed388fc8aa36f5314b30fdc6bd5ff /src | |
parent | 0658ef369e9dc27ca3a16075fc0f9e20931a2350 (diff) | |
download | justbuild-bd66d45945dc186a0d08db7d9845ef657d549577.tar.gz |
execution-service: add new subcommand execute
This subcommand starts a single node remote execution service honoring
the just native remote protocol.
If the flag --compatible is provided, the execution service will honor
the original remote build execution protocol.
New command line args supported by this subcommand:
-p,--port INT: Execution service will listen to this port. If unset,
the service will listen to the first available one.
--info-file TEXT: Write the used port, interface, and pid to this file
in JSON format. If the file exists, it will be overwritten.
-i,--interface TEXT: Interface to use. If unset, the loopback device
is used.
--pid-file TEXT Write pid to this file in plain txt. If the file
exists, it will be overwritten.
--tls-server-cert TEXT: Path to the TLS server certificate.
--tls-server-key TEXT: Path to the TLS server key.
Co-authored by: Klaus Aehlig <klaus.aehlig@huawei.com>
Diffstat (limited to 'src')
18 files changed, 1504 insertions, 25 deletions
diff --git a/src/buildtool/auth/authentication.hpp b/src/buildtool/auth/authentication.hpp index c3f6eb71..447d9f1c 100644 --- a/src/buildtool/auth/authentication.hpp +++ b/src/buildtool/auth/authentication.hpp @@ -58,6 +58,14 @@ class Auth { return Instance().client_key_; } + [[nodiscard]] static auto ServerCert() noexcept -> const std::string& { + return Instance().server_cert_; + } + + [[nodiscard]] static auto ServerKey() noexcept -> const std::string& { + return Instance().server_key_; + } + [[nodiscard]] static auto SetCACertificate( std::filesystem::path const& cert_file) noexcept -> bool { return set(cert_file, &Instance().ca_cert_); @@ -73,6 +81,15 @@ class Auth { return set(key_file, &Instance().client_key_); } + [[nodiscard]] static auto SetServerCertificate( + std::filesystem::path const& cert_file) noexcept -> bool { + return set(cert_file, &Instance().server_cert_); + } + + [[nodiscard]] static auto SetServerKey( + std::filesystem::path const& key_file) noexcept -> bool { + return set(key_file, &Instance().server_key_); + } // must be called after the parsing of cmd line arguments // we ensure that either both tls_client_cert or tls_client_key are set // or none of the two. @@ -94,6 +111,19 @@ class Auth { "Please also provide tls-client-key"); return false; } + + // to enable mTLS, both tls_server_{ceritifcate,key} must be + // supplied + if (ServerCert().empty() && not(ServerKey().empty())) { + Logger::Log(LogLevel::Error, + "Please also provide tls-server-cert"); + return false; + } + if (not(ServerCert().empty()) && ServerKey().empty()) { + Logger::Log(LogLevel::Error, + "Please also provide tls-server-key"); + return false; + } return true; } @@ -101,7 +131,8 @@ class Auth { std::string ca_cert_; std::string client_cert_; std::string client_key_; - + std::string server_cert_; + std::string server_key_; // auxiliary function to set the content of the members of this class [[nodiscard]] static auto set( std::filesystem::path const& x, diff --git a/src/buildtool/common/cli.hpp b/src/buildtool/common/cli.hpp index feb58a6d..c638193b 100644 --- a/src/buildtool/common/cli.hpp +++ b/src/buildtool/common/cli.hpp @@ -125,14 +125,32 @@ struct GraphArguments { std::optional<std::filesystem::path> git_cas{}; }; -/// \brief Arguments for authentication methods. -struct AuthArguments { - // CA certificate used to verify server's identity +// Arguments for authentication methods. + +/// \brief Arguments shared by both server and client +struct CommonAuthArguments { std::optional<std::filesystem::path> tls_ca_cert{std::nullopt}; +}; + +/// \brief Arguments used by the client +struct ClientAuthArguments { std::optional<std::filesystem::path> tls_client_cert{std::nullopt}; std::optional<std::filesystem::path> tls_client_key{std::nullopt}; }; +/// \brief Authentication arguments used by subcommand just execute +struct ServerAuthArguments { + std::optional<std::filesystem::path> tls_server_cert{std::nullopt}; + std::optional<std::filesystem::path> tls_server_key{std::nullopt}; +}; + +struct ExecutionServiceArguments { + std::optional<int> port{std::nullopt}; + std::optional<std::filesystem::path> info_file{std::nullopt}; + std::optional<std::string> interface{std::nullopt}; + std::optional<std::string> pid_file{std::nullopt}; +}; + static inline auto SetupCommonArguments( gsl::not_null<CLI::App*> const& app, gsl::not_null<CommonArguments*> const& clargs) { @@ -340,7 +358,7 @@ static inline auto SetupEndpointArguments( ->expected(1, 1); } -static inline auto SetupBuildArguments( +static inline auto SetupCommonBuildArguments( gsl::not_null<CLI::App*> const& app, gsl::not_null<BuildArguments*> const& clargs) { app->add_option_function<std::string>( @@ -354,6 +372,11 @@ static inline auto SetupBuildArguments( "prepend actions' commands before being executed locally.") ->type_name("JSON") ->default_val(nlohmann::json{"env", "--"}.dump()); +} + +static inline auto SetupBuildArguments( + gsl::not_null<CLI::App*> const& app, + gsl::not_null<BuildArguments*> const& clargs) { app->add_option_function<unsigned int>( "--action-timeout", @@ -470,13 +493,18 @@ static inline auto SetupCompatibilityArguments( "the flag must be used consistently for all related invocations."); } -static inline auto SetupAuthArguments( +static inline auto SetupCommonAuthArguments( gsl::not_null<CLI::App*> const& app, - gsl::not_null<AuthArguments*> const& authargs) { + gsl::not_null<CommonAuthArguments*> const& authargs) { app->add_option("--tls-ca-cert", authargs->tls_ca_cert, "Path to a TLS CA certificate that is trusted to sign the " "server certificate."); +} + +static inline auto SetupClientAuthArguments( + gsl::not_null<CLI::App*> const& app, + gsl::not_null<ClientAuthArguments*> const& authargs) { app->add_option("--tls-client-cert", authargs->tls_client_cert, "Path to the TLS client certificate."); @@ -484,4 +512,37 @@ static inline auto SetupAuthArguments( authargs->tls_client_key, "Path to the TLS client key."); } + +static inline auto SetupServerAuthArguments( + gsl::not_null<CLI::App*> const& app, + gsl::not_null<ServerAuthArguments*> const& authargs) { + app->add_option("--tls-server-cert", + authargs->tls_server_cert, + "Path to the TLS server certificate."); + app->add_option("--tls-server-key", + authargs->tls_server_key, + "Path to the TLS server key."); +} + +static inline auto SetupExecutionServiceArguments( + gsl::not_null<CLI::App*> const& app, + gsl::not_null<ExecutionServiceArguments*> const& es_args) { + app->add_option("-p,--port", + es_args->port, + "Execution service will listen to this port. If unset, the " + "service will listen to the first available one."); + app->add_option("--info-file", + es_args->info_file, + "Write the used port, interface, and pid to this file in " + "JSON format. If the file exists, it " + "will be overwritten."); + app->add_option("-i,--interface", + es_args->interface, + "Interface to use. If unset, the loopback device is used."); + app->add_option( + "--pid-file", + es_args->pid_file, + "Write pid to this file in plain txt. If the file exists, it " + "will be overwritten."); +} #endif // INCLUDED_SRC_BUILDTOOL_COMMON_CLI_HPP diff --git a/src/buildtool/execution_api/execution_service/TARGETS b/src/buildtool/execution_api/execution_service/TARGETS new file mode 100644 index 00000000..91726eac --- /dev/null +++ b/src/buildtool/execution_api/execution_service/TARGETS @@ -0,0 +1,98 @@ +{ "execution_server": + { "type": ["@", "rules", "CC", "library"] + , "name": ["execution_server"] + , "hdrs": ["execution_server.hpp"] + , "srcs": ["execution_server.cpp"] + , "proto": [["@", "bazel_remote_apis", "", "remote_execution_proto"]] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "deps": + [ ["src/buildtool/execution_api/local", "local"] + , ["src/buildtool/logging", "logging"] + ] + , "private-deps": + [ ["src/buildtool/compatibility", "compatibility"] + , ["@", "fmt", "", "fmt"] + , ["src/buildtool/execution_api/local", "garbage_collector"] + ] + } +, "ac_server": + { "type": ["@", "rules", "CC", "library"] + , "name": ["ac_server"] + , "hdrs": ["ac_server.hpp"] + , "srcs": ["ac_server.cpp"] + , "proto": [["@", "bazel_remote_apis", "", "remote_execution_proto"]] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "deps": + [ ["src/buildtool/execution_api/local", "local"] + , ["src/buildtool/logging", "logging"] + ] + , "private-deps": + [["src/buildtool/execution_api/local", "garbage_collector"]] + } +, "cas_server": + { "type": ["@", "rules", "CC", "library"] + , "name": ["cas_server"] + , "hdrs": ["cas_server.hpp"] + , "srcs": ["cas_server.cpp"] + , "proto": [["@", "bazel_remote_apis", "", "remote_execution_proto"]] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "deps": + [ ["src/buildtool/execution_api/local", "local"] + , ["src/buildtool/logging", "logging"] + ] + , "private-deps": + [ ["src/buildtool/compatibility", "compatibility"] + , ["@", "fmt", "", "fmt"] + , ["src/buildtool/execution_api/local", "garbage_collector"] + ] + } +, "server_implementation": + { "type": ["@", "rules", "CC", "library"] + , "name": ["sever_implemenation"] + , "hdrs": ["server_implementation.hpp"] + , "srcs": ["server_implementation.cpp"] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "private-deps": + [ "execution_server" + , "ac_server" + , "cas_server" + , "bytestream_server" + , "capabilities_server" + , ["src/buildtool/execution_api/remote", "config"] + , ["src/buildtool/auth", "auth"] + , ["@", "json", "", "json"] + , ["src/buildtool/execution_api/local", "local"] + , ["@", "grpc", "", "grpc++"] + , ["src/buildtool/execution_api/remote", "config"] + ] + } +, "bytestream_server": + { "type": ["@", "rules", "CC", "library"] + , "name": ["bytestream"] + , "hdrs": ["bytestream_server.hpp"] + , "srcs": ["bytestream_server.cpp"] + , "proto": [["@", "googleapis", "", "google_bytestream_proto"]] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "deps": + [ ["src/buildtool/execution_api/local", "local"] + , ["src/buildtool/logging", "logging"] + ] + , "private-deps": + [ ["src/buildtool/compatibility", "compatibility"] + , ["src/buildtool/execution_api/common", "bytestream-common"] + , ["src/utils/cpp", "tmp_dir"] + , ["@", "fmt", "", "fmt"] + , ["src/buildtool/execution_api/local", "garbage_collector"] + ] + } +, "capabilities_server": + { "type": ["@", "rules", "CC", "library"] + , "name": ["capabilities_server"] + , "hdrs": ["capabilities_server.hpp"] + , "srcs": ["capabilities_server.cpp"] + , "proto": [["@", "bazel_remote_apis", "", "remote_execution_proto"]] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "deps": [["src/buildtool/execution_api/local", "local"]] + , "private-deps": [["src/buildtool/logging", "logging"]] + } +} diff --git a/src/buildtool/execution_api/execution_service/ac_server.cpp b/src/buildtool/execution_api/execution_service/ac_server.cpp new file mode 100644 index 00000000..1b5b9470 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/ac_server.cpp @@ -0,0 +1,54 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/ac_server.hpp" + +#include "fmt/format.h" +#include "src/buildtool/execution_api/local/garbage_collector.hpp" + +auto ActionCacheServiceImpl::GetActionResult( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::GetActionResultRequest* + request, + ::build::bazel::remote::execution::v2::ActionResult* response) + -> ::grpc::Status { + logger_.Emit(LogLevel::Trace, + "GetActionResult: {}", + request->action_digest().hash()); + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + auto x = ac_.CachedResult(request->action_digest()); + if (!x) { + return grpc::Status{ + grpc::StatusCode::NOT_FOUND, + fmt::format("{} missing from AC", request->action_digest().hash())}; + } + *response = *x; + return ::grpc::Status::OK; +} + +auto ActionCacheServiceImpl::UpdateActionResult( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::UpdateActionResultRequest* + /*request*/, + ::build::bazel::remote::execution::v2::ActionResult* /*response*/) + -> ::grpc::Status { + auto const* str = "UpdateActionResult not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} diff --git a/src/buildtool/execution_api/execution_service/ac_server.hpp b/src/buildtool/execution_api/execution_service/ac_server.hpp new file mode 100644 index 00000000..8e4b993b --- /dev/null +++ b/src/buildtool/execution_api/execution_service/ac_server.hpp @@ -0,0 +1,72 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef AC_SERVER_HPP +#define AC_SERVER_HPP + +#include "build/bazel/remote/execution/v2/remote_execution.grpc.pb.h" +#include "src/buildtool/execution_api/local/local_ac.hpp" +#include "src/buildtool/logging/logger.hpp" + +class ActionCacheServiceImpl final + : public build::bazel::remote::execution::v2::ActionCache::Service { + public: + // Retrieve a cached execution result. + // + // Implementations SHOULD ensure that any blobs referenced from the + // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage] + // are available at the time of returning the + // [ActionResult][build.bazel.remote.execution.v2.ActionResult] and will be + // for some period of time afterwards. The TTLs of the referenced blobs + // SHOULD be increased if necessary and applicable. + // + // Errors: + // + // * `NOT_FOUND`: The requested `ActionResult` is not in the cache. + auto GetActionResult( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::GetActionResultRequest* + request, + ::build::bazel::remote::execution::v2::ActionResult* response) + -> ::grpc::Status override; + // Upload a new execution result. + // + // In order to allow the server to perform access control based on the type + // of action, and to assist with client debugging, the client MUST first + // upload the [Action][build.bazel.remote.execution.v2.Execution] that + // produced the result, along with its + // [Command][build.bazel.remote.execution.v2.Command], into the + // `ContentAddressableStorage`. + // + // Errors: + // + // * `INVALID_ARGUMENT`: One or more arguments are invalid. + // * `FAILED_PRECONDITION`: One or more errors occurred in updating the + // action result, such as a missing command or action. + // * `RESOURCE_EXHAUSTED`: There is insufficient storage space to add the + // entry to the cache. + auto UpdateActionResult( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::UpdateActionResultRequest* + request, + ::build::bazel::remote::execution::v2::ActionResult* response) + -> ::grpc::Status override; + + private: + LocalCAS<ObjectType::File> cas_{}; + LocalAC ac_{&cas_}; + Logger logger_{"execution-service"}; +}; + +#endif diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp new file mode 100644 index 00000000..64d5e9eb --- /dev/null +++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp @@ -0,0 +1,166 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "bytestream_server.hpp" + +#include <fstream> +#include <sstream> +#include <utility> + +#include "fmt/format.h" +#include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/execution_api/common/bytestream_common.hpp" +#include "src/buildtool/execution_api/local/garbage_collector.hpp" +#include "src/utils/cpp/tmp_dir.hpp" + +namespace { +auto ParseResourceName(std::string const& x) -> std::optional<std::string> { + // resource name is like this + // remote-execution/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712 + if (auto end = x.rfind('/'); end != std::string::npos) { + if (auto start = x.rfind('/', end - 1); start != std::string::npos) { + return x.substr(start + 1, end - start - 1); + } + } + return std::nullopt; +} +} // namespace + +auto BytestreamServiceImpl::Read( + ::grpc::ServerContext* /*context*/, + const ::google::bytestream::ReadRequest* request, + ::grpc::ServerWriter<::google::bytestream::ReadResponse>* writer) + -> ::grpc::Status { + logger_.Emit(LogLevel::Trace, "Read {}", request->resource_name()); + // resource_name is of type + // remote-execution/blobs/62f408d64bca5de775c4b1dbc3288fc03afd6b19eb/0 + std::istringstream iss{request->resource_name()}; + auto hash = ParseResourceName(request->resource_name()); + if (!hash) { + auto str = fmt::format("could not parse {}", request->resource_name()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + + std::optional<std::filesystem::path> path{}; + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + if (NativeSupport::IsTree(*hash)) { + ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, true}; + path = storage_.TreePath(static_cast<bazel_re::Digest>(dgst)); + if (!path) { + auto str = fmt::format("could not find {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str}; + } + } + ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, false}; + path = storage_.BlobPath(static_cast<bazel_re::Digest>(dgst), false); + if (!path) { + auto str = fmt::format("could not find {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str}; + } + + std::ifstream blob{*path}; + + ::google::bytestream::ReadResponse response; + std::string buffer(kChunkSize, '\0'); + bool done = false; + blob.seekg(request->read_offset()); + while (!done) { + blob.read(buffer.data(), kChunkSize); + if (blob.eof()) { + // do not send random bytes + buffer.resize(static_cast<std::size_t>(blob.gcount())); + done = true; + } + *(response.mutable_data()) = buffer; + writer->Write(response); + } + return ::grpc::Status::OK; +} + +auto BytestreamServiceImpl::Write( + ::grpc::ServerContext* /*context*/, + ::grpc::ServerReader<::google::bytestream::WriteRequest>* reader, + ::google::bytestream::WriteResponse* response) -> ::grpc::Status { + ::google::bytestream::WriteRequest request; + reader->Read(&request); + logger_.Emit(LogLevel::Debug, "write {}", request.resource_name()); + auto hash = ParseResourceName(request.resource_name()); + if (!hash) { + auto str = fmt::format("could not parse {}", request.resource_name()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + logger_.Emit(LogLevel::Trace, + "Write: {}, offset {}, finish write {}", + *hash, + request.write_offset(), + request.finish_write()); + auto tmp_dir = TmpDir::Create("execution-service"); + if (!tmp_dir) { + return ::grpc::Status{::grpc::StatusCode::INTERNAL, + "could not create TmpDir"}; + } + auto tmp = tmp_dir->GetPath() / *hash; + { + std::ofstream of{tmp, std::ios::binary}; + of.write(request.data().data(), + static_cast<std::streamsize>(request.data().size())); + } + while (!request.finish_write() && reader->Read(&request)) { + std::ofstream of{tmp, std::ios::binary | std::ios::app}; + of.write(request.data().data(), + static_cast<std::streamsize>(request.data().size())); + } + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + if (NativeSupport::IsTree(*hash)) { + if (not storage_.StoreTree(tmp)) { + auto str = fmt::format("could not store tree {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + } + else { + if (not storage_.StoreBlob(tmp)) { + auto str = fmt::format("could not store blob {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + } + response->set_committed_size( + static_cast<google::protobuf::int64>(std::filesystem::file_size(tmp))); + return ::grpc::Status::OK; +} + +auto BytestreamServiceImpl::QueryWriteStatus( + ::grpc::ServerContext* /*context*/, + const ::google::bytestream::QueryWriteStatusRequest* /*request*/, + ::google::bytestream::QueryWriteStatusResponse* /*response*/) + -> ::grpc::Status { + auto const* str = "QueryWriteStatus not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.hpp b/src/buildtool/execution_api/execution_service/bytestream_server.hpp new file mode 100644 index 00000000..e2627328 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/bytestream_server.hpp @@ -0,0 +1,83 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef BYTESTREAM_SERVER_HPP +#define BYTESTREAM_SERVER_HPP + +#include "google/bytestream/bytestream.grpc.pb.h" +#include "src/buildtool/execution_api/local/local_storage.hpp" +#include "src/buildtool/logging/logger.hpp" + +class BytestreamServiceImpl : public ::google::bytestream::ByteStream::Service { + public: + // `Read()` is used to retrieve the contents of a resource as a sequence + // of bytes. The bytes are returned in a sequence of responses, and the + // responses are delivered as the results of a server-side streaming RPC. + auto Read(::grpc::ServerContext* context, + const ::google::bytestream::ReadRequest* request, + ::grpc::ServerWriter< ::google::bytestream::ReadResponse>* writer) + -> ::grpc::Status override; + // `Write()` is used to send the contents of a resource as a sequence of + // bytes. The bytes are sent in a sequence of request protos of a + // client-side streaming RPC. + // + // A `Write()` action is resumable. If there is an error or the connection + // is broken during the `Write()`, the client should check the status of the + // `Write()` by calling `QueryWriteStatus()` and continue writing from the + // returned `committed_size`. This may be less than the amount of data the + // client previously sent. + // + // Calling `Write()` on a resource name that was previously written and + // finalized could cause an error, depending on whether the underlying + // service allows over-writing of previously written resources. + // + // When the client closes the request channel, the service will respond with + // a `WriteResponse`. The service will not view the resource as `complete` + // until the client has sent a `WriteRequest` with `finish_write` set to + // `true`. Sending any requests on a stream after sending a request with + // `finish_write` set to `true` will cause an error. The client **should** + // check the `WriteResponse` it receives to determine how much data the + // service was able to commit and whether the service views the resource as + // `complete` or not. + auto Write( + ::grpc::ServerContext* context, + ::grpc::ServerReader< ::google::bytestream::WriteRequest>* reader, + ::google::bytestream::WriteResponse* response) + -> ::grpc::Status override; + // `QueryWriteStatus()` is used to find the `committed_size` for a resource + // that is being written, which can then be used as the `write_offset` for + // the next `Write()` call. + // + // If the resource does not exist (i.e., the resource has been deleted, or + // the first `Write()` has not yet reached the service), this method returns + // the error `NOT_FOUND`. + // + // The client **may** call `QueryWriteStatus()` at any time to determine how + // much data has been processed for this resource. This is useful if the + // client is buffering data and needs to know which data can be safely + // evicted. For any sequence of `QueryWriteStatus()` calls for a given + // resource name, the sequence of returned `committed_size` values will be + // non-decreasing. + auto QueryWriteStatus( + ::grpc::ServerContext* context, + const ::google::bytestream::QueryWriteStatusRequest* request, + ::google::bytestream::QueryWriteStatusResponse* response) + -> ::grpc::Status override; + + private: + LocalStorage storage_{}; + Logger logger_{"execution-service:bytestream"}; +}; + +#endif diff --git a/src/buildtool/execution_api/execution_service/capabilities_server.cpp b/src/buildtool/execution_api/execution_service/capabilities_server.cpp new file mode 100644 index 00000000..198d1fb6 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/capabilities_server.cpp @@ -0,0 +1,57 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/capabilities_server.hpp" + +#include "src/buildtool/compatibility/compatibility.hpp" +#include "src/buildtool/logging/logger.hpp" + +auto CapabilitiesServiceImpl::GetCapabilities( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::GetCapabilitiesRequest* + /*request*/, + ::build::bazel::remote::execution::v2::ServerCapabilities* response) + -> ::grpc::Status { + if (!Compatibility::IsCompatible()) { + auto const* str = "GetCapabilities not implemented"; + Logger::Log(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; + } + ::build::bazel::remote::execution::v2::CacheCapabilities cache; + ::build::bazel::remote::execution::v2::ExecutionCapabilities exec; + + cache.add_digest_function( + ::build::bazel::remote::execution::v2::DigestFunction_Value:: + DigestFunction_Value_SHA256); + cache.mutable_action_cache_update_capabilities()->set_update_enabled(false); + static constexpr std::size_t kMaxBatchTransferSize = 1024 * 1024; + cache.set_max_batch_total_size_bytes(kMaxBatchTransferSize); + static_assert(kMaxBatchTransferSize < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, + "Max batch transfer size too large."); + *(response->mutable_cache_capabilities()) = cache; + + exec.set_digest_function( + ::build::bazel::remote::execution::v2::DigestFunction_Value:: + DigestFunction_Value_SHA256); + exec.set_exec_enabled(true); + + *(response->mutable_execution_capabilities()) = exec; + ::build::bazel::semver::SemVer v{}; + v.set_major(2); + v.set_minor(0); + + *(response->mutable_low_api_version()) = v; + *(response->mutable_high_api_version()) = v; + return ::grpc::Status::OK; +} diff --git a/src/buildtool/execution_api/execution_service/capabilities_server.hpp b/src/buildtool/execution_api/execution_service/capabilities_server.hpp new file mode 100644 index 00000000..62bc4852 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/capabilities_server.hpp @@ -0,0 +1,38 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef CAPABILITIES_SERVER_HPP +#define CAPABILITIES_SERVER_HPP + +#include "build/bazel/remote/execution/v2/remote_execution.grpc.pb.h" + +class CapabilitiesServiceImpl final + : public build::bazel::remote::execution::v2::Capabilities::Service { + public: + // GetCapabilities returns the server capabilities configuration of the + // remote endpoint. + // Only the capabilities of the services supported by the endpoint will + // be returned: + // * Execution + CAS + Action Cache endpoints should return both + // CacheCapabilities and ExecutionCapabilities. + // * Execution only endpoints should return ExecutionCapabilities. + // * CAS + Action Cache only endpoints should return CacheCapabilities. + auto GetCapabilities( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::GetCapabilitiesRequest* + request, + ::build::bazel::remote::execution::v2::ServerCapabilities* response) + -> ::grpc::Status override; +}; +#endif diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp new file mode 100644 index 00000000..025a6757 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -0,0 +1,132 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/cas_server.hpp" + +#include "fmt/format.h" +#include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/execution_api/local/garbage_collector.hpp" + +auto CASServiceImpl::FindMissingBlobs( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::FindMissingBlobsRequest* + request, + ::build::bazel::remote::execution::v2::FindMissingBlobsResponse* response) + -> ::grpc::Status { + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + for (auto const& x : request->blob_digests()) { + auto const& hash = x.hash(); + logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash); + if (NativeSupport::IsTree(hash)) { + if (!storage_.TreePath(x)) { + auto* d = response->add_missing_blob_digests(); + d->CopyFrom(x); + } + } + else if (!storage_.BlobPath(x, false)) { + auto* d = response->add_missing_blob_digests(); + d->CopyFrom(x); + } + } + return ::grpc::Status::OK; +} + +auto CASServiceImpl::BatchUpdateBlobs( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::BatchUpdateBlobsRequest* + request, + ::build::bazel::remote::execution::v2::BatchUpdateBlobsResponse* response) + -> ::grpc::Status { + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + for (auto const& x : request->requests()) { + auto const& hash = x.digest().hash(); + logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash); + auto* r = response->add_responses(); + r->mutable_digest()->CopyFrom(x.digest()); + if (NativeSupport::IsTree(hash)) { + if (!storage_.StoreTree(x.data())) { + auto const& str = fmt::format( + "BatchUpdateBlobs: could not upload tree {}", hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + } + else if (!storage_.StoreBlob(x.data(), false)) { + auto const& str = + fmt::format("BatchUpdateBlobs: could not upload blob {}", hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + } + return ::grpc::Status::OK; +} + +auto CASServiceImpl::BatchReadBlobs( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::BatchReadBlobsRequest* request, + ::build::bazel::remote::execution::v2::BatchReadBlobsResponse* response) + -> ::grpc::Status { + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + for (auto const& digest : request->digests()) { + auto* r = response->add_responses(); + r->mutable_digest()->CopyFrom(digest); + std::optional<std::filesystem::path> path; + if (NativeSupport::IsTree(digest.hash())) { + path = storage_.TreePath(digest); + } + else { + path = storage_.BlobPath(digest, false); + } + if (!path) { + google::rpc::Status status; + status.set_code(grpc::StatusCode::NOT_FOUND); + r->mutable_status()->CopyFrom(status); + + continue; + } + std::ifstream cert{*path}; + std::string tmp((std::istreambuf_iterator<char>(cert)), + std::istreambuf_iterator<char>()); + *(r->mutable_data()) = std::move(tmp); + + r->mutable_status()->CopyFrom(google::rpc::Status{}); + } + return ::grpc::Status::OK; +} + +auto CASServiceImpl::GetTree( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::GetTreeRequest* /*request*/, + ::grpc::ServerWriter< + ::build::bazel::remote::execution::v2::GetTreeResponse>* /*writer*/) + -> ::grpc::Status { + auto const* str = "GetTree not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} diff --git a/src/buildtool/execution_api/execution_service/cas_server.hpp b/src/buildtool/execution_api/execution_service/cas_server.hpp new file mode 100644 index 00000000..29045a0c --- /dev/null +++ b/src/buildtool/execution_api/execution_service/cas_server.hpp @@ -0,0 +1,128 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef CAS_SERVER_HPP +#define CAS_SERVER_HPP +#include "build/bazel/remote/execution/v2/remote_execution.grpc.pb.h" +#include "src/buildtool/execution_api/local/local_storage.hpp" +#include "src/buildtool/logging/logger.hpp" + +class CASServiceImpl final : public build::bazel::remote::execution::v2:: + ContentAddressableStorage::Service { + public: + // Determine if blobs are present in the CAS. + // + // Clients can use this API before uploading blobs to determine which ones + // are already present in the CAS and do not need to be uploaded again. + // + // There are no method-specific errors. + auto FindMissingBlobs( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::FindMissingBlobsRequest* + request, + ::build::bazel::remote::execution::v2::FindMissingBlobsResponse* + response) -> ::grpc::Status override; + // Upload many blobs at once. + // + // The server may enforce a limit of the combined total size of blobs + // to be uploaded using this API. This limit may be obtained using the + // [Capabilities][build.bazel.remote.execution.v2.Capabilities] API. + // Requests exceeding the limit should either be split into smaller + // chunks or uploaded using the + // [ByteStream API][google.bytestream.ByteStream], as appropriate. + // + // This request is equivalent to calling a Bytestream `Write` request + // on each individual blob, in parallel. The requests may succeed or fail + // independently. + // + // Errors: + // + // * `INVALID_ARGUMENT`: The client attempted to upload more than the + // server supported limit. + // + // Individual requests may return the following errors, additionally: + // + // * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the + // blob. + // * `INVALID_ARGUMENT`: The + // [Digest][build.bazel.remote.execution.v2.Digest] does not match the + // provided data. + auto BatchUpdateBlobs( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::BatchUpdateBlobsRequest* + request, + ::build::bazel::remote::execution::v2::BatchUpdateBlobsResponse* + response) -> ::grpc::Status override; + // Download many blobs at once. + // + // The server may enforce a limit of the combined total size of blobs + // to be downloaded using this API. This limit may be obtained using the + // [Capabilities][build.bazel.remote.execution.v2.Capabilities] API. + // Requests exceeding the limit should either be split into smaller + // chunks or downloaded using the + // [ByteStream API][google.bytestream.ByteStream], as appropriate. + // + // This request is equivalent to calling a Bytestream `Read` request + // on each individual blob, in parallel. The requests may succeed or fail + // independently. + // + // Errors: + // + // * `INVALID_ARGUMENT`: The client attempted to read more than the + // server supported limit. + // + // Every error on individual read will be returned in the corresponding + // digest status. + auto BatchReadBlobs( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::BatchReadBlobsRequest* + request, + ::build::bazel::remote::execution::v2::BatchReadBlobsResponse* response) + -> ::grpc::Status override; + // Fetch the entire directory tree rooted at a node. + // + // This request must be targeted at a + // [Directory][build.bazel.remote.execution.v2.Directory] stored in the + // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage] + // (CAS). The server will enumerate the `Directory` tree recursively and + // return every node descended from the root. + // + // The GetTreeRequest.page_token parameter can be used to skip ahead in + // the stream (e.g. when retrying a partially completed and aborted + // request), by setting it to a value taken from + // GetTreeResponse.next_page_token of the last successfully processed + // GetTreeResponse). + // + // The exact traversal order is unspecified and, unless retrieving + // subsequent pages from an earlier request, is not guaranteed to be stable + // across multiple invocations of `GetTree`. + // + // If part of the tree is missing from the CAS, the server will return the + // portion present and omit the rest. + // + // Errors: + // + // * `NOT_FOUND`: The requested tree root is not present in the CAS. + auto GetTree( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::GetTreeRequest* request, + ::grpc::ServerWriter< + ::build::bazel::remote::execution::v2::GetTreeResponse>* writer) + -> ::grpc::Status override; + + private: + LocalStorage storage_{}; + Logger logger_{"execution-service"}; +}; +#endif diff --git a/src/buildtool/execution_api/execution_service/execution_server.cpp b/src/buildtool/execution_api/execution_service/execution_server.cpp new file mode 100644 index 00000000..774128a2 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/execution_server.cpp @@ -0,0 +1,153 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/execution_server.hpp" + +#include <algorithm> +#include <fstream> +#include <iostream> +#include <string> + +#include "fmt/format.h" +#include "src/buildtool/execution_api/local/garbage_collector.hpp" + +auto ExecutionServiceImpl::Execute( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2::ExecuteRequest* request, + ::grpc::ServerWriter<::google::longrunning::Operation>* writer) + -> ::grpc::Status { + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + auto path = storage_.BlobPath(request->action_digest(), false); + if (!path) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, + fmt::format("could not retrieve blob {} from cas", + request->action_digest().hash())}; + } + ::build::bazel::remote::execution::v2::Action a{}; + { + std::ifstream f(*path); + if (!a.ParseFromIstream(&f)) { + return ::grpc::Status{ + grpc::StatusCode::INTERNAL, + fmt::format("failed to parse action from blob {}", + request->action_digest().hash())}; + } + } + path = storage_.BlobPath(a.command_digest(), false); + if (!path) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, + fmt::format("could not retrieve blob {} from cas", + request->action_digest().hash())}; + } + ::build::bazel::remote::execution::v2::Command c{}; + { + std::ifstream f(*path); + if (!c.ParseFromIstream(&f)) { + return ::grpc::Status{ + grpc::StatusCode::INTERNAL, + fmt::format("failed to parse command from blob {}", + a.command_digest().hash())}; + } + } + if (Compatibility::IsCompatible()) { + path = storage_.BlobPath(a.input_root_digest(), false); + } + else { + path = storage_.TreePath(a.input_root_digest()); + } + if (!path) { + return ::grpc::Status{grpc::StatusCode::INTERNAL, + fmt::format("could not retrieve tree {} from cas", + a.input_root_digest().hash())}; + } + auto op = ::google::longrunning::Operation{}; + op.set_name("just-remote-execution"); + std::map<std::string, std::string> env_vars; + for (auto const& x : c.environment_variables()) { + env_vars.emplace(x.name(), x.value()); + } + auto action = api_->CreateAction( + ArtifactDigest{a.input_root_digest()}, + {c.arguments().begin(), c.arguments().end()}, + {c.output_files().begin(), c.output_files().end()}, + {c.output_directories().begin(), c.output_directories().end()}, + env_vars, + {}); + logger_.Emit(LogLevel::Info, "Execute {}", request->action_digest().hash()); + auto tmp = action->Execute(&logger_); + ::build::bazel::remote::execution::v2::ExecuteResponse response{}; + for (auto const& [path, info] : tmp->Artifacts()) { + if (info.type == ObjectType::Tree) { + auto* d = response.mutable_result()->add_output_directories(); + *(d->mutable_path()) = path; + *(d->mutable_tree_digest()) = + static_cast<::build::bazel::remote::execution::v2::Digest>( + info.digest); + } + else { + auto* f = response.mutable_result()->add_output_files(); + *(f->mutable_path()) = path; + *(f->mutable_digest()) = + static_cast<::build::bazel::remote::execution::v2::Digest>( + info.digest); + if (info.type == ObjectType::Executable) { + f->set_is_executable(true); + } + } + } + response.set_cached_result(tmp->IsCached()); + + if (tmp->HasStdErr()) { + logger_.Emit(LogLevel::Error, tmp->StdErr()); + } + op.set_done(true); + ::google::rpc::Status status{}; + *(response.mutable_status()) = status; + response.mutable_result()->set_exit_code(tmp->ExitCode()); + if (tmp->HasStdErr()) { + response.mutable_result()->set_stderr_raw(tmp->StdErr().data()); + } + if (tmp->HasStdOut()) { + response.mutable_result()->set_stdout_raw(tmp->StdOut().data()); + } + + op.mutable_response()->PackFrom(response); + writer->Write(op); + if (tmp->ExitCode() == 0 && + !storage_.StoreActionResult(request->action_digest(), + response.result())) { + auto str = fmt::format("Could not store action result for action {}", + request->action_digest().hash()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + + return ::grpc::Status::OK; +} + +auto ExecutionServiceImpl::WaitExecution( + ::grpc::ServerContext* /*context*/, + const ::build::bazel::remote::execution::v2:: + WaitExecutionRequest* /*request*/, + ::grpc::ServerWriter<::google::longrunning::Operation>* /*writer*/) + -> ::grpc::Status { + auto const* str = "WaitExecution not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} diff --git a/src/buildtool/execution_api/execution_service/execution_server.hpp b/src/buildtool/execution_api/execution_service/execution_server.hpp new file mode 100644 index 00000000..02c4ed36 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/execution_server.hpp @@ -0,0 +1,117 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef EXECUTION_SERVER_HPP +#define EXECUTION_SERVER_HPP + +#include "build/bazel/remote/execution/v2/remote_execution.grpc.pb.h" +#include "src/buildtool/execution_api/local/local_api.hpp" +#include "src/buildtool/execution_api/local/local_storage.hpp" +#include "src/buildtool/logging/logger.hpp" + +class ExecutionServiceImpl final + : public build::bazel::remote::execution::v2::Execution::Service { + public: + ExecutionServiceImpl() = default; + // Execute an action remotely. + // + // In order to execute an action, the client must first upload all of the + // inputs, the + // [Command][build.bazel.remote.execution.v2.Command] to run, and the + // [Action][build.bazel.remote.execution.v2.Action] into the + // [ContentAddressableStorage][build.bazel.remote.execution.v2.ContentAddressableStorage]. + // It then calls `Execute` with an `action_digest` referring to them. The + // server will run the action and eventually return the result. + // + // The input `Action`'s fields MUST meet the various canonicalization + // requirements specified in the documentation for their types so that it + // has the same digest as other logically equivalent `Action`s. The server + // MAY enforce the requirements and return errors if a non-canonical input + // is received. It MAY also proceed without verifying some or all of the + // requirements, such as for performance reasons. If the server does not + // verify the requirement, then it will treat the `Action` as distinct from + // another logically equivalent action if they hash differently. + // + // Returns a stream of + // [google.longrunning.Operation][google.longrunning.Operation] messages + // describing the resulting execution, with eventual `response` + // [ExecuteResponse][build.bazel.remote.execution.v2.ExecuteResponse]. The + // `metadata` on the operation is of type + // [ExecuteOperationMetadata][build.bazel.remote.execution.v2.ExecuteOperationMetadata]. + // + // If the client remains connected after the first response is returned + // after the server, then updates are streamed as if the client had called + // [WaitExecution][build.bazel.remote.execution.v2.Execution.WaitExecution] + // until the execution completes or the request reaches an error. The + // operation can also be queried using [Operations + // API][google.longrunning.Operations.GetOperation]. + // + // The server NEED NOT implement other methods or functionality of the + // Operations API. + // + // Errors discovered during creation of the `Operation` will be reported + // as gRPC Status errors, while errors that occurred while running the + // action will be reported in the `status` field of the `ExecuteResponse`. + // The server MUST NOT set the `error` field of the `Operation` proto. The + // possible errors include: + // + // * `INVALID_ARGUMENT`: One or more arguments are invalid. + // * `FAILED_PRECONDITION`: One or more errors occurred in setting up the + // action requested, such as a missing input or command or no worker being + // available. The client may be able to fix the errors and retry. + // * `RESOURCE_EXHAUSTED`: There is insufficient quota of some resource to + // run + // the action. + // * `UNAVAILABLE`: Due to a transient condition, such as all workers being + // occupied (and the server does not support a queue), the action could + // not be started. The client should retry. + // * `INTERNAL`: An internal error occurred in the execution engine or the + // worker. + // * `DEADLINE_EXCEEDED`: The execution timed out. + // * `CANCELLED`: The operation was cancelled by the client. This status is + // only possible if the server implements the Operations API + // CancelOperation method, and it was called for the current execution. + // + // In the case of a missing input or command, the server SHOULD additionally + // send a [PreconditionFailure][google.rpc.PreconditionFailure] error detail + // where, for each requested blob not present in the CAS, there is a + // `Violation` with a `type` of `MISSING` and a `subject` of + // `"blobs/{hash}/{size}"` indicating the digest of the missing blob. + auto Execute( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::ExecuteRequest* request, + ::grpc::ServerWriter< ::google::longrunning::Operation>* writer) + -> ::grpc::Status override; + + // Wait for an execution operation to complete. When the client initially + // makes the request, the server immediately responds with the current + // status of the execution. The server will leave the request stream open + // until the operation completes, and then respond with the completed + // operation. The server MAY choose to stream additional updates as + // execution progresses, such as to provide an update as to the state of the + // execution. + auto WaitExecution( + ::grpc::ServerContext* context, + const ::build::bazel::remote::execution::v2::WaitExecutionRequest* + request, + ::grpc::ServerWriter< ::google::longrunning::Operation>* writer) + -> ::grpc::Status override; + + private: + LocalStorage storage_{}; + IExecutionApi::Ptr api_{new LocalApi()}; + Logger logger_{"execution-service"}; +}; + +#endif diff --git a/src/buildtool/execution_api/execution_service/server_implementation.cpp b/src/buildtool/execution_api/execution_service/server_implementation.cpp new file mode 100644 index 00000000..d38a11d5 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/server_implementation.cpp @@ -0,0 +1,135 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/server_implementation.hpp" + +#include <iostream> +#include <memory> + +#include <sys/types.h> + +#include "fmt/format.h" +#include "grpcpp/grpcpp.h" +#include "nlohmann/json.hpp" +#include "src/buildtool/auth/authentication.hpp" +#include "src/buildtool/execution_api/execution_service/ac_server.hpp" +#include "src/buildtool/execution_api/execution_service/bytestream_server.hpp" +#include "src/buildtool/execution_api/execution_service/capabilities_server.hpp" +#include "src/buildtool/execution_api/execution_service/cas_server.hpp" +#include "src/buildtool/execution_api/execution_service/execution_server.hpp" +#include "src/buildtool/execution_api/remote/config.hpp" +#include "src/buildtool/logging/logger.hpp" + +namespace { +template <typename T> +auto TryWrite(std::string const& file, T const& content) noexcept -> bool { + std::ofstream of{file}; + if (!of.good()) { + Logger::Log(LogLevel::Error, + "Could not open {}. Make sure to have write permissions", + file); + return false; + } + of << content; + return true; +} +} // namespace + +auto ServerImpl::Run() -> bool { + ExecutionServiceImpl es{}; + ActionCacheServiceImpl ac{}; + CASServiceImpl cas{}; + BytestreamServiceImpl b{}; + CapabilitiesServiceImpl cap{}; + + grpc::ServerBuilder builder; + + builder.RegisterService(&es) + .RegisterService(&ac) + .RegisterService(&cas) + .RegisterService(&b) + .RegisterService(&cap); + + std::shared_ptr<grpc::ServerCredentials> creds; + if (Auth::GetAuthMethod() == AuthMethod::kTLS) { + auto tls_opts = grpc::SslServerCredentialsOptions{}; + + tls_opts.pem_root_certs = Auth::TLS::CACert(); + grpc::SslServerCredentialsOptions::PemKeyCertPair keycert = { + Auth::TLS::ServerKey(), Auth::TLS::ServerCert()}; + + tls_opts.pem_key_cert_pairs.emplace_back(keycert); + + creds = grpc::SslServerCredentials(tls_opts); + } + else { + creds = grpc::InsecureServerCredentials(); + } + + builder.AddListeningPort( + fmt::format("{}:{}", interface_, port_), creds, &port_); + + auto server = builder.BuildAndStart(); + if (!server) { + Logger::Log(LogLevel::Error, "Could not start execution service"); + return false; + } + + auto pid = getpid(); + + nlohmann::json const& info = { + {"interface", interface_}, {"port", port_}, {"pid", pid}}; + + if (!pid_file_.empty()) { + if (!TryWrite(pid_file_, pid)) { + server->Shutdown(); + return false; + } + } + + auto const& info_str = nlohmann::to_string(info); + Logger::Log(LogLevel::Info, + fmt::format("execution service started: {}", info_str)); + + if (!info_file_.empty()) { + if (!TryWrite(info_file_, info_str)) { + server->Shutdown(); + return false; + } + } + + server->Wait(); + return true; +} + +[[nodiscard]] auto ServerImpl::SetInfoFile(std::string const& x) noexcept + -> bool { + Instance().info_file_ = x; + return true; +} + +[[nodiscard]] auto ServerImpl::SetPidFile(std::string const& x) noexcept + -> bool { + Instance().pid_file_ = x; + return true; +} + +[[nodiscard]] auto ServerImpl::SetPort(int const x) noexcept -> bool { + auto port_num = ParsePort(x); + if (!port_num) { + return false; + } + Instance().port_ = static_cast<int>(*port_num); + return true; +}
\ No newline at end of file diff --git a/src/buildtool/execution_api/execution_service/server_implementation.hpp b/src/buildtool/execution_api/execution_service/server_implementation.hpp new file mode 100644 index 00000000..4570a66c --- /dev/null +++ b/src/buildtool/execution_api/execution_service/server_implementation.hpp @@ -0,0 +1,58 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef SERVER_IMPLEMENATION_HPP +#define SERVER_IMPLEMENATION_HPP + +#include <fstream> +#include <string> + +class ServerImpl { + public: + ServerImpl() noexcept = default; + [[nodiscard]] static auto Instance() noexcept -> ServerImpl& { + static ServerImpl x; + return x; + } + + [[nodiscard]] static auto SetInterface(std::string const& x) noexcept + -> bool { + Instance().interface_ = x; + return true; + } + + [[nodiscard]] static auto SetPidFile(std::string const& x) noexcept -> bool; + + [[nodiscard]] static auto SetPort(int x) noexcept -> bool; + + [[nodiscard]] static auto SetInfoFile(std::string const& x) noexcept + -> bool; + + ServerImpl(ServerImpl const&) = delete; + auto operator=(ServerImpl const&) noexcept -> ServerImpl& = delete; + + ServerImpl(ServerImpl&&) noexcept = delete; + auto operator=(ServerImpl&&) noexcept -> ServerImpl& = delete; + + auto Run() -> bool; + ~ServerImpl() = default; + + private: + std::string interface_{"127.0.0.1"}; + int port_{0}; + std::string info_file_{}; + std::string pid_file_{}; +}; + +#endif diff --git a/src/buildtool/execution_api/remote/config.hpp b/src/buildtool/execution_api/remote/config.hpp index 42d0be80..0d674220 100644 --- a/src/buildtool/execution_api/remote/config.hpp +++ b/src/buildtool/execution_api/remote/config.hpp @@ -39,7 +39,6 @@ using Port = type_safe_arithmetic<PortTag>; if (port_num >= 0 and port_num <= kMaxPortNumber) { return gsl::narrow_cast<Port::value_t>(port_num); } - } catch (std::out_of_range const& e) { Logger::Log(LogLevel::Error, "Port raised out_of_range exception."); } @@ -56,6 +55,7 @@ using Port = type_safe_arithmetic<PortTag>; } return std::nullopt; } + class RemoteExecutionConfig { public: struct ServerAddress { diff --git a/src/buildtool/main/TARGETS b/src/buildtool/main/TARGETS index 46c61343..725bd83a 100644 --- a/src/buildtool/main/TARGETS +++ b/src/buildtool/main/TARGETS @@ -20,6 +20,9 @@ , ["src/utils/cpp", "concepts"] , ["src/utils/cpp", "json"] , ["src/buildtool/auth", "auth"] + , [ "src/buildtool/execution_api/execution_service" + , "server_implementation" + ] , "common" , "version" , "analyse" diff --git a/src/buildtool/main/main.cpp b/src/buildtool/main/main.cpp index 60653651..146ec221 100644 --- a/src/buildtool/main/main.cpp +++ b/src/buildtool/main/main.cpp @@ -40,6 +40,7 @@ #include "src/buildtool/main/install_cas.hpp" #ifndef BOOTSTRAP_BUILD_TOOL #include "src/buildtool/auth/authentication.hpp" +#include "src/buildtool/execution_api/execution_service/server_implementation.hpp" #include "src/buildtool/execution_api/local/garbage_collector.hpp" #include "src/buildtool/graph_traverser/graph_traverser.hpp" #include "src/buildtool/progress_reporting/base_progress_reporter.hpp" @@ -68,7 +69,8 @@ enum class SubCommand { kRebuild, kInstallCas, kTraverse, - kGc + kGc, + kExecute }; struct CommandLineArguments { @@ -84,7 +86,10 @@ struct CommandLineArguments { RebuildArguments rebuild; FetchArguments fetch; GraphArguments graph; - AuthArguments auth; + CommonAuthArguments auth; + ClientAuthArguments cauth; + ServerAuthArguments sauth; + ExecutionServiceArguments es; }; /// \brief Setup arguments for sub command "just describe". @@ -119,7 +124,9 @@ auto SetupBuildCommandArguments( SetupAnalysisArguments(app, &clargs->analysis); SetupCacheArguments(app, &clargs->endpoint); SetupEndpointArguments(app, &clargs->endpoint); - SetupAuthArguments(app, &clargs->auth); + SetupCommonAuthArguments(app, &clargs->auth); + SetupClientAuthArguments(app, &clargs->cauth); + SetupCommonBuildArguments(app, &clargs->build); SetupBuildArguments(app, &clargs->build); SetupCompatibilityArguments(app); } @@ -147,7 +154,8 @@ auto SetupInstallCasCommandArguments( SetupCompatibilityArguments(app); SetupCacheArguments(app, &clargs->endpoint); SetupEndpointArguments(app, &clargs->endpoint); - SetupAuthArguments(app, &clargs->auth); + SetupCommonAuthArguments(app, &clargs->auth); + SetupClientAuthArguments(app, &clargs->cauth); SetupFetchArguments(app, &clargs->fetch); SetupLogArguments(app, &clargs->log); } @@ -160,8 +168,10 @@ auto SetupTraverseCommandArguments( SetupLogArguments(app, &clargs->log); SetupCacheArguments(app, &clargs->endpoint); SetupEndpointArguments(app, &clargs->endpoint); - SetupAuthArguments(app, &clargs->auth); + SetupCommonAuthArguments(app, &clargs->auth); + SetupClientAuthArguments(app, &clargs->cauth); SetupGraphArguments(app, &clargs->graph); // instead of analysis + SetupCommonBuildArguments(app, &clargs->build); SetupBuildArguments(app, &clargs->build); SetupStageArguments(app, &clargs->stage); SetupCompatibilityArguments(app); @@ -174,6 +184,19 @@ auto SetupGcCommandArguments( SetupCacheArguments(app, &clargs->endpoint); } +/// \brief Setup arguments for sub command "just execute". +auto SetupExecutionServiceCommandArguments( + gsl::not_null<CLI::App*> const& app, + gsl::not_null<CommandLineArguments*> const& clargs) { + SetupCompatibilityArguments(app); + SetupCommonBuildArguments(app, &clargs->build); + SetupCacheArguments(app, &clargs->endpoint); + SetupExecutionServiceArguments(app, &clargs->es); + SetupLogArguments(app, &clargs->log); + SetupCommonAuthArguments(app, &clargs->auth); + SetupServerAuthArguments(app, &clargs->sauth); +} + auto ParseCommandLineArguments(int argc, char const* const* argv) -> CommandLineArguments { CLI::App app("just, a generic build tool"); @@ -194,6 +217,8 @@ auto ParseCommandLineArguments(int argc, char const* const* argv) app.add_subcommand("install-cas", "Fetch and stage artifact from CAS."); auto* cmd_gc = app.add_subcommand("gc", "Trigger garbage collection of local cache."); + auto* cmd_execution = app.add_subcommand( + "execute", "Start single node execution service on this machine."); auto* cmd_traverse = app.group("") // group for creating hidden options ->add_subcommand("traverse", @@ -209,7 +234,7 @@ auto ParseCommandLineArguments(int argc, char const* const* argv) SetupInstallCasCommandArguments(cmd_install_cas, &clargs); SetupTraverseCommandArguments(cmd_traverse, &clargs); SetupGcCommandArguments(cmd_gc, &clargs); - + SetupExecutionServiceCommandArguments(cmd_execution, &clargs); try { app.parse(argc, argv); } catch (CLI::Error& e) { @@ -246,6 +271,9 @@ auto ParseCommandLineArguments(int argc, char const* const* argv) else if (*cmd_gc) { clargs.cmd = SubCommand::kGc; } + else if (*cmd_execution) { + clargs.cmd = SubCommand::kExecute; + } return clargs; } @@ -266,7 +294,6 @@ void SetupLogging(LogArguments const& clargs) { #ifndef BOOTSTRAP_BUILD_TOOL void SetupExecutionConfig(EndpointArguments const& eargs, - AuthArguments const& authargs, BuildArguments const& bargs, RebuildArguments const& rargs) { using LocalConfig = LocalExecutionConfig; @@ -304,6 +331,11 @@ void SetupExecutionConfig(EndpointArguments const& eargs, std::exit(kExitFailure); } } +} + +void SetupAuthConfig(CommonAuthArguments const& authargs, + ClientAuthArguments const& client_authargs, + ServerAuthArguments const& server_authargs) { auto use_tls = false; if (authargs.tls_ca_cert) { use_tls = true; @@ -314,24 +346,46 @@ void SetupExecutionConfig(EndpointArguments const& eargs, std::exit(kExitFailure); } } - if (authargs.tls_client_cert) { + if (client_authargs.tls_client_cert) { use_tls = true; - if (not Auth::TLS::SetClientCertificate(*authargs.tls_client_cert)) { + if (not Auth::TLS::SetClientCertificate( + *client_authargs.tls_client_cert)) { Logger::Log(LogLevel::Error, "Could not read '{}' certificate.", - authargs.tls_client_cert->string()); + client_authargs.tls_client_cert->string()); std::exit(kExitFailure); } } - if (authargs.tls_client_key) { + if (client_authargs.tls_client_key) { use_tls = true; - if (not Auth::TLS::SetClientKey(*authargs.tls_client_key)) { + if (not Auth::TLS::SetClientKey(*client_authargs.tls_client_key)) { Logger::Log(LogLevel::Error, "Could not read '{}' key.", - authargs.tls_client_key->string()); + client_authargs.tls_client_key->string()); std::exit(kExitFailure); } } + + if (server_authargs.tls_server_cert) { + use_tls = true; + if (not Auth::TLS::SetServerCertificate( + *server_authargs.tls_server_cert)) { + Logger::Log(LogLevel::Error, + "Could not read '{}' certificate.", + server_authargs.tls_server_cert->string()); + std::exit(kExitFailure); + } + } + if (server_authargs.tls_server_key) { + use_tls = true; + if (not Auth::TLS::SetServerKey(*server_authargs.tls_server_key)) { + Logger::Log(LogLevel::Error, + "Could not read '{}' key.", + server_authargs.tls_server_key->string()); + std::exit(kExitFailure); + } + } + if (use_tls) { if (not Auth::TLS::Validate()) { std::exit(kExitFailure); @@ -339,6 +393,39 @@ void SetupExecutionConfig(EndpointArguments const& eargs, } } +void SetupExecutionServiceConfig(ExecutionServiceArguments const& args) { + if (args.port) { + if (!ServerImpl::SetPort(*args.port)) { + Logger::Log(LogLevel::Error, "Invalid port '{}'", *args.port); + std::exit(kExitFailure); + } + } + if (args.info_file) { + if (!ServerImpl::SetInfoFile(*args.info_file)) { + Logger::Log(LogLevel::Error, + "Invalid info-file '{}'", + args.info_file->string()); + std::exit(kExitFailure); + } + } + if (args.interface) { + if (!ServerImpl::SetInterface(*args.interface)) { + Logger::Log(LogLevel::Error, + "Invalid interface '{}'", + args.info_file->string()); + std::exit(kExitFailure); + } + } + if (args.pid_file) { + if (!ServerImpl::SetPidFile(*args.pid_file)) { + Logger::Log(LogLevel::Error, + "Invalid pid-file '{}'", + args.info_file->string()); + std::exit(kExitFailure); + } + } +} + void SetupHashFunction() { HashFunction::SetHashType(Compatibility::IsCompatible() ? HashFunction::JustHash::Compatible @@ -1157,10 +1244,9 @@ auto main(int argc, char* argv[]) -> int { } #ifndef BOOTSTRAP_BUILD_TOOL SetupHashFunction(); - SetupExecutionConfig(arguments.endpoint, - arguments.auth, - arguments.build, - arguments.rebuild); + SetupExecutionConfig( + arguments.endpoint, arguments.build, arguments.rebuild); + SetupAuthConfig(arguments.auth, arguments.cauth, arguments.sauth); if (arguments.cmd == SubCommand::kGc) { if (GarbageCollector::TriggerGarbageCollection()) { @@ -1169,6 +1255,13 @@ auto main(int argc, char* argv[]) -> int { return kExitFailure; } + if (arguments.cmd == SubCommand::kExecute) { + SetupExecutionServiceConfig(arguments.es); + if (!ServerImpl::Instance().Run()) { + return kExitFailure; + } + return kExitSuccess; + } #endif auto jobs = arguments.build.build_jobs > 0 ? arguments.build.build_jobs |