diff options
author | Sascha Roloff <sascha.roloff@huawei.com> | 2023-10-31 20:48:32 +0100 |
---|---|---|
committer | Sascha Roloff <sascha.roloff@huawei.com> | 2023-11-22 16:18:17 +0100 |
commit | 44a7c680289ba6812583746013f350d63942c894 (patch) | |
tree | b6482b81c89768fcae95363080e0c9dc765b33be /src/buildtool/execution_api/execution_service/cas_server.cpp | |
parent | 9dc43626cf863ecfee29ef36fe3637e52b876f85 (diff) | |
download | justbuild-44a7c680289ba6812583746013f350d63942c894.tar.gz |
Implement blob splitting protocol on just server side
Diffstat (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp')
-rw-r--r-- | src/buildtool/execution_api/execution_service/cas_server.cpp | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp index abdab5b1..87faee66 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.cpp +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -14,8 +14,15 @@ #include "src/buildtool/execution_api/execution_service/cas_server.hpp" +#include <algorithm> +#include <filesystem> +#include <fstream> +#include <sstream> +#include <vector> + #include "fmt/core.h" #include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/execution_api/execution_service/file_chunker.hpp" #include "src/buildtool/storage/garbage_collector.hpp" #include "src/utils/cpp/verify_hash.hpp" @@ -176,3 +183,87 @@ auto CASServiceImpl::GetTree( logger_.Emit(LogLevel::Error, str); return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; } + +auto CASServiceImpl::SplitBlob(::grpc::ServerContext* /*context*/, + const ::bazel_re::SplitBlobRequest* request, + ::bazel_re::SplitBlobResponse* response) + -> ::grpc::Status { + if (not request->has_blob_digest()) { + auto str = fmt::format("SplitBlob: no blob digest provided"); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; + } + + // Acquire garbage collection lock. + auto lock = GarbageCollector::SharedLock(); + if (not lock) { + auto str = + fmt::format("SplitBlob: could not acquire garbage collection lock"); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + + auto const& blob_digest = request->blob_digest(); + logger_.Emit(LogLevel::Info, "SplitBlob({})", blob_digest.hash()); + + // Check blob existence. + auto path = std::optional<std::filesystem::path>{}; + if (NativeSupport::IsTree(blob_digest.hash())) { + path = storage_->CAS().TreePath(blob_digest); + } + else { + path = storage_->CAS().BlobPath(blob_digest, true); + if (not path) { + path = storage_->CAS().BlobPath(blob_digest, false); + } + } + if (not path) { + auto str = + fmt::format("SplitBlob: blob not found {}", blob_digest.hash()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::NOT_FOUND, str}; + } + + // Split blob into chunks, store each chunk in CAS, and collect chunk + // digests. + auto chunker = FileChunker{*path}; + if (not chunker.IsOpen()) { + auto str = fmt::format("SplitBlob: could not open blob for reading"); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + + auto chunk_digests = std::vector<bazel_re::Digest>{}; + while (auto chunk = chunker.NextChunk()) { + auto chunk_digest = storage_->CAS().StoreBlob(*chunk, false); + if (not chunk_digest) { + auto str = + fmt::format("SplitBlob: could not store chunk of blob {}", + blob_digest.hash()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, str}; + } + chunk_digests.emplace_back(*chunk_digest); + } + if (not chunker.Finished()) { + auto str = + fmt::format("SplitBlob: could split blob {}", blob_digest.hash()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + logger_.Emit(LogLevel::Debug, [&blob_digest, &chunk_digests]() { + std::stringstream ss{}; + ss << "Split blob " << blob_digest.hash() << ":" + << blob_digest.size_bytes() << " into [ "; + for (auto const& chunk_digest : chunk_digests) { + ss << chunk_digest.hash() << ":" << chunk_digest.size_bytes() + << " "; + } + ss << "]"; + return ss.str(); + }); + std::copy(chunk_digests.cbegin(), + chunk_digests.cend(), + pb::back_inserter(response->mutable_chunk_digests())); + return ::grpc::Status::OK; +} |