diff options
Diffstat (limited to 'src/buildtool/execution_api/execution_service/bytestream_server.cpp')
-rw-r--r-- | src/buildtool/execution_api/execution_service/bytestream_server.cpp | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp new file mode 100644 index 00000000..64d5e9eb --- /dev/null +++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp @@ -0,0 +1,166 @@ +// Copyright 2023 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "bytestream_server.hpp" + +#include <fstream> +#include <sstream> +#include <utility> + +#include "fmt/format.h" +#include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/execution_api/common/bytestream_common.hpp" +#include "src/buildtool/execution_api/local/garbage_collector.hpp" +#include "src/utils/cpp/tmp_dir.hpp" + +namespace { +auto ParseResourceName(std::string const& x) -> std::optional<std::string> { + // resource name is like this + // remote-execution/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712 + if (auto end = x.rfind('/'); end != std::string::npos) { + if (auto start = x.rfind('/', end - 1); start != std::string::npos) { + return x.substr(start + 1, end - start - 1); + } + } + return std::nullopt; +} +} // namespace + +auto BytestreamServiceImpl::Read( + ::grpc::ServerContext* /*context*/, + const ::google::bytestream::ReadRequest* request, + ::grpc::ServerWriter<::google::bytestream::ReadResponse>* writer) + -> ::grpc::Status { + logger_.Emit(LogLevel::Trace, "Read {}", request->resource_name()); + // resource_name is of type + // remote-execution/blobs/62f408d64bca5de775c4b1dbc3288fc03afd6b19eb/0 + std::istringstream iss{request->resource_name()}; + auto hash = ParseResourceName(request->resource_name()); + if (!hash) { + auto str = fmt::format("could not parse {}", request->resource_name()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + + std::optional<std::filesystem::path> path{}; + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + if (NativeSupport::IsTree(*hash)) { + ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, true}; + path = storage_.TreePath(static_cast<bazel_re::Digest>(dgst)); + if (!path) { + auto str = fmt::format("could not find {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str}; + } + } + ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, false}; + path = storage_.BlobPath(static_cast<bazel_re::Digest>(dgst), false); + if (!path) { + auto str = fmt::format("could not find {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str}; + } + + std::ifstream blob{*path}; + + ::google::bytestream::ReadResponse response; + std::string buffer(kChunkSize, '\0'); + bool done = false; + blob.seekg(request->read_offset()); + while (!done) { + blob.read(buffer.data(), kChunkSize); + if (blob.eof()) { + // do not send random bytes + buffer.resize(static_cast<std::size_t>(blob.gcount())); + done = true; + } + *(response.mutable_data()) = buffer; + writer->Write(response); + } + return ::grpc::Status::OK; +} + +auto BytestreamServiceImpl::Write( + ::grpc::ServerContext* /*context*/, + ::grpc::ServerReader<::google::bytestream::WriteRequest>* reader, + ::google::bytestream::WriteResponse* response) -> ::grpc::Status { + ::google::bytestream::WriteRequest request; + reader->Read(&request); + logger_.Emit(LogLevel::Debug, "write {}", request.resource_name()); + auto hash = ParseResourceName(request.resource_name()); + if (!hash) { + auto str = fmt::format("could not parse {}", request.resource_name()); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + logger_.Emit(LogLevel::Trace, + "Write: {}, offset {}, finish write {}", + *hash, + request.write_offset(), + request.finish_write()); + auto tmp_dir = TmpDir::Create("execution-service"); + if (!tmp_dir) { + return ::grpc::Status{::grpc::StatusCode::INTERNAL, + "could not create TmpDir"}; + } + auto tmp = tmp_dir->GetPath() / *hash; + { + std::ofstream of{tmp, std::ios::binary}; + of.write(request.data().data(), + static_cast<std::streamsize>(request.data().size())); + } + while (!request.finish_write() && reader->Read(&request)) { + std::ofstream of{tmp, std::ios::binary | std::ios::app}; + of.write(request.data().data(), + static_cast<std::streamsize>(request.data().size())); + } + auto lock = GarbageCollector::SharedLock(); + if (!lock) { + auto str = fmt::format("Could not acquire SharedLock"); + logger_.Emit(LogLevel::Error, str); + return grpc::Status{grpc::StatusCode::INTERNAL, str}; + } + if (NativeSupport::IsTree(*hash)) { + if (not storage_.StoreTree(tmp)) { + auto str = fmt::format("could not store tree {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + } + else { + if (not storage_.StoreBlob(tmp)) { + auto str = fmt::format("could not store blob {}", *hash); + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; + } + } + response->set_committed_size( + static_cast<google::protobuf::int64>(std::filesystem::file_size(tmp))); + return ::grpc::Status::OK; +} + +auto BytestreamServiceImpl::QueryWriteStatus( + ::grpc::ServerContext* /*context*/, + const ::google::bytestream::QueryWriteStatusRequest* /*request*/, + ::google::bytestream::QueryWriteStatusResponse* /*response*/) + -> ::grpc::Status { + auto const* str = "QueryWriteStatus not implemented"; + logger_.Emit(LogLevel::Error, str); + return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; +} |