diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/buildtool/serve_api/serve_service/source_tree.cpp | 377 | ||||
-rw-r--r-- | src/buildtool/serve_api/serve_service/source_tree.hpp | 18 |
2 files changed, 258 insertions, 137 deletions
diff --git a/src/buildtool/serve_api/serve_service/source_tree.cpp b/src/buildtool/serve_api/serve_service/source_tree.cpp index 40478aeb..6d51221a 100644 --- a/src/buildtool/serve_api/serve_service/source_tree.cpp +++ b/src/buildtool/serve_api/serve_service/source_tree.cpp @@ -375,109 +375,90 @@ auto SourceTreeService::ResolveContentTree( return SyncArchive(tree_id, repo_path, sync_tree, response); } -auto SourceTreeService::ImportToGit( - std::filesystem::path const& unpack_path, - std::filesystem::path const& archive_tree_id_file, - std::string const& content, - std::string const& archive_type, - std::string const& subdir, - std::optional<PragmaSpecial> const& resolve_special, - bool sync_tree, - ServeArchiveTreeResponse* response) -> ::grpc::Status { +auto SourceTreeService::CommonImportToGit( + std::filesystem::path const& root_path, + std::string const& commit_message) + -> std::variant<std::string, std::string> { + using result_t = std::variant<std::string, std::string>; // do the initial commit; no need to guard, as the tmp location is unique - auto git_repo = GitRepo::InitAndOpen(unpack_path, + auto git_repo = GitRepo::InitAndOpen(root_path, /*is_bare=*/false); if (not git_repo) { auto str = fmt::format("Could not initialize repository {}", - unpack_path.string()); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + root_path.string()); + return result_t(std::in_place_index<0>, str); } // wrap logger for GitRepo call + std::string err; auto wrapped_logger = std::make_shared<GitRepo::anon_logger_t>( - [logger = logger_, unpack_path](auto const& msg, bool fatal) { + [root_path, &err](auto const& msg, bool fatal) { if (fatal) { - auto err = fmt::format( - "ServeArchiveTree: While staging and committing all in " - "repository {}:\n{}", - unpack_path.string(), + err = fmt::format( + "While staging and committing all in repository {}:\n{}", + root_path.string(), msg); - logger->Emit(LogLevel::Error, err); } }); // stage and commit all - // Important: message must be consistent with just-mr! - auto mess = fmt::format("Content of {} {}", archive_type, content); auto commit_hash = - git_repo->StageAndCommitAllAnonymous(mess, wrapped_logger); + git_repo->StageAndCommitAllAnonymous(commit_message, wrapped_logger); if (not commit_hash) { auto str = - fmt::format("Failed to create initial commit in repository {}", - unpack_path.string()); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + fmt::format("Failed to create initial commit in repository {}\n{}", + root_path.string(), + err); + return result_t(std::in_place_index<0>, str); } // create a tmp directory for the fetch to Git CAS auto tmp_dir = StorageUtils::CreateTypedTmpDir("import-to-git"); if (not tmp_dir) { - logger_->Emit(LogLevel::Error, - "Failed to create tmp path for git import"); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + return result_t( + std::in_place_index<0>, + std::string("Failed to create tmp path for git import")); } // open the Git CAS repo auto just_git_cas = GitCAS::Open(StorageConfig::GitRoot()); if (not just_git_cas) { auto str = fmt::format("Failed to open Git ODB at {}", StorageConfig::GitRoot().string()); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + return result_t(std::in_place_index<0>, str); } auto just_git_repo = GitRepo::Open(just_git_cas); if (not just_git_repo) { auto str = fmt::format("Failed to open Git repository {}", StorageConfig::GitRoot().string()); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + return result_t(std::in_place_index<0>, str); } // wrap logger for GitRepo call + err.clear(); wrapped_logger = std::make_shared<GitRepo::anon_logger_t>( - [logger = logger_](auto const& msg, bool fatal) { + [&err](auto const& msg, bool fatal) { if (fatal) { - auto err = fmt::format( - "ServeArchiveTree: While fetching in repository {}:\n{}", - StorageConfig::GitRoot().string(), - msg); - logger->Emit(LogLevel::Error, err); + err = fmt::format("While fetching in repository {}:\n{}", + StorageConfig::GitRoot().string(), + msg); } }); // fetch the new commit into the Git CAS via tmp directory; the call is // thread-safe, so it needs no guarding if (not just_git_repo->LocalFetchViaTmpRepo(tmp_dir->GetPath(), - unpack_path.string(), + root_path.string(), /*branch=*/std::nullopt, wrapped_logger)) { - auto str = - fmt::format("Failed to fetch commit {} into Git CAS", *commit_hash); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + auto str = fmt::format( + "Failed to fetch commit {} into Git CAS\n{}", *commit_hash, err); + return result_t(std::in_place_index<0>, str); } // wrap logger for GitRepo call + err.clear(); wrapped_logger = std::make_shared<GitRepo::anon_logger_t>( - [logger = logger_, commit_hash](auto const& msg, bool fatal) { + [commit_hash, &err](auto const& msg, bool fatal) { if (fatal) { - auto err = fmt::format( - "ServeArchiveTree: While tagging commit {} in repository " - "{}:\n{}", - *commit_hash, - StorageConfig::GitRoot().string(), - msg); - logger->Emit(LogLevel::Error, err); + err = + fmt::format("While tagging commit {} in repository {}:\n{}", + *commit_hash, + StorageConfig::GitRoot().string(), + msg); } }); // tag commit and keep it in Git CAS @@ -489,70 +470,102 @@ auto SourceTreeService::ImportToGit( if (not git_repo) { auto str = fmt::format("Failed to open Git CAS repository {}", StorageConfig::GitRoot().string()); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + return result_t(std::in_place_index<0>, str); } // Important: message must be consistent with just-mr! if (not git_repo->KeepTag(*commit_hash, "Keep referenced tree alive", // message wrapped_logger)) { - auto str = - fmt::format("Failed to tag and keep commit {}", *commit_hash); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); - return ::grpc::Status::OK; + auto str = fmt::format( + "Failed to tag and keep commit {}\n{}", *commit_hash, err); + return result_t(std::in_place_index<0>, str); } } // wrap logger for GitRepo call + err.clear(); wrapped_logger = std::make_shared<GitRepo::anon_logger_t>( - [logger = logger_, commit_hash](auto const& msg, bool fatal) { + [commit_hash, &err](auto const& msg, bool fatal) { if (fatal) { - auto err = fmt::format( - "ServeArchiveTree: While retrieving tree id of commit " - "{}:\n{}", - *commit_hash, - msg); - logger->Emit(LogLevel::Error, err); + err = fmt::format("While retrieving tree id of commit {}:\n{}", + *commit_hash, + msg); } }); - // get the root tree of this commit, to store in file; this is thread-safe + // get the root tree of this commit; this is thread-safe auto tree_id = just_git_repo->GetSubtreeFromCommit(*commit_hash, ".", wrapped_logger); if (not tree_id) { - auto str = fmt::format("Failed to retrieve tree id of commit {}", - *commit_hash); - logger_->Emit(LogLevel::Error, str); + auto str = fmt::format( + "Failed to retrieve tree id of commit {}\n{}", *commit_hash, err); + return result_t(std::in_place_index<0>, str); + } + // return the root tree id + return result_t(std::in_place_index<1>, *tree_id); +} + +auto SourceTreeService::ArchiveImportToGit( + std::filesystem::path const& unpack_path, + std::filesystem::path const& archive_tree_id_file, + std::string const& content, + std::string const& archive_type, + std::string const& subdir, + std::optional<PragmaSpecial> const& resolve_special, + bool sync_tree, + ServeArchiveTreeResponse* response) -> ::grpc::Status { + // Important: commit message must match that in just-mr! + auto commit_message = + fmt::format("Content of {} {}", archive_type, content); + auto res = CommonImportToGit(unpack_path, commit_message); + if (res.index() == 0) { + // report the error + logger_->Emit(LogLevel::Error, std::get<0>(res)); response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); return ::grpc::Status::OK; } + auto const& tree_id = std::get<1>(res); // write to tree id file - if (not StorageUtils::WriteTreeIDFile(archive_tree_id_file, *tree_id)) { + if (not StorageUtils::WriteTreeIDFile(archive_tree_id_file, tree_id)) { auto str = fmt::format("Failed to write tree id to file {}", archive_tree_id_file.string()); logger_->Emit(LogLevel::Error, str); response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); return ::grpc::Status::OK; } + // open the Git CAS repo + auto just_git_cas = GitCAS::Open(StorageConfig::GitRoot()); + if (not just_git_cas) { + auto str = fmt::format("Failed to open Git ODB at {}", + StorageConfig::GitRoot().string()); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); + return ::grpc::Status::OK; + } + auto just_git_repo = GitRepo::Open(just_git_cas); + if (not just_git_repo) { + auto str = fmt::format("Failed to open Git repository {}", + StorageConfig::GitRoot().string()); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); + return ::grpc::Status::OK; + } // wrap logger for GitRepo call - wrapped_logger = std::make_shared<GitRepo::anon_logger_t>( + auto wrapped_logger = std::make_shared<GitRepo::anon_logger_t>( [logger = logger_, subdir, tree_id](auto const& msg, bool fatal) { if (fatal) { - auto err = fmt::format( - "ServeArchiveTree: While retrieving subtree {} of tree " - "{}:\n{}", - subdir, - *tree_id, - msg); + auto err = + fmt::format("While retrieving subtree {} of tree {}:\n{}", + subdir, + tree_id, + msg); logger->Emit(LogLevel::Error, err); } }); // get the subtree id; this is thread-safe auto subtree_id = - just_git_repo->GetSubtreeFromTree(*tree_id, subdir, wrapped_logger); + just_git_repo->GetSubtreeFromTree(tree_id, subdir, wrapped_logger); if (not subtree_id) { - auto str = fmt::format("Failed to retrieve tree id of commit {}", - *commit_hash); + auto str = fmt::format( + "Failed to retrieve subtree {} of tree {}", subdir, tree_id); logger_->Emit(LogLevel::Error, str); response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR); return ::grpc::Status::OK; @@ -718,14 +731,94 @@ auto SourceTreeService::ServeArchiveTree( return ::grpc::Status::OK; } // import to git - return ImportToGit(tmp_dir->GetPath(), - archive_tree_id_file, - content, - archive_type, - subdir, - resolve_special, - request->sync_tree(), - response); + return ArchiveImportToGit(tmp_dir->GetPath(), + archive_tree_id_file, + content, + archive_type, + subdir, + resolve_special, + request->sync_tree(), + response); +} + +auto SourceTreeService::DistdirImportToGit( + std::string const& distdir_tree_id, + std::string const& content_id, + std::unordered_map<std::string, std::string> const& content_list, + bool sync_tree, + ServeDistdirTreeResponse* response) -> ::grpc::Status { + // create tmp directory for the distdir + auto distdir_tmp_dir = StorageUtils::CreateTypedTmpDir("distdir"); + if (not distdir_tmp_dir) { + auto str = fmt::format( + "Failed to create tmp path for distdir target {}", content_id); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR); + return ::grpc::Status::OK; + } + auto const& tmp_path = distdir_tmp_dir->GetPath(); + // link the CAS blobs into the tmp dir + auto const& cas = Storage::Instance().CAS(); + if (not std::all_of(content_list.begin(), + content_list.end(), + [&cas, tmp_path](auto const& kv) { + auto content_path = cas.BlobPath( + ArtifactDigest(kv.second, 0, /*is_tree=*/false), + /*is_executable=*/false); + if (content_path) { + return FileSystemManager::CreateFileHardlink( + *content_path, // from: cas_path/content_id + tmp_path / kv.first); // to: tmp_path/name + } + return false; + })) { + auto str = + fmt::format("Failed to create links to CAS content {}", content_id); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR); + return ::grpc::Status::OK; + } + // Important: commit message must match that in just-mr! + auto commit_message = fmt::format("Content of distdir {}", content_id); + auto res = CommonImportToGit(tmp_path, commit_message); + if (res.index() == 0) { + // report the error + logger_->Emit(LogLevel::Error, std::get<0>(res)); + response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR); + return ::grpc::Status::OK; + } + auto const& tree_id = std::get<1>(res); + // check the committed tree matches what we expect + if (tree_id != distdir_tree_id) { + // something is very wrong... + auto str = fmt::format( + "Unexpected mismatch for tree of committed distdir:\nexpected {} " + "but got {}", + distdir_tree_id, + tree_id); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR); + return ::grpc::Status::OK; + } + // if asked, sync tree (and implicitly all blobs) with remote CAS + if (sync_tree) { + if (not local_api_->RetrieveToCas( + {Artifact::ObjectInfo{ + .digest = + ArtifactDigest{distdir_tree_id, 0, /*is_tree=*/true}, + .type = ObjectType::Tree}}, + &(*remote_api_))) { + auto str = fmt::format("Failed to sync tree {} from local CAS", + distdir_tree_id); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeDistdirTreeResponse::SYNC_ERROR); + return ::grpc::Status::OK; + } + } + // set response on success + response->set_tree(distdir_tree_id); + response->set_status(ServeDistdirTreeResponse::OK); + return ::grpc::Status::OK; } auto SourceTreeService::ServeDistdirTree( @@ -746,10 +839,8 @@ auto SourceTreeService::ServeDistdirTree( bool blob_found{}; auto const& cas = Storage::Instance().CAS(); - std::vector<Artifact::ObjectInfo> objects{}; // to be able to sync in batch - if (request->sync_tree()) { - objects.reserve(request->distfiles().size()); - } + std::unordered_map<std::string, std::string> content_list{}; + content_list.reserve(request->distfiles().size()); for (auto const& kv : request->distfiles()) { auto const& content = kv.content(); @@ -759,11 +850,6 @@ auto SourceTreeService::ServeDistdirTree( if (blob_found = static_cast<bool>( cas.BlobPath(digest, /*is_executable=*/false)); blob_found) { - // store digest for later sync with remote, if needed - if (request->sync_tree()) { - objects.emplace_back(Artifact::ObjectInfo{ - .digest = std::move(digest), .type = ObjectType::File}); - } } else { // check local Git cache @@ -780,11 +866,6 @@ auto SourceTreeService::ServeDistdirTree( ServeDistdirTreeResponse::INTERNAL_ERROR); return ::grpc::Status::OK; } - // store digest for later sync with remote, if needed - if (request->sync_tree()) { - objects.emplace_back(Artifact::ObjectInfo{ - .digest = std::move(digest), .type = ObjectType::File}); - } blob_found = true; } else { @@ -804,12 +885,6 @@ auto SourceTreeService::ServeDistdirTree( ServeDistdirTreeResponse::INTERNAL_ERROR); return ::grpc::Status::OK; } - // store digest for later sync with remote, if needed - if (request->sync_tree()) { - objects.emplace_back(Artifact::ObjectInfo{ - .digest = std::move(digest), - .type = ObjectType::File}); - } blob_found = true; break; } @@ -864,7 +939,13 @@ auto SourceTreeService::ServeDistdirTree( response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR); return ::grpc::Status::OK; } + // store to content_list for import-to-git hardlinking + content_list.insert_or_assign(kv.name(), kv.content()); } + // get hash of distdir content; this must match with that in just-mr + auto content_id = + HashFunction::ComputeBlobHash(nlohmann::json(content_list).dump()) + .HexString(); // create in-memory tree of the distdir, now that we know we have all blobs auto tree = GitRepo::CreateShallowTree(entries); if (not tree) { @@ -885,30 +966,54 @@ auto SourceTreeService::ServeDistdirTree( response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR); return ::grpc::Status::OK; } - // if asked, sync tree and all blobs with remote CAS - if (request->sync_tree()) { - if (not local_api_->RetrieveToCas( - {Artifact::ObjectInfo{.digest = ArtifactDigest(*tree_digest), - .type = ObjectType::Tree}}, - &(*remote_api_))) { - auto str = - fmt::format("Failed to sync tree {} from local CAS", tree_id); - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeDistdirTreeResponse::SYNC_ERROR); - return ::grpc::Status::OK; + // check if tree is already in Git cache + if (IsTreeInRepo(tree_id, StorageConfig::GitRoot(), logger_)) { + // if asked, sync tree and all blobs with remote CAS + if (request->sync_tree()) { + if (not local_api_->RetrieveToCas( + {Artifact::ObjectInfo{ + .digest = ArtifactDigest{tree_id, 0, /*is_tree=*/true}, + .type = ObjectType::Tree}}, + &(*remote_api_))) { + auto str = fmt::format("Failed to sync tree {} from local CAS", + tree_id); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeDistdirTreeResponse::SYNC_ERROR); + return ::grpc::Status::OK; + } } - if (not local_api_->RetrieveToCas(objects, &(*remote_api_))) { - auto str = - std::string{"Failed to bulk sync content blobs from local CAS"}; - logger_->Emit(LogLevel::Error, str); - response->set_status(ServeDistdirTreeResponse::SYNC_ERROR); + // set response on success + response->set_tree(tree_id); + response->set_status(ServeDistdirTreeResponse::OK); + return ::grpc::Status::OK; + } + // check if tree is in a known repository + for (auto const& path : RemoteServeConfig::KnownRepositories()) { + if (IsTreeInRepo(tree_id, path, logger_)) { + // if asked, sync tree and all blobs with remote CAS + if (request->sync_tree()) { + if (not local_api_->RetrieveToCas( + {Artifact::ObjectInfo{ + .digest = + ArtifactDigest{tree_id, 0, /*is_tree=*/true}, + .type = ObjectType::Tree}}, + &(*remote_api_))) { + auto str = fmt::format( + "Failed to sync tree {} from local CAS", tree_id); + logger_->Emit(LogLevel::Error, str); + response->set_status(ServeDistdirTreeResponse::SYNC_ERROR); + return ::grpc::Status::OK; + } + } + // set response on success + response->set_tree(tree_id); + response->set_status(ServeDistdirTreeResponse::OK); return ::grpc::Status::OK; } } - // set response on success - response->set_tree(tree_id); - response->set_status(ServeDistdirTreeResponse::OK); - return ::grpc::Status::OK; + // otherwise, we import the tree from CAS ourselves + return DistdirImportToGit( + tree_id, content_id, content_list, request->sync_tree(), response); } auto SourceTreeService::ServeContent( diff --git a/src/buildtool/serve_api/serve_service/source_tree.hpp b/src/buildtool/serve_api/serve_service/source_tree.hpp index 763762f6..8dd4021e 100644 --- a/src/buildtool/serve_api/serve_service/source_tree.hpp +++ b/src/buildtool/serve_api/serve_service/source_tree.hpp @@ -20,6 +20,7 @@ #include <mutex> #include <optional> #include <string> +#include <variant> #include <vector> #include "gsl/gsl" @@ -135,7 +136,15 @@ class SourceTreeService final bool sync_tree, ServeArchiveTreeResponse* response) -> ::grpc::Status; - [[nodiscard]] auto ImportToGit( + /// \brief Common import-to-git utility, used by both archives and distdirs. + /// \returns An error + data union, where at index 0 is the error message on + /// failure and at index 1 is the root tree id of the committed directory on + /// success. + [[nodiscard]] auto CommonImportToGit(std::filesystem::path const& root_path, + std::string const& commit_message) + -> std::variant<std::string, std::string>; + + [[nodiscard]] auto ArchiveImportToGit( std::filesystem::path const& unpack_path, std::filesystem::path const& archive_tree_id_file, std::string const& content, @@ -149,6 +158,13 @@ class SourceTreeService final std::string const& tree_id, std::filesystem::path const& repo_path, std::shared_ptr<Logger> const& logger) -> bool; + + [[nodiscard]] auto DistdirImportToGit( + std::string const& tree_id, + std::string const& content_id, + std::unordered_map<std::string, std::string> const& content_list, + bool sync_tree, + ServeDistdirTreeResponse* response) -> ::grpc::Status; }; #endif // INCLUDED_SRC_BUILDTOOL_SERVE_API_SERVE_SERVICE_SOURCE_TREE_HPP |