summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/execution_service/cas_server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp')
-rw-r--r--src/buildtool/execution_api/execution_service/cas_server.cpp103
1 files changed, 12 insertions, 91 deletions
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp
index 49939c20..4fd78323 100644
--- a/src/buildtool/execution_api/execution_service/cas_server.cpp
+++ b/src/buildtool/execution_api/execution_service/cas_server.cpp
@@ -21,15 +21,10 @@
#include <vector>
#include "fmt/core.h"
-#include "src/buildtool/common/artifact_digest.hpp"
#include "src/buildtool/compatibility/compatibility.hpp"
#include "src/buildtool/compatibility/native_support.hpp"
-#include "src/buildtool/execution_api/execution_service/file_chunker.hpp"
-#include "src/buildtool/file_system/git_repo.hpp"
-#include "src/buildtool/file_system/object_type.hpp"
-#include "src/buildtool/logging/log_level.hpp"
+#include "src/buildtool/execution_api/execution_service/cas_utils.hpp"
#include "src/buildtool/storage/garbage_collector.hpp"
-#include "src/utils/cpp/hex_string.hpp"
#include "src/utils/cpp/verify_hash.hpp"
static constexpr std::size_t kGitSHA1Length = 42;
@@ -100,46 +95,6 @@ auto CASServiceImpl::CheckDigestConsistency(bazel_re::Digest const& ref,
return std::nullopt;
}
-auto CASServiceImpl::EnsureTreeInvariant(std::string const& data,
- std::string const& hash) const noexcept
- -> std::optional<std::string> {
- auto entries = GitRepo::ReadTreeData(
- data,
- NativeSupport::Unprefix(hash),
- [](auto const& /*unused*/) { return true; },
- /*is_hex_id=*/true);
- if (not entries) {
- auto str = fmt::format("Could not read tree data {}", hash);
- logger_.Emit(LogLevel::Error, str);
- return str;
- }
- for (auto const& entry : *entries) {
- for (auto const& item : entry.second) {
- auto digest = static_cast<bazel_re::Digest>(
- ArtifactDigest{ToHexString(entry.first),
- /*size is unknown*/ 0,
- IsTreeObject(item.type)});
- if (not(IsTreeObject(item.type)
- ? storage_->CAS().TreePath(digest)
- : storage_->CAS().BlobPath(digest, false))) {
- auto str = fmt::format(
- "Tree invariant violated {}: missing element {}",
- hash,
- digest.hash());
- logger_.Emit(LogLevel::Error, str);
- return str;
- }
- // The GitRepo::tree_entries_t data structure maps the object id to
- // a list of entries of that object in possibly multiple trees. It
- // is sufficient to check the existence of only one of these entries
- // to be sure that the object is in CAS since they all have the same
- // content.
- break;
- }
- }
- return std::nullopt;
-}
-
auto CASServiceImpl::BatchUpdateBlobs(
::grpc::ServerContext* /*context*/,
const ::bazel_re::BatchUpdateBlobsRequest* request,
@@ -166,7 +121,8 @@ auto CASServiceImpl::BatchUpdateBlobs(
if (NativeSupport::IsTree(hash)) {
// In native mode: for trees, check whether the tree invariant holds
// before storing the actual tree object.
- if (auto err = EnsureTreeInvariant(x.data(), hash)) {
+ if (auto err =
+ CASUtils::EnsureTreeInvariant(x.data(), hash, *storage_)) {
return ::grpc::Status{grpc::StatusCode::FAILED_PRECONDITION,
*err};
}
@@ -273,55 +229,20 @@ auto CASServiceImpl::SplitBlob(::grpc::ServerContext* /*context*/,
logger_.Emit(LogLevel::Info, "SplitBlob({})", blob_digest.hash());
- // Check blob existence.
- auto path = std::optional<std::filesystem::path>{};
- if (NativeSupport::IsTree(blob_digest.hash())) {
- path = storage_->CAS().TreePath(blob_digest);
- }
- else {
- path = storage_->CAS().BlobPath(blob_digest, true);
- if (not path) {
- path = storage_->CAS().BlobPath(blob_digest, false);
- }
- }
- if (not path) {
- auto str =
- fmt::format("SplitBlob: blob not found {}", blob_digest.hash());
- logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::NOT_FOUND, str};
- }
-
- // Split blob into chunks, store each chunk in CAS, and collect chunk
- // digests.
- auto chunker = FileChunker{*path};
- if (not chunker.IsOpen()) {
- auto str = fmt::format("SplitBlob: could not open blob for reading");
+ // Split blob into chunks.
+ auto split_result = CASUtils::SplitBlob(blob_digest, *storage_);
+ if (std::holds_alternative<grpc::Status>(split_result)) {
+ auto status = std::get<grpc::Status>(split_result);
+ auto str = fmt::format("SplitBlob: {}", status.error_message());
logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
- }
-
- auto chunk_digests = std::vector<bazel_re::Digest>{};
- while (auto chunk = chunker.NextChunk()) {
- auto chunk_digest = storage_->CAS().StoreBlob(*chunk, false);
- if (not chunk_digest) {
- auto str =
- fmt::format("SplitBlob: could not store chunk of blob {}",
- blob_digest.hash());
- logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, str};
- }
- chunk_digests.emplace_back(*chunk_digest);
- }
- if (not chunker.Finished()) {
- auto str =
- fmt::format("SplitBlob: could split blob {}", blob_digest.hash());
- logger_.Emit(LogLevel::Error, str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ return ::grpc::Status{status.error_code(), str};
}
+ auto chunk_digests = std::get<std::vector<bazel_re::Digest>>(split_result);
logger_.Emit(LogLevel::Debug, [&blob_digest, &chunk_digests]() {
std::stringstream ss{};
ss << "Split blob " << blob_digest.hash() << ":"
- << blob_digest.size_bytes() << " into [ ";
+ << blob_digest.size_bytes() << " into " << chunk_digests.size()
+ << " chunks: [ ";
for (auto const& chunk_digest : chunk_digests) {
ss << chunk_digest.hash() << ":" << chunk_digest.size_bytes()
<< " ";