diff options
Diffstat (limited to 'src/buildtool/execution_api/execution_service/bytestream_server.cpp')
-rw-r--r-- | src/buildtool/execution_api/execution_service/bytestream_server.cpp | 104 |
1 files changed, 63 insertions, 41 deletions
diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp index 4a3fa300..9f3e453f 100644 --- a/src/buildtool/execution_api/execution_service/bytestream_server.cpp +++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp @@ -21,6 +21,7 @@ #include "fmt/core.h" #include "src/buildtool/compatibility/native_support.hpp" +#include "src/buildtool/common/bazel_types.hpp" #include "src/buildtool/execution_api/common/bytestream_common.hpp" #include "src/buildtool/execution_api/execution_service/cas_utils.hpp" #include "src/buildtool/file_system/file_system_manager.hpp" @@ -30,15 +31,29 @@ #include "src/utils/cpp/verify_hash.hpp" namespace { -auto ParseResourceName(std::string const& x) -> std::optional<std::string> { +auto ParseResourceName(std::string const& x) noexcept + -> std::optional<bazel_re::Digest> { // resource name is like this // remote-execution/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712 - if (auto end = x.rfind('/'); end != std::string::npos) { - if (auto start = x.rfind('/', end - 1); start != std::string::npos) { - return x.substr(start + 1, end - start - 1); - } + auto const size_delim = x.rfind('/'); + if (size_delim == std::string::npos) { + return std::nullopt; + } + + auto const hash_delim = x.rfind('/', size_delim - 1); + if (hash_delim == std::string::npos) { + return std::nullopt; + } + + try { + bazel_re::Digest digest{}; + + digest.set_size_bytes(std::stoll(x.substr(size_delim + 1))); + digest.set_hash(x.substr(hash_delim + 1, size_delim - hash_delim - 1)); + return digest; + } catch (...) { + return std::nullopt; } - return std::nullopt; } } // namespace @@ -50,37 +65,38 @@ auto BytestreamServiceImpl::Read( logger_.Emit(LogLevel::Trace, "Read {}", request->resource_name()); // resource_name is of type // remote-execution/blobs/62f408d64bca5de775c4b1dbc3288fc03afd6b19eb/0 - auto hash = ParseResourceName(request->resource_name()); - if (not hash) { - auto str = fmt::format("could not parse {}", request->resource_name()); + auto const digest = ParseResourceName(request->resource_name()); + if (not digest) { + auto const str = + fmt::format("could not parse {}", request->resource_name()); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; } - if (auto error_msg = IsAHash(*hash); error_msg) { + if (auto error_msg = IsAHash(digest->hash())) { logger_.Emit(LogLevel::Debug, "{}", *error_msg); return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, *error_msg}; } - auto lock = GarbageCollector::SharedLock(storage_config_); + auto const lock = GarbageCollector::SharedLock(storage_config_); if (not lock) { - auto str = fmt::format("Could not acquire SharedLock"); - logger_.Emit(LogLevel::Error, str); + static constexpr auto str = "Could not acquire SharedLock"; + logger_.Emit(LogLevel::Error, "{}", str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } std::optional<std::filesystem::path> path{}; - if (NativeSupport::IsTree(*hash)) { - ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, true}; + if (NativeSupport::IsTree(digest->hash())) { + ArtifactDigest dgst{NativeSupport::Unprefix(digest->hash()), 0, true}; path = storage_.CAS().TreePath(dgst); } else { - ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, false}; + ArtifactDigest dgst{NativeSupport::Unprefix(digest->hash()), 0, false}; path = storage_.CAS().BlobPath(dgst, false); } if (not path) { - auto str = fmt::format("could not find {}", *hash); + auto const str = fmt::format("could not find {}", digest->hash()); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str}; } @@ -95,8 +111,9 @@ auto BytestreamServiceImpl::Read( while (not stream.eof()) { stream.read(buffer.data(), kChunkSize); if (stream.bad()) { - auto const str = fmt::format("Failed to read data for {}", *hash); - logger_.Emit(LogLevel::Error, str); + auto const str = + fmt::format("Failed to read data for {}", digest->hash()); + logger_.Emit(LogLevel::Error, "{}", str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } @@ -116,40 +133,41 @@ auto BytestreamServiceImpl::Write( ::google::bytestream::WriteRequest request; reader->Read(&request); logger_.Emit(LogLevel::Debug, "write {}", request.resource_name()); - auto hash = ParseResourceName(request.resource_name()); - if (not hash) { - auto str = fmt::format("could not parse {}", request.resource_name()); + auto const digest = ParseResourceName(request.resource_name()); + if (not digest) { + auto const str = + fmt::format("could not parse {}", request.resource_name()); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; } - if (auto error_msg = IsAHash(*hash); error_msg) { + if (auto error_msg = IsAHash(digest->hash())) { logger_.Emit(LogLevel::Debug, "{}", *error_msg); return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, *error_msg}; } logger_.Emit(LogLevel::Trace, "Write: {}, offset {}, finish write {}", - *hash, + digest->hash(), request.write_offset(), request.finish_write()); - auto lock = GarbageCollector::SharedLock(storage_config_); + auto const lock = GarbageCollector::SharedLock(storage_config_); if (not lock) { - auto str = fmt::format("Could not acquire SharedLock"); - logger_.Emit(LogLevel::Error, str); + static constexpr auto str = "Could not acquire SharedLock"; + logger_.Emit(LogLevel::Error, "{}", str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } - auto tmp_dir = storage_config_.CreateTypedTmpDir("execution-service"); + auto const tmp_dir = storage_config_.CreateTypedTmpDir("execution-service"); if (not tmp_dir) { return ::grpc::Status{::grpc::StatusCode::INTERNAL, "could not create TmpDir"}; } - auto tmp = tmp_dir->GetPath() / *hash; + auto tmp = tmp_dir->GetPath() / digest->hash(); { std::ofstream stream{tmp, std::ios::binary}; do { if (not stream.good()) { auto const str = - fmt::format("Failed to write data for {}", *hash); + fmt::format("Failed to write data for {}", digest->hash()); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{::grpc::StatusCode::INTERNAL, str}; } @@ -159,19 +177,21 @@ auto BytestreamServiceImpl::Write( } // Before storing a tree, we have to verify that its parts are present - bool const is_tree = NativeSupport::IsTree(*hash); + bool const is_tree = NativeSupport::IsTree(digest->hash()); if (is_tree) { // ... unfortunately, this requires us to read the whole tree object // into memory - auto content = FileSystemManager::ReadFile(tmp); + auto const content = FileSystemManager::ReadFile(tmp); if (not content) { - auto const msg = fmt::format( - "Failed to read temporary file {} for {}", tmp.string(), *hash); + auto const msg = + fmt::format("Failed to read temporary file {} for {}", + tmp.string(), + digest->hash()); logger_.Emit(LogLevel::Error, "{}", msg); return ::grpc::Status{::grpc::StatusCode::INTERNAL, msg}; } - ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, true}; + ArtifactDigest dgst{NativeSupport::Unprefix(digest->hash()), 0, true}; if (auto err = CASUtils::EnsureTreeInvariant(dgst, *content, storage_)) { auto const str = fmt::format("Write: {}", *std::move(err)); @@ -187,16 +207,18 @@ auto BytestreamServiceImpl::Write( if (not stored) { // This is a serious problem: we have a sequence of bytes, but cannot // write them to CAS. - auto str = fmt::format("Failed to store object {}", *hash); + auto const str = + fmt::format("Failed to store object {}", digest->hash()); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{::grpc::StatusCode::INTERNAL, str}; } - if (static_cast<bazel_re::Digest>(*stored).hash() != *hash) { + if (static_cast<bazel_re::Digest>(*stored).hash() != digest->hash()) { // User error: did not get a file with the announced hash - auto str = fmt::format("In upload for {} received object with hash {}", - *hash, - stored->hash()); + auto const str = + fmt::format("In upload for {} received object with hash {}", + digest->hash(), + stored->hash()); logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str}; } @@ -211,7 +233,7 @@ auto BytestreamServiceImpl::QueryWriteStatus( const ::google::bytestream::QueryWriteStatusRequest* /*request*/, ::google::bytestream::QueryWriteStatusResponse* /*response*/) -> ::grpc::Status { - auto const* str = "QueryWriteStatus not implemented"; + static constexpr auto str = "QueryWriteStatus not implemented"; logger_.Emit(LogLevel::Error, "{}", str); return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str}; } |