summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/serve_api/serve_service/source_tree.cpp377
-rw-r--r--src/buildtool/serve_api/serve_service/source_tree.hpp18
2 files changed, 258 insertions, 137 deletions
diff --git a/src/buildtool/serve_api/serve_service/source_tree.cpp b/src/buildtool/serve_api/serve_service/source_tree.cpp
index 40478aeb..6d51221a 100644
--- a/src/buildtool/serve_api/serve_service/source_tree.cpp
+++ b/src/buildtool/serve_api/serve_service/source_tree.cpp
@@ -375,109 +375,90 @@ auto SourceTreeService::ResolveContentTree(
return SyncArchive(tree_id, repo_path, sync_tree, response);
}
-auto SourceTreeService::ImportToGit(
- std::filesystem::path const& unpack_path,
- std::filesystem::path const& archive_tree_id_file,
- std::string const& content,
- std::string const& archive_type,
- std::string const& subdir,
- std::optional<PragmaSpecial> const& resolve_special,
- bool sync_tree,
- ServeArchiveTreeResponse* response) -> ::grpc::Status {
+auto SourceTreeService::CommonImportToGit(
+ std::filesystem::path const& root_path,
+ std::string const& commit_message)
+ -> std::variant<std::string, std::string> {
+ using result_t = std::variant<std::string, std::string>;
// do the initial commit; no need to guard, as the tmp location is unique
- auto git_repo = GitRepo::InitAndOpen(unpack_path,
+ auto git_repo = GitRepo::InitAndOpen(root_path,
/*is_bare=*/false);
if (not git_repo) {
auto str = fmt::format("Could not initialize repository {}",
- unpack_path.string());
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ root_path.string());
+ return result_t(std::in_place_index<0>, str);
}
// wrap logger for GitRepo call
+ std::string err;
auto wrapped_logger = std::make_shared<GitRepo::anon_logger_t>(
- [logger = logger_, unpack_path](auto const& msg, bool fatal) {
+ [root_path, &err](auto const& msg, bool fatal) {
if (fatal) {
- auto err = fmt::format(
- "ServeArchiveTree: While staging and committing all in "
- "repository {}:\n{}",
- unpack_path.string(),
+ err = fmt::format(
+ "While staging and committing all in repository {}:\n{}",
+ root_path.string(),
msg);
- logger->Emit(LogLevel::Error, err);
}
});
// stage and commit all
- // Important: message must be consistent with just-mr!
- auto mess = fmt::format("Content of {} {}", archive_type, content);
auto commit_hash =
- git_repo->StageAndCommitAllAnonymous(mess, wrapped_logger);
+ git_repo->StageAndCommitAllAnonymous(commit_message, wrapped_logger);
if (not commit_hash) {
auto str =
- fmt::format("Failed to create initial commit in repository {}",
- unpack_path.string());
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ fmt::format("Failed to create initial commit in repository {}\n{}",
+ root_path.string(),
+ err);
+ return result_t(std::in_place_index<0>, str);
}
// create a tmp directory for the fetch to Git CAS
auto tmp_dir = StorageUtils::CreateTypedTmpDir("import-to-git");
if (not tmp_dir) {
- logger_->Emit(LogLevel::Error,
- "Failed to create tmp path for git import");
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ return result_t(
+ std::in_place_index<0>,
+ std::string("Failed to create tmp path for git import"));
}
// open the Git CAS repo
auto just_git_cas = GitCAS::Open(StorageConfig::GitRoot());
if (not just_git_cas) {
auto str = fmt::format("Failed to open Git ODB at {}",
StorageConfig::GitRoot().string());
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ return result_t(std::in_place_index<0>, str);
}
auto just_git_repo = GitRepo::Open(just_git_cas);
if (not just_git_repo) {
auto str = fmt::format("Failed to open Git repository {}",
StorageConfig::GitRoot().string());
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ return result_t(std::in_place_index<0>, str);
}
// wrap logger for GitRepo call
+ err.clear();
wrapped_logger = std::make_shared<GitRepo::anon_logger_t>(
- [logger = logger_](auto const& msg, bool fatal) {
+ [&err](auto const& msg, bool fatal) {
if (fatal) {
- auto err = fmt::format(
- "ServeArchiveTree: While fetching in repository {}:\n{}",
- StorageConfig::GitRoot().string(),
- msg);
- logger->Emit(LogLevel::Error, err);
+ err = fmt::format("While fetching in repository {}:\n{}",
+ StorageConfig::GitRoot().string(),
+ msg);
}
});
// fetch the new commit into the Git CAS via tmp directory; the call is
// thread-safe, so it needs no guarding
if (not just_git_repo->LocalFetchViaTmpRepo(tmp_dir->GetPath(),
- unpack_path.string(),
+ root_path.string(),
/*branch=*/std::nullopt,
wrapped_logger)) {
- auto str =
- fmt::format("Failed to fetch commit {} into Git CAS", *commit_hash);
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ auto str = fmt::format(
+ "Failed to fetch commit {} into Git CAS\n{}", *commit_hash, err);
+ return result_t(std::in_place_index<0>, str);
}
// wrap logger for GitRepo call
+ err.clear();
wrapped_logger = std::make_shared<GitRepo::anon_logger_t>(
- [logger = logger_, commit_hash](auto const& msg, bool fatal) {
+ [commit_hash, &err](auto const& msg, bool fatal) {
if (fatal) {
- auto err = fmt::format(
- "ServeArchiveTree: While tagging commit {} in repository "
- "{}:\n{}",
- *commit_hash,
- StorageConfig::GitRoot().string(),
- msg);
- logger->Emit(LogLevel::Error, err);
+ err =
+ fmt::format("While tagging commit {} in repository {}:\n{}",
+ *commit_hash,
+ StorageConfig::GitRoot().string(),
+ msg);
}
});
// tag commit and keep it in Git CAS
@@ -489,70 +470,102 @@ auto SourceTreeService::ImportToGit(
if (not git_repo) {
auto str = fmt::format("Failed to open Git CAS repository {}",
StorageConfig::GitRoot().string());
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ return result_t(std::in_place_index<0>, str);
}
// Important: message must be consistent with just-mr!
if (not git_repo->KeepTag(*commit_hash,
"Keep referenced tree alive", // message
wrapped_logger)) {
- auto str =
- fmt::format("Failed to tag and keep commit {}", *commit_hash);
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
- return ::grpc::Status::OK;
+ auto str = fmt::format(
+ "Failed to tag and keep commit {}\n{}", *commit_hash, err);
+ return result_t(std::in_place_index<0>, str);
}
}
// wrap logger for GitRepo call
+ err.clear();
wrapped_logger = std::make_shared<GitRepo::anon_logger_t>(
- [logger = logger_, commit_hash](auto const& msg, bool fatal) {
+ [commit_hash, &err](auto const& msg, bool fatal) {
if (fatal) {
- auto err = fmt::format(
- "ServeArchiveTree: While retrieving tree id of commit "
- "{}:\n{}",
- *commit_hash,
- msg);
- logger->Emit(LogLevel::Error, err);
+ err = fmt::format("While retrieving tree id of commit {}:\n{}",
+ *commit_hash,
+ msg);
}
});
- // get the root tree of this commit, to store in file; this is thread-safe
+ // get the root tree of this commit; this is thread-safe
auto tree_id =
just_git_repo->GetSubtreeFromCommit(*commit_hash, ".", wrapped_logger);
if (not tree_id) {
- auto str = fmt::format("Failed to retrieve tree id of commit {}",
- *commit_hash);
- logger_->Emit(LogLevel::Error, str);
+ auto str = fmt::format(
+ "Failed to retrieve tree id of commit {}\n{}", *commit_hash, err);
+ return result_t(std::in_place_index<0>, str);
+ }
+ // return the root tree id
+ return result_t(std::in_place_index<1>, *tree_id);
+}
+
+auto SourceTreeService::ArchiveImportToGit(
+ std::filesystem::path const& unpack_path,
+ std::filesystem::path const& archive_tree_id_file,
+ std::string const& content,
+ std::string const& archive_type,
+ std::string const& subdir,
+ std::optional<PragmaSpecial> const& resolve_special,
+ bool sync_tree,
+ ServeArchiveTreeResponse* response) -> ::grpc::Status {
+ // Important: commit message must match that in just-mr!
+ auto commit_message =
+ fmt::format("Content of {} {}", archive_type, content);
+ auto res = CommonImportToGit(unpack_path, commit_message);
+ if (res.index() == 0) {
+ // report the error
+ logger_->Emit(LogLevel::Error, std::get<0>(res));
response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
return ::grpc::Status::OK;
}
+ auto const& tree_id = std::get<1>(res);
// write to tree id file
- if (not StorageUtils::WriteTreeIDFile(archive_tree_id_file, *tree_id)) {
+ if (not StorageUtils::WriteTreeIDFile(archive_tree_id_file, tree_id)) {
auto str = fmt::format("Failed to write tree id to file {}",
archive_tree_id_file.string());
logger_->Emit(LogLevel::Error, str);
response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
return ::grpc::Status::OK;
}
+ // open the Git CAS repo
+ auto just_git_cas = GitCAS::Open(StorageConfig::GitRoot());
+ if (not just_git_cas) {
+ auto str = fmt::format("Failed to open Git ODB at {}",
+ StorageConfig::GitRoot().string());
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
+ return ::grpc::Status::OK;
+ }
+ auto just_git_repo = GitRepo::Open(just_git_cas);
+ if (not just_git_repo) {
+ auto str = fmt::format("Failed to open Git repository {}",
+ StorageConfig::GitRoot().string());
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
+ return ::grpc::Status::OK;
+ }
// wrap logger for GitRepo call
- wrapped_logger = std::make_shared<GitRepo::anon_logger_t>(
+ auto wrapped_logger = std::make_shared<GitRepo::anon_logger_t>(
[logger = logger_, subdir, tree_id](auto const& msg, bool fatal) {
if (fatal) {
- auto err = fmt::format(
- "ServeArchiveTree: While retrieving subtree {} of tree "
- "{}:\n{}",
- subdir,
- *tree_id,
- msg);
+ auto err =
+ fmt::format("While retrieving subtree {} of tree {}:\n{}",
+ subdir,
+ tree_id,
+ msg);
logger->Emit(LogLevel::Error, err);
}
});
// get the subtree id; this is thread-safe
auto subtree_id =
- just_git_repo->GetSubtreeFromTree(*tree_id, subdir, wrapped_logger);
+ just_git_repo->GetSubtreeFromTree(tree_id, subdir, wrapped_logger);
if (not subtree_id) {
- auto str = fmt::format("Failed to retrieve tree id of commit {}",
- *commit_hash);
+ auto str = fmt::format(
+ "Failed to retrieve subtree {} of tree {}", subdir, tree_id);
logger_->Emit(LogLevel::Error, str);
response->set_status(ServeArchiveTreeResponse::INTERNAL_ERROR);
return ::grpc::Status::OK;
@@ -718,14 +731,94 @@ auto SourceTreeService::ServeArchiveTree(
return ::grpc::Status::OK;
}
// import to git
- return ImportToGit(tmp_dir->GetPath(),
- archive_tree_id_file,
- content,
- archive_type,
- subdir,
- resolve_special,
- request->sync_tree(),
- response);
+ return ArchiveImportToGit(tmp_dir->GetPath(),
+ archive_tree_id_file,
+ content,
+ archive_type,
+ subdir,
+ resolve_special,
+ request->sync_tree(),
+ response);
+}
+
+auto SourceTreeService::DistdirImportToGit(
+ std::string const& distdir_tree_id,
+ std::string const& content_id,
+ std::unordered_map<std::string, std::string> const& content_list,
+ bool sync_tree,
+ ServeDistdirTreeResponse* response) -> ::grpc::Status {
+ // create tmp directory for the distdir
+ auto distdir_tmp_dir = StorageUtils::CreateTypedTmpDir("distdir");
+ if (not distdir_tmp_dir) {
+ auto str = fmt::format(
+ "Failed to create tmp path for distdir target {}", content_id);
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR);
+ return ::grpc::Status::OK;
+ }
+ auto const& tmp_path = distdir_tmp_dir->GetPath();
+ // link the CAS blobs into the tmp dir
+ auto const& cas = Storage::Instance().CAS();
+ if (not std::all_of(content_list.begin(),
+ content_list.end(),
+ [&cas, tmp_path](auto const& kv) {
+ auto content_path = cas.BlobPath(
+ ArtifactDigest(kv.second, 0, /*is_tree=*/false),
+ /*is_executable=*/false);
+ if (content_path) {
+ return FileSystemManager::CreateFileHardlink(
+ *content_path, // from: cas_path/content_id
+ tmp_path / kv.first); // to: tmp_path/name
+ }
+ return false;
+ })) {
+ auto str =
+ fmt::format("Failed to create links to CAS content {}", content_id);
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR);
+ return ::grpc::Status::OK;
+ }
+ // Important: commit message must match that in just-mr!
+ auto commit_message = fmt::format("Content of distdir {}", content_id);
+ auto res = CommonImportToGit(tmp_path, commit_message);
+ if (res.index() == 0) {
+ // report the error
+ logger_->Emit(LogLevel::Error, std::get<0>(res));
+ response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR);
+ return ::grpc::Status::OK;
+ }
+ auto const& tree_id = std::get<1>(res);
+ // check the committed tree matches what we expect
+ if (tree_id != distdir_tree_id) {
+ // something is very wrong...
+ auto str = fmt::format(
+ "Unexpected mismatch for tree of committed distdir:\nexpected {} "
+ "but got {}",
+ distdir_tree_id,
+ tree_id);
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR);
+ return ::grpc::Status::OK;
+ }
+ // if asked, sync tree (and implicitly all blobs) with remote CAS
+ if (sync_tree) {
+ if (not local_api_->RetrieveToCas(
+ {Artifact::ObjectInfo{
+ .digest =
+ ArtifactDigest{distdir_tree_id, 0, /*is_tree=*/true},
+ .type = ObjectType::Tree}},
+ &(*remote_api_))) {
+ auto str = fmt::format("Failed to sync tree {} from local CAS",
+ distdir_tree_id);
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeDistdirTreeResponse::SYNC_ERROR);
+ return ::grpc::Status::OK;
+ }
+ }
+ // set response on success
+ response->set_tree(distdir_tree_id);
+ response->set_status(ServeDistdirTreeResponse::OK);
+ return ::grpc::Status::OK;
}
auto SourceTreeService::ServeDistdirTree(
@@ -746,10 +839,8 @@ auto SourceTreeService::ServeDistdirTree(
bool blob_found{};
auto const& cas = Storage::Instance().CAS();
- std::vector<Artifact::ObjectInfo> objects{}; // to be able to sync in batch
- if (request->sync_tree()) {
- objects.reserve(request->distfiles().size());
- }
+ std::unordered_map<std::string, std::string> content_list{};
+ content_list.reserve(request->distfiles().size());
for (auto const& kv : request->distfiles()) {
auto const& content = kv.content();
@@ -759,11 +850,6 @@ auto SourceTreeService::ServeDistdirTree(
if (blob_found = static_cast<bool>(
cas.BlobPath(digest, /*is_executable=*/false));
blob_found) {
- // store digest for later sync with remote, if needed
- if (request->sync_tree()) {
- objects.emplace_back(Artifact::ObjectInfo{
- .digest = std::move(digest), .type = ObjectType::File});
- }
}
else {
// check local Git cache
@@ -780,11 +866,6 @@ auto SourceTreeService::ServeDistdirTree(
ServeDistdirTreeResponse::INTERNAL_ERROR);
return ::grpc::Status::OK;
}
- // store digest for later sync with remote, if needed
- if (request->sync_tree()) {
- objects.emplace_back(Artifact::ObjectInfo{
- .digest = std::move(digest), .type = ObjectType::File});
- }
blob_found = true;
}
else {
@@ -804,12 +885,6 @@ auto SourceTreeService::ServeDistdirTree(
ServeDistdirTreeResponse::INTERNAL_ERROR);
return ::grpc::Status::OK;
}
- // store digest for later sync with remote, if needed
- if (request->sync_tree()) {
- objects.emplace_back(Artifact::ObjectInfo{
- .digest = std::move(digest),
- .type = ObjectType::File});
- }
blob_found = true;
break;
}
@@ -864,7 +939,13 @@ auto SourceTreeService::ServeDistdirTree(
response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR);
return ::grpc::Status::OK;
}
+ // store to content_list for import-to-git hardlinking
+ content_list.insert_or_assign(kv.name(), kv.content());
}
+ // get hash of distdir content; this must match with that in just-mr
+ auto content_id =
+ HashFunction::ComputeBlobHash(nlohmann::json(content_list).dump())
+ .HexString();
// create in-memory tree of the distdir, now that we know we have all blobs
auto tree = GitRepo::CreateShallowTree(entries);
if (not tree) {
@@ -885,30 +966,54 @@ auto SourceTreeService::ServeDistdirTree(
response->set_status(ServeDistdirTreeResponse::INTERNAL_ERROR);
return ::grpc::Status::OK;
}
- // if asked, sync tree and all blobs with remote CAS
- if (request->sync_tree()) {
- if (not local_api_->RetrieveToCas(
- {Artifact::ObjectInfo{.digest = ArtifactDigest(*tree_digest),
- .type = ObjectType::Tree}},
- &(*remote_api_))) {
- auto str =
- fmt::format("Failed to sync tree {} from local CAS", tree_id);
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeDistdirTreeResponse::SYNC_ERROR);
- return ::grpc::Status::OK;
+ // check if tree is already in Git cache
+ if (IsTreeInRepo(tree_id, StorageConfig::GitRoot(), logger_)) {
+ // if asked, sync tree and all blobs with remote CAS
+ if (request->sync_tree()) {
+ if (not local_api_->RetrieveToCas(
+ {Artifact::ObjectInfo{
+ .digest = ArtifactDigest{tree_id, 0, /*is_tree=*/true},
+ .type = ObjectType::Tree}},
+ &(*remote_api_))) {
+ auto str = fmt::format("Failed to sync tree {} from local CAS",
+ tree_id);
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeDistdirTreeResponse::SYNC_ERROR);
+ return ::grpc::Status::OK;
+ }
}
- if (not local_api_->RetrieveToCas(objects, &(*remote_api_))) {
- auto str =
- std::string{"Failed to bulk sync content blobs from local CAS"};
- logger_->Emit(LogLevel::Error, str);
- response->set_status(ServeDistdirTreeResponse::SYNC_ERROR);
+ // set response on success
+ response->set_tree(tree_id);
+ response->set_status(ServeDistdirTreeResponse::OK);
+ return ::grpc::Status::OK;
+ }
+ // check if tree is in a known repository
+ for (auto const& path : RemoteServeConfig::KnownRepositories()) {
+ if (IsTreeInRepo(tree_id, path, logger_)) {
+ // if asked, sync tree and all blobs with remote CAS
+ if (request->sync_tree()) {
+ if (not local_api_->RetrieveToCas(
+ {Artifact::ObjectInfo{
+ .digest =
+ ArtifactDigest{tree_id, 0, /*is_tree=*/true},
+ .type = ObjectType::Tree}},
+ &(*remote_api_))) {
+ auto str = fmt::format(
+ "Failed to sync tree {} from local CAS", tree_id);
+ logger_->Emit(LogLevel::Error, str);
+ response->set_status(ServeDistdirTreeResponse::SYNC_ERROR);
+ return ::grpc::Status::OK;
+ }
+ }
+ // set response on success
+ response->set_tree(tree_id);
+ response->set_status(ServeDistdirTreeResponse::OK);
return ::grpc::Status::OK;
}
}
- // set response on success
- response->set_tree(tree_id);
- response->set_status(ServeDistdirTreeResponse::OK);
- return ::grpc::Status::OK;
+ // otherwise, we import the tree from CAS ourselves
+ return DistdirImportToGit(
+ tree_id, content_id, content_list, request->sync_tree(), response);
}
auto SourceTreeService::ServeContent(
diff --git a/src/buildtool/serve_api/serve_service/source_tree.hpp b/src/buildtool/serve_api/serve_service/source_tree.hpp
index 763762f6..8dd4021e 100644
--- a/src/buildtool/serve_api/serve_service/source_tree.hpp
+++ b/src/buildtool/serve_api/serve_service/source_tree.hpp
@@ -20,6 +20,7 @@
#include <mutex>
#include <optional>
#include <string>
+#include <variant>
#include <vector>
#include "gsl/gsl"
@@ -135,7 +136,15 @@ class SourceTreeService final
bool sync_tree,
ServeArchiveTreeResponse* response) -> ::grpc::Status;
- [[nodiscard]] auto ImportToGit(
+ /// \brief Common import-to-git utility, used by both archives and distdirs.
+ /// \returns An error + data union, where at index 0 is the error message on
+ /// failure and at index 1 is the root tree id of the committed directory on
+ /// success.
+ [[nodiscard]] auto CommonImportToGit(std::filesystem::path const& root_path,
+ std::string const& commit_message)
+ -> std::variant<std::string, std::string>;
+
+ [[nodiscard]] auto ArchiveImportToGit(
std::filesystem::path const& unpack_path,
std::filesystem::path const& archive_tree_id_file,
std::string const& content,
@@ -149,6 +158,13 @@ class SourceTreeService final
std::string const& tree_id,
std::filesystem::path const& repo_path,
std::shared_ptr<Logger> const& logger) -> bool;
+
+ [[nodiscard]] auto DistdirImportToGit(
+ std::string const& tree_id,
+ std::string const& content_id,
+ std::unordered_map<std::string, std::string> const& content_list,
+ bool sync_tree,
+ ServeDistdirTreeResponse* response) -> ::grpc::Status;
};
#endif // INCLUDED_SRC_BUILDTOOL_SERVE_API_SERVE_SERVICE_SOURCE_TREE_HPP