diff options
Diffstat (limited to 'src/buildtool/multithreading/notification_queue.hpp')
-rw-r--r-- | src/buildtool/multithreading/notification_queue.hpp | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/src/buildtool/multithreading/notification_queue.hpp b/src/buildtool/multithreading/notification_queue.hpp new file mode 100644 index 00000000..7e79aa43 --- /dev/null +++ b/src/buildtool/multithreading/notification_queue.hpp @@ -0,0 +1,188 @@ +#ifndef INCLUDED_SRC_BUILDTOOL_MULTITHREADING_NOTIFICATION_QUEUE_HPP +#define INCLUDED_SRC_BUILDTOOL_MULTITHREADING_NOTIFICATION_QUEUE_HPP + +#include <condition_variable> +#include <deque> +#include <mutex> +#include <optional> +#include <utility> // std::forward + +#include "gsl-lite/gsl-lite.hpp" +#include "src/buildtool/multithreading/task.hpp" +#include "src/utils/cpp/atomic.hpp" + +// Flag that can block the caller until it is set. Cannot be cleared after set. +class WaitableOneWayFlag { + public: + // Clear flag. Essentially a noop, if it was set before. + void Clear() { + if (not was_set_) { + initial_ = false; + } + } + + // Set flag. Essentially a noop, if it was set before. + void Set() { + if (not was_set_) { + was_set_ = true; + was_set_.notify_all(); + } + } + + // Blocks caller until it is set, if it was ever cleared. + void WaitForSet() { + if (not was_set_ and not initial_) { + was_set_.wait(false); + } + } + + private: + atomic<bool> was_set_{}; + bool initial_{true}; +}; + +// Counter that can block the caller until it reaches zero. +class WaitableZeroCounter { + enum class Status { Init, Wait, Reached }; + + public: + explicit WaitableZeroCounter(std::size_t init = 0) : count_{init} {} + + // Essentially a noop, if count reached zero since last wait call. + void Decrement() { + if (status_ != Status::Reached and --count_ == 0) { + if (status_ == Status::Wait) { + status_ = Status::Reached; + status_.notify_all(); + } + } + } + + // Essentially a noop, if count reached zero since last wait call. + void Increment() { + if (status_ != Status::Reached) { + ++count_; + } + } + + // Blocks caller until count reached zero, since last call to this method. + void WaitForZero() { + status_ = Status::Wait; + if (count_ != 0) { + status_.wait(Status::Wait); + } + status_ = Status::Reached; + } + + private: + std::atomic<std::size_t> count_{}; + atomic<Status> status_{Status::Init}; +}; + +class NotificationQueue { + public: + NotificationQueue(gsl::not_null<WaitableOneWayFlag*> queues_read, + gsl::not_null<WaitableZeroCounter*> num_threads_running) + : queues_read_{std::move(queues_read)}, + num_threads_running_{std::move(num_threads_running)} {} + + NotificationQueue(NotificationQueue const& other) = delete; + NotificationQueue(NotificationQueue&& other) noexcept + : queue_{std::move(other.queue_)}, + done_{other.done_}, + queues_read_{std::move(other.queues_read_)}, + num_threads_running_{std::move(other.num_threads_running_)} {} + ~NotificationQueue() = default; + + [[nodiscard]] auto operator=(NotificationQueue const& other) + -> NotificationQueue& = delete; + [[nodiscard]] auto operator=(NotificationQueue&& other) + -> NotificationQueue& = delete; + + // Blocks the thread until it's possible to pop or we are done. + // Note that the lock releases ownership of the mutex while waiting + // for the queue to have some element or for the notification queue + // state to be set to "done". + // Returns task popped or nullopt if no task was popped + [[nodiscard]] auto pop() -> std::optional<Task> { + std::unique_lock lock{mutex_}; + auto there_is_something_to_pop_or_we_are_done = [&]() { + return !queue_.empty() || done_; + }; + if (not there_is_something_to_pop_or_we_are_done()) { + num_threads_running_->Decrement(); + ready_.wait(lock, there_is_something_to_pop_or_we_are_done); + num_threads_running_->Increment(); + } + + if (queue_.empty()) { + return std::nullopt; + } + auto t = std::move(queue_.front()); + queue_.pop_front(); + queues_read_->Set(); + return t; + } + + // Returns nullopt if the mutex is already locked or the queue is empty, + // otherwise pops the front element of the queue and returns it + [[nodiscard]] auto try_pop() -> std::optional<Task> { + std::unique_lock lock{mutex_, std::try_to_lock}; + if (!lock || queue_.empty()) { + return std::nullopt; + } + auto t = std::move(queue_.front()); + queue_.pop_front(); + queues_read_->Set(); + return t; + } + + // Push task once the mutex is available (locking it until addition is + // finished) + template <typename FunctionType> + void push(FunctionType&& f) { + { + std::unique_lock lock{mutex_}; + queue_.emplace_back(std::forward<FunctionType>(f)); + } + queues_read_->Clear(); + ready_.notify_one(); + } + + // Returns false if mutex is locked without pushing the task, pushes task + // and returns true otherwise + template <typename FunctionType> + [[nodiscard]] auto try_push(FunctionType&& f) -> bool { + { + std::unique_lock lock{mutex_, std::try_to_lock}; + if (!lock) { + return false; + } + queue_.emplace_back(std::forward<FunctionType>(f)); + } + queues_read_->Clear(); + ready_.notify_one(); + return true; + } + + // Method to communicate to the notification queue that there will not be + // any more queries. Queries after calling this method are not guaratied to + // work as expected + void done() { + { + std::unique_lock lock{mutex_}; + done_ = true; + } + ready_.notify_all(); + } + + private: + std::deque<Task> queue_{}; + bool done_{false}; + std::mutex mutex_{}; + std::condition_variable ready_{}; + gsl::not_null<WaitableOneWayFlag*> queues_read_; + gsl::not_null<WaitableZeroCounter*> num_threads_running_; +}; + +#endif // INCLUDED_SRC_BUILDTOOL_MULTITHREADING_NOTIFICATION_QUEUE_HPP |