summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/remote/bazel/bytestream_client.hpp')
-rw-r--r--src/buildtool/execution_api/remote/bazel/bytestream_client.hpp185
1 files changed, 185 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
new file mode 100644
index 00000000..b8823236
--- /dev/null
+++ b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
@@ -0,0 +1,185 @@
+#ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP
+#define INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP
+
+#include <functional>
+#include <iomanip>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "google/bytestream/bytestream.grpc.pb.h"
+#include "src/buildtool/execution_api/remote/bazel/bazel_client_common.hpp"
+#include "src/buildtool/logging/logger.hpp"
+
+/// Implements client side for google.bytestream.ByteStream service.
+class ByteStreamClient {
+ public:
+ class IncrementalReader {
+ friend class ByteStreamClient;
+
+ public:
+ /// \brief Read next chunk of data.
+ /// \returns empty string if stream finished and std::nullopt on error.
+ [[nodiscard]] auto Next() -> std::optional<std::string> {
+ google::bytestream::ReadResponse response{};
+ if (reader_->Read(&response)) {
+ return std::move(*response.mutable_data());
+ }
+
+ if (not finished_) {
+ auto status = reader_->Finish();
+ if (not status.ok()) {
+ LogStatus(logger_, LogLevel::Debug, status);
+ return std::nullopt;
+ }
+ finished_ = true;
+ }
+ return std::string{};
+ }
+
+ private:
+ Logger const* logger_;
+ grpc::ClientContext ctx_;
+ std::unique_ptr<grpc::ClientReader<google::bytestream::ReadResponse>>
+ reader_;
+ bool finished_{false};
+
+ IncrementalReader(
+ gsl::not_null<google::bytestream::ByteStream::Stub*> const& stub,
+ Logger const* logger,
+ std::string const& resource_name)
+ : logger_{logger} {
+ google::bytestream::ReadRequest request{};
+ request.set_resource_name(resource_name);
+ reader_ = stub->Read(&ctx_, request);
+ }
+ };
+
+ ByteStreamClient(std::string const& server,
+ Port port,
+ std::string const& user = "",
+ std::string const& pwd = "") noexcept {
+ stub_ = google::bytestream::ByteStream::NewStub(
+ CreateChannelWithCredentials(server, port, user, pwd));
+ }
+
+ [[nodiscard]] auto IncrementalRead(
+ std::string const& resource_name) const noexcept -> IncrementalReader {
+ return IncrementalReader{stub_.get(), &logger_, resource_name};
+ }
+
+ [[nodiscard]] auto Read(std::string const& resource_name) const noexcept
+ -> std::optional<std::string> {
+ auto reader = IncrementalRead(resource_name);
+ std::string output{};
+ auto data = reader.Next();
+ while (data and not data->empty()) {
+ output.append(data->begin(), data->end());
+ data = reader.Next();
+ }
+ if (not data) {
+ return std::nullopt;
+ }
+ return output;
+ }
+
+ [[nodiscard]] auto Write(std::string const& resource_name,
+ std::string const& data) const noexcept -> bool {
+ grpc::ClientContext ctx;
+ google::bytestream::WriteResponse response{};
+ auto writer = stub_->Write(&ctx, &response);
+
+ auto* allocated_data =
+ std::make_unique<std::string>(kChunkSize, '\0').release();
+ google::bytestream::WriteRequest request{};
+ request.set_resource_name(resource_name);
+ request.set_allocated_data(allocated_data);
+ std::size_t pos{};
+ do {
+ auto const size = std::min(data.size() - pos, kChunkSize);
+ allocated_data->resize(size);
+ data.copy(allocated_data->data(), size, pos);
+ request.set_write_offset(static_cast<int>(pos));
+ request.set_finish_write(pos + size >= data.size());
+ if (not writer->Write(request)) {
+ // According to the docs, quote:
+ // If there is an error or the connection is broken during the
+ // `Write()`, the client should check the status of the
+ // `Write()` by calling `QueryWriteStatus()` and continue
+ // writing from the returned `committed_size`.
+ auto const committed_size = QueryWriteStatus(resource_name);
+ if (committed_size <= 0) {
+ logger_.Emit(LogLevel::Debug,
+ "broken stream for upload to resource name {}",
+ resource_name);
+ return false;
+ }
+ pos = gsl::narrow<std::size_t>(committed_size);
+ }
+ else {
+ pos += kChunkSize;
+ }
+ } while (pos < data.size());
+ if (not writer->WritesDone()) {
+ logger_.Emit(LogLevel::Debug,
+ "broken stream for upload to resource name {}",
+ resource_name);
+ return false;
+ }
+
+ auto status = writer->Finish();
+ if (not status.ok()) {
+ LogStatus(&logger_, LogLevel::Debug, status);
+ return false;
+ }
+
+ return gsl::narrow<std::size_t>(response.committed_size()) ==
+ data.size();
+ }
+
+ template <class T_Input>
+ void ReadMany(
+ std::vector<T_Input> const& inputs,
+ std::function<std::string(T_Input const&)> const& to_resource_name,
+ std::function<void(std::string)> const& parse_data) const noexcept {
+ for (auto const& i : inputs) {
+ auto data = Read(to_resource_name(i));
+ if (data) {
+ parse_data(std::move(*data));
+ }
+ }
+ }
+
+ template <class T_Input>
+ [[nodiscard]] auto WriteMany(
+ std::vector<T_Input> const& inputs,
+ std::function<std::string(T_Input const&)> const& to_resource_name,
+ std::function<std::string(T_Input const&)> const& to_data)
+ const noexcept -> bool {
+ for (auto const& i : inputs) {
+ if (not Write(to_resource_name(i), to_data(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ // Chunk size for uploads (default size used by BuildBarn)
+ constexpr static std::size_t kChunkSize = 64 * 1024;
+
+ std::unique_ptr<google::bytestream::ByteStream::Stub> stub_;
+ Logger logger_{"ByteStreamClient"};
+
+ [[nodiscard]] auto QueryWriteStatus(
+ std::string const& resource_name) const noexcept -> std::int64_t {
+ grpc::ClientContext ctx;
+ google::bytestream::QueryWriteStatusRequest request{};
+ request.set_resource_name(resource_name);
+ google::bytestream::QueryWriteStatusResponse response{};
+ stub_->QueryWriteStatus(&ctx, request, &response);
+ return response.committed_size();
+ }
+};
+
+#endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP