diff options
author | Maksim Denisov <denisov.maksim@huawei.com> | 2025-02-20 11:48:11 +0100 |
---|---|---|
committer | Maksim Denisov <denisov.maksim@huawei.com> | 2025-02-21 14:46:30 +0100 |
commit | 986b6e25526e38f21a6b3f11beefbb9679a200ab (patch) | |
tree | a024210b2254453e77490e05d26dfd7fa9529182 /src/buildtool/execution_api | |
parent | 5979976e6ac3d64263e306d3c253d54f0b6748fd (diff) | |
download | justbuild-986b6e25526e38f21a6b3f11beefbb9679a200ab.tar.gz |
ByteStreamClient: Use IncrementalReader for writing
Diffstat (limited to 'src/buildtool/execution_api')
-rw-r--r-- | src/buildtool/execution_api/remote/TARGETS | 2 | ||||
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bytestream_client.hpp | 48 |
2 files changed, 37 insertions, 13 deletions
diff --git a/src/buildtool/execution_api/remote/TARGETS b/src/buildtool/execution_api/remote/TARGETS index 39c1a444..43f3f381 100644 --- a/src/buildtool/execution_api/remote/TARGETS +++ b/src/buildtool/execution_api/remote/TARGETS @@ -34,6 +34,8 @@ , ["src/buildtool/file_system", "git_repo"] , ["src/buildtool/logging", "log_level"] , ["src/buildtool/logging", "logging"] + , ["src/utils/cpp", "expected"] + , ["src/utils/cpp", "incremental_reader"] ] , "proto": [ ["@", "bazel_remote_apis", "", "remote_execution_proto"] diff --git a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp index 1e856119..4343393c 100644 --- a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp +++ b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp @@ -15,12 +15,12 @@ #ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP #define INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP -#include <algorithm> #include <cstddef> #include <cstdint> #include <memory> #include <optional> #include <string> +#include <string_view> #include <utility> // std::move #include <grpcpp/grpcpp.h> @@ -34,6 +34,8 @@ #include "src/buildtool/execution_api/common/bytestream_utils.hpp" #include "src/buildtool/logging/log_level.hpp" #include "src/buildtool/logging/logger.hpp" +#include "src/utils/cpp/expected.hpp" +#include "src/utils/cpp/incremental_reader.hpp" /// Implements client side for google.bytestream.ByteStream service. class ByteStreamClient { @@ -118,17 +120,39 @@ class ByteStreamClient { google::bytestream::WriteRequest request{}; request.set_resource_name(std::move(write_request).ToString()); - request.mutable_data()->resize(ByteStreamUtils::kChunkSize, '\0'); + request.mutable_data()->reserve(ByteStreamUtils::kChunkSize); + + auto const to_read = ::IncrementalReader::FromMemory( + ByteStreamUtils::kChunkSize, &data); + if (not to_read.has_value()) { + logger_.Emit( + LogLevel::Error, + "ByteStreamClient: Failed to create a reader for {}:\n{}", + request.resource_name(), + to_read.error()); + return false; + } std::size_t pos = 0; - do { // NOLINT(cppcoreguidelines-avoid-do-while) - auto const size = - std::min(data.size() - pos, ByteStreamUtils::kChunkSize); - request.mutable_data()->resize(size); - data.copy(request.mutable_data()->data(), size, pos); + for (auto it = to_read->begin(); it != to_read->end();) { + auto const chunk = *it; + if (not chunk.has_value()) { + logger_.Emit( + LogLevel::Error, + "ByteStreamClient: Failed to read data for {}:\n{}", + request.resource_name(), + chunk.error()); + return false; + } + *request.mutable_data() = *chunk; + request.set_write_offset(static_cast<int>(pos)); - request.set_finish_write(pos + size >= data.size()); - if (not writer->Write(request)) { + request.set_finish_write(pos + chunk->size() >= data.size()); + if (writer->Write(request)) { + pos += chunk->size(); + ++it; + } + else { // According to the docs, quote: // If there is an error or the connection is broken during // the `Write()`, the client should check the status of the @@ -144,11 +168,9 @@ class ByteStreamClient { return false; } pos = gsl::narrow<std::size_t>(committed_size); + it = to_read->make_iterator(pos); } - else { - pos += ByteStreamUtils::kChunkSize; - } - } while (pos < data.size()); + } if (not writer->WritesDone()) { logger_.Emit(LogLevel::Warning, "broken stream for upload to resource name {}", |