#ifndef INCLUDED_SRC_BUILDTOOL_EXECUTION_ENGINE_EXECUTOR_EXECUTOR_HPP #define INCLUDED_SRC_BUILDTOOL_EXECUTION_ENGINE_EXECUTOR_EXECUTOR_HPP #include #include #include #include #include #include #include #include #include "gsl-lite/gsl-lite.hpp" #include "src/buildtool/common/repository_config.hpp" #include "src/buildtool/common/statistics.hpp" #include "src/buildtool/common/tree.hpp" #include "src/buildtool/compatibility/compatibility.hpp" #include "src/buildtool/execution_api/common/execution_api.hpp" #include "src/buildtool/execution_engine/dag/dag.hpp" #include "src/buildtool/file_system/file_system_manager.hpp" #include "src/buildtool/logging/logger.hpp" #include "src/buildtool/progress_reporting/progress.hpp" #include "src/utils/cpp/hex_string.hpp" /// \brief Implementations for executing actions and uploading artifacts. class ExecutorImpl { public: /// \brief Execute action and obtain response. /// \returns std::nullopt for actions without response (e.g., tree actions). /// \returns nullptr on error. [[nodiscard]] static auto ExecuteAction( Logger const& logger, gsl::not_null const& action, gsl::not_null const& api, std::map const& properties, std::chrono::milliseconds const& timeout, IExecutionAction::CacheFlag cache_flag) -> std::optional { auto const& inputs = action->Dependencies(); auto const tree_action = action->Content().IsTreeAction(); logger.Emit(LogLevel::Trace, [&inputs, tree_action]() { std::ostringstream oss{}; oss << "execute " << (tree_action ? "tree " : "") << "action" << std::endl; for (auto const& [local_path, artifact] : inputs) { auto const& info = artifact->Content().Info(); oss << fmt::format( " - needs {} {}", local_path, info ? info->ToString() : std::string{"[???]"}) << std::endl; } return oss.str(); }); auto const root_digest = CreateRootDigest(api, inputs); if (not root_digest) { Logger::Log(LogLevel::Error, "failed to create root digest for input artifacts."); return nullptr; } if (tree_action) { auto const& tree_artifact = action->OutputDirs()[0].node->Content(); bool failed_inputs = false; for (auto const& [local_path, artifact] : inputs) { failed_inputs |= artifact->Content().Info()->failed; } tree_artifact.SetObjectInfo( *root_digest, ObjectType::Tree, failed_inputs); return std::nullopt; } // do not count statistics for rebuilder fetching from cache if (cache_flag != IExecutionAction::CacheFlag::FromCacheOnly) { Progress::Instance().Start(action->Content().Id()); Statistics::Instance().IncrementActionsQueuedCounter(); } auto remote_action = api->CreateAction(*root_digest, action->Command(), action->OutputFilePaths(), action->OutputDirPaths(), action->Env(), properties); if (remote_action == nullptr) { logger.Emit(LogLevel::Error, "failed to create action for execution."); return nullptr; } // set action options remote_action->SetCacheFlag(cache_flag); remote_action->SetTimeout(timeout); return remote_action->Execute(&logger); } /// \brief Ensures the artifact is available to the CAS, either checking /// that its existing digest corresponds to that of an object already /// available or by uploading it if there is no digest in the artifact. In /// the later case, the new digest is saved in the artifact /// \param[in] artifact The artifact to process. /// \returns True if artifact is available at the point of return, false /// otherwise [[nodiscard]] static auto VerifyOrUploadArtifact( Logger const& logger, gsl::not_null const& artifact, gsl::not_null const& remote_api, gsl::not_null const& local_api) noexcept -> bool { auto const object_info_opt = artifact->Content().Info(); auto const file_path_opt = artifact->Content().FilePath(); // If there is no object info and no file path, the artifact can not be // processed: it means its definition is ill-formed or that it is the // output of an action, in which case it shouldn't have reached here if (not object_info_opt and not file_path_opt) { Logger::Log(LogLevel::Error, "artifact {} can not be processed.", ToHexString(artifact->Content().Id())); return false; } // If the artifact has digest, we check that an object with this digest // is available to the execution API if (object_info_opt) { logger.Emit(LogLevel::Trace, [&object_info_opt]() { std::ostringstream oss{}; oss << fmt::format("upload KNOWN artifact: {}", object_info_opt->ToString()) << std::endl; return oss.str(); }); if (not remote_api->IsAvailable(object_info_opt->digest)) { // Check if requested artifact is available in local CAS and // upload to remote CAS in case it is. if (local_api->IsAvailable(object_info_opt->digest) and local_api->RetrieveToCas({*object_info_opt}, remote_api)) { return true; } if (not VerifyOrUploadKnownArtifact( remote_api, artifact->Content().Repository(), object_info_opt->digest)) { Logger::Log( LogLevel::Error, "artifact {} should be present in CAS but is missing.", ToHexString(artifact->Content().Id())); return false; } } return true; } // Otherwise, we upload the new file to make it available to the // execution API // Note that we can be sure now that file_path_opt has a value and // that the path stored is relative to the workspace dir, so we need to // prepend it logger.Emit(LogLevel::Trace, [&file_path_opt]() { std::ostringstream oss{}; oss << fmt::format("upload LOCAL artifact: {}", file_path_opt->string()) << std::endl; return oss.str(); }); auto repo = artifact->Content().Repository(); auto new_info = UploadFile(remote_api, repo, *file_path_opt); if (not new_info) { Logger::Log(LogLevel::Error, "artifact in {} could not be uploaded to CAS.", file_path_opt->string()); return false; } // And we save the digest object type in the artifact artifact->Content().SetObjectInfo(*new_info, false); return true; } /// \brief Uploads the content of a git tree recursively to the CAS. It is /// first checked which elements of a directory are not available in the /// CAS and the missing elements are uploaded accordingly. This ensures the /// invariant that if a git tree is known to the CAS all its content is also /// existing in the CAS. /// \param[in] api The remote execution API of the CAS. /// \param[in] tree The git tree to be uploaded. /// \returns True if the upload was successful, False in case of any error. // NOLINTNEXTLINE(misc-no-recursion) [[nodiscard]] static auto VerifyOrUploadTree( gsl::not_null const& api, GitTree const& tree) noexcept -> bool { // create list of digests for batch check of CAS availability std::vector digests; std::unordered_map> entry_map; for (auto const& [path, entry] : tree) { auto digest = ArtifactDigest{entry->Hash(), *entry->Size(), entry->IsTree()}; digests.emplace_back(digest); try { entry_map.emplace(std::move(digest), entry); } catch (...) { return false; } } Logger::Log(LogLevel::Trace, [&tree]() { std::ostringstream oss{}; oss << "upload directory content" << std::endl; for (auto const& [path, entry] : tree) { oss << fmt::format(" - {}: {}", path, entry->Hash()) << std::endl; } return oss.str(); }); // find missing digests auto missing_digests = api->IsAvailable(digests); // process missing trees for (auto const& digest : missing_digests) { if (auto it = entry_map.find(digest); it != entry_map.end()) { auto const& entry = it->second; if (entry->IsTree()) { if (not VerifyOrUploadTree(api, *entry->Tree())) { return false; } } } } // upload missing entries (blobs or trees) BlobContainer container; for (auto const& digest : missing_digests) { if (auto it = entry_map.find(digest); it != entry_map.end()) { auto const& entry = it->second; auto content = entry->RawData(); if (not content) { return false; } try { container.Emplace( std::move(BazelBlob{digest, std::move(*content)})); } catch (std::exception const& ex) { Logger::Log(LogLevel::Error, "failed to create blob with: ", ex.what()); return false; } } } return api->Upload(container, /*skip_find_missing=*/true); } /// \brief Lookup blob via digest in local git repositories and upload. /// \param api The endpoint used for uploading /// \param repo The global repository name, the artifact belongs to /// \param digest The digest of the object /// \param hash The git-sha1 hash of the object /// \returns true on success [[nodiscard]] static auto VerifyOrUploadGitArtifact( gsl::not_null const& api, std::string const& repo, ArtifactDigest const& digest, std::string const& hash) noexcept -> bool { std::optional content; if (NativeSupport::IsTree( static_cast(digest).hash())) { // if known tree is not available, recursively upload its content auto tree = ReadGitTree(repo, hash); if (not tree) { return false; } if (not VerifyOrUploadTree(api, *tree)) { return false; } content = tree->RawData(); } else { // if known blob is not available, read and upload it content = ReadGitBlob(repo, hash); } if (not content) { return false; } // upload artifact content auto container = BlobContainer{{BazelBlob{digest, std::move(*content)}}}; return api->Upload(container, /*skip_find_missing=*/true); } [[nodiscard]] static auto ReadGitBlob(std::string const& repo, std::string const& hash) noexcept -> std::optional { auto const& repo_config = RepositoryConfig::Instance(); std::optional blob{}; if (auto const* ws_root = repo_config.WorkspaceRoot(repo)) { // try to obtain blob from local workspace's Git CAS, if any blob = ws_root->ReadBlob(hash); } if (not blob) { // try to obtain blob from global Git CAS, if any blob = repo_config.ReadBlobFromGitCAS(hash); } return blob; } [[nodiscard]] static auto ReadGitTree(std::string const& repo, std::string const& hash) noexcept -> std::optional { auto const& repo_config = RepositoryConfig::Instance(); std::optional tree{}; if (auto const* ws_root = repo_config.WorkspaceRoot(repo)) { // try to obtain tree from local workspace's Git CAS, if any tree = ws_root->ReadTree(hash); } if (not tree) { // try to obtain tree from global Git CAS, if any tree = repo_config.ReadTreeFromGitCAS(hash); } return tree; } /// \brief Lookup blob via digest in local git repositories and upload. /// \param api The endpoint used for uploading /// \param repo The global repository name, the artifact belongs to /// \param digest The digest of the object /// \returns true on success [[nodiscard]] static auto VerifyOrUploadKnownArtifact( gsl::not_null const& api, std::string const& repo, ArtifactDigest const& digest) noexcept -> bool { if (Compatibility::IsCompatible()) { auto opt = Compatibility::GetGitEntry(digest.hash()); if (opt) { auto const& [git_sha1_hash, comp_repo] = *opt; return VerifyOrUploadGitArtifact( api, comp_repo, digest, git_sha1_hash); } return false; } return VerifyOrUploadGitArtifact(api, repo, digest, digest.hash()); } /// \brief Lookup file via path in local workspace root and upload. /// \param api The endpoint used for uploading /// \param repo The global repository name, the artifact belongs to /// \param file_path The path of the file to be read /// \returns The computed object info on success [[nodiscard]] static auto UploadFile( gsl::not_null const& api, std::string const& repo, std::filesystem::path const& file_path) noexcept -> std::optional { auto const* ws_root = RepositoryConfig::Instance().WorkspaceRoot(repo); if (ws_root == nullptr) { return std::nullopt; } auto const object_type = ws_root->FileType(file_path); if (not object_type) { return std::nullopt; } auto content = ws_root->ReadFile(file_path); if (not content.has_value()) { return std::nullopt; } auto digest = ArtifactDigest::Create(*content); if (not api->Upload( BlobContainer{{BazelBlob{digest, std::move(*content)}}})) { return std::nullopt; } return Artifact::ObjectInfo{std::move(digest), *object_type}; } /// \brief Add digests and object type to artifact nodes for all outputs of /// the action that was run void static SaveObjectInfo( IExecutionResponse::ArtifactInfos const& artifacts, gsl::not_null const& action, bool fail_artifacts) noexcept { for (auto const& [name, node] : action->OutputFiles()) { node->Content().SetObjectInfo(artifacts.at(name), fail_artifacts); } for (auto const& [name, node] : action->OutputDirs()) { node->Content().SetObjectInfo(artifacts.at(name), fail_artifacts); } } /// \brief Create root tree digest for input artifacts. /// \param api The endpoint required for uploading /// \param artifacts The artifacts to create the root tree digest from [[nodiscard]] static auto CreateRootDigest( gsl::not_null const& api, std::vector const& artifacts) noexcept -> std::optional { if (artifacts.size() == 1 and (artifacts.at(0).path == "." or artifacts.at(0).path.empty())) { auto const& info = artifacts.at(0).node->Content().Info(); if (info and IsTreeObject(info->type)) { // Artifact list contains single tree with path "." or "". Reuse // the existing tree artifact by returning its digest. return info->digest; } } return api->UploadTree(artifacts); } /// \brief Check that all outputs expected from the action description /// are present in the artifacts map [[nodiscard]] static auto CheckOutputsExist( IExecutionResponse::ArtifactInfos const& artifacts, std::vector const& outputs) noexcept -> bool { return std::all_of( outputs.begin(), outputs.end(), [&artifacts](auto const& output) { return artifacts.contains(output); }); } /// \brief Parse response and write object info to DAG's artifact nodes. /// \returns false on non-zero exit code or if output artifacts are missing [[nodiscard]] static auto ParseResponse( Logger const& logger, IExecutionResponse::Ptr const& response, gsl::not_null const& action, bool count_as_executed = false) -> bool { logger.Emit(LogLevel::Trace, "finished execution"); if (!response) { logger.Emit(LogLevel::Trace, "response is empty"); return false; } if (not count_as_executed and response->IsCached()) { logger.Emit(LogLevel::Trace, " - served from cache"); Statistics::Instance().IncrementActionsCachedCounter(); } else { Statistics::Instance().IncrementActionsExecutedCounter(); } Progress::Instance().Stop(action->Content().Id()); PrintInfo(logger, action->Command(), response); bool should_fail_outputs = false; for (auto const& [local_path, node] : action->Dependencies()) { should_fail_outputs |= node->Content().Info()->failed; } if (response->ExitCode() != 0) { if (action->MayFail()) { logger.Emit(LogLevel::Warning, "{} (exit code {})", *(action->MayFail()), response->ExitCode()); should_fail_outputs = true; } else { logger.Emit(LogLevel::Error, "action returned non-zero exit code {}", response->ExitCode()); PrintError(logger, action->Command()); return false; } } auto artifacts = response->Artifacts(); auto output_files = action->OutputFilePaths(); auto output_dirs = action->OutputDirPaths(); if (artifacts.empty() or not CheckOutputsExist(artifacts, output_files) or not CheckOutputsExist(artifacts, output_dirs)) { logger.Emit(LogLevel::Error, [&] { std::string message{ "action executed with missing outputs.\n" " Action outputs should be the following artifacts:"}; for (auto const& output : output_files) { message += "\n - " + output; } return message; }); PrintError(logger, action->Command()); return false; } SaveObjectInfo(artifacts, action, should_fail_outputs); return true; } /// \brief Write out if response is empty and otherwise, write out /// standard error/output if they are present void static PrintInfo(Logger const& logger, std::vector const& command, IExecutionResponse::Ptr const& response) noexcept { if (!response) { logger.Emit(LogLevel::Error, "response is empty"); return; } auto const has_err = response->HasStdErr(); auto const has_out = response->HasStdOut(); auto build_message = [has_err, has_out, &logger, &command, &response]() { using namespace std::string_literals; auto message = ""s; if (has_err or has_out) { message += (has_err and has_out ? "Stdout and stderr"s : has_out ? "Stdout"s : "Stderr"s) + " of command: "; } message += nlohmann::json(command).dump() + "\n"; if (response->HasStdOut()) { message += response->StdOut(); } if (response->HasStdErr()) { message += response->StdErr(); } return message; }; logger.Emit((has_err or has_out) ? LogLevel::Info : LogLevel::Debug, std::move(build_message)); } void static PrintError(Logger const& logger, std::vector const& command) noexcept { logger.Emit(LogLevel::Error, "Failed to execute command {}", nlohmann::json(command).dump()); } }; /// \brief Executor for using concrete Execution API. class Executor { using Impl = ExecutorImpl; using CF = IExecutionAction::CacheFlag; public: explicit Executor( IExecutionApi* local_api, IExecutionApi* remote_api, std::map properties, std::chrono::milliseconds timeout = IExecutionAction::kDefaultTimeout) : local_api_{local_api}, remote_api_{remote_api}, properties_{std::move(properties)}, timeout_{timeout} {} /// \brief Run an action in a blocking manner /// This method must be thread-safe as it could be called in parallel /// \param[in] action The action to execute. /// \returns True if execution was successful, false otherwise [[nodiscard]] auto Process( gsl::not_null const& action) const noexcept -> bool { Logger logger("action:" + action->Content().Id()); auto const response = Impl::ExecuteAction( logger, action, remote_api_, properties_, timeout_, action->NoCache() ? CF::DoNotCacheOutput : CF::CacheOutput); // check response and save digests of results return not response or Impl::ParseResponse(logger, *response, action); } /// \brief Check artifact is available to the CAS or upload it. /// \param[in] artifact The artifact to process. /// \returns True if artifact is available or uploaded, false otherwise [[nodiscard]] auto Process( gsl::not_null const& artifact) const noexcept -> bool { Logger logger("artifact:" + ToHexString(artifact->Content().Id())); return Impl::VerifyOrUploadArtifact( logger, artifact, remote_api_, local_api_); } private: gsl::not_null local_api_; gsl::not_null remote_api_; std::map properties_; std::chrono::milliseconds timeout_; }; /// \brief Rebuilder for running and comparing actions of two API endpoints. class Rebuilder { using Impl = ExecutorImpl; using CF = IExecutionAction::CacheFlag; public: /// \brief Create rebuilder for action comparision of two endpoints. /// \param api Rebuild endpoint, executes without action cache. /// \param api_cached Reference endpoint, serves everything from cache. /// \param properties Platform properties for execution. /// \param timeout Timeout for action execution. Rebuilder( IExecutionApi* local_api, IExecutionApi* remote_api, IExecutionApi* api_cached, std::map properties, std::chrono::milliseconds timeout = IExecutionAction::kDefaultTimeout) : local_api_{local_api}, remote_api_{remote_api}, api_cached_{api_cached}, properties_{std::move(properties)}, timeout_{timeout} {} [[nodiscard]] auto Process( gsl::not_null const& action) const noexcept -> bool { auto const& action_id = action->Content().Id(); Logger logger("rebuild:" + action_id); auto response = Impl::ExecuteAction(logger, action, remote_api_, properties_, timeout_, CF::PretendCached); if (not response) { return true; // action without response (e.g., tree action) } Logger logger_cached("cached:" + action_id); auto response_cached = Impl::ExecuteAction(logger_cached, action, api_cached_, properties_, timeout_, CF::FromCacheOnly); if (not response_cached) { logger_cached.Emit(LogLevel::Error, "expected regular action with response"); return false; } DetectFlakyAction(*response, *response_cached, action->Content()); return Impl::ParseResponse( logger, *response, action, /*count_as_executed=*/true); } [[nodiscard]] auto Process( gsl::not_null const& artifact) const noexcept -> bool { Logger logger("artifact:" + ToHexString(artifact->Content().Id())); return Impl::VerifyOrUploadArtifact( logger, artifact, remote_api_, local_api_); } [[nodiscard]] auto DumpFlakyActions() const noexcept -> nlohmann::json { std::unique_lock lock{m_}; auto actions = nlohmann::json::object(); for (auto const& [action_id, outputs] : flaky_actions_) { for (auto const& [path, infos] : outputs) { actions[action_id][path]["rebuilt"] = infos.first.ToJson(); actions[action_id][path]["cached"] = infos.second.ToJson(); } } return {{"flaky actions", actions}, {"cache misses", cache_misses_}}; } private: gsl::not_null local_api_; gsl::not_null remote_api_; gsl::not_null api_cached_; std::map properties_; std::chrono::milliseconds timeout_; mutable std::mutex m_; mutable std::vector cache_misses_{}; mutable std::unordered_map< std::string, std::unordered_map< std::string, std::pair>> flaky_actions_{}; void DetectFlakyAction(IExecutionResponse::Ptr const& response, IExecutionResponse::Ptr const& response_cached, Action const& action) const noexcept { if (response and response_cached and response_cached->ActionDigest() == response->ActionDigest()) { Statistics::Instance().IncrementRebuiltActionComparedCounter(); auto artifacts = response->Artifacts(); auto artifacts_cached = response_cached->Artifacts(); std::ostringstream msg{}; for (auto const& [path, info] : artifacts) { auto const& info_cached = artifacts_cached[path]; if (info != info_cached) { RecordFlakyAction(&msg, action, path, info, info_cached); } } if (msg.tellp() > 0) { Statistics::Instance().IncrementActionsFlakyCounter(); bool tainted = action.MayFail() or action.NoCache(); if (tainted) { Statistics::Instance() .IncrementActionsFlakyTaintedCounter(); } Logger::Log(tainted ? LogLevel::Debug : LogLevel::Warning, "{}", msg.str()); } } else { Statistics::Instance().IncrementRebuiltActionMissingCounter(); std::unique_lock lock{m_}; cache_misses_.emplace_back(action.Id()); } } void RecordFlakyAction(gsl::not_null const& msg, Action const& action, std::string const& path, Artifact::ObjectInfo const& rebuilt, Artifact::ObjectInfo const& cached) const noexcept { auto const& action_id = action.Id(); if (msg->tellp() <= 0) { bool tainted = action.MayFail() or action.NoCache(); auto cmd = GetCmdString(action); (*msg) << "Found flaky " << (tainted ? "tainted " : "") << "action:" << std::endl << " - id: " << action_id << std::endl << " - cmd: " << cmd << std::endl; } (*msg) << " - output '" << path << "' differs:" << std::endl << " - " << rebuilt.ToString() << " (rebuilt)" << std::endl << " - " << cached.ToString() << " (cached)" << std::endl; std::unique_lock lock{m_}; auto& object_map = flaky_actions_[action_id]; try { object_map.emplace(path, std::make_pair(rebuilt, cached)); } catch (std::exception const& ex) { Logger::Log(LogLevel::Error, "recoding flaky action failed with: {}", ex.what()); } } static auto GetCmdString(Action const& action) noexcept -> std::string { try { return nlohmann::json(action.Command()).dump(); } catch (std::exception const& ex) { return fmt::format("", ex.what()); } } }; #endif // INCLUDED_SRC_BUILDTOOL_EXECUTION_ENGINE_EXECUTOR_EXECUTOR_HPP