diff options
Diffstat (limited to 'src/buildtool/execution_api/execution_service')
5 files changed, 236 insertions, 101 deletions
diff --git a/src/buildtool/execution_api/execution_service/TARGETS b/src/buildtool/execution_api/execution_service/TARGETS index 85068f0f..e494711c 100644 --- a/src/buildtool/execution_api/execution_service/TARGETS +++ b/src/buildtool/execution_api/execution_service/TARGETS @@ -58,12 +58,7 @@ , ["@", "fmt", "", "fmt"] , ["src/buildtool/storage", "storage"] , ["src/utils/cpp", "verify_hash"] - , ["src/buildtool/common", "common"] - , ["src/buildtool/file_system", "git_repo"] - , ["src/buildtool/file_system", "object_type"] - , ["src/buildtool/logging", "log_level"] - , ["src/utils/cpp", "hex_string"] - , "file_chunker" + , "cas_utils" ] } , "server_implementation": @@ -151,4 +146,27 @@ , "stage": ["src", "buildtool", "execution_api", "execution_service"] , "private-deps": [["@", "gsl", "", "gsl"]] } +, "cas_utils": + { "type": ["@", "rules", "CC", "library"] + , "name": ["cas_utils"] + , "hdrs": ["cas_utils.hpp"] + , "srcs": ["cas_utils.cpp"] + , "stage": ["src", "buildtool", "execution_api", "execution_service"] + , "deps": + [ ["@", "grpc", "", "grpc++"] + , ["src/buildtool/common", "bazel_types"] + , ["src/buildtool/storage", "storage"] + ] + , "private-deps": + [ "file_chunker" + , ["@", "fmt", "", "fmt"] + , ["src/buildtool/common", "common"] + , ["src/buildtool/compatibility", "compatibility"] + , ["src/buildtool/file_system", "git_repo"] + , ["src/buildtool/file_system", "object_type"] + , ["src/buildtool/file_system", "file_system_manager"] + , ["src/buildtool/storage", "fs_utils"] + , ["src/utils/cpp", "hex_string"] + ] + } } diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp index 49939c20..4fd78323 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.cpp +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -21,15 +21,10 @@ #include <vector> #include "fmt/core.h" -#include "src/buildtool/common/artifact_digest.hpp" #include "src/buildtool/compatibility/compatibility.hpp" #include "src/buildtool/compatibility/native_support.hpp" -#include "src/buildtool/execution_api/execution_service/file_chunker.hpp" -#include "src/buildtool/file_system/git_repo.hpp" -#include "src/buildtool/file_system/object_type.hpp" -#include "src/buildtool/logging/log_level.hpp" +#include "src/buildtool/execution_api/execution_service/cas_utils.hpp" #include "src/buildtool/storage/garbage_collector.hpp" -#include "src/utils/cpp/hex_string.hpp" #include "src/utils/cpp/verify_hash.hpp" static constexpr std::size_t kGitSHA1Length = 42; @@ -100,46 +95,6 @@ auto CASServiceImpl::CheckDigestConsistency(bazel_re::Digest const& ref, return std::nullopt; } -auto CASServiceImpl::EnsureTreeInvariant(std::string const& data, - std::string const& hash) const noexcept - -> std::optional<std::string> { - auto entries = GitRepo::ReadTreeData( - data, - NativeSupport::Unprefix(hash), - [](auto const& /*unused*/) { return true; }, - /*is_hex_id=*/true); - if (not entries) { - auto str = fmt::format("Could not read tree data {}", hash); - logger_.Emit(LogLevel::Error, str); - return str; - } - for (auto const& entry : *entries) { - for (auto const& item : entry.second) { - auto digest = static_cast<bazel_re::Digest>( - ArtifactDigest{ToHexString(entry.first), - /*size is unknown*/ 0, - IsTreeObject(item.type)}); - if (not(IsTreeObject(item.type) - ? storage_->CAS().TreePath(digest) - : storage_->CAS().BlobPath(digest, false))) { - auto str = fmt::format( - "Tree invariant violated {}: missing element {}", - hash, - digest.hash()); - logger_.Emit(LogLevel::Error, str); - return str; - } - // The GitRepo::tree_entries_t data structure maps the object id to - // a list of entries of that object in possibly multiple trees. It - // is sufficient to check the existence of only one of these entries - // to be sure that the object is in CAS since they all have the same - // content. - break; - } - } - return std::nullopt; -} - auto CASServiceImpl::BatchUpdateBlobs( ::grpc::ServerContext* /*context*/, const ::bazel_re::BatchUpdateBlobsRequest* request, @@ -166,7 +121,8 @@ auto CASServiceImpl::BatchUpdateBlobs( if (NativeSupport::IsTree(hash)) { // In native mode: for trees, check whether the tree invariant holds // before storing the actual tree object. - if (auto err = EnsureTreeInvariant(x.data(), hash)) { + if (auto err = + CASUtils::EnsureTreeInvariant(x.data(), hash, *storage_)) { return ::grpc::Status{grpc::StatusCode::FAILED_PRECONDITION, *err}; } @@ -273,55 +229,20 @@ auto CASServiceImpl::SplitBlob(::grpc::ServerContext* /*context*/, logger_.Emit(LogLevel::Info, "SplitBlob({})", blob_digest.hash()); - // Check blob existence. - auto path = std::optional<std::filesystem::path>{}; - if (NativeSupport::IsTree(blob_digest.hash())) { - path = storage_->CAS().TreePath(blob_digest); - } - else { - path = storage_->CAS().BlobPath(blob_digest, true); - if (not path) { - path = storage_->CAS().BlobPath(blob_digest, false); - } - } - if (not path) { - auto str = - fmt::format("SplitBlob: blob not found {}", blob_digest.hash()); - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::NOT_FOUND, str}; - } - - // Split blob into chunks, store each chunk in CAS, and collect chunk - // digests. - auto chunker = FileChunker{*path}; - if (not chunker.IsOpen()) { - auto str = fmt::format("SplitBlob: could not open blob for reading"); + // Split blob into chunks. + auto split_result = CASUtils::SplitBlob(blob_digest, *storage_); + if (std::holds_alternative<grpc::Status>(split_result)) { + auto status = std::get<grpc::Status>(split_result); + auto str = fmt::format("SplitBlob: {}", status.error_message()); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; - } - - auto chunk_digests = std::vector<bazel_re::Digest>{}; - while (auto chunk = chunker.NextChunk()) { - auto chunk_digest = storage_->CAS().StoreBlob(*chunk, false); - if (not chunk_digest) { - auto str = - fmt::format("SplitBlob: could not store chunk of blob {}", - blob_digest.hash()); - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, str}; - } - chunk_digests.emplace_back(*chunk_digest); - } - if (not chunker.Finished()) { - auto str = - fmt::format("SplitBlob: could split blob {}", blob_digest.hash()); - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return ::grpc::Status{status.error_code(), str}; } + auto chunk_digests = std::get<std::vector<bazel_re::Digest>>(split_result); logger_.Emit(LogLevel::Debug, [&blob_digest, &chunk_digests]() { std::stringstream ss{}; ss << "Split blob " << blob_digest.hash() << ":" - << blob_digest.size_bytes() << " into [ "; + << blob_digest.size_bytes() << " into " << chunk_digests.size() + << " chunks: [ "; for (auto const& chunk_digest : chunk_digests) { ss << chunk_digest.hash() << ":" << chunk_digest.size_bytes() << " "; diff --git a/src/buildtool/execution_api/execution_service/cas_server.hpp b/src/buildtool/execution_api/execution_service/cas_server.hpp index c9f7cff4..306c2833 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.hpp +++ b/src/buildtool/execution_api/execution_service/cas_server.hpp @@ -147,10 +147,6 @@ class CASServiceImpl final bazel_re::Digest const& computed) const noexcept -> std::optional<std::string>; - [[nodiscard]] auto EnsureTreeInvariant( - std::string const& data, - std::string const& hash) const noexcept -> std::optional<std::string>; - gsl::not_null<Storage const*> storage_ = &Storage::Instance(); Logger logger_{"execution-service"}; }; diff --git a/src/buildtool/execution_api/execution_service/cas_utils.cpp b/src/buildtool/execution_api/execution_service/cas_utils.cpp new file mode 100644 index 00000000..3da56e28 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/cas_utils.cpp @@ -0,0 +1,155 @@ +// Copyright 2024 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/execution_api/execution_service/cas_utils.hpp" + +#include <fstream> + +#include "fmt/core.h" +#include "src/buildtool/common/artifact_digest.hpp" +#include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/execution_api/execution_service/file_chunker.hpp" +#include "src/buildtool/file_system/file_system_manager.hpp" +#include "src/buildtool/file_system/git_repo.hpp" +#include "src/buildtool/file_system/object_type.hpp" +#include "src/buildtool/storage/fs_utils.hpp" +#include "src/utils/cpp/hex_string.hpp" + +auto CASUtils::EnsureTreeInvariant(std::string const& data, + std::string const& hash, + Storage const& storage) noexcept + -> std::optional<std::string> { + auto entries = GitRepo::ReadTreeData( + data, + NativeSupport::Unprefix(hash), + [](auto const& /*unused*/) { return true; }, + /*is_hex_id=*/true); + if (not entries) { + return fmt::format("could not read tree data {}", hash); + } + for (auto const& entry : *entries) { + for (auto const& item : entry.second) { + auto digest = static_cast<bazel_re::Digest>( + ArtifactDigest{ToHexString(entry.first), + /*size is unknown*/ 0, + IsTreeObject(item.type)}); + if (not(IsTreeObject(item.type) + ? storage.CAS().TreePath(digest) + : storage.CAS().BlobPath(digest, false))) { + return fmt::format( + "tree invariant violated {}: missing element {}", + hash, + digest.hash()); + } + // The GitRepo::tree_entries_t data structure maps the object id to + // a list of entries of that object in possibly multiple trees. It + // is sufficient to check the existence of only one of these entries + // to be sure that the object is in CAS since they all have the same + // content. + break; + } + } + return std::nullopt; +} + +auto CASUtils::SplitBlob(bazel_re::Digest const& blob_digest, + Storage const& storage) noexcept + -> std::variant<std::vector<bazel_re::Digest>, grpc::Status> { + + // Check blob existence. + auto path = NativeSupport::IsTree(blob_digest.hash()) + ? storage.CAS().TreePath(blob_digest) + : storage.CAS().BlobPath(blob_digest, false); + if (not path) { + return grpc::Status{ + grpc::StatusCode::NOT_FOUND, + fmt::format("blob not found {}", blob_digest.hash())}; + } + + // Split blob into chunks, store each chunk in CAS, and collect chunk + // digests. + auto chunker = FileChunker{*path}; + if (not chunker.IsOpen()) { + return grpc::Status{ + grpc::StatusCode::INTERNAL, + fmt::format("could not open blob {}", blob_digest.hash())}; + } + auto chunk_digests = std::vector<bazel_re::Digest>{}; + while (auto chunk = chunker.NextChunk()) { + auto chunk_digest = storage.CAS().StoreBlob(*chunk, false); + if (not chunk_digest) { + return grpc::Status{grpc::StatusCode::INTERNAL, + fmt::format("could not store chunk of blob {}", + blob_digest.hash())}; + } + chunk_digests.emplace_back(*chunk_digest); + } + if (not chunker.Finished()) { + return grpc::Status{ + grpc::StatusCode::INTERNAL, + fmt::format("could not split blob {}", blob_digest.hash())}; + } + + return chunk_digests; +} + +auto CASUtils::SpliceBlob(bazel_re::Digest const& blob_digest, + std::vector<bazel_re::Digest> const& chunk_digests, + Storage const& storage) noexcept + -> std::variant<bazel_re::Digest, grpc::Status> { + + // Assemble blob from chunks. + auto tmp_dir = StorageUtils::CreateTypedTmpDir("splice"); + auto tmp_file = tmp_dir->GetPath() / "blob"; + { + std::ofstream tmp(tmp_file, std::ios::binary); + for (auto const& chunk_digest : chunk_digests) { + // Check chunk existence (only check file CAS). + auto path = storage.CAS().BlobPath(chunk_digest, false); + if (not path) { + return grpc::Status{ + grpc::StatusCode::NOT_FOUND, + fmt::format("chunk not found {}", chunk_digest.hash())}; + } + // Load chunk data. + auto chunk_data = FileSystemManager::ReadFile(*path); + if (not chunk_data) { + return grpc::Status{grpc::StatusCode::INTERNAL, + fmt::format("could read chunk data {}", + chunk_digest.hash())}; + } + tmp << *chunk_data; + } + } + + // Store resulting blob in according CAS. + auto const& hash = blob_digest.hash(); + if (NativeSupport::IsTree(hash)) { + auto const& digest = + storage.CAS().StoreTree</* kOwner= */ true>(tmp_file); + if (not digest) { + return grpc::Status{grpc::StatusCode::INTERNAL, + fmt::format("could not store tree {}", hash)}; + } + return *digest; + } + + auto const& digest = + storage.CAS().StoreBlob</* kOwner= */ true>(tmp_file, false); + if (not digest) { + return grpc::Status{grpc::StatusCode::INTERNAL, + fmt::format("could not store blob {}", hash)}; + } + return *digest; +} diff --git a/src/buildtool/execution_api/execution_service/cas_utils.hpp b/src/buildtool/execution_api/execution_service/cas_utils.hpp new file mode 100644 index 00000000..b22260b8 --- /dev/null +++ b/src/buildtool/execution_api/execution_service/cas_utils.hpp @@ -0,0 +1,45 @@ +// Copyright 2024 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_API_EXECUTION_SERVICE_CAS_UTILS_HPP +#define INCLUDED_SRC_BUILDTOOL_EXECUTION_API_EXECUTION_SERVICE_CAS_UTILS_HPP + +#include <optional> +#include <string> +#include <variant> +#include <vector> + +#include "grpcpp/support/status.h" +#include "src/buildtool/common/bazel_types.hpp" +#include "src/buildtool/storage/storage.hpp" + +class CASUtils { + public: + [[nodiscard]] static auto EnsureTreeInvariant( + std::string const& data, + std::string const& hash, + Storage const& storage) noexcept -> std::optional<std::string>; + + [[nodiscard]] static auto SplitBlob(bazel_re::Digest const& blob_digest, + Storage const& storage) noexcept + -> std::variant<std::vector<bazel_re::Digest>, grpc::Status>; + + [[nodiscard]] static auto SpliceBlob( + bazel_re::Digest const& blob_digest, + std::vector<bazel_re::Digest> const& chunk_digests, + Storage const& storage) noexcept + -> std::variant<bazel_re::Digest, grpc::Status>; +}; + +#endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_EXECUTION_SERVICE_CAS_UTILS_HPP |