summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/execution_service/cas_server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp')
-rw-r--r--src/buildtool/execution_api/execution_service/cas_server.cpp182
1 files changed, 97 insertions, 85 deletions
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp
index 3ab31fc8..9960dc96 100644
--- a/src/buildtool/execution_api/execution_service/cas_server.cpp
+++ b/src/buildtool/execution_api/execution_service/cas_server.cpp
@@ -30,10 +30,11 @@
#include "src/buildtool/storage/garbage_collector.hpp"
#include "src/utils/cpp/verify_hash.hpp"
-static constexpr std::size_t kGitSHA1Length = 42;
-static constexpr std::size_t kSHA256Length = 64;
+namespace {
+inline constexpr std::size_t kGitSHA1Length = 42;
+inline constexpr std::size_t kSHA256Length = 64;
-static auto IsValidHash(std::string const& x) -> bool {
+[[nodiscard]] auto IsValidHash(std::string const& x) -> bool {
auto error_msg = IsAHash(x);
auto const& length = x.size();
return not error_msg and
@@ -41,8 +42,8 @@ static auto IsValidHash(std::string const& x) -> bool {
length == kGitSHA1Length);
}
-static auto ChunkingAlgorithmToString(::bazel_re::ChunkingAlgorithm_Value type)
- -> std::string {
+[[nodiscard]] auto ChunkingAlgorithmToString(
+ ::bazel_re::ChunkingAlgorithm_Value type) -> std::string {
switch (type) {
case ::bazel_re::ChunkingAlgorithm_Value::
ChunkingAlgorithm_Value_IDENTITY:
@@ -58,6 +59,31 @@ static auto ChunkingAlgorithmToString(::bazel_re::ChunkingAlgorithm_Value type)
}
}
+[[nodiscard]] auto CheckDigestConsistency(
+ bazel_re::Digest const& ref,
+ bazel_re::Digest const& computed) noexcept -> std::optional<std::string> {
+ bool valid = ref.hash() == computed.hash();
+ if (valid) {
+ bool const check_sizes =
+ Compatibility::IsCompatible() or ref.size_bytes() != 0;
+ if (check_sizes) {
+ valid = ref.size_bytes() == computed.size_bytes();
+ }
+ }
+ if (not valid) {
+ return fmt::format(
+ "Blob {} is corrupted: provided digest {}:{} and digest computed "
+ "from data {}:{} do not correspond.",
+ ref.hash(),
+ ref.hash(),
+ ref.size_bytes(),
+ computed.hash(),
+ computed.size_bytes());
+ }
+ return std::nullopt;
+}
+} // namespace
+
auto CASServiceImpl::FindMissingBlobs(
::grpc::ServerContext* /*context*/,
const ::bazel_re::FindMissingBlobsRequest* request,
@@ -72,22 +98,22 @@ auto CASServiceImpl::FindMissingBlobs(
for (auto const& x : request->blob_digests()) {
auto const& hash = x.hash();
- if (not IsValidHash(hash)) {
+ bool is_in_cas = false;
+ if (IsValidHash(hash)) {
+ logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash);
+ ArtifactDigest const digest(x);
+ is_in_cas =
+ NativeSupport::IsTree(hash)
+ ? storage_.CAS().TreePath(digest).has_value()
+ : storage_.CAS().BlobPath(digest, false).has_value();
+ }
+ else {
logger_.Emit(LogLevel::Error,
"FindMissingBlobs: unsupported digest {}",
hash);
- auto* d = response->add_missing_blob_digests();
- d->CopyFrom(x);
- continue;
- }
- logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash);
- if (NativeSupport::IsTree(hash)) {
- if (not storage_.CAS().TreePath(x)) {
- auto* d = response->add_missing_blob_digests();
- d->CopyFrom(x);
- }
}
- else if (not storage_.CAS().BlobPath(x, false)) {
+
+ if (not is_in_cas) {
auto* d = response->add_missing_blob_digests();
d->CopyFrom(x);
}
@@ -95,26 +121,6 @@ auto CASServiceImpl::FindMissingBlobs(
return ::grpc::Status::OK;
}
-auto CASServiceImpl::CheckDigestConsistency(bazel_re::Digest const& ref,
- bazel_re::Digest const& computed)
- const noexcept -> std::optional<std::string> {
- if (ref.hash() != computed.hash() or
- ((Compatibility::IsCompatible() or ref.size_bytes() > 0LL) and
- ref.size_bytes() != computed.size_bytes())) {
- auto const& str = fmt::format(
- "Blob {} is corrupted: provided digest {}:{} and digest computed "
- "from data {}:{} do not correspond.",
- ref.hash(),
- ref.hash(),
- ref.size_bytes(),
- computed.size_bytes(),
- computed.size_bytes());
- logger_.Emit(LogLevel::Error, "{}", str);
- return str;
- }
- return std::nullopt;
-}
-
auto CASServiceImpl::BatchUpdateBlobs(
::grpc::ServerContext* /*context*/,
const ::bazel_re::BatchUpdateBlobsRequest* request,
@@ -130,7 +136,7 @@ auto CASServiceImpl::BatchUpdateBlobs(
auto const& hash = x.digest().hash();
logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash);
if (not IsValidHash(hash)) {
- auto const& str =
+ auto const str =
fmt::format("BatchUpdateBlobs: unsupported digest {}", hash);
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
@@ -138,42 +144,40 @@ auto CASServiceImpl::BatchUpdateBlobs(
logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash);
auto* r = response->add_responses();
r->mutable_digest()->CopyFrom(x.digest());
- if (NativeSupport::IsTree(hash)) {
+
+ bool const is_tree = NativeSupport::IsTree(hash);
+ if (is_tree) {
// In native mode: for trees, check whether the tree invariant holds
// before storing the actual tree object.
if (auto err = CASUtils::EnsureTreeInvariant(
x.digest(), x.data(), storage_)) {
- auto str = fmt::format("BatchUpdateBlobs: {}", *err);
+ auto const str =
+ fmt::format("BatchUpdateBlobs: {}", *std::move(err));
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{grpc::StatusCode::FAILED_PRECONDITION,
str};
}
- auto const& dgst = storage_.CAS().StoreTree(x.data());
- if (not dgst) {
- auto const& str = fmt::format(
- "BatchUpdateBlobs: could not upload tree {}", hash);
- logger_.Emit(LogLevel::Error, "{}", str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
- }
- if (auto err = CheckDigestConsistency(x.digest(), *dgst)) {
- auto str = fmt::format("BatchUpdateBlobs: {}", *err);
- logger_.Emit(LogLevel::Error, "{}", str);
- return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
- }
}
- else {
- auto const& dgst = storage_.CAS().StoreBlob(x.data(), false);
- if (not dgst) {
- auto const& str = fmt::format(
- "BatchUpdateBlobs: could not upload blob {}", hash);
- logger_.Emit(LogLevel::Error, "{}", str);
- return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
- }
- if (auto err = CheckDigestConsistency(x.digest(), *dgst)) {
- auto str = fmt::format("BatchUpdateBlobs: {}", *err);
- logger_.Emit(LogLevel::Error, "{}", str);
- return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
- }
+
+ auto const cas_digest =
+ is_tree
+ ? storage_.CAS().StoreTree(x.data())
+ : storage_.CAS().StoreBlob(x.data(), /*is_executable=*/false);
+
+ if (not cas_digest) {
+ auto const str =
+ fmt::format("BatchUpdateBlobs: could not upload {} {}",
+ is_tree ? "tree" : "blob",
+ hash);
+ logger_.Emit(LogLevel::Error, "{}", str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+
+ if (auto err = CheckDigestConsistency(x.digest(), *cas_digest)) {
+ auto const str =
+ fmt::format("BatchUpdateBlobs: {}", *std::move(err));
+ logger_.Emit(LogLevel::Error, "{}", str);
+ return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
}
}
return ::grpc::Status::OK;
@@ -185,25 +189,25 @@ auto CASServiceImpl::BatchReadBlobs(
::bazel_re::BatchReadBlobsResponse* response) -> ::grpc::Status {
auto lock = GarbageCollector::SharedLock(storage_config_);
if (not lock) {
- auto str = fmt::format("BatchReadBlobs: Could not acquire SharedLock");
+ auto const str =
+ fmt::format("BatchReadBlobs: Could not acquire SharedLock");
logger_.Emit(LogLevel::Error, "{}", str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
- for (auto const& digest : request->digests()) {
+ for (auto const& x : request->digests()) {
auto* r = response->add_responses();
- r->mutable_digest()->CopyFrom(digest);
- std::optional<std::filesystem::path> path;
- if (NativeSupport::IsTree(digest.hash())) {
- path = storage_.CAS().TreePath(digest);
- }
- else {
- path = storage_.CAS().BlobPath(digest, false);
- }
+ r->mutable_digest()->CopyFrom(x);
+
+ ArtifactDigest const digest(x);
+ auto const path =
+ NativeSupport::IsTree(x.hash())
+ ? storage_.CAS().TreePath(digest)
+ : storage_.CAS().BlobPath(digest, /*is_executable=*/false);
+
if (not path) {
google::rpc::Status status;
status.set_code(grpc::StatusCode::NOT_FOUND);
r->mutable_status()->CopyFrom(status);
-
continue;
}
std::ifstream cert{*path};
@@ -339,25 +343,33 @@ auto CASServiceImpl::SpliceBlob(::grpc::ServerContext* /*context*/,
return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
}
- // Splice blob from chunks.
auto chunk_digests = std::vector<bazel_re::Digest>{};
- std::copy(request->chunk_digests().cbegin(),
- request->chunk_digests().cend(),
- std::back_inserter(chunk_digests));
+ chunk_digests.reserve(request->chunk_digests().size());
+ for (auto const& x : request->chunk_digests()) {
+ if (not IsValidHash(x.hash())) {
+ auto const str =
+ fmt::format("SpliceBlob: unsupported digest {}", x.hash());
+ logger_.Emit(LogLevel::Error, "{}", str);
+ return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
+ }
+ chunk_digests.push_back(x);
+ }
+
+ // Splice blob from chunks.
auto splice_result =
CASUtils::SpliceBlob(blob_digest, chunk_digests, storage_);
if (not splice_result) {
auto const& status = splice_result.error();
- auto str = fmt::format("SpliceBlob: {}", status.error_message());
+ auto const str = fmt::format("SpliceBlob: {}", status.error_message());
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{status.error_code(), str};
}
- auto const& digest = *splice_result;
- if (auto err = CheckDigestConsistency(blob_digest, digest)) {
- auto str = fmt::format("SpliceBlob: {}", *err);
+ if (auto err = CheckDigestConsistency(blob_digest, *splice_result)) {
+ auto const str = fmt::format("SpliceBlob: {}", *err);
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
}
- response->mutable_blob_digest()->CopyFrom(digest);
+
+ response->mutable_blob_digest()->CopyFrom(*splice_result);
return ::grpc::Status::OK;
}