From 55ba09ec97d2449b39d7fcc38c346969168d899b Mon Sep 17 00:00:00 2001 From: Alberto Sartori Date: Mon, 27 Feb 2023 10:27:52 +0100 Subject: execution service: implement WaitExecution and google::longrunning::Operations::GetOperation For each action that is executed, an entry is added to a shared thread safe cache. Once the number of operations stored exceeds twice 2^n, where n is given by the option --log-operations-threshold, at most 2^n operations will be removed, in a FIFO scheme. --- .../execution_api/execution_service/cas_server.cpp | 47 +++++++++++++--------- 1 file changed, 29 insertions(+), 18 deletions(-) (limited to 'src/buildtool/execution_api/execution_service/cas_server.cpp') diff --git a/src/buildtool/execution_api/execution_service/cas_server.cpp b/src/buildtool/execution_api/execution_service/cas_server.cpp index fcc34f9c..07b9f3bf 100644 --- a/src/buildtool/execution_api/execution_service/cas_server.cpp +++ b/src/buildtool/execution_api/execution_service/cas_server.cpp @@ -21,21 +21,35 @@ static constexpr std::size_t kJustHashLength = 42; static constexpr std::size_t kSHA256Length = 64; +static auto IsValidHash(std::string const& x) -> bool { + auto const& length = x.size(); + return (Compatibility::IsCompatible() and length == kSHA256Length) or + length == kJustHashLength; +} + auto CASServiceImpl::FindMissingBlobs( ::grpc::ServerContext* /*context*/, const ::bazel_re::FindMissingBlobsRequest* request, ::bazel_re::FindMissingBlobsResponse* response) -> ::grpc::Status { auto lock = GarbageCollector::SharedLock(); if (!lock) { - auto str = fmt::format("Could not acquire SharedLock"); + auto str = + fmt::format("FindMissingBlobs: could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } for (auto const& x : request->blob_digests()) { auto const& hash = x.hash(); - logger_.Emit(LogLevel::Trace, - "FindMissingBlobs: {}", - NativeSupport::Unprefix(hash)); + + if (!IsValidHash(hash)) { + logger_.Emit(LogLevel::Error, + "FindMissingBlobs: unsupported digest {}", + hash); + auto* d = response->add_missing_blob_digests(); + d->CopyFrom(x); + continue; + } + logger_.Emit(LogLevel::Trace, "FindMissingBlobs: {}", hash); if (NativeSupport::IsTree(hash)) { if (!storage_.TreePath(x)) { auto* d = response->add_missing_blob_digests(); @@ -68,29 +82,27 @@ auto CASServiceImpl::BatchUpdateBlobs( ::bazel_re::BatchUpdateBlobsResponse* response) -> ::grpc::Status { auto lock = GarbageCollector::SharedLock(); if (!lock) { - auto str = fmt::format("Could not acquire SharedLock"); + auto str = + fmt::format("BatchUpdateBlobs: could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } for (auto const& x : request->requests()) { auto const& hash = x.digest().hash(); - auto const& hash_lenght = hash.size(); - if (hash_lenght != kJustHashLength and hash_lenght != kSHA256Length) { - auto const& str = fmt::format("Unsupported digest {}", hash); + if (!IsValidHash(hash)) { + auto const& str = + fmt::format("BatchUpdateBlobs: unsupported digest {}", hash); logger_.Emit(LogLevel::Error, str); return ::grpc::Status{grpc::StatusCode::INVALID_ARGUMENT, str}; } - logger_.Emit(LogLevel::Trace, - "BatchUpdateBlobs: {}", - NativeSupport::Unprefix(hash)); + logger_.Emit(LogLevel::Trace, "BatchUpdateBlobs: {}", hash); auto* r = response->add_responses(); r->mutable_digest()->CopyFrom(x.digest()); if (NativeSupport::IsTree(hash)) { auto const& dgst = storage_.StoreTree(x.data()); if (!dgst) { - auto const& str = - fmt::format("BatchUpdateBlobs: could not upload tree {}", - NativeSupport::Unprefix(hash)); + auto const& str = fmt::format( + "BatchUpdateBlobs: could not upload tree {}", hash); logger_.Emit(LogLevel::Error, str); return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; } @@ -101,9 +113,8 @@ auto CASServiceImpl::BatchUpdateBlobs( else { auto const& dgst = storage_.StoreBlob(x.data(), false); if (!dgst) { - auto const& str = - fmt::format("BatchUpdateBlobs: could not upload blob {}", - NativeSupport::Unprefix(hash)); + auto const& str = fmt::format( + "BatchUpdateBlobs: could not upload blob {}", hash); logger_.Emit(LogLevel::Error, str); return ::grpc::Status{grpc::StatusCode::INTERNAL, str}; } @@ -121,7 +132,7 @@ auto CASServiceImpl::BatchReadBlobs( ::bazel_re::BatchReadBlobsResponse* response) -> ::grpc::Status { auto lock = GarbageCollector::SharedLock(); if (!lock) { - auto str = fmt::format("Could not acquire SharedLock"); + auto str = fmt::format("BatchReadBlobs: Could not acquire SharedLock"); logger_.Emit(LogLevel::Error, str); return grpc::Status{grpc::StatusCode::INTERNAL, str}; } -- cgit v1.2.3