summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/execution_service/cas_server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp')
-rw-r--r--src/buildtool/execution_api/execution_service/cas_server.cpp91
1 files changed, 91 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp
index abdab5b1..87faee66 100644
--- a/src/buildtool/execution_api/execution_service/cas_server.cpp
+++ b/src/buildtool/execution_api/execution_service/cas_server.cpp
@@ -14,8 +14,15 @@
#include "src/buildtool/execution_api/execution_service/cas_server.hpp"
+#include <algorithm>
+#include <filesystem>
+#include <fstream>
+#include <sstream>
+#include <vector>
+
#include "fmt/core.h"
#include "src/buildtool/compatibility/native_support.hpp"
+#include "src/buildtool/execution_api/execution_service/file_chunker.hpp"
#include "src/buildtool/storage/garbage_collector.hpp"
#include "src/utils/cpp/verify_hash.hpp"
@@ -176,3 +183,87 @@ auto CASServiceImpl::GetTree(
logger_.Emit(LogLevel::Error, str);
return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str};
}
+
+auto CASServiceImpl::SplitBlob(::grpc::ServerContext* /*context*/,
+ const ::bazel_re::SplitBlobRequest* request,
+ ::bazel_re::SplitBlobResponse* response)
+ -> ::grpc::Status {
+ if (not request->has_blob_digest()) {
+ auto str = fmt::format("SplitBlob: no blob digest provided");
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
+ }
+
+ // Acquire garbage collection lock.
+ auto lock = GarbageCollector::SharedLock();
+ if (not lock) {
+ auto str =
+ fmt::format("SplitBlob: could not acquire garbage collection lock");
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+
+ auto const& blob_digest = request->blob_digest();
+ logger_.Emit(LogLevel::Info, "SplitBlob({})", blob_digest.hash());
+
+ // Check blob existence.
+ auto path = std::optional<std::filesystem::path>{};
+ if (NativeSupport::IsTree(blob_digest.hash())) {
+ path = storage_->CAS().TreePath(blob_digest);
+ }
+ else {
+ path = storage_->CAS().BlobPath(blob_digest, true);
+ if (not path) {
+ path = storage_->CAS().BlobPath(blob_digest, false);
+ }
+ }
+ if (not path) {
+ auto str =
+ fmt::format("SplitBlob: blob not found {}", blob_digest.hash());
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::NOT_FOUND, str};
+ }
+
+ // Split blob into chunks, store each chunk in CAS, and collect chunk
+ // digests.
+ auto chunker = FileChunker{*path};
+ if (not chunker.IsOpen()) {
+ auto str = fmt::format("SplitBlob: could not open blob for reading");
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+
+ auto chunk_digests = std::vector<bazel_re::Digest>{};
+ while (auto chunk = chunker.NextChunk()) {
+ auto chunk_digest = storage_->CAS().StoreBlob(*chunk, false);
+ if (not chunk_digest) {
+ auto str =
+ fmt::format("SplitBlob: could not store chunk of blob {}",
+ blob_digest.hash());
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::RESOURCE_EXHAUSTED, str};
+ }
+ chunk_digests.emplace_back(*chunk_digest);
+ }
+ if (not chunker.Finished()) {
+ auto str =
+ fmt::format("SplitBlob: could split blob {}", blob_digest.hash());
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+ logger_.Emit(LogLevel::Debug, [&blob_digest, &chunk_digests]() {
+ std::stringstream ss{};
+ ss << "Split blob " << blob_digest.hash() << ":"
+ << blob_digest.size_bytes() << " into [ ";
+ for (auto const& chunk_digest : chunk_digests) {
+ ss << chunk_digest.hash() << ":" << chunk_digest.size_bytes()
+ << " ";
+ }
+ ss << "]";
+ return ss.str();
+ });
+ std::copy(chunk_digests.cbegin(),
+ chunk_digests.cend(),
+ pb::back_inserter(response->mutable_chunk_digests()));
+ return ::grpc::Status::OK;
+}