summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/execution_service/bytestream_server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/execution_service/bytestream_server.cpp')
-rw-r--r--src/buildtool/execution_api/execution_service/bytestream_server.cpp48
1 files changed, 29 insertions, 19 deletions
diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
index fd169fd8..a985e37d 100644
--- a/src/buildtool/execution_api/execution_service/bytestream_server.cpp
+++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
@@ -48,7 +48,6 @@ auto BytestreamServiceImpl::Read(
logger_.Emit(LogLevel::Trace, "Read {}", request->resource_name());
// resource_name is of type
// remote-execution/blobs/62f408d64bca5de775c4b1dbc3288fc03afd6b19eb/0
- std::istringstream iss{request->resource_name()};
auto hash = ParseResourceName(request->resource_name());
if (not hash) {
auto str = fmt::format("could not parse {}", request->resource_name());
@@ -84,20 +83,26 @@ auto BytestreamServiceImpl::Read(
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str};
}
- std::ifstream blob{*path};
+
+ std::ifstream stream{*path, std::ios::binary};
+ stream.seekg(request->read_offset(), std::ios::beg);
::google::bytestream::ReadResponse response;
- std::string buffer(kChunkSize, '\0');
- bool done = false;
- blob.seekg(request->read_offset());
- while (not done) {
- blob.read(buffer.data(), kChunkSize);
- if (blob.eof()) {
+ std::string& buffer = *response.mutable_data();
+ buffer.resize(kChunkSize);
+
+ while (not stream.eof()) {
+ stream.read(buffer.data(), kChunkSize);
+ if (stream.bad()) {
+ auto const str = fmt::format("Failed to read data for {}", *hash);
+ logger_.Emit(LogLevel::Error, str);
+ return grpc::Status{grpc::StatusCode::INTERNAL, str};
+ }
+
+ if (stream.eof()) {
// do not send random bytes
- buffer.resize(static_cast<std::size_t>(blob.gcount()));
- done = true;
+ buffer.resize(static_cast<std::size_t>(stream.gcount()));
}
- *(response.mutable_data()) = buffer;
writer->Write(response);
}
return ::grpc::Status::OK;
@@ -136,17 +141,22 @@ auto BytestreamServiceImpl::Write(
return ::grpc::Status{::grpc::StatusCode::INTERNAL,
"could not create TmpDir"};
}
+
auto tmp = tmp_dir->GetPath() / *hash;
{
- std::ofstream of{tmp, std::ios::binary};
- of.write(request.data().data(),
- static_cast<std::streamsize>(request.data().size()));
- }
- while (not request.finish_write() and reader->Read(&request)) {
- std::ofstream of{tmp, std::ios::binary | std::ios::app};
- of.write(request.data().data(),
- static_cast<std::streamsize>(request.data().size()));
+ std::ofstream stream{tmp, std::ios::binary};
+ do {
+ if (not stream.good()) {
+ auto const str =
+ fmt::format("Failed to write data for {}", *hash);
+ logger_.Emit(LogLevel::Error, "{}", str);
+ return ::grpc::Status{::grpc::StatusCode::INTERNAL, str};
+ }
+ stream.write(request.data().data(),
+ static_cast<std::streamsize>(request.data().size()));
+ } while (not request.finish_write() and reader->Read(&request));
}
+
if (NativeSupport::IsTree(*hash)) {
if (not storage_.CAS().StoreTree</*kOwner=*/true>(tmp)) {
auto str = fmt::format("could not store tree {}", *hash);