diff options
author | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-12-11 11:33:50 +0100 |
---|---|---|
committer | Paul Cristian Sarbu <paul.cristian.sarbu@huawei.com> | 2023-12-12 14:37:18 +0100 |
commit | 3fdfd22e7f02effbf28ad99b00e4916ae6f4b4ad (patch) | |
tree | 793686e03f90e67b103713cfafa7c03743ac7311 /src/buildtool/serve_api/serve_service/target.cpp | |
parent | 21a0e5ca4c9bb8d8b92822fac7d8dc66b5a4e0e8 (diff) | |
download | justbuild-3fdfd22e7f02effbf28ad99b00e4916ae6f4b4ad.tar.gz |
serve target: Update server-side to compute correct target cache shard
Diffstat (limited to 'src/buildtool/serve_api/serve_service/target.cpp')
-rw-r--r-- | src/buildtool/serve_api/serve_service/target.cpp | 115 |
1 files changed, 104 insertions, 11 deletions
diff --git a/src/buildtool/serve_api/serve_service/target.cpp b/src/buildtool/serve_api/serve_service/target.cpp index b4a0968a..8e46b77b 100644 --- a/src/buildtool/serve_api/serve_service/target.cpp +++ b/src/buildtool/serve_api/serve_service/target.cpp @@ -14,8 +14,9 @@ #include "src/buildtool/serve_api/serve_service/target.hpp" +#include <vector> + #include "fmt/core.h" -#include "nlohmann/json.hpp" #include "src/buildtool/build_engine/base_maps/entity_name.hpp" #include "src/buildtool/build_engine/base_maps/entity_name_data.hpp" #include "src/buildtool/build_engine/expression/configuration.hpp" @@ -24,7 +25,7 @@ #include "src/buildtool/build_engine/target_map/configured_target.hpp" #include "src/buildtool/build_engine/target_map/result_map.hpp" #include "src/buildtool/common/artifact.hpp" -#include "src/buildtool/common/artifact_digest.hpp" +#include "src/buildtool/common/remote/remote_common.hpp" #include "src/buildtool/file_system/object_type.hpp" #include "src/buildtool/graph_traverser/graph_traverser.hpp" #include "src/buildtool/main/analyse.hpp" @@ -38,19 +39,59 @@ #include "src/buildtool/storage/target_cache_key.hpp" #include "src/utils/cpp/verify_hash.hpp" +auto TargetService::GetDispatchList(ArtifactDigest const& dispatch_digest) + -> std::variant<::grpc::Status, nlohmann::json> { + using result_t = std::variant<::grpc::Status, nlohmann::json>; + // get blob from remote cas + auto const& dispatch_info = Artifact::ObjectInfo{.digest = dispatch_digest, + .type = ObjectType::File}; + if (!local_api_->IsAvailable(dispatch_digest) and + !remote_api_->RetrieveToCas({dispatch_info}, &*local_api_)) { + return result_t( + std::in_place_index<0>, + ::grpc::Status{::grpc::StatusCode::FAILED_PRECONDITION, + fmt::format("Could not retrieve from " + "remote-execution end point blob {}", + dispatch_info.ToString())}); + } + // get blob content + auto const& dispatch_str = local_api_->RetrieveToMemory(dispatch_info); + if (not dispatch_str) { + // this should not fail unless something really broke... + return result_t( + std::in_place_index<0>, + ::grpc::Status{ + ::grpc::StatusCode::INTERNAL, + fmt::format("Unexpected failure in retrieving blob {} from CAS", + dispatch_info.ToString())}); + } + // parse content + auto parsed = ParseDispatch(*dispatch_str); + if (parsed.index() == 0) { + // pass the parsing error forward + return result_t(std::in_place_index<0>, + ::grpc::Status{::grpc::StatusCode::FAILED_PRECONDITION, + std::get<0>(parsed)}); + } + auto const& dispatch = std::get<1>(parsed); + auto dispatch_list = nlohmann::json::array(); + for (auto const& [props, endpoint] : dispatch) { + auto entry = nlohmann::json::array(); + entry.push_back(nlohmann::json(props)); + entry.push_back(endpoint.ToJson()); + dispatch_list.push_back(entry); + } + return result_t(std::in_place_index<1>, std::move(dispatch_list)); +} + auto TargetService::ServeTarget( ::grpc::ServerContext* /*context*/, const ::justbuild::just_serve::ServeTargetRequest* request, ::justbuild::just_serve::ServeTargetResponse* response) -> ::grpc::Status { + // check target cache key hash for validity if (auto error_msg = IsAHash(request->target_cache_key_id().hash()); error_msg) { - logger_->Emit(LogLevel::Debug, *error_msg); - return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, *error_msg}; - } - if (auto error_msg = - IsAHash(request->execution_backend_description_id().hash()); - error_msg) { - logger_->Emit(LogLevel::Debug, *error_msg); + logger_->Emit(LogLevel::Error, *error_msg); return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, *error_msg}; } auto const& target_cache_key_digest = @@ -64,8 +105,51 @@ auto TargetService::ServeTarget( return ::grpc::Status{::grpc::StatusCode::INTERNAL, error_msg}; } + // start filling in the backend description + auto address = RemoteExecutionConfig::RemoteAddress(); + auto description = nlohmann::json{ + {"remote_address", address ? address->ToJson() : nlohmann::json{}}}; + + // read in the execution properties and add it to the description + description["platform_properties"] = std::map<std::string, std::string>{}; + for (auto const& p : request->execution_properties()) { + description["platform_properties"][p.name()] = p.value(); + } + + // read in the dispatch list and add it to the description, if not empty + if (auto error_msg = IsAHash(request->dispatch_info().hash()); error_msg) { + logger_->Emit(LogLevel::Error, *error_msg); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, *error_msg}; + } + auto const& dispatch_info_digest = ArtifactDigest{request->dispatch_info()}; + auto res = GetDispatchList(dispatch_info_digest); + if (res.index() == 0) { + auto err = std::get<0>(res); + logger_->Emit(LogLevel::Error, err.error_message()); + return err; + } + if (auto dispatch_list = std::get<1>(res); not dispatch_list.empty()) { + description["endpoint dispatch list"] = std::move(dispatch_list); + } + + // add backend description to CAS + auto const description_str = description.dump( + 2, ' ', false, nlohmann::json::error_handler_t::replace); + auto execution_backend_dgst = + ArtifactDigest::Create<ObjectType::File>(description_str); + auto const& execution_info = + Artifact::ObjectInfo{.digest = ArtifactDigest{execution_backend_dgst}, + .type = ObjectType::File}; + if (!local_api_->RetrieveToCas({execution_info}, &*remote_api_)) { + auto msg = fmt::format("Failed to upload blob {} to remote CAS", + execution_info.ToString()); + logger_->Emit(LogLevel::Error, msg); + return ::grpc::Status{::grpc::StatusCode::UNAVAILABLE, msg}; + } + + // get a target cache instance with the correct computed shard auto const& tc = Storage::Instance().TargetCache().WithShard( - ArtifactDigest{request->execution_backend_description_id()}.hash()); + execution_backend_dgst.hash()); auto const& tc_key = TargetCacheKey{{target_cache_key_digest, ObjectType::File}}; @@ -120,6 +204,14 @@ auto TargetService::ServeTarget( auto const& target_description_str = local_api_->RetrieveToMemory(target_cache_key_info); + if (not target_description_str) { + // this should not fail unless something really broke... + auto msg = + fmt::format("Unexpected failure in retrieving blob {} from CAS", + target_cache_key_info.ToString()); + logger_->Emit(LogLevel::Error, msg); + return ::grpc::Status{::grpc::StatusCode::INTERNAL, msg}; + } ExpressionPtr target_description_dict{}; try { @@ -340,7 +432,8 @@ auto TargetService::ServeTarget( jobs, traverser.GetLocalApi(), traverser.GetRemoteApi(), - RemoteServeConfig::TCStrategy()); + RemoteServeConfig::TCStrategy(), + *tc); if (build_result->failed_artifacts) { auto msg = |