summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/buildtool/execution_api/remote/bazel/bazel_api.cpp56
1 files changed, 44 insertions, 12 deletions
diff --git a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
index 6fbd19f3..8f6e07b4 100644
--- a/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
+++ b/src/buildtool/execution_api/remote/bazel/bazel_api.cpp
@@ -18,6 +18,7 @@
#include <atomic>
#include <cstdint>
#include <iterator>
+#include <mutex>
#include <sstream>
#include <unordered_map>
#include <unordered_set>
@@ -408,25 +409,56 @@ auto BazelApi::CreateAction(
// Recursively process trees.
std::atomic_bool failure{false};
+ std::vector<Artifact::ObjectInfo> prerequisites{};
+ std::mutex prerequisites_lock{};
try {
auto ts = TaskSystem{jobs};
for (auto const& dgst : missing_artifacts_info->digests) {
auto const& info = missing_artifacts_info->back_map[dgst];
if (IsTreeObject(info.type)) {
- auto reader =
- TreeReader<BazelNetworkReader>{network_->CreateReader()};
- auto const result = reader.ReadDirectTreeEntries(
- info.digest, std::filesystem::path{});
- if (not result or
- not ParallelRetrieveToCasWithCache(
- result->infos, api, jobs, use_blob_splitting, done)) {
- return false;
- }
+ ts.QueueTask([this,
+ &info,
+ &failure,
+ &prerequisites,
+ &prerequisites_lock]() {
+ auto reader = TreeReader<BazelNetworkReader>{
+ network_->CreateReader()};
+ auto const result = reader.ReadDirectTreeEntries(
+ info.digest, std::filesystem::path{});
+ if (not result) {
+ failure = true;
+ return;
+ }
+ {
+ std::unique_lock lock{prerequisites_lock};
+ prerequisites.insert(prerequisites.end(),
+ result->infos.begin(),
+ result->infos.end());
+ }
+ });
}
+ }
+ } catch (std::exception const& ex) {
+ Logger::Log(LogLevel::Warning,
+ "Artifact synchronization failed: {}",
+ ex.what());
+ return false;
+ }
+
+ if (failure) {
+ return false;
+ }
- // Object infos created by network_->ReadTreeInfos() will contain 0
- // as size, but this is handled by the remote execution engine, so
- // no need to regenerate the digest.
+ if (not ParallelRetrieveToCasWithCache(
+ prerequisites, api, jobs, use_blob_splitting, done)) {
+ return false;
+ }
+
+ // In parallel process all the requested artifacts
+ try {
+ auto ts = TaskSystem{jobs};
+ for (auto const& dgst : missing_artifacts_info->digests) {
+ auto const& info = missing_artifacts_info->back_map[dgst];
ts.QueueTask([this,
&info,
&api,