summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildtool/execution_api/execution_service/TARGETS1
-rw-r--r--src/buildtool/execution_api/execution_service/bytestream_server.cpp36
2 files changed, 21 insertions, 16 deletions
diff --git a/src/buildtool/execution_api/execution_service/TARGETS b/src/buildtool/execution_api/execution_service/TARGETS
index 670f2694..1cca42a0 100644
--- a/src/buildtool/execution_api/execution_service/TARGETS
+++ b/src/buildtool/execution_api/execution_service/TARGETS
@@ -153,6 +153,7 @@
, ["src/buildtool/logging", "log_level"]
, ["src/buildtool/storage", "garbage_collector"]
, ["src/utils/cpp", "expected"]
+ , ["src/utils/cpp", "incremental_reader"]
, ["src/utils/cpp", "tmp_dir"]
]
}
diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
index b4e1cb07..0a6358a0 100644
--- a/src/buildtool/execution_api/execution_service/bytestream_server.cpp
+++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
@@ -14,12 +14,12 @@
#include "src/buildtool/execution_api/execution_service/bytestream_server.hpp"
-#include <cstddef>
#include <filesystem>
#include <fstream>
#include <memory>
#include <optional>
#include <string>
+#include <string_view>
#include "fmt/core.h"
#include "google/protobuf/stubs/port.h"
@@ -30,6 +30,7 @@
#include "src/buildtool/logging/log_level.hpp"
#include "src/buildtool/storage/garbage_collector.hpp"
#include "src/utils/cpp/expected.hpp"
+#include "src/utils/cpp/incremental_reader.hpp"
#include "src/utils/cpp/tmp_dir.hpp"
auto BytestreamServiceImpl::Read(
@@ -72,26 +73,29 @@ auto BytestreamServiceImpl::Read(
return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str};
}
- std::ifstream stream{*path, std::ios::binary};
- stream.seekg(request->read_offset(), std::ios::beg);
+ auto const to_read =
+ IncrementalReader::FromFile(ByteStreamUtils::kChunkSize, *path);
+ if (not to_read.has_value()) {
+ auto const str = fmt::format("Failed to create reader for {}:\n{}",
+ read_digest->hash(),
+ to_read.error());
+ logger_.Emit(LogLevel::Error, str);
+ return grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
::google::bytestream::ReadResponse response;
- std::string& buffer = *response.mutable_data();
- buffer.resize(ByteStreamUtils::kChunkSize);
-
- while (not stream.eof()) {
- stream.read(buffer.data(), ByteStreamUtils::kChunkSize);
- if (stream.bad()) {
- auto const str =
- fmt::format("Failed to read data for {}", read_digest->hash());
+ for (auto it = to_read->make_iterator(request->read_offset());
+ it != to_read->end();
+ ++it) {
+ auto const chunk = *it;
+ if (not chunk.has_value()) {
+ auto const str = fmt::format("Failed to read data for {}:\n{}",
+ read_digest->hash(),
+ chunk.error());
logger_.Emit(LogLevel::Error, str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
-
- if (stream.eof()) {
- // do not send random bytes
- buffer.resize(static_cast<std::size_t>(stream.gcount()));
- }
+ *response.mutable_data() = *chunk;
writer->Write(response);
}
return ::grpc::Status::OK;