diff options
Diffstat (limited to 'src/buildtool/multithreading/notification_queue.hpp')
-rw-r--r-- | src/buildtool/multithreading/notification_queue.hpp | 84 |
1 files changed, 19 insertions, 65 deletions
diff --git a/src/buildtool/multithreading/notification_queue.hpp b/src/buildtool/multithreading/notification_queue.hpp index 7e79aa43..5965fde0 100644 --- a/src/buildtool/multithreading/notification_queue.hpp +++ b/src/buildtool/multithreading/notification_queue.hpp @@ -11,87 +11,42 @@ #include "src/buildtool/multithreading/task.hpp" #include "src/utils/cpp/atomic.hpp" -// Flag that can block the caller until it is set. Cannot be cleared after set. -class WaitableOneWayFlag { - public: - // Clear flag. Essentially a noop, if it was set before. - void Clear() { - if (not was_set_) { - initial_ = false; - } - } - - // Set flag. Essentially a noop, if it was set before. - void Set() { - if (not was_set_) { - was_set_ = true; - was_set_.notify_all(); - } - } - - // Blocks caller until it is set, if it was ever cleared. - void WaitForSet() { - if (not was_set_ and not initial_) { - was_set_.wait(false); - } - } - - private: - atomic<bool> was_set_{}; - bool initial_{true}; -}; - // Counter that can block the caller until it reaches zero. class WaitableZeroCounter { - enum class Status { Init, Wait, Reached }; - public: explicit WaitableZeroCounter(std::size_t init = 0) : count_{init} {} - // Essentially a noop, if count reached zero since last wait call. void Decrement() { - if (status_ != Status::Reached and --count_ == 0) { - if (status_ == Status::Wait) { - status_ = Status::Reached; - status_.notify_all(); - } + if (--count_ == 0) { + count_.notify_all(); } } - // Essentially a noop, if count reached zero since last wait call. - void Increment() { - if (status_ != Status::Reached) { - ++count_; - } - } + void Increment() { ++count_; } - // Blocks caller until count reached zero, since last call to this method. void WaitForZero() { - status_ = Status::Wait; - if (count_ != 0) { - status_.wait(Status::Wait); + auto val = count_.load(); + while (val != 0) { // loop to protect against spurious wakeups + count_.wait(val); + val = count_.load(); } - status_ = Status::Reached; } private: - std::atomic<std::size_t> count_{}; - atomic<Status> status_{Status::Init}; + atomic<std::size_t> count_{}; }; class NotificationQueue { public: - NotificationQueue(gsl::not_null<WaitableOneWayFlag*> queues_read, - gsl::not_null<WaitableZeroCounter*> num_threads_running) - : queues_read_{std::move(queues_read)}, - num_threads_running_{std::move(num_threads_running)} {} + explicit NotificationQueue( + gsl::not_null<WaitableZeroCounter*> total_workload) + : total_workload_{std::move(total_workload)} {} NotificationQueue(NotificationQueue const& other) = delete; NotificationQueue(NotificationQueue&& other) noexcept : queue_{std::move(other.queue_)}, done_{other.done_}, - queues_read_{std::move(other.queues_read_)}, - num_threads_running_{std::move(other.num_threads_running_)} {} + total_workload_{std::move(other.total_workload_)} {} ~NotificationQueue() = default; [[nodiscard]] auto operator=(NotificationQueue const& other) @@ -110,9 +65,9 @@ class NotificationQueue { return !queue_.empty() || done_; }; if (not there_is_something_to_pop_or_we_are_done()) { - num_threads_running_->Decrement(); + total_workload_->Decrement(); ready_.wait(lock, there_is_something_to_pop_or_we_are_done); - num_threads_running_->Increment(); + total_workload_->Increment(); } if (queue_.empty()) { @@ -120,7 +75,7 @@ class NotificationQueue { } auto t = std::move(queue_.front()); queue_.pop_front(); - queues_read_->Set(); + total_workload_->Decrement(); return t; } @@ -133,7 +88,7 @@ class NotificationQueue { } auto t = std::move(queue_.front()); queue_.pop_front(); - queues_read_->Set(); + total_workload_->Decrement(); return t; } @@ -145,7 +100,7 @@ class NotificationQueue { std::unique_lock lock{mutex_}; queue_.emplace_back(std::forward<FunctionType>(f)); } - queues_read_->Clear(); + total_workload_->Increment(); ready_.notify_one(); } @@ -160,7 +115,7 @@ class NotificationQueue { } queue_.emplace_back(std::forward<FunctionType>(f)); } - queues_read_->Clear(); + total_workload_->Increment(); ready_.notify_one(); return true; } @@ -181,8 +136,7 @@ class NotificationQueue { bool done_{false}; std::mutex mutex_{}; std::condition_variable ready_{}; - gsl::not_null<WaitableOneWayFlag*> queues_read_; - gsl::not_null<WaitableZeroCounter*> num_threads_running_; + gsl::not_null<WaitableZeroCounter*> total_workload_; }; #endif // INCLUDED_SRC_BUILDTOOL_MULTITHREADING_NOTIFICATION_QUEUE_HPP |