summaryrefslogtreecommitdiff
path: root/src/buildtool/execution_api/execution_service/bytestream_server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/buildtool/execution_api/execution_service/bytestream_server.cpp')
-rw-r--r--src/buildtool/execution_api/execution_service/bytestream_server.cpp104
1 files changed, 63 insertions, 41 deletions
diff --git a/src/buildtool/execution_api/execution_service/bytestream_server.cpp b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
index 4a3fa300..9f3e453f 100644
--- a/src/buildtool/execution_api/execution_service/bytestream_server.cpp
+++ b/src/buildtool/execution_api/execution_service/bytestream_server.cpp
@@ -21,6 +21,7 @@
#include "fmt/core.h"
#include "src/buildtool/compatibility/native_support.hpp"
+#include "src/buildtool/common/bazel_types.hpp"
#include "src/buildtool/execution_api/common/bytestream_common.hpp"
#include "src/buildtool/execution_api/execution_service/cas_utils.hpp"
#include "src/buildtool/file_system/file_system_manager.hpp"
@@ -30,15 +31,29 @@
#include "src/utils/cpp/verify_hash.hpp"
namespace {
-auto ParseResourceName(std::string const& x) -> std::optional<std::string> {
+auto ParseResourceName(std::string const& x) noexcept
+ -> std::optional<bazel_re::Digest> {
// resource name is like this
// remote-execution/uploads/c4f03510-7d56-4490-8934-01bce1b1288e/blobs/62183d7a696acf7e69e218efc82c93135f8c85f895/4424712
- if (auto end = x.rfind('/'); end != std::string::npos) {
- if (auto start = x.rfind('/', end - 1); start != std::string::npos) {
- return x.substr(start + 1, end - start - 1);
- }
+ auto const size_delim = x.rfind('/');
+ if (size_delim == std::string::npos) {
+ return std::nullopt;
+ }
+
+ auto const hash_delim = x.rfind('/', size_delim - 1);
+ if (hash_delim == std::string::npos) {
+ return std::nullopt;
+ }
+
+ try {
+ bazel_re::Digest digest{};
+
+ digest.set_size_bytes(std::stoll(x.substr(size_delim + 1)));
+ digest.set_hash(x.substr(hash_delim + 1, size_delim - hash_delim - 1));
+ return digest;
+ } catch (...) {
+ return std::nullopt;
}
- return std::nullopt;
}
} // namespace
@@ -50,37 +65,38 @@ auto BytestreamServiceImpl::Read(
logger_.Emit(LogLevel::Trace, "Read {}", request->resource_name());
// resource_name is of type
// remote-execution/blobs/62f408d64bca5de775c4b1dbc3288fc03afd6b19eb/0
- auto hash = ParseResourceName(request->resource_name());
- if (not hash) {
- auto str = fmt::format("could not parse {}", request->resource_name());
+ auto const digest = ParseResourceName(request->resource_name());
+ if (not digest) {
+ auto const str =
+ fmt::format("could not parse {}", request->resource_name());
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str};
}
- if (auto error_msg = IsAHash(*hash); error_msg) {
+ if (auto error_msg = IsAHash(digest->hash())) {
logger_.Emit(LogLevel::Debug, "{}", *error_msg);
return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, *error_msg};
}
- auto lock = GarbageCollector::SharedLock(storage_config_);
+ auto const lock = GarbageCollector::SharedLock(storage_config_);
if (not lock) {
- auto str = fmt::format("Could not acquire SharedLock");
- logger_.Emit(LogLevel::Error, str);
+ static constexpr auto str = "Could not acquire SharedLock";
+ logger_.Emit(LogLevel::Error, "{}", str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
std::optional<std::filesystem::path> path{};
- if (NativeSupport::IsTree(*hash)) {
- ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, true};
+ if (NativeSupport::IsTree(digest->hash())) {
+ ArtifactDigest dgst{NativeSupport::Unprefix(digest->hash()), 0, true};
path = storage_.CAS().TreePath(dgst);
}
else {
- ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, false};
+ ArtifactDigest dgst{NativeSupport::Unprefix(digest->hash()), 0, false};
path = storage_.CAS().BlobPath(dgst, false);
}
if (not path) {
- auto str = fmt::format("could not find {}", *hash);
+ auto const str = fmt::format("could not find {}", digest->hash());
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{::grpc::StatusCode::NOT_FOUND, str};
}
@@ -95,8 +111,9 @@ auto BytestreamServiceImpl::Read(
while (not stream.eof()) {
stream.read(buffer.data(), kChunkSize);
if (stream.bad()) {
- auto const str = fmt::format("Failed to read data for {}", *hash);
- logger_.Emit(LogLevel::Error, str);
+ auto const str =
+ fmt::format("Failed to read data for {}", digest->hash());
+ logger_.Emit(LogLevel::Error, "{}", str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
@@ -116,40 +133,41 @@ auto BytestreamServiceImpl::Write(
::google::bytestream::WriteRequest request;
reader->Read(&request);
logger_.Emit(LogLevel::Debug, "write {}", request.resource_name());
- auto hash = ParseResourceName(request.resource_name());
- if (not hash) {
- auto str = fmt::format("could not parse {}", request.resource_name());
+ auto const digest = ParseResourceName(request.resource_name());
+ if (not digest) {
+ auto const str =
+ fmt::format("could not parse {}", request.resource_name());
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str};
}
- if (auto error_msg = IsAHash(*hash); error_msg) {
+ if (auto error_msg = IsAHash(digest->hash())) {
logger_.Emit(LogLevel::Debug, "{}", *error_msg);
return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, *error_msg};
}
logger_.Emit(LogLevel::Trace,
"Write: {}, offset {}, finish write {}",
- *hash,
+ digest->hash(),
request.write_offset(),
request.finish_write());
- auto lock = GarbageCollector::SharedLock(storage_config_);
+ auto const lock = GarbageCollector::SharedLock(storage_config_);
if (not lock) {
- auto str = fmt::format("Could not acquire SharedLock");
- logger_.Emit(LogLevel::Error, str);
+ static constexpr auto str = "Could not acquire SharedLock";
+ logger_.Emit(LogLevel::Error, "{}", str);
return grpc::Status{grpc::StatusCode::INTERNAL, str};
}
- auto tmp_dir = storage_config_.CreateTypedTmpDir("execution-service");
+ auto const tmp_dir = storage_config_.CreateTypedTmpDir("execution-service");
if (not tmp_dir) {
return ::grpc::Status{::grpc::StatusCode::INTERNAL,
"could not create TmpDir"};
}
- auto tmp = tmp_dir->GetPath() / *hash;
+ auto tmp = tmp_dir->GetPath() / digest->hash();
{
std::ofstream stream{tmp, std::ios::binary};
do {
if (not stream.good()) {
auto const str =
- fmt::format("Failed to write data for {}", *hash);
+ fmt::format("Failed to write data for {}", digest->hash());
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{::grpc::StatusCode::INTERNAL, str};
}
@@ -159,19 +177,21 @@ auto BytestreamServiceImpl::Write(
}
// Before storing a tree, we have to verify that its parts are present
- bool const is_tree = NativeSupport::IsTree(*hash);
+ bool const is_tree = NativeSupport::IsTree(digest->hash());
if (is_tree) {
// ... unfortunately, this requires us to read the whole tree object
// into memory
- auto content = FileSystemManager::ReadFile(tmp);
+ auto const content = FileSystemManager::ReadFile(tmp);
if (not content) {
- auto const msg = fmt::format(
- "Failed to read temporary file {} for {}", tmp.string(), *hash);
+ auto const msg =
+ fmt::format("Failed to read temporary file {} for {}",
+ tmp.string(),
+ digest->hash());
logger_.Emit(LogLevel::Error, "{}", msg);
return ::grpc::Status{::grpc::StatusCode::INTERNAL, msg};
}
- ArtifactDigest dgst{NativeSupport::Unprefix(*hash), 0, true};
+ ArtifactDigest dgst{NativeSupport::Unprefix(digest->hash()), 0, true};
if (auto err =
CASUtils::EnsureTreeInvariant(dgst, *content, storage_)) {
auto const str = fmt::format("Write: {}", *std::move(err));
@@ -187,16 +207,18 @@ auto BytestreamServiceImpl::Write(
if (not stored) {
// This is a serious problem: we have a sequence of bytes, but cannot
// write them to CAS.
- auto str = fmt::format("Failed to store object {}", *hash);
+ auto const str =
+ fmt::format("Failed to store object {}", digest->hash());
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{::grpc::StatusCode::INTERNAL, str};
}
- if (static_cast<bazel_re::Digest>(*stored).hash() != *hash) {
+ if (static_cast<bazel_re::Digest>(*stored).hash() != digest->hash()) {
// User error: did not get a file with the announced hash
- auto str = fmt::format("In upload for {} received object with hash {}",
- *hash,
- stored->hash());
+ auto const str =
+ fmt::format("In upload for {} received object with hash {}",
+ digest->hash(),
+ stored->hash());
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{::grpc::StatusCode::INVALID_ARGUMENT, str};
}
@@ -211,7 +233,7 @@ auto BytestreamServiceImpl::QueryWriteStatus(
const ::google::bytestream::QueryWriteStatusRequest* /*request*/,
::google::bytestream::QueryWriteStatusResponse* /*response*/)
-> ::grpc::Status {
- auto const* str = "QueryWriteStatus not implemented";
+ static constexpr auto str = "QueryWriteStatus not implemented";
logger_.Emit(LogLevel::Error, "{}", str);
return ::grpc::Status{grpc::StatusCode::UNIMPLEMENTED, str};
}