diff options
-rw-r--r-- | src/buildtool/execution_api/remote/bazel/bazel_api.cpp | 56 |
1 files changed, 44 insertions, 12 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp index 6fbd19f3..8f6e07b4 100644 --- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp +++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp @@ -18,6 +18,7 @@ #include <atomic> #include <cstdint> #include <iterator> +#include <mutex> #include <sstream> #include <unordered_map> #include <unordered_set> @@ -408,25 +409,56 @@ auto BazelApi::CreateAction( // Recursively process trees. std::atomic_bool failure{false}; + std::vector<Artifact::ObjectInfo> prerequisites{}; + std::mutex prerequisites_lock{}; try { auto ts = TaskSystem{jobs}; for (auto const& dgst : missing_artifacts_info->digests) { auto const& info = missing_artifacts_info->back_map[dgst]; if (IsTreeObject(info.type)) { - auto reader = - TreeReader<BazelNetworkReader>{network_->CreateReader()}; - auto const result = reader.ReadDirectTreeEntries( - info.digest, std::filesystem::path{}); - if (not result or - not ParallelRetrieveToCasWithCache( - result->infos, api, jobs, use_blob_splitting, done)) { - return false; - } + ts.QueueTask([this, + &info, + &failure, + &prerequisites, + &prerequisites_lock]() { + auto reader = TreeReader<BazelNetworkReader>{ + network_->CreateReader()}; + auto const result = reader.ReadDirectTreeEntries( + info.digest, std::filesystem::path{}); + if (not result) { + failure = true; + return; + } + { + std::unique_lock lock{prerequisites_lock}; + prerequisites.insert(prerequisites.end(), + result->infos.begin(), + result->infos.end()); + } + }); } + } + } catch (std::exception const& ex) { + Logger::Log(LogLevel::Warning, + "Artifact synchronization failed: {}", + ex.what()); + return false; + } + + if (failure) { + return false; + } - // Object infos created by network_->ReadTreeInfos() will contain 0 - // as size, but this is handled by the remote execution engine, so - // no need to regenerate the digest. + if (not ParallelRetrieveToCasWithCache( + prerequisites, api, jobs, use_blob_splitting, done)) { + return false; + } + + // In parallel process all the requested artifacts + try { + auto ts = TaskSystem{jobs}; + for (auto const& dgst : missing_artifacts_info->digests) { + auto const& info = missing_artifacts_info->back_map[dgst]; ts.QueueTask([this, &info, &api, |