diff options
-rw-r--r-- | src/buildtool/storage/TARGETS | 3 | ||||
-rw-r--r-- | src/buildtool/storage/compactification_task.cpp | 147 | ||||
-rw-r--r-- | src/buildtool/storage/compactification_task.hpp | 95 |
3 files changed, 245 insertions, 0 deletions
diff --git a/src/buildtool/storage/TARGETS b/src/buildtool/storage/TARGETS index 9161c398..0346e3d6 100644 --- a/src/buildtool/storage/TARGETS +++ b/src/buildtool/storage/TARGETS @@ -35,11 +35,13 @@ , "large_object_cas.tpp" , "compactifier.hpp" ] + , "private-hdrs": ["compactification_task.hpp"] , "srcs": [ "target_cache_key.cpp" , "target_cache_entry.cpp" , "garbage_collector.cpp" , "compactifier.cpp" + , "compactification_task.cpp" ] , "deps": [ "config" @@ -70,6 +72,7 @@ [ ["src/buildtool/execution_api/remote", "config"] , ["src/buildtool/logging", "log_level"] , ["src/buildtool/execution_api/common", "message_limits"] + , ["src/buildtool/multithreading", "task_system"] ] } , "fs_utils": diff --git a/src/buildtool/storage/compactification_task.cpp b/src/buildtool/storage/compactification_task.cpp new file mode 100644 index 00000000..ee9411ba --- /dev/null +++ b/src/buildtool/storage/compactification_task.cpp @@ -0,0 +1,147 @@ +// Copyright 2024 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/buildtool/storage/compactification_task.hpp" + +#include <atomic> +#include <optional> +#include <unordered_map> +#include <utility> //std::move +#include <vector> + +#include "gsl/gsl" +#include "src/buildtool/file_system/file_system_manager.hpp" +#include "src/buildtool/file_system/object_type.hpp" +#include "src/buildtool/multithreading/task_system.hpp" + +namespace { +[[nodiscard]] auto GetObjectTask(CompactificationTask const& task, + ObjectType type) noexcept + -> CompactificationTask::ObjectTask const&; + +[[nodiscard]] auto GetFilterTypes(CompactificationTask const& task) noexcept + -> std::vector<ObjectType>; + +using FilterResult = std::optional<std::vector<std::filesystem::path>>; +[[nodiscard]] auto FilterEntries(CompactificationTask const& task, + ObjectType type) noexcept -> FilterResult; +} // namespace + +[[nodiscard]] auto CompactifyConcurrently( + CompactificationTask const& task) noexcept -> bool { + std::atomic_bool failed = false; + std::unordered_map<ObjectType, FilterResult> scan_results; + { + TaskSystem ts; + // Filter entries to create execution tasks: + for (auto type : GetFilterTypes(task)) { + try { + auto tstask = + [result = &scan_results[type], &failed, type, &task] { + *result = ::FilterEntries(task, type); + if (not *result) { + failed = true; + } + }; + ts.QueueTask(std::move(tstask)); + } catch (...) { + ts.Shutdown(); + return false; + } + } + } + + // Init compactification tasks: + if (not failed) { + TaskSystem ts; + for (auto const& [type, subtasks] : scan_results) { + auto const& task_callback = GetObjectTask(task, type); + for (auto const& entry : *subtasks) { + try { + auto tstask = [&failed, &task, &task_callback, &entry] { + if (not failed and + not std::invoke(task_callback, task, entry)) { + failed = true; + } + }; + ts.QueueTask(std::move(tstask)); + } catch (...) { + ts.Shutdown(); + return false; + } + } + } + } + return not failed; +} + +namespace { +[[nodiscard]] auto GetObjectTask(CompactificationTask const& task, + ObjectType type) noexcept + -> CompactificationTask::ObjectTask const& { + switch (type) { + case ObjectType::Symlink: + case ObjectType::File: + return task.f_task; + case ObjectType::Executable: + return task.x_task; + case ObjectType::Tree: + return task.t_task; + } + Ensures(false); // unreachable +} + +[[nodiscard]] auto GetFilterTypes(CompactificationTask const& task) noexcept + -> std::vector<ObjectType> { + return task.large ? std::vector{ObjectType::File, ObjectType::Tree} + : std::vector{ObjectType::File, + ObjectType::Tree, + ObjectType::Executable}; +} + +[[nodiscard]] auto FilterEntries(CompactificationTask const& task, + ObjectType type) noexcept -> FilterResult { + std::vector<std::filesystem::path> result; + auto const& storage_root = task.cas.StorageRoot(type, task.large); + // Check there are entries to process: + if (not FileSystemManager::IsDirectory(storage_root)) { + return result; + } + + FileSystemManager::UseDirEntryFunc callback = + [&task, &storage_root, &result](std::filesystem::path const& entry, + bool /*unused*/) -> bool { + // Filter entries: + try { + if (std::invoke(task.filter, storage_root / entry)) { + result.push_back(entry); + } + } catch (...) { + return false; + } + return true; + }; + + // Read the ObjectType storage directory: + if (not FileSystemManager::ReadDirectoryEntriesRecursive(storage_root, + callback)) { + result.clear(); + task.Log(LogLevel::Error, + "Scanning: Failed to read {}", + storage_root.string()); + return std::nullopt; + } + return result; +} +} // namespace diff --git a/src/buildtool/storage/compactification_task.hpp b/src/buildtool/storage/compactification_task.hpp new file mode 100644 index 00000000..e0f7e492 --- /dev/null +++ b/src/buildtool/storage/compactification_task.hpp @@ -0,0 +1,95 @@ +// Copyright 2024 Huawei Cloud Computing Technology Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef INCLUDED_SRC_BUILDTOOL_STORAGE_COMPACTIFICATION_TASK_HPP +#define INCLUDED_SRC_BUILDTOOL_STORAGE_COMPACTIFICATION_TASK_HPP + +#include <filesystem> +#include <functional> +#include <string> + +#include "fmt/core.h" +#include "src/buildtool/logging/log_level.hpp" +#include "src/buildtool/logging/logger.hpp" +#include "src/buildtool/storage/local_cas.hpp" + +/// \brief Set of data that defines a compactification task. +/// \param cas CAS to be scanned. +/// \param large True if large storages need to be scanned. +/// \param f_task A handler for ObjectType::File entries. +/// \param x_task A handler for ObjectType::Executable entries. +/// It is never called during scanning of large storages. +/// \param t_task A handler for ObjectType::Tree entries. +/// \param logger A callback for logging. +/// \param filter A callback that defines which files must be processed. +struct CompactificationTask final { + using Logger = std::function<void(LogLevel, std::string const&)>; + using Filter = std::function<bool(std::filesystem::path const&)>; + using ObjectTask = std::function<bool(CompactificationTask const&, + std::filesystem::path const&)>; + + static inline const Logger kLoggerDefault = [](LogLevel level, + std::string const& message) { + ::Logger::Log(level, "{}", message); + }; + static inline const Filter kFilterDefault = + [](std::filesystem::path const& /*unused*/) { return false; }; + static inline const ObjectTask kObjectTaskDefault = + [](CompactificationTask const& /*unused*/, + std::filesystem::path const& /*unused*/) { + kLoggerDefault(LogLevel::Error, "Default ObjectTask was called"); + return false; + }; + + LocalCAS<false> const& cas; + bool large = false; + Logger logger = kLoggerDefault; + Filter filter = kFilterDefault; + ObjectTask f_task = kObjectTaskDefault; + ObjectTask x_task = kObjectTaskDefault; + ObjectTask t_task = kObjectTaskDefault; + + /// \brief Log a formatted error. + template <typename... Args> + void Log(LogLevel level, + std::string const& message, + Args&&... args) const noexcept; +}; + +/// \brief Execute the comapctification task using multiple threads. +/// Execution of the CompactificationTask-defined logic begins only after the +/// entire storage has been scanned, otherwise the storage may be invalidated. +/// Example: casf is scanned while another thread puts new split chunks there. +/// \return True if execution was successful. +[[nodiscard]] auto CompactifyConcurrently( + CompactificationTask const& task) noexcept -> bool; + +template <typename... Args> +void CompactificationTask::Log(LogLevel level, + std::string const& message, + Args&&... args) const noexcept { + if (not logger) { + ::Logger::Log(LogLevel::Error, "Logger is missing."); + return; + } + auto msg = fmt::vformat(message, fmt::make_format_args(args...)); + try { + std::invoke(logger, level, msg); + } catch (...) { + ::Logger::Log(LogLevel::Error, "Failed to invoke a logger"); + ::Logger::Log(level, "{}", msg); + } +} + +#endif // INCLUDED_SRC_BUILDTOOL_STORAGE_COMPACTIFICATION_TASK_HPP |