diff options
Diffstat (limited to 'src/buildtool/execution_api/remote/bazel/bytestream_client.hpp')
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bytestream_client.hpp | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp new file mode 100644 index 00000000..b8823236 --- /dev/null +++ b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp @@ -0,0 +1,185 @@ +#ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP +#define INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP + +#include <functional> +#include <iomanip> +#include <optional> +#include <string> +#include <vector> + +#include "google/bytestream/bytestream.grpc.pb.h" +#include "src/buildtool/execution_api/remote/bazel/bazel_client_common.hpp" +#include "src/buildtool/logging/logger.hpp" + +/// Implements client side for google.bytestream.ByteStream service. +class ByteStreamClient { + public: + class IncrementalReader { + friend class ByteStreamClient; + + public: + /// \brief Read next chunk of data. + /// \returns empty string if stream finished and std::nullopt on error. + [[nodiscard]] auto Next() -> std::optional<std::string> { + google::bytestream::ReadResponse response{}; + if (reader_->Read(&response)) { + return std::move(*response.mutable_data()); + } + + if (not finished_) { + auto status = reader_->Finish(); + if (not status.ok()) { + LogStatus(logger_, LogLevel::Debug, status); + return std::nullopt; + } + finished_ = true; + } + return std::string{}; + } + + private: + Logger const* logger_; + grpc::ClientContext ctx_; + std::unique_ptr<grpc::ClientReader<google::bytestream::ReadResponse>> + reader_; + bool finished_{false}; + + IncrementalReader( + gsl::not_null<google::bytestream::ByteStream::Stub*> const& stub, + Logger const* logger, + std::string const& resource_name) + : logger_{logger} { + google::bytestream::ReadRequest request{}; + request.set_resource_name(resource_name); + reader_ = stub->Read(&ctx_, request); + } + }; + + ByteStreamClient(std::string const& server, + Port port, + std::string const& user = "", + std::string const& pwd = "") noexcept { + stub_ = google::bytestream::ByteStream::NewStub( + CreateChannelWithCredentials(server, port, user, pwd)); + } + + [[nodiscard]] auto IncrementalRead( + std::string const& resource_name) const noexcept -> IncrementalReader { + return IncrementalReader{stub_.get(), &logger_, resource_name}; + } + + [[nodiscard]] auto Read(std::string const& resource_name) const noexcept + -> std::optional<std::string> { + auto reader = IncrementalRead(resource_name); + std::string output{}; + auto data = reader.Next(); + while (data and not data->empty()) { + output.append(data->begin(), data->end()); + data = reader.Next(); + } + if (not data) { + return std::nullopt; + } + return output; + } + + [[nodiscard]] auto Write(std::string const& resource_name, + std::string const& data) const noexcept -> bool { + grpc::ClientContext ctx; + google::bytestream::WriteResponse response{}; + auto writer = stub_->Write(&ctx, &response); + + auto* allocated_data = + std::make_unique<std::string>(kChunkSize, '\0').release(); + google::bytestream::WriteRequest request{}; + request.set_resource_name(resource_name); + request.set_allocated_data(allocated_data); + std::size_t pos{}; + do { + auto const size = std::min(data.size() - pos, kChunkSize); + allocated_data->resize(size); + data.copy(allocated_data->data(), size, pos); + request.set_write_offset(static_cast<int>(pos)); + request.set_finish_write(pos + size >= data.size()); + if (not writer->Write(request)) { + // According to the docs, quote: + // If there is an error or the connection is broken during the + // `Write()`, the client should check the status of the + // `Write()` by calling `QueryWriteStatus()` and continue + // writing from the returned `committed_size`. + auto const committed_size = QueryWriteStatus(resource_name); + if (committed_size <= 0) { + logger_.Emit(LogLevel::Debug, + "broken stream for upload to resource name {}", + resource_name); + return false; + } + pos = gsl::narrow<std::size_t>(committed_size); + } + else { + pos += kChunkSize; + } + } while (pos < data.size()); + if (not writer->WritesDone()) { + logger_.Emit(LogLevel::Debug, + "broken stream for upload to resource name {}", + resource_name); + return false; + } + + auto status = writer->Finish(); + if (not status.ok()) { + LogStatus(&logger_, LogLevel::Debug, status); + return false; + } + + return gsl::narrow<std::size_t>(response.committed_size()) == + data.size(); + } + + template <class T_Input> + void ReadMany( + std::vector<T_Input> const& inputs, + std::function<std::string(T_Input const&)> const& to_resource_name, + std::function<void(std::string)> const& parse_data) const noexcept { + for (auto const& i : inputs) { + auto data = Read(to_resource_name(i)); + if (data) { + parse_data(std::move(*data)); + } + } + } + + template <class T_Input> + [[nodiscard]] auto WriteMany( + std::vector<T_Input> const& inputs, + std::function<std::string(T_Input const&)> const& to_resource_name, + std::function<std::string(T_Input const&)> const& to_data) + const noexcept -> bool { + for (auto const& i : inputs) { + if (not Write(to_resource_name(i), to_data(i))) { + return false; + } + } + return true; + } + + private: + // Chunk size for uploads (default size used by BuildBarn) + constexpr static std::size_t kChunkSize = 64 * 1024; + + std::unique_ptr<google::bytestream::ByteStream::Stub> stub_; + Logger logger_{"ByteStreamClient"}; + + [[nodiscard]] auto QueryWriteStatus( + std::string const& resource_name) const noexcept -> std::int64_t { + grpc::ClientContext ctx; + google::bytestream::QueryWriteStatusRequest request{}; + request.set_resource_name(resource_name); + google::bytestream::QueryWriteStatusResponse response{}; + stub_->QueryWriteStatus(&ctx, request, &response); + return response.committed_size(); + } +}; + +#endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP |