diff options
Diffstat (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp')
-rw-r--r-- | src/buildtool/execution_api/execution_service/cas_server.cpp | 103 |
1 files changed, 12 insertions, 91 deletions
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp index 49939c20..4fd78323 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.cpp +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -21,15 +21,10 @@ #include <vector> #include "fmt/core.h" -#include "src/buildtool/common/artifact_digest.hpp" #include "src/buildtool/compatibility/compatibility.hpp" #include "src/buildtool/compatibility/native_support.hpp" -#include "src/buildtool/execution_api/execution_service/file_chunker.hpp" -#include "src/buildtool/file_system/git_repo.hpp" -#include "src/buildtool/file_system/object_type.hpp" -#include "src/buildtool/logging/log_level.hpp" +#include "src/buildtool/execution_api/execution_service/cas_utils.hpp" #include "src/buildtool/storage/garbage_collector.hpp" -#include "src/utils/cpp/hex_string.hpp" #include "src/utils/cpp/verify_hash.hpp" static constexpr std::size_t kGitSHA1Length = 42; @@ -100,46 +95,6 @@ auto CASServiceImpl::CheckDigestConsistency(bazel_re::Digest const& ref, return std::nullopt; } -auto CASServiceImpl::EnsureTreeInvariant(std::string const& data, - std::string const& hash) const noexcept - -> std::optional<std::string> { - auto entries = GitRepo::ReadTreeData( - data, - NativeSupport::Unprefix(hash), - [](auto const& /*unused*/) { return true; }, - /*is_hex_id=*/true); - if (not entries) { - auto str = fmt::format("Could not read tree data {}", hash); - logger_.Emit(LogLevel::Error, str); - return str; - } - for (auto const& entry : *entries) { - for (auto const& item : entry.second) { - auto digest = static_cast<bazel_re::Digest>( - ArtifactDigest{ToHexString(entry.first), - /*size is unknown*/ 0, - IsTreeObject(item.type)}); - if (not(IsTreeObject(item.type) - ? storage_->CAS().TreePath(digest) - : storage_->CAS().BlobPath(digest, false))) { - auto str = fmt::format( - "Tree invariant violated {}: missing element {}", - hash, - digest.hash()); - logger_.Emit(LogLevel::Error, str); - return str; - } - // The GitRepo::tree_entries_t data structure maps the object id to - // a list of entries of that object in possibly multiple trees. It - // is sufficient to check the existence of only one of these entries - // to be sure that the object is in CAS since they all have the same - // content. - break; - } - } - return std::nullopt; -} - auto CASServiceImpl::BatchUpdateBlobs( ::grpc::ServerContext* /*context*/, const ::bazel_re::BatchUpdateBlobsRequest* request, @@ -166,7 +121,8 @@ auto CASServiceImpl::BatchUpdateBlobs( if (NativeSupport::IsTree(hash)) { // In native mode: for trees, check whether the tree invariant holds // before storing the actual tree object. - if (auto err = EnsureTreeInvariant(x.data(), hash)) { + if (auto err = + CASUtils::EnsureTreeInvariant(x.data(), hash, *storage_)) { return ::grpc::Status{grpc::StatusCode::FAILED_PRECONDITION, *err}; } @@ -273,55 +229,20 @@ auto CASServiceImpl::SplitBlob(::grpc::ServerContext* /*context*/, logger_.Emit(LogLevel::Info, "SplitBlob({})", blob_digest.hash()); - // Check blob existence. - auto path = std::optional<std::filesystem::path>{}; - if (NativeSupport::IsTree(blob_digest.hash())) { - path = storage_->CAS().TreePath(blob_digest); - } - else { - path = storage_->CAS().BlobPath(blob_digest, true); - if (not path) { - path = storage_->CAS().BlobPath(blob_digest, false); - } - } - if (not path) { - auto str = - fmt::format("SplitBlob: blob not found {}", blob_digest.hash()); - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::NOT_FOUND, str}; - } - - // Split blob into chunks, store each chunk in CAS, and collect chunk - // digests. - auto chunker = FileChunker{*path}; - if (not chunker.IsOpen()) { - auto str = fmt::format("SplitBlob: could not open blob for reading"); + // Split blob into chunks. + auto split_result = CASUtils::SplitBlob(blob_digest, *storage_); + if (std::holds_alternative<grpc::Status>(split_result)) { + auto status = std::get<grpc::Status>(split_result); + auto str = fmt::format("SplitBlob: {}", status.error_message()); logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; - } - - auto chunk_digests = std::vector<bazel_re::Digest>{}; - while (auto chunk = chunker.NextChunk()) { - auto chunk_digest = storage_->CAS().StoreBlob(*chunk, false); - if (not chunk_digest) { - auto str = - fmt::format("SplitBlob: could not store chunk of blob {}", - blob_digest.hash()); - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, str}; - } - chunk_digests.emplace_back(*chunk_digest); - } - if (not chunker.Finished()) { - auto str = - fmt::format("SplitBlob: could split blob {}", blob_digest.hash()); - logger_.Emit(LogLevel::Error, str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + return ::grpc::Status{status.error_code(), str}; } + auto chunk_digests = std::get<std::vector<bazel_re::Digest>>(split_result); logger_.Emit(LogLevel::Debug, [&blob_digest, &chunk_digests]() { std::stringstream ss{}; ss << "Split blob " << blob_digest.hash() << ":" - << blob_digest.size_bytes() << " into [ "; + << blob_digest.size_bytes() << " into " << chunk_digests.size() + << " chunks: [ "; for (auto const& chunk_digest : chunk_digests) { ss << chunk_digest.hash() << ":" << chunk_digest.size_bytes() << " "; |