From 0d98a04c28eeb18d08f731c3f94de825d49daac5 Mon Sep 17 00:00:00 2001 From: Sascha Roloff Date: Fri, 23 Feb 2024 16:04:36 +0100 Subject: Implement blob splicing protocol at just server side --- .../execution_api/execution_service/cas_server.cpp | 59 +++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp') diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp index 064308e5..b266cd9e 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.cpp +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -216,7 +216,7 @@ auto CASServiceImpl::BatchReadBlobs( auto CASServiceImpl::GetTree( ::grpc::ServerContext* /*context*/, const ::bazel_re::GetTreeRequest* /*request*/, - ::grpc::ServerWriter< ::bazel_re::GetTreeResponse>* /*writer*/) + ::grpc::ServerWriter<::bazel_re::GetTreeResponse>* /*writer*/) -> ::grpc::Status { auto const* str = "GetTree not implemented"; logger_.Emit(LogLevel::Error, str); @@ -305,3 +305,60 @@ auto CASServiceImpl::SplitBlob(::grpc::ServerContext* /*context*/, pb::back_inserter(response->mutable_chunk_digests())); return ::grpc::Status::OK; } + +auto CASServiceImpl::SpliceBlob(::grpc::ServerContext* /*context*/, + const ::bazel_re::SpliceBlobRequest* request, + ::bazel_re::SpliceBlobResponse* response) + -> ::grpc::Status { + if (not request->has_blob_digest()) { + auto str = fmt::format("SpliceBlob: no blob digest provided"); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; + } + + auto const& blob_digest = request->blob_digest(); + if (not IsValidHash(blob_digest.hash())) { + auto str = fmt::format("SpliceBlob: unsupported digest {}", + blob_digest.hash()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; + } + + logger_.Emit(LogLevel::Debug, + "SpliceBlob({}, {} chunks)", + blob_digest.hash(), + request->chunk_digests().size()); + + // Acquire garbage collection lock. + auto lock = GarbageCollector::SharedLock(); + if (not lock) { + auto str = fmt::format( + "SpliceBlob: could not acquire garbage collection lock"); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + + // Splice blob from chunks. + auto chunk_digests = std::vector{}; + std::copy(request->chunk_digests().cbegin(), + request->chunk_digests().cend(), + std::back_inserter(chunk_digests)); + auto splice_result = CASUtils::SpliceBlob(blob_digest, + chunk_digests, + *storage_, + /* check_tree_invariant= */ true); + if (std::holds_alternative(splice_result)) { + auto status = std::get(splice_result); + auto str = fmt::format("SpliceBlob: {}", status.error_message()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{status.error_code(), str}; + } + auto digest = std::get(splice_result); + if (auto err = CheckDigestConsistency(blob_digest, digest)) { + auto str = fmt::format("SpliceBlob: {}", *err); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; + } + response->mutable_blob_digest()->CopyFrom(digest); + return ::grpc::Status::OK; +} -- cgit v1.2.3