diff options
Diffstat (limited to 'src')
5 files changed, 103 insertions, 41 deletions
diff --git a/src/buildtool/execution_api/common/bytestream_utils.cpp b/src/buildtool/execution_api/common/bytestream_utils.cpp index cd1f0d19..f1b53f05 100644 --- a/src/buildtool/execution_api/common/bytestream_utils.cpp +++ b/src/buildtool/execution_api/common/bytestream_utils.cpp @@ -102,3 +102,56 @@ auto ByteStreamUtils::ReadRequest::GetDigest() const noexcept -> bazel_re::Digest { return ToBazelDigest(hash_, size_); } + +ByteStreamUtils::WriteRequest::WriteRequest( + std::string instance_name, + std::string uuid, + bazel_re::Digest const& digest) noexcept + : instance_name_{std::move(instance_name)}, + uuid_{std::move(uuid)}, + hash_{digest.hash()}, + size_{digest.size_bytes()} {} + +auto ByteStreamUtils::WriteRequest::ToString() && noexcept -> std::string { + return fmt::format("{}/{}/{}/{}/{}/{}", + std::move(instance_name_), + ByteStreamUtils::kUploads, + std::move(uuid_), + ByteStreamUtils::kBlobs, + std::move(hash_), + size_); +} + +auto ByteStreamUtils::WriteRequest::FromString( + std::string const& request) noexcept -> std::optional<WriteRequest> { + static constexpr std::size_t kInstanceNameIndex = 0U; + static constexpr std::size_t kUploadsIndex = 1U; + static constexpr std::size_t kUUIDIndex = 2U; + static constexpr std::size_t kBlobsIndex = 3U; + static constexpr std::size_t kHashIndex = 4U; + static constexpr std::size_t kSizeIndex = 5U; + static constexpr std::size_t kWriteRequestPartsCount = 6U; + + auto const parts = ::SplitRequest(request); + if (parts.size() != kWriteRequestPartsCount or + parts.at(kUploadsIndex).compare(ByteStreamUtils::kUploads) != 0 or + parts.at(kBlobsIndex).compare(ByteStreamUtils::kBlobs) != 0) { + return std::nullopt; + } + + WriteRequest result; + result.instance_name_ = std::string(parts.at(kInstanceNameIndex)); + result.uuid_ = std::string(parts.at(kUUIDIndex)); + result.hash_ = std::string(parts.at(kHashIndex)); + try { + result.size_ = std::stoi(std::string(parts.at(kSizeIndex))); + } catch (...) { + return std::nullopt; + } + return result; +} + +auto ByteStreamUtils::WriteRequest::GetDigest() const noexcept + -> bazel_re::Digest { + return ToBazelDigest(hash_, size_); +} diff --git a/src/buildtool/execution_api/common/bytestream_utils.hpp b/src/buildtool/execution_api/common/bytestream_utils.hpp index 9b46073f..672ae017 100644 --- a/src/buildtool/execution_api/common/bytestream_utils.hpp +++ b/src/buildtool/execution_api/common/bytestream_utils.hpp @@ -27,6 +27,7 @@ namespace bazel_re = build::bazel::remote::execution::v2; class ByteStreamUtils final { static constexpr auto* kBlobs = "blobs"; + static constexpr auto* kUploads = "uploads"; public: // Chunk size for uploads (default size used by BuildBarn) @@ -61,6 +62,42 @@ class ByteStreamUtils final { ReadRequest() = default; }; + + /// \brief Create a write request for the bytestream service to be + /// transferred over the net. Handles serialization/deserialization on its + /// own. The pattern is: + /// "{instance_name}/{kUploads}/{uuid}/{kBlobs}/{digest.hash()}/{digest.size_bytes()}". + /// "instance_name_example/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712" + class WriteRequest final { + public: + explicit WriteRequest(std::string instance_name, + std::string uuid, + bazel_re::Digest const& digest) noexcept; + + [[nodiscard]] auto ToString() && noexcept -> std::string; + + [[nodiscard]] static auto FromString( + std::string const& request) noexcept -> std::optional<WriteRequest>; + + [[nodiscard]] auto GetInstanceName() const noexcept + -> std::string const& { + return instance_name_; + } + + [[nodiscard]] auto GetUUID() const noexcept -> std::string const& { + return uuid_; + } + + [[nodiscard]] auto GetDigest() const noexcept -> bazel_re::Digest; + + private: + std::string instance_name_; + std::string uuid_; + std::string hash_; + std::int64_t size_ = 0; + + WriteRequest() = default; + }; }; #endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_COMMON_BYTESTREAM_UTILS_HPP diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp index 789fdf95..d1e13d53 100644 --- a/src/buildtool/execution_api/execution_service/bytestream_server.cpp +++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp @@ -30,33 +30,6 @@ #include "src/buildtool/storage/garbage_collector.hpp" #include "src/utils/cpp/tmp_dir.hpp" -namespace { -auto ParseResourceName(std::string const& x) noexcept - -> std::optional<bazel_re::Digest> { - // resource name is like this - // remote-execution/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712 - auto const size_delim = x.rfind('/'); - if (size_delim == std::string::npos) { - return std::nullopt; - } - - auto const hash_delim = x.rfind('/', size_delim - 1); - if (hash_delim == std::string::npos) { - return std::nullopt; - } - - try { - bazel_re::Digest digest{}; - - digest.set_size_bytes(std::stoll(x.substr(size_delim + 1))); - digest.set_hash(x.substr(hash_delim + 1, size_delim - hash_delim - 1)); - return digest; - } catch (...) { - return std::nullopt; - } -} -} // namespace - auto BytestreamServiceImpl::Read( ::grpc::ServerContext* /*context*/, const ::google::bytestream::ReadRequest* request, @@ -129,8 +102,9 @@ auto BytestreamServiceImpl::Write( ::google::bytestream::WriteRequest request; reader->Read(&request); logger_.Emit(LogLevel::Debug, "write {}", request.resource_name()); - auto const digest = ParseResourceName(request.resource_name()); - if (not digest) { + auto const write_request = + ByteStreamUtils::WriteRequest::FromString(request.resource_name()); + if (not write_request) { auto const str = fmt::format("could not parse {}", request.resource_name()); logger_.Emit(LogLevel::Error, "{}", str); @@ -138,7 +112,7 @@ auto BytestreamServiceImpl::Write( } auto const write_digest = ArtifactDigestFactory::FromBazel( - storage_config_.hash_function.GetType(), *digest); + storage_config_.hash_function.GetType(), write_request->GetDigest()); if (not write_digest) { logger_.Emit(LogLevel::Debug, "{}", write_digest.error()); return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp index ea248810..155bbc89 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp @@ -317,12 +317,9 @@ auto BazelCasClient::UpdateSingleBlob(std::string const& instance_name, } uuid = CreateUUIDVersion4(*id); } - auto ok = stream_->Write(fmt::format("{}/uploads/{}/blobs/{}/{}", - instance_name, - uuid, - blob.digest.hash(), - blob.digest.size_bytes()), - *blob.data); + auto ok = stream_->Write( + ByteStreamUtils::WriteRequest{instance_name, uuid, blob.digest}, + *blob.data); if (not ok) { logger_.Emit(LogLevel::Error, "Failed to write {}:{}", diff --git a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp index 02538384..a67fd904 100644 --- a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp +++ b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp @@ -106,7 +106,7 @@ class ByteStreamClient { return output; } - [[nodiscard]] auto Write(std::string const& resource_name, + [[nodiscard]] auto Write(ByteStreamUtils::WriteRequest&& write_request, std::string const& data) const noexcept -> bool { try { grpc::ClientContext ctx; @@ -114,7 +114,7 @@ class ByteStreamClient { auto writer = stub_->Write(&ctx, &response); google::bytestream::WriteRequest request{}; - request.set_resource_name(resource_name); + request.set_resource_name(std::move(write_request).ToString()); request.mutable_data()->resize(ByteStreamUtils::kChunkSize, '\0'); std::size_t pos{}; @@ -131,12 +131,13 @@ class ByteStreamClient { // the `Write()`, the client should check the status of the // `Write()` by calling `QueryWriteStatus()` and continue // writing from the returned `committed_size`. - auto const committed_size = QueryWriteStatus(resource_name); + auto const committed_size = + QueryWriteStatus(request.resource_name()); if (committed_size <= 0) { logger_.Emit( LogLevel::Warning, "broken stream for upload to resource name {}", - resource_name); + request.resource_name()); return false; } pos = gsl::narrow<std::size_t>(committed_size); @@ -148,7 +149,7 @@ class ByteStreamClient { if (not writer->WritesDone()) { logger_.Emit(LogLevel::Warning, "broken stream for upload to resource name {}", - resource_name); + request.resource_name()); return false; } |