diff options
-rw-r--r-- | src/buildtool/execution_engine/executor/executor.hpp | 427 |
1 files changed, 238 insertions, 189 deletions
diff --git a/src/buildtool/execution_engine/executor/executor.hpp b/src/buildtool/execution_engine/executor/executor.hpp index 018a1536..cfdbf9fc 100644 --- a/src/buildtool/execution_engine/executor/executor.hpp +++ b/src/buildtool/execution_engine/executor/executor.hpp @@ -143,116 +143,127 @@ class ExecutorImpl { std::chrono::milliseconds const& timeout, IExecutionAction::CacheFlag cache_flag, gsl::not_null<Statistics*> const& stats, - gsl::not_null<Progress*> const& progress) + gsl::not_null<Progress*> const& progress) noexcept -> std::optional<IExecutionResponse::Ptr> { - if (action->Content().IsTreeOverlayAction()) { - return ExecuteTreeOverlayAction(logger, action, api, progress); - } - auto const& inputs = action->Dependencies(); - auto const tree_action = action->Content().IsTreeAction(); + try { + if (action->Content().IsTreeOverlayAction()) { + return ExecuteTreeOverlayAction(logger, action, api, progress); + } + auto const& inputs = action->Dependencies(); + auto const tree_action = action->Content().IsTreeAction(); - logger.Emit(LogLevel::Trace, [&inputs, tree_action]() { - std::ostringstream oss{}; - oss << "execute " << (tree_action ? "tree " : "") << "action" - << std::endl; - for (auto const& [local_path, artifact] : inputs) { - auto const& info = artifact->Content().Info(); - oss << fmt::format( - " - needs {} {}", - local_path, - info ? info->ToString() : std::string{"[???]"}) + logger.Emit(LogLevel::Trace, [&inputs, tree_action]() { + std::ostringstream oss{}; + oss << "execute " << (tree_action ? "tree " : "") << "action" << std::endl; + for (auto const& [local_path, artifact] : inputs) { + auto const& info = artifact->Content().Info(); + oss << fmt::format( + " - needs {} {}", + local_path, + info ? info->ToString() : std::string{"[???]"}) + << std::endl; + } + return oss.str(); + }); + + auto const root_digest = CreateRootDigest(api, inputs); + if (not root_digest) { + logger.Emit( + LogLevel::Error, + "failed to create root digest for input artifacts."); + return nullptr; } - return oss.str(); - }); - auto const root_digest = CreateRootDigest(api, inputs); - if (not root_digest) { - logger.Emit(LogLevel::Error, - "failed to create root digest for input artifacts."); - return nullptr; - } + if (tree_action) { + auto const& tree_artifact = + action->OutputDirs()[0].node->Content(); + bool failed_inputs = false; + for (auto const& [local_path, artifact] : inputs) { + failed_inputs |= artifact->Content().Info()->failed; + } + tree_artifact.SetObjectInfo( + *root_digest, ObjectType::Tree, failed_inputs); + return std::nullopt; + } - if (tree_action) { - auto const& tree_artifact = action->OutputDirs()[0].node->Content(); - bool failed_inputs = false; - for (auto const& [local_path, artifact] : inputs) { - failed_inputs |= artifact->Content().Info()->failed; + // do not count statistics for rebuilder fetching from cache + if (cache_flag != IExecutionAction::CacheFlag::FromCacheOnly) { + progress->TaskTracker().Start(action->Content().Id()); + stats->IncrementActionsQueuedCounter(); } - tree_artifact.SetObjectInfo( - *root_digest, ObjectType::Tree, failed_inputs); - return std::nullopt; - } - // do not count statistics for rebuilder fetching from cache - if (cache_flag != IExecutionAction::CacheFlag::FromCacheOnly) { - progress->TaskTracker().Start(action->Content().Id()); - stats->IncrementActionsQueuedCounter(); - } + // get the alternative endpoint + auto alternative_api = GetAlternativeEndpoint(merged_properties, + remote_context, + api.GetHashType(), + api.GetTempSpace()); + if (alternative_api) { + if (not api.ParallelRetrieveToCas( + std::vector<Artifact::ObjectInfo>{ + Artifact::ObjectInfo{*root_digest, + ObjectType::Tree, + /* failed= */ false}}, + *alternative_api, + /* jobs= */ 1, + /* use_blob_splitting= */ true)) { + logger.Emit(LogLevel::Error, + "Failed to sync tree {} to dispatch endpoint", + root_digest->hash()); + return nullptr; + } + } - // get the alternative endpoint - auto alternative_api = GetAlternativeEndpoint(merged_properties, - remote_context, - api.GetHashType(), - api.GetTempSpace()); - if (alternative_api) { - if (not api.ParallelRetrieveToCas( - std::vector<Artifact::ObjectInfo>{Artifact::ObjectInfo{ - *root_digest, ObjectType::Tree, /* failed= */ false}}, - *alternative_api, - /* jobs= */ 1, - /* use_blob_splitting= */ true)) { + auto base = action->Content().Cwd(); + auto cwd_relative_output_files = + RebasePathStringsRelativeTo(base, action->OutputFilePaths()); + auto cwd_relative_output_dirs = + RebasePathStringsRelativeTo(base, action->OutputDirPaths()); + auto remote_action = (alternative_api ? *alternative_api : api) + .CreateAction(*root_digest, + action->Command(), + base, + cwd_relative_output_files, + cwd_relative_output_dirs, + action->Env(), + merged_properties); + + if (remote_action == nullptr) { logger.Emit(LogLevel::Error, - "Failed to sync tree {} to dispatch endpoint", - root_digest->hash()); + "failed to create action for execution."); return nullptr; } - } - - auto base = action->Content().Cwd(); - auto cwd_relative_output_files = - RebasePathStringsRelativeTo(base, action->OutputFilePaths()); - auto cwd_relative_output_dirs = - RebasePathStringsRelativeTo(base, action->OutputDirPaths()); - auto remote_action = (alternative_api ? *alternative_api : api) - .CreateAction(*root_digest, - action->Command(), - base, - cwd_relative_output_files, - cwd_relative_output_dirs, - action->Env(), - merged_properties); - - if (remote_action == nullptr) { - logger.Emit(LogLevel::Error, - "failed to create action for execution."); - return nullptr; - } - // set action options - remote_action->SetCacheFlag(cache_flag); - remote_action->SetTimeout(timeout); - auto result = remote_action->Execute(&logger); - if (alternative_api) { - if (result) { - auto const artifacts = result->Artifacts(); - if (not artifacts) { - logger.Emit(LogLevel::Error, artifacts.error()); - return nullptr; - } - std::vector<Artifact::ObjectInfo> object_infos{}; - object_infos.reserve(artifacts.value()->size()); - for (auto const& [path, info] : *artifacts.value()) { - object_infos.emplace_back(info); - } - if (not alternative_api->RetrieveToCas(object_infos, api)) { - logger.Emit(LogLevel::Warning, - "Failed to retrieve back artifacts from " - "dispatch endpoint"); + // set action options + remote_action->SetCacheFlag(cache_flag); + remote_action->SetTimeout(timeout); + auto result = remote_action->Execute(&logger); + if (alternative_api) { + if (result) { + auto const artifacts = result->Artifacts(); + if (not artifacts) { + logger.Emit(LogLevel::Error, artifacts.error()); + return nullptr; + } + std::vector<Artifact::ObjectInfo> object_infos{}; + object_infos.reserve(artifacts.value()->size()); + for (auto const& [path, info] : *artifacts.value()) { + object_infos.emplace_back(info); + } + if (not alternative_api->RetrieveToCas(object_infos, api)) { + logger.Emit(LogLevel::Warning, + "Failed to retrieve back artifacts from " + "dispatch endpoint"); + } } } + return result; + } catch (std::exception const& ex) { + logger.Emit(LogLevel::Error, + "Unexpectedly failed to execute action with:\n{}", + ex.what()); + return nullptr; } - return result; } /// \brief Ensures the artifact is available to the CAS, either checking @@ -597,8 +608,8 @@ class ExecutorImpl { /// \param artifacts The artifacts to create the root tree digest from [[nodiscard]] static auto CreateRootDigest( IExecutionApi const& api, - std::vector<DependencyGraph::NamedArtifactNodePtr> const& - artifacts) noexcept -> std::optional<ArtifactDigest> { + std::vector<DependencyGraph::NamedArtifactNodePtr> const& artifacts) + -> std::optional<ArtifactDigest> { if (artifacts.size() == 1 and (artifacts.at(0).path == "." or artifacts.at(0).path.empty())) { auto const& info = artifacts.at(0).node->Content().Info(); @@ -615,7 +626,7 @@ class ExecutorImpl { [[nodiscard]] static auto CheckOutputsExist( IExecutionResponse::ArtifactInfos const& artifacts, std::vector<Action::LocalPath> const& outputs, - std::string base) noexcept -> bool { + std::string base) -> bool { return std::all_of( outputs.begin(), outputs.end(), @@ -736,7 +747,7 @@ class ExecutorImpl { void static PrintInfo( Logger const& logger, gsl::not_null<DependencyGraph::ActionNode const*> const& action, - IExecutionResponse::Ptr const& response) noexcept { + IExecutionResponse::Ptr const& response) { if (not response) { logger.Emit(LogLevel::Error, "response is empty"); return; @@ -780,7 +791,7 @@ class ExecutorImpl { void static PrintError( Logger const& logger, gsl::not_null<DependencyGraph::ActionNode const*> const& action, - gsl::not_null<Progress*> const& progress) noexcept { + gsl::not_null<Progress*> const& progress) { std::ostringstream msg{}; if (action->Content().IsTreeOverlayAction() or action->Content().IsTreeAction()) { @@ -904,11 +915,45 @@ class Executor { [[nodiscard]] auto Process( gsl::not_null<DependencyGraph::ActionNode const*> const& action) const noexcept -> bool { - // to avoid always creating a logger we might not need, which is a - // non-copyable and non-movable object, we need some code duplication - if (logger_ != nullptr) { + try { + // to avoid always creating a logger we might not need, which is a + // non-copyable and non-movable object, we need some code + // duplication + if (logger_ != nullptr) { + auto const response = Impl::ExecuteAction( + *logger_, + action, + *context_.apis->remote, + Impl::MergeProperties(context_.remote_context->exec_config + ->platform_properties, + action->ExecutionProperties()), + context_.remote_context, + Impl::ScaleTime(timeout_, action->TimeoutScale()), + action->NoCache() ? CF::DoNotCacheOutput : CF::CacheOutput, + context_.statistics, + context_.progress); + // check response and save digests of results + if (not response) { + return true; + } + auto result = Impl::ParseResponse(*logger_, + *response, + action, + context_.statistics, + context_.progress); + if (context_.profile) { + (*context_.profile) + ->NoteActionCompleted(action->Content().Id(), + *response, + action->Content().Cwd()); + } + return result; + } + + Logger logger("action:" + action->Content().Id()); + auto const response = Impl::ExecuteAction( - *logger_, + logger, action, *context_.apis->remote, Impl::MergeProperties( @@ -919,11 +964,12 @@ class Executor { action->NoCache() ? CF::DoNotCacheOutput : CF::CacheOutput, context_.statistics, context_.progress); + // check response and save digests of results if (not response) { return true; } - auto result = Impl::ParseResponse(*logger_, + auto result = Impl::ParseResponse(logger, *response, action, context_.statistics, @@ -935,35 +981,13 @@ class Executor { action->Content().Cwd()); } return result; + } catch (std::exception const& ex) { + Logger::Log( + LogLevel::Error, + "Executor: Unexpected failure processing action with:\n{}", + ex.what()); + return false; } - - Logger logger("action:" + action->Content().Id()); - - auto const response = Impl::ExecuteAction( - logger, - action, - *context_.apis->remote, - Impl::MergeProperties( - context_.remote_context->exec_config->platform_properties, - action->ExecutionProperties()), - context_.remote_context, - Impl::ScaleTime(timeout_, action->TimeoutScale()), - action->NoCache() ? CF::DoNotCacheOutput : CF::CacheOutput, - context_.statistics, - context_.progress); - - // check response and save digests of results - if (not response) { - return true; - } - auto result = Impl::ParseResponse( - logger, *response, action, context_.statistics, context_.progress); - if (context_.profile) { - (*context_.profile) - ->NoteActionCompleted( - action->Content().Id(), *response, action->Content().Cwd()); - } - return result; } /// \brief Check artifact is available to the CAS or upload it. @@ -973,16 +997,25 @@ class Executor { [[nodiscard]] auto Process( gsl::not_null<DependencyGraph::ArtifactNode const*> const& artifact) const noexcept -> bool { - // to avoid always creating a logger we might not need, which is a - // non-copyable and non-movable object, we need some code duplication - if (logger_ != nullptr) { + try { + // to avoid always creating a logger we might not need, which is a + // non-copyable and non-movable object, we need some code + // duplication + if (logger_ != nullptr) { + return Impl::VerifyOrUploadArtifact( + *logger_, artifact, context_.repo_config, *context_.apis); + } + + Logger logger("artifact:" + ToHexString(artifact->Content().Id())); return Impl::VerifyOrUploadArtifact( - *logger_, artifact, context_.repo_config, *context_.apis); + logger, artifact, context_.repo_config, *context_.apis); + } catch (std::exception const& ex) { + Logger::Log( + LogLevel::Error, + "Executor: Unexpected failure checking artifact with:\n{}", + ex.what()); + return false; } - - Logger logger("artifact:" + ToHexString(artifact->Content().Id())); - return Impl::VerifyOrUploadArtifact( - logger, artifact, context_.repo_config, *context_.apis); } private: @@ -1016,64 +1049,80 @@ class Rebuilder { [[nodiscard]] auto Process( gsl::not_null<DependencyGraph::ActionNode const*> const& action) const noexcept -> bool { - auto const& action_id = action->Content().Id(); - Logger logger("rebuild:" + action_id); - auto response = Impl::ExecuteAction( - logger, - action, - *context_.apis->remote, - Impl::MergeProperties( - context_.remote_context->exec_config->platform_properties, - action->ExecutionProperties()), - context_.remote_context, - Impl::ScaleTime(timeout_, action->TimeoutScale()), - CF::PretendCached, - context_.statistics, - context_.progress); + try { + auto const& action_id = action->Content().Id(); + Logger logger("rebuild:" + action_id); + auto response = Impl::ExecuteAction( + logger, + action, + *context_.apis->remote, + Impl::MergeProperties( + context_.remote_context->exec_config->platform_properties, + action->ExecutionProperties()), + context_.remote_context, + Impl::ScaleTime(timeout_, action->TimeoutScale()), + CF::PretendCached, + context_.statistics, + context_.progress); - if (not response) { - return true; // action without response (e.g., tree action) - } + if (not response) { + return true; // action without response (e.g., tree action) + } - Logger logger_cached("cached:" + action_id); - auto response_cached = Impl::ExecuteAction( - logger_cached, - action, - *api_cached_, - Impl::MergeProperties( - context_.remote_context->exec_config->platform_properties, - action->ExecutionProperties()), - context_.remote_context, - Impl::ScaleTime(timeout_, action->TimeoutScale()), - CF::FromCacheOnly, - context_.statistics, - context_.progress); - - if (not response_cached) { - logger_cached.Emit(LogLevel::Error, - "expected regular action with response"); - return false; - } + Logger logger_cached("cached:" + action_id); + auto response_cached = Impl::ExecuteAction( + logger_cached, + action, + *api_cached_, + Impl::MergeProperties( + context_.remote_context->exec_config->platform_properties, + action->ExecutionProperties()), + context_.remote_context, + Impl::ScaleTime(timeout_, action->TimeoutScale()), + CF::FromCacheOnly, + context_.statistics, + context_.progress); - if (auto error = DetectFlakyAction( - *response, *response_cached, action->Content())) { - logger_cached.Emit(LogLevel::Error, *error); + if (not response_cached) { + logger_cached.Emit(LogLevel::Error, + "expected regular action with response"); + return false; + } + + if (auto error = DetectFlakyAction( + *response, *response_cached, action->Content())) { + logger_cached.Emit(LogLevel::Error, *error); + return false; + } + return Impl::ParseResponse(logger, + *response, + action, + context_.statistics, + context_.progress, + /*count_as_executed=*/true); + } catch (std::exception const& ex) { + Logger::Log( + LogLevel::Error, + "Rebuilder: Unexpected failure processing action with:\n{}", + ex.what()); return false; } - return Impl::ParseResponse(logger, - *response, - action, - context_.statistics, - context_.progress, - /*count_as_executed=*/true); } [[nodiscard]] auto Process( gsl::not_null<DependencyGraph::ArtifactNode const*> const& artifact) const noexcept -> bool { - Logger logger("artifact:" + ToHexString(artifact->Content().Id())); - return Impl::VerifyOrUploadArtifact( - logger, artifact, context_.repo_config, *context_.apis); + try { + Logger logger("artifact:" + ToHexString(artifact->Content().Id())); + return Impl::VerifyOrUploadArtifact( + logger, artifact, context_.repo_config, *context_.apis); + } catch (std::exception const& ex) { + Logger::Log( + LogLevel::Error, + "Rebuilder: Unexpected failure checking artifact with:\n{}", + ex.what()); + return false; + } } [[nodiscard]] auto DumpFlakyActions() const -> nlohmann::json { @@ -1104,7 +1153,7 @@ class Rebuilder { [[nodiscard]] auto DetectFlakyAction( IExecutionResponse::Ptr const& response, IExecutionResponse::Ptr const& response_cached, - Action const& action) const noexcept -> std::optional<std::string> { + Action const& action) const -> std::optional<std::string> { auto& stats = *context_.statistics; if (response and response_cached and response_cached->ActionDigest() == response->ActionDigest()) { |