summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSascha Roloff <sascha.roloff@huawei.com>2024-02-23 16:04:36 +0100
committerSascha Roloff <sascha.roloff@huawei.com>2024-02-26 17:16:21 +0100
commit0d98a04c28eeb18d08f731c3f94de825d49daac5 (patch)
tree8218dcfca1fb1c9dca5da6a1afd5c30677788726 /src
parent25ef9672988f008e61193228756dcfed069bda57 (diff)
downloadjustbuild-0d98a04c28eeb18d08f731c3f94de825d49daac5.tar.gz
Implement blob splicing protocol at just server side
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/execution_api/execution_service/cas_server.cpp59
-rw-r--r--src/buildtool/execution_api/execution_service/cas_server.hpp43
-rw-r--r--src/buildtool/execution_api/execution_service/cas_utils.cpp17
-rw-r--r--src/buildtool/execution_api/execution_service/cas_utils.hpp3
-rw-r--r--src/buildtool/execution_api/local/local_api.hpp7
5 files changed, 123 insertions, 6 deletions
diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp
index 064308e5..b266cd9e 100644
--- a/src/buildtool/execution_api/execution_service/cas_server.cpp
+++ b/src/buildtool/execution_api/execution_service/cas_server.cpp
@@ -216,7 +216,7 @@ auto CASServiceImpl::BatchReadBlobs(
auto CASServiceImpl::GetTree(
::grpc::ServerContext* /*context*/,
const ::bazel_re::GetTreeRequest* /*request*/,
- ::grpc::ServerWriter< ::bazel_re::GetTreeResponse>* /*writer*/)
+ ::grpc::ServerWriter<::bazel_re::GetTreeResponse>* /*writer*/)
-> ::grpc::Status {
auto const* str = "GetTree not implemented";
logger_.Emit(LogLevel::Error, str);
@@ -305,3 +305,60 @@ auto CASServiceImpl::SplitBlob(::grpc::ServerContext* /*context*/,
pb::back_inserter(response->mutable_chunk_digests()));
return ::grpc::Status::OK;
}
+
+auto CASServiceImpl::SpliceBlob(::grpc::ServerContext* /*context*/,
+ const ::bazel_re::SpliceBlobRequest* request,
+ ::bazel_re::SpliceBlobResponse* response)
+ -> ::grpc::Status {
+ if (not request->has_blob_digest()) {
+ auto str = fmt::format("SpliceBlob: no blob digest provided");
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
+ }
+
+ auto const& blob_digest = request->blob_digest();
+ if (not IsValidHash(blob_digest.hash())) {
+ auto str = fmt::format("SpliceBlob: unsupported digest {}",
+ blob_digest.hash());
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
+ }
+
+ logger_.Emit(LogLevel::Debug,
+ "SpliceBlob({}, {} chunks)",
+ blob_digest.hash(),
+ request->chunk_digests().size());
+
+ // Acquire garbage collection lock.
+ auto lock = GarbageCollector::SharedLock();
+ if (not lock) {
+ auto str = fmt::format(
+ "SpliceBlob: could not acquire garbage collection lock");
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+
+ // Splice blob from chunks.
+ auto chunk_digests = std::vector<bazel_re::Digest>{};
+ std::copy(request->chunk_digests().cbegin(),
+ request->chunk_digests().cend(),
+ std::back_inserter(chunk_digests));
+ auto splice_result = CASUtils::SpliceBlob(blob_digest,
+ chunk_digests,
+ *storage_,
+ /* check_tree_invariant= */ true);
+ if (std::holds_alternative<grpc::Status>(splice_result)) {
+ auto status = std::get<grpc::Status>(splice_result);
+ auto str = fmt::format("SpliceBlob: {}", status.error_message());
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{status.error_code(), str};
+ }
+ auto digest = std::get<bazel_re::Digest>(splice_result);
+ if (auto err = CheckDigestConsistency(blob_digest, digest)) {
+ auto str = fmt::format("SpliceBlob: {}", *err);
+ logger_.Emit(LogLevel::Error, str);
+ return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str};
+ }
+ response->mutable_blob_digest()->CopyFrom(digest);
+ return ::grpc::Status::OK;
+}
diff --git a/src/buildtool/execution_api/execution_service/cas_server.hpp b/src/buildtool/execution_api/execution_service/cas_server.hpp
index fd77a03e..4a6b6092 100644
--- a/src/buildtool/execution_api/execution_service/cas_server.hpp
+++ b/src/buildtool/execution_api/execution_service/cas_server.hpp
@@ -116,7 +116,7 @@ class CASServiceImpl final
// * `NOT_FOUND`: The requested tree root is not present in the CAS.
auto GetTree(::grpc::ServerContext* context,
const ::bazel_re::GetTreeRequest* request,
- ::grpc::ServerWriter< ::bazel_re::GetTreeResponse>* writer)
+ ::grpc::ServerWriter<::bazel_re::GetTreeResponse>* writer)
-> ::grpc::Status override;
// Split a blob into chunks.
//
@@ -168,6 +168,47 @@ class CASServiceImpl final
const ::bazel_re::SplitBlobRequest* request,
::bazel_re::SplitBlobResponse* response)
-> ::grpc::Status override;
+ // Splice a blob from chunks.
+ //
+ // This is the complementary operation to the
+ // [ContentAddressableStorage.SplitBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SplitBlob]
+ // function to handle the splitted upload of large blobs to save upload
+ // traffic.
+ //
+ // If a client needs to upload a large blob and is able to split a blob into
+ // chunks locally according to some content-defined chunking algorithm, it
+ // can first determine which parts of the blob are already available in the
+ // remote CAS and upload the missing chunks, and then use this API to
+ // instruct the server to splice the original blob from the remotely
+ // available blob chunks.
+ //
+ // In order to ensure data consistency of the CAS, the server will verify
+ // the spliced result whether digest calculation results in the provided
+ // digest from the request and will reject a splice request if this check
+ // fails.
+ //
+ // The usage of this API is optional for clients but it allows them to
+ // upload only the missing parts of a large blob instead of the entire blob
+ // data, which in turn can considerably reduce upload network traffic.
+ //
+ // In order to split a blob into chunks, it is recommended for the client to
+ // use one of the servers' advertised chunking algorithms by
+ // [CacheCapabilities.supported_chunking_algorithms][build.bazel.remote.execution.v2.CacheCapabilities.supported_chunking_algorithms]
+ // to benefit from each others chunking data. If several clients use blob
+ // splicing, it is recommended that they use the same splitting algorithm to
+ // split their blobs into chunk.
+ //
+ // Errors:
+ //
+ // * `NOT_FOUND`: At least one of the blob chunks is not present in the CAS.
+ // * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the
+ // spliced blob.
+ // * `INVALID_ARGUMENT`: The digest of the spliced blob is different from
+ // the provided expected digest.
+ auto SpliceBlob(::grpc::ServerContext* context,
+ const ::bazel_re::SpliceBlobRequest* request,
+ ::bazel_re::SpliceBlobResponse* response)
+ -> ::grpc::Status override;
private:
[[nodiscard]] auto CheckDigestConsistency(bazel_re::Digest const& ref,
diff --git a/src/buildtool/execution_api/execution_service/cas_utils.cpp b/src/buildtool/execution_api/execution_service/cas_utils.cpp
index 48bbd124..b710dd36 100644
--- a/src/buildtool/execution_api/execution_service/cas_utils.cpp
+++ b/src/buildtool/execution_api/execution_service/cas_utils.cpp
@@ -145,7 +145,8 @@ auto CASUtils::SplitBlobFastCDC(bazel_re::Digest const& blob_digest,
auto CASUtils::SpliceBlob(bazel_re::Digest const& blob_digest,
std::vector<bazel_re::Digest> const& chunk_digests,
- Storage const& storage) noexcept
+ Storage const& storage,
+ bool check_tree_invariant) noexcept
-> std::variant<bazel_re::Digest, grpc::Status> {
// Assemble blob from chunks.
@@ -175,6 +176,20 @@ auto CASUtils::SpliceBlob(bazel_re::Digest const& blob_digest,
// Store resulting blob in according CAS.
auto const& hash = blob_digest.hash();
if (NativeSupport::IsTree(hash)) {
+ // In native mode: for trees, check whether the tree invariant holds
+ // before storing the actual tree object.
+ if (check_tree_invariant) {
+ auto tree_data = FileSystemManager::ReadFile(tmp_file);
+ if (not tree_data) {
+ return grpc::Status{
+ grpc::StatusCode::INTERNAL,
+ fmt::format("could read tree data {}", hash)};
+ }
+ if (auto err = EnsureTreeInvariant(*tree_data, hash, storage)) {
+ return grpc::Status{grpc::StatusCode::FAILED_PRECONDITION,
+ *err};
+ }
+ }
auto const& digest =
storage.CAS().StoreTree</* kOwner= */ true>(tmp_file);
if (not digest) {
diff --git a/src/buildtool/execution_api/execution_service/cas_utils.hpp b/src/buildtool/execution_api/execution_service/cas_utils.hpp
index c3155b17..c537bd3f 100644
--- a/src/buildtool/execution_api/execution_service/cas_utils.hpp
+++ b/src/buildtool/execution_api/execution_service/cas_utils.hpp
@@ -44,7 +44,8 @@ class CASUtils {
[[nodiscard]] static auto SpliceBlob(
bazel_re::Digest const& blob_digest,
std::vector<bazel_re::Digest> const& chunk_digests,
- Storage const& storage) noexcept
+ Storage const& storage,
+ bool check_tree_invariant) noexcept
-> std::variant<bazel_re::Digest, grpc::Status>;
};
diff --git a/src/buildtool/execution_api/local/local_api.hpp b/src/buildtool/execution_api/local/local_api.hpp
index 5880475e..3c063a84 100644
--- a/src/buildtool/execution_api/local/local_api.hpp
+++ b/src/buildtool/execution_api/local/local_api.hpp
@@ -476,8 +476,11 @@ class LocalApi final : public IExecutionApi {
[](auto const& artifact_digest) {
return static_cast<bazel_re::Digest>(artifact_digest);
});
- auto splice_result = CASUtils::SpliceBlob(
- static_cast<bazel_re::Digest>(blob_digest), digests, *storage_);
+ auto splice_result =
+ CASUtils::SpliceBlob(static_cast<bazel_re::Digest>(blob_digest),
+ digests,
+ *storage_,
+ /* check_tree_invariant= */ false);
if (std::holds_alternative<grpc::Status>(splice_result)) {
auto* status = std::get_if<grpc::Status>(&splice_result);
Logger::Log(LogLevel::Error, status->error_message());