diff options
Diffstat (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp')
-rw-r--r-- | src/buildtool/execution_api/execution_service/cas_server.cpp | 182 |
1 files changed, 97 insertions, 85 deletions
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp index 3ab31fc8..9960dc96 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.cpp +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -30,10 +30,11 @@ #include "src/buildtool/storage/garbage_collector.hpp" #include "src/utils/cpp/verify_hash.hpp" -static constexpr std::size_t kGitSHA1Length = 42; -static constexpr std::size_t kSHA256Length = 64; +namespace { +inline constexpr std::size_t kGitSHA1Length = 42; +inline constexpr std::size_t kSHA256Length = 64; -static auto IsValidHash(std::string const& x) -> bool { +[[nodiscard]] auto IsValidHash(std::string const& x) -> bool { auto error_msg = IsAHash(x); auto const& length = x.size(); return not error_msg and @@ -41,8 +42,8 @@ static auto IsValidHash(std::string const& x) -> bool { length == kGitSHA1Length); } -static auto ChunkingAlgorithmToString(::bazel_re::ChunkingAlgorithm_Value type) - -> std::string { +[[nodiscard]] auto ChunkingAlgorithmToString( + ::bazel_re::ChunkingAlgorithm_Value type) -> std::string { switch (type) { case ::bazel_re::ChunkingAlgorithm_Value:: ChunkingAlgorithm_Value_IDENTITY: @@ -58,6 +59,31 @@ static auto ChunkingAlgorithmToString(::bazel_re::ChunkingAlgorithm_Value type) } } +[[nodiscard]] auto CheckDigestConsistency( + bazel_re::Digest const& ref, + bazel_re::Digest const& computed) noexcept -> std::optional<std::string> { + bool valid = ref.hash() == computed.hash(); + if (valid) { + bool const check_sizes = + Compatibility::IsCompatible() or ref.size_bytes() != 0; + if (check_sizes) { + valid = ref.size_bytes() == computed.size_bytes(); + } + } + if (not valid) { + return fmt::format( + "Blob {} is corrupted: provided digest {}:{} and digest computed " + "from data {}:{} do not correspond.", + ref.hash(), + ref.hash(), + ref.size_bytes(), + computed.hash(), + computed.size_bytes()); + } + return std::nullopt; +} +} // namespace + auto CASServiceImpl::FindMissingBlobs( ::grpc::ServerContext* /*context*/, const ::bazel_re::FindMissingBlobsRequest* request, @@ -72,22 +98,22 @@ auto CASServiceImpl::FindMissingBlobs( for (auto const& x : request->blob_digests()) { auto const& hash = x.hash(); - if (not IsValidHash(hash)) { + bool is_in_cas = false; + if (IsValidHash(hash)) { + logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash); + ArtifactDigest const digest(x); + is_in_cas = + NativeSupport::IsTree(hash) + ? storage_.CAS().TreePath(digest).has_value() + : storage_.CAS().BlobPath(digest, false).has_value(); + } + else { logger_.Emit(LogLevel::Error, "FindMissingBlobs: unsupported digest {}", hash); - auto* d = response->add_missing_blob_digests(); - d->CopyFrom(x); - continue; - } - logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash); - if (NativeSupport::IsTree(hash)) { - if (not storage_.CAS().TreePath(x)) { - auto* d = response->add_missing_blob_digests(); - d->CopyFrom(x); - } } - else if (not storage_.CAS().BlobPath(x, false)) { + + if (not is_in_cas) { auto* d = response->add_missing_blob_digests(); d->CopyFrom(x); } @@ -95,26 +121,6 @@ auto CASServiceImpl::FindMissingBlobs( return ::grpc::Status::OK; } -auto CASServiceImpl::CheckDigestConsistency(bazel_re::Digest const& ref, - bazel_re::Digest const& computed) - const noexcept -> std::optional<std::string> { - if (ref.hash() != computed.hash() or - ((Compatibility::IsCompatible() or ref.size_bytes() > 0LL) and - ref.size_bytes() != computed.size_bytes())) { - auto const& str = fmt::format( - "Blob {} is corrupted: provided digest {}:{} and digest computed " - "from data {}:{} do not correspond.", - ref.hash(), - ref.hash(), - ref.size_bytes(), - computed.size_bytes(), - computed.size_bytes()); - logger_.Emit(LogLevel::Error, "{}", str); - return str; - } - return std::nullopt; -} - auto CASServiceImpl::BatchUpdateBlobs( ::grpc::ServerContext* /*context*/, const ::bazel_re::BatchUpdateBlobsRequest* request, @@ -130,7 +136,7 @@ auto CASServiceImpl::BatchUpdateBlobs( auto const& hash = x.digest().hash(); logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash); if (not IsValidHash(hash)) { - auto const& str = + auto const str = fmt::format("BatchUpdateBlobs: unsupported digest {}", hash); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; @@ -138,42 +144,40 @@ auto CASServiceImpl::BatchUpdateBlobs( logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash); auto* r = response->add_responses(); r->mutable_digest()->CopyFrom(x.digest()); - if (NativeSupport::IsTree(hash)) { + + bool const is_tree = NativeSupport::IsTree(hash); + if (is_tree) { // In native mode: for trees, check whether the tree invariant holds // before storing the actual tree object. if (auto err = CASUtils::EnsureTreeInvariant( x.digest(), x.data(), storage_)) { - auto str = fmt::format("BatchUpdateBlobs: {}", *err); + auto const str = + fmt::format("BatchUpdateBlobs: {}", *std::move(err)); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{grpc::StatusCode::FAILED_PRECONDITION, str}; } - auto const& dgst = storage_.CAS().StoreTree(x.data()); - if (not dgst) { - auto const& str = fmt::format( - "BatchUpdateBlobs: could not upload tree {}", hash); - logger_.Emit(LogLevel::Error, "{}", str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; - } - if (auto err = CheckDigestConsistency(x.digest(), *dgst)) { - auto str = fmt::format("BatchUpdateBlobs: {}", *err); - logger_.Emit(LogLevel::Error, "{}", str); - return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; - } } - else { - auto const& dgst = storage_.CAS().StoreBlob(x.data(), false); - if (not dgst) { - auto const& str = fmt::format( - "BatchUpdateBlobs: could not upload blob {}", hash); - logger_.Emit(LogLevel::Error, "{}", str); - return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; - } - if (auto err = CheckDigestConsistency(x.digest(), *dgst)) { - auto str = fmt::format("BatchUpdateBlobs: {}", *err); - logger_.Emit(LogLevel::Error, "{}", str); - return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; - } + + auto const cas_digest = + is_tree + ? storage_.CAS().StoreTree(x.data()) + : storage_.CAS().StoreBlob(x.data(), /*is_executable=*/false); + + if (not cas_digest) { + auto const str = + fmt::format("BatchUpdateBlobs: could not upload {} {}", + is_tree ? "tree" : "blob", + hash); + logger_.Emit(LogLevel::Error, "{}", str); + return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + + if (auto err = CheckDigestConsistency(x.digest(), *cas_digest)) { + auto const str = + fmt::format("BatchUpdateBlobs: {}", *std::move(err)); + logger_.Emit(LogLevel::Error, "{}", str); + return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; } } return ::grpc::Status::OK; @@ -185,25 +189,25 @@ auto CASServiceImpl::BatchReadBlobs( ::bazel_re::BatchReadBlobsResponse* response) -> ::grpc::Status { auto lock = GarbageCollector::SharedLock(storage_config_); if (not lock) { - auto str = fmt::format("BatchReadBlobs: Could not acquire SharedLock"); + auto const str = + fmt::format("BatchReadBlobs: Could not acquire SharedLock"); logger_.Emit(LogLevel::Error, "{}", str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } - for (auto const& digest : request->digests()) { + for (auto const& x : request->digests()) { auto* r = response->add_responses(); - r->mutable_digest()->CopyFrom(digest); - std::optional<std::filesystem::path> path; - if (NativeSupport::IsTree(digest.hash())) { - path = storage_.CAS().TreePath(digest); - } - else { - path = storage_.CAS().BlobPath(digest, false); - } + r->mutable_digest()->CopyFrom(x); + + ArtifactDigest const digest(x); + auto const path = + NativeSupport::IsTree(x.hash()) + ? storage_.CAS().TreePath(digest) + : storage_.CAS().BlobPath(digest, /*is_executable=*/false); + if (not path) { google::rpc::Status status; status.set_code(grpc::StatusCode::NOT_FOUND); r->mutable_status()->CopyFrom(status); - continue; } std::ifstream cert{*path}; @@ -339,25 +343,33 @@ auto CASServiceImpl::SpliceBlob(::grpc::ServerContext* /*context*/, return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; } - // Splice blob from chunks. auto chunk_digests = std::vector<bazel_re::Digest>{}; - std::copy(request->chunk_digests().cbegin(), - request->chunk_digests().cend(), - std::back_inserter(chunk_digests)); + chunk_digests.reserve(request->chunk_digests().size()); + for (auto const& x : request->chunk_digests()) { + if (not IsValidHash(x.hash())) { + auto const str = + fmt::format("SpliceBlob: unsupported digest {}", x.hash()); + logger_.Emit(LogLevel::Error, "{}", str); + return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; + } + chunk_digests.push_back(x); + } + + // Splice blob from chunks. auto splice_result = CASUtils::SpliceBlob(blob_digest, chunk_digests, storage_); if (not splice_result) { auto const& status = splice_result.error(); - auto str = fmt::format("SpliceBlob: {}", status.error_message()); + auto const str = fmt::format("SpliceBlob: {}", status.error_message()); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{status.error_code(), str}; } - auto const& digest = *splice_result; - if (auto err = CheckDigestConsistency(blob_digest, digest)) { - auto str = fmt::format("SpliceBlob: {}", *err); + if (auto err = CheckDigestConsistency(blob_digest, *splice_result)) { + auto const str = fmt::format("SpliceBlob: {}", *err); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; } - response->mutable_blob_digest()->CopyFrom(digest); + + response->mutable_blob_digest()->CopyFrom(*splice_result); return ::grpc::Status::OK; } |