summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/remote
diff options
context:
space:
mode:
authorMaksim Denisov <denisov.maksim@huawei.com>2025-02-20 11:48:11 +0100
committerMaksim Denisov <denisov.maksim@huawei.com>2025-02-21 14:46:30 +0100
commit986b6e25526e38f21a6b3f11beefbb9679a200ab (patch)
treea024210b2254453e77490e05d26dfd7fa9529182 /src/buildtool/execution_api/remote
parent5979976e6ac3d64263e306d3c253d54f0b6748fd (diff)
downloadjustbuild-986b6e25526e38f21a6b3f11beefbb9679a200ab.tar.gz
ByteStreamClient: Use IncrementalReader for writing
Diffstat (limited to 'src/buildtool/execution_api/remote')
-rw-r--r--src/buildtool/execution_api/remote/TARGETS2
-rw-r--r--src/buildtool/execution_api/remote/bazel/bytestream_client.hpp48
2 files changed, 37 insertions, 13 deletions
diff --git a/src/buildtool/execution_api/remote/TARGETS b/src/buildtool/execution_api/remote/TARGETS
index 39c1a444..43f3f381 100644
--- a/src/buildtool/execution_api/remote/TARGETS
+++ b/src/buildtool/execution_api/remote/TARGETS
@@ -34,6 +34,8 @@
, ["src/buildtool/file_system", "git_repo"]
, ["src/buildtool/logging", "log_level"]
, ["src/buildtool/logging", "logging"]
+ , ["src/utils/cpp", "expected"]
+ , ["src/utils/cpp", "incremental_reader"]
]
, "proto":
[ ["@", "bazel_remote_apis", "", "remote_execution_proto"]
diff --git a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
index 1e856119..4343393c 100644
--- a/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
+++ b/src/buildtool/execution_api/remote/bazel/bytestream_client.hpp
@@ -15,12 +15,12 @@
#ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP
#define INCLUDED_SRC_BUILDTOOL_EXECUTION_API_REMOTE_BAZEL_BYTESTREAM_CLIENT_HPP
-#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
+#include <string_view>
#include <utility> // std::move
#include <grpcpp/grpcpp.h>
@@ -34,6 +34,8 @@
#include "src/buildtool/execution_api/common/bytestream_utils.hpp"
#include "src/buildtool/logging/log_level.hpp"
#include "src/buildtool/logging/logger.hpp"
+#include "src/utils/cpp/expected.hpp"
+#include "src/utils/cpp/incremental_reader.hpp"
/// Implements client side for google.bytestream.ByteStream service.
class ByteStreamClient {
@@ -118,17 +120,39 @@ class ByteStreamClient {
google::bytestream::WriteRequest request{};
request.set_resource_name(std::move(write_request).ToString());
- request.mutable_data()->resize(ByteStreamUtils::kChunkSize, '\0');
+ request.mutable_data()->reserve(ByteStreamUtils::kChunkSize);
+
+ auto const to_read = ::IncrementalReader::FromMemory(
+ ByteStreamUtils::kChunkSize, &data);
+ if (not to_read.has_value()) {
+ logger_.Emit(
+ LogLevel::Error,
+ "ByteStreamClient: Failed to create a reader for {}:\n{}",
+ request.resource_name(),
+ to_read.error());
+ return false;
+ }
std::size_t pos = 0;
- do { // NOLINT(cppcoreguidelines-avoid-do-while)
- auto const size =
- std::min(data.size() - pos, ByteStreamUtils::kChunkSize);
- request.mutable_data()->resize(size);
- data.copy(request.mutable_data()->data(), size, pos);
+ for (auto it = to_read->begin(); it != to_read->end();) {
+ auto const chunk = *it;
+ if (not chunk.has_value()) {
+ logger_.Emit(
+ LogLevel::Error,
+ "ByteStreamClient: Failed to read data for {}:\n{}",
+ request.resource_name(),
+ chunk.error());
+ return false;
+ }
+ *request.mutable_data() = *chunk;
+
request.set_write_offset(static_cast<int>(pos));
- request.set_finish_write(pos + size >= data.size());
- if (not writer->Write(request)) {
+ request.set_finish_write(pos + chunk->size() >= data.size());
+ if (writer->Write(request)) {
+ pos += chunk->size();
+ ++it;
+ }
+ else {
// According to the docs, quote:
// If there is an error or the connection is broken during
// the `Write()`, the client should check the status of the
@@ -144,11 +168,9 @@ class ByteStreamClient {
return false;
}
pos = gsl::narrow<std::size_t>(committed_size);
+ it = to_read->make_iterator(pos);
}
- else {
- pos += ByteStreamUtils::kChunkSize;
- }
- } while (pos < data.size());
+ }
if (not writer->WritesDone()) {
logger_.Emit(LogLevel::Warning,
"broken stream for upload to resource name {}",