summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api')
-rw-r--r--src/buildtool/execution_api/common/bytestream_utils.cpp53
-rw-r--r--src/buildtool/execution_api/common/bytestream_utils.hpp37
-rw-r--r--src/buildtool/execution_api/execution_service/bytestream_server.cpp34
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp9
-rw-r--r--src/buildtool/execution_api/remote/bazel/bytestream_client.hpp11
5 files changed, 103 insertions, 41 deletions
diff --git a/src/buildtool/execution_api/common/bytestream_utils.cpp b/src/buildtool/execution_api/common/bytestream_utils.cpp
index cd1f0d19..f1b53f05 100644
--- a/src/buildtool/execution_api/common/bytestream_utils.cpp
+++ b/src/buildtool/execution_api/common/bytestream_utils.cpp
@@ -102,3 +102,56 @@ auto ByteStreamUtils::ReadRequest::GetDigest() const noexcept
-> bazel_re::Digest {
return ToBazelDigest(hash_, size_);
}
+
+ByteStreamUtils::WriteRequest::WriteRequest(
+ std::string instance_name,
+ std::string uuid,
+ bazel_re::Digest const& digest) noexcept
+ : instance_name_{std::move(instance_name)},
+ uuid_{std::move(uuid)},
+ hash_{digest.hash()},
+ size_{digest.size_bytes()} {}
+
+auto ByteStreamUtils::WriteRequest::ToString() && noexcept -> std::string {
+ return fmt::format("{}/{}/{}/{}/{}/{}",
+ std::move(instance_name_),
+ ByteStreamUtils::kUploads,
+ std::move(uuid_),
+ ByteStreamUtils::kBlobs,
+ std::move(hash_),
+ size_);
+}
+
+auto ByteStreamUtils::WriteRequest::FromString(
+ std::string const& request) noexcept -> std::optional<WriteRequest> {
+ static constexpr std::size_t kInstanceNameIndex = 0U;
+ static constexpr std::size_t kUploadsIndex = 1U;
+ static constexpr std::size_t kUUIDIndex = 2U;
+ static constexpr std::size_t kBlobsIndex = 3U;
+ static constexpr std::size_t kHashIndex = 4U;
+ static constexpr std::size_t kSizeIndex = 5U;
+ static constexpr std::size_t kWriteRequestPartsCount = 6U;
+
+ auto const parts = ::SplitRequest(request);
+ if (parts.size() != kWriteRequestPartsCount or
+ parts.at(kUploadsIndex).compare(ByteStreamUtils::kUploads) != 0 or
+ parts.at(kBlobsIndex).compare(ByteStreamUtils::kBlobs) != 0) {
+ return std::nullopt;
+ }
+
+ WriteRequest result;
+ result.instance_name_ = std::string(parts.at(kInstanceNameIndex));
+ result.uuid_ = std::string(parts.at(kUUIDIndex));
+ result.hash_ = std::string(parts.at(kHashIndex));
+ try {
+ result.size_ = std::stoi(std::string(parts.at(kSizeIndex)));
+ } catch (...) {
+ return std::nullopt;
+ }
+ return result;
+}
+
+auto ByteStreamUtils::WriteRequest::GetDigest() const noexcept
+ -> bazel_re::Digest {
+ return ToBazelDigest(hash_, size_);
+}
diff --git a/src/buildtool/execution_api/common/bytestream_utils.hpp b/src/buildtool/execution_api/common/bytestream_utils.hpp
index 9b46073f..672ae017 100644
--- a/src/buildtool/execution_api/common/bytestream_utils.hpp
+++ b/src/buildtool/execution_api/common/bytestream_utils.hpp
@@ -27,6 +27,7 @@ namespace bazel_re = build::bazel::remote::execution::v2;
class ByteStreamUtils final {
static constexpr auto* kBlobs = "blobs";
+ static constexpr auto* kUploads = "uploads";
public:
// Chunk size for uploads (default size used by BuildBarn)
@@ -61,6 +62,42 @@ class ByteStreamUtils final {
ReadRequest() = default;
};
+
+ /// \brief Create a write request for the bytestream service to be
+ /// transferred over the net. Handles serialization/deserialization on its
+ /// own. The pattern is:
+ /// "{instance_name}/{kUploads}/{uuid}/{kBlobs}/{digest.hash()}/{digest.size_bytes()}".
+ /// "instance_name_example/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712"
+ class WriteRequest final {
+ public:
+ explicit WriteRequest(std::string instance_name,
+ std::string uuid,
+ bazel_re::Digest const& digest) noexcept;
+
+ [[nodiscard]] auto ToString() && noexcept -> std::string;
+
+ [[nodiscard]] static auto FromString(
+ std::string const& request) noexcept -> std::optional<WriteRequest>;
+
+ [[nodiscard]] auto GetInstanceName() const noexcept
+ -> std::string const& {
+ return instance_name_;
+ }
+
+ [[nodiscard]] auto GetUUID() const noexcept -> std::string const& {
+ return uuid_;
+ }
+
+ [[nodiscard]] auto GetDigest() const noexcept -> bazel_re::Digest;
+
+ private:
+ std::string instance_name_;
+ std::string uuid_;
+ std::string hash_;
+ std::int64_t size_ = 0;
+
+ WriteRequest() = default;
+ };
};
#endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_COMMON_BYTESTREAM_UTILS_HPP
diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
index 789fdf95..d1e13d53 100644
--- a/src/buildtool/execution_api/execution_service/bytestream_server.cpp
+++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
@@ -30,33 +30,6 @@
#include "src/buildtool/storage/garbage_collector.hpp"
#include "src/utils/cpp/tmp_dir.hpp"
-namespace {
-auto ParseResourceName(std::string const& x) noexcept
- -> std::optional<bazel_re::Digest> {
- // resource name is like this
- // remote-execution/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712
- auto const size_delim = x.rfind('/');
- if (size_delim == std::string::npos) {
- return std::nullopt;
- }
-
- auto const hash_delim = x.rfind('/', size_delim - 1);
- if (hash_delim == std::string::npos) {
- return std::nullopt;
- }
-
- try {
- bazel_re::Digest digest{};
-
- digest.set_size_bytes(std::stoll(x.substr(size_delim + 1)));
- digest.set_hash(x.substr(hash_delim + 1, size_delim - hash_delim - 1));
- return digest;
- } catch (...) {
- return std::nullopt;
- }
-}
-} // namespace
-
auto BytestreamServiceImpl::Read(
::grpc::ServerContext* /*context*/,
const ::google::bytestream::ReadRequest* request,
@@ -129,8 +102,9 @@ auto BytestreamServiceImpl::Write(
::google::bytestream::WriteRequest request;
reader->Read(&request);
logger_.Emit(LogLevel::Debug, "write {}", request.resource_name());
- auto const digest = ParseResourceName(request.resource_name());
- if (not digest) {
+ auto const write_request =
+ ByteStreamUtils::WriteRequest::FromString(request.resource_name());
+ if (not write_request) {
auto const str =
fmt::format("could not parse {}", request.resource_name());
logger_.Emit(LogLevel::Error, "{}", str);
@@ -138,7 +112,7 @@ auto BytestreamServiceImpl::Write(
}
auto const write_digest = ArtifactDigestFactory::FromBazel(
- storage_config_.hash_function.GetType(), *digest);
+ storage_config_.hash_function.GetType(), write_request->GetDigest());
if (not write_digest) {
logger_.Emit(LogLevel::Debug, "{}", write_digest.error());
return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT,
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
index ea248810..155bbc89 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_cas_client.cpp
@@ -317,12 +317,9 @@ auto BazelCasClient::UpdateSingleBlob(std::string const& instance_name,
}
uuid = CreateUUIDVersion4(*id);
}
- auto ok = stream_->Write(fmt::format("{}/uploads/{}/blobs/{}/{}",
- instance_name,
- uuid,
- blob.digest.hash(),
- blob.digest.size_bytes()),
- *blob.data);
+ auto ok = stream_->Write(
+ ByteStreamUtils::WriteRequest{instance_name, uuid, blob.digest},
+ *blob.data);
if (not ok) {
logger_.Emit(LogLevel::Error,
"Failed to write {}:{}",
diff --git a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
index 02538384..a67fd904 100644
--- a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
+++ b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
@@ -106,7 +106,7 @@ class ByteStreamClient {
return output;
}
- [[nodiscard]] auto Write(std::string const& resource_name,
+ [[nodiscard]] auto Write(ByteStreamUtils::WriteRequest&& write_request,
std::string const& data) const noexcept -> bool {
try {
grpc::ClientContext ctx;
@@ -114,7 +114,7 @@ class ByteStreamClient {
auto writer = stub_->Write(&ctx, &response);
google::bytestream::WriteRequest request{};
- request.set_resource_name(resource_name);
+ request.set_resource_name(std::move(write_request).ToString());
request.mutable_data()->resize(ByteStreamUtils::kChunkSize, '\0');
std::size_t pos{};
@@ -131,12 +131,13 @@ class ByteStreamClient {
// the `Write()`, the client should check the status of the
// `Write()` by calling `QueryWriteStatus()` and continue
// writing from the returned `committed_size`.
- auto const committed_size = QueryWriteStatus(resource_name);
+ auto const committed_size =
+ QueryWriteStatus(request.resource_name());
if (committed_size <= 0) {
logger_.Emit(
LogLevel::Warning,
"broken stream for upload to resource name {}",
- resource_name);
+ request.resource_name());
return false;
}
pos = gsl::narrow<std::size_t>(committed_size);
@@ -148,7 +149,7 @@ class ByteStreamClient {
if (not writer->WritesDone()) {
logger_.Emit(LogLevel::Warning,
"broken stream for upload to resource name {}",
- resource_name);
+ request.resource_name());
return false;
}